File indexing completed on 2026-04-10 08:39:14
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 try:
0011
0012 import dask_kubernetes
0013
0014 except Exception:
0015 pass
0016
0017
0018 from pilot.util.container import execute
0019 from pilot.util.filehandling import establish_logging, write_file
0020
0021 import os
0022 import re
0023 from time import sleep
0024
0025 import logging
0026 logger = logging.getLogger(__name__)
0027
0028
0029 class Dask(object):
0030 """
0031 Dask interface class.
0032 """
0033
0034 servicename = 'single-dask'
0035 status = None
0036 loadbalancerip = None
0037 servicetype = "LoadBalancer"
0038 jupyter = False
0039 overrides = "override_values.yaml"
0040 _workdir = os.getcwd()
0041 cluster = None
0042
0043 def __init__(self, **kwargs):
0044 """
0045 Init function.
0046
0047 :param kwargs:
0048 """
0049
0050 _servicename = kwargs.get('servicename', None)
0051 if _servicename:
0052 self.servicename = _servicename
0053 _servicetype = kwargs.get('servicetype', None)
0054 if _servicetype:
0055 self.servicetype = _servicetype
0056 _jupyter = kwargs.get('jupyter', None)
0057 if _jupyter:
0058 self.jupyter = _jupyter
0059 _overrides = kwargs.get('overrides', None)
0060 if _overrides:
0061 self.overrides = _overrides
0062
0063 def uninstall(self, block=True):
0064 """
0065
0066 """
0067
0068 logger.info('uninstalling service %s', self.servicename)
0069 if block:
0070 logger.warning('blocking mode not yet implemented')
0071
0072 cmd = 'helm uninstall %s' % self.servicename
0073 exit_code, stdout, stderr = execute(cmd, mute=True)
0074 if not exit_code:
0075 self.status = 'uninstalled'
0076 logger.info('uninstall of service %s has been requested', self.servicename)
0077
0078 def install(self, block=True):
0079 """
0080
0081 """
0082
0083
0084 if not self._validate():
0085 logger.warning('validation failed')
0086 self.status = 'failed'
0087 else:
0088 logger.debug('dask has been validated')
0089 self.status = 'validated'
0090
0091
0092 name = '%s-scheduler' % self.servicename
0093 if self.is_running(name=name):
0094 logger.info('service %s is already running - nothing to install', name)
0095 else:
0096 logger.info('service %s is not yet running - proceed with installation', name)
0097
0098
0099 cmd = ''
0100
0101 override_option = "-f %s" % self.overrides if self.overrides else ""
0102 cmd = 'helm install %s %s dask/dask' % (override_option, self.servicename)
0103 exit_code, stdout, stderr = execute(cmd, mute=True)
0104 if not exit_code:
0105 logger.info('installation of service %s is in progress', self.servicename)
0106
0107 if block:
0108 while True:
0109 name = '%s-scheduler' % self.servicename
0110 if self.is_running(name=name):
0111 logger.info('service %s is running', name)
0112 self.status = 'running'
0113 break
0114 else:
0115 self.status = 'pending'
0116 sleep(2)
0117
0118
0119 def is_running(self, name='single-dask-scheduler'):
0120 """
0121
0122 """
0123
0124 status = False
0125 dictionary = self._get_dictionary(cmd='kubectl get services')
0126 for key in dictionary:
0127 if key == name:
0128 status = True if self._is_valid_ip(dictionary[key]['EXTERNAL-IP']) else False
0129 break
0130
0131 return status
0132
0133 def _is_valid_ip(self, ip):
0134 """
0135 Verify that the given IP number is valid.
0136
0137 :param ip: IP number (string).
0138 :return: Boolean.
0139 """
0140
0141 regex = r"^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$"
0142 return True if re.search(regex, ip) else False
0143
0144 def _get_dictionary(self, cmd=None):
0145 """
0146
0147 """
0148
0149 dictionary = {}
0150 if not cmd:
0151 return dictionary
0152
0153 exit_code, stdout, stderr = execute(cmd, mute=True)
0154 if exit_code:
0155 logger.warning('failed to execute \'%s\': %s', cmd, stdout)
0156 self.status = 'failed'
0157 else:
0158
0159 dictionary = self._convert_to_dict(stdout)
0160
0161 return dictionary
0162
0163 def _validate(self):
0164 """
0165 Make sure that pre-conditions are met before any installation can be attempted.
0166
0167 Pre-conditions: required libraries and commands
0168 1. library: dask
0169 2. library: dask_kubernetes
0170 3. command: helm
0171 4. command: kubectl
0172 5. copy relevant yaml file(s)
0173 """
0174
0175 establish_logging(debug=True)
0176
0177
0178
0179
0180
0181
0182 commands = ['helm', 'kubectl']
0183 found = False
0184 for cmd in commands:
0185 exit_code, stdout, stderr = execute('which %s' % cmd, mute=True)
0186 found = True if 'not found' not in stdout else False
0187 if not found:
0188 logger.warning(stdout)
0189 break
0190 else:
0191 logger.debug('%s verified', cmd)
0192 if not found:
0193 return False
0194
0195
0196 self._generate_override_script()
0197
0198 return True
0199
0200 def _generate_override_script(self, jupyter=False, servicetype='LoadBalancer'):
0201 """
0202 Generate a values yaml script, unless it already exists.
0203
0204 :param jupyter: False if jupyter notebook server should be disabled (Boolean).
0205 :param servicetype: name of service type (string).
0206 :return:
0207 """
0208
0209 filename = os.path.join(self._workdir, self.overrides)
0210 if os.path.exists(filename):
0211 logger.info('file \'%s\' already exists - will not override', filename)
0212 return
0213
0214 script = ""
0215 if not jupyter:
0216 script += 'jupyter:\n enabled: false\n\n'
0217 if servicetype:
0218 script += 'scheduler:\n serviceType: \"%s\"\n' % servicetype
0219
0220 if script:
0221 status = write_file(filename, script)
0222 if status:
0223 logger.debug('generated script: %s', filename)
0224 else:
0225 self.overrides = None
0226
0227 def _convert_to_dict(self, output):
0228 """
0229
0230 """
0231
0232 dictionary = {}
0233 first_line = []
0234 for line in output.split('\n'):
0235 try:
0236
0237 _l = re.sub(' +', ' ', line)
0238 _l = [_f for _f in _l.split(' ') if _f]
0239 if first_line == []:
0240 first_line = _l[1:]
0241 else:
0242 dictionary[_l[0]] = {}
0243 for i in range(len(_l[1:])):
0244 dictionary[_l[0]][first_line[i]] = _l[1:][i]
0245
0246 except Exception:
0247 logger.warning("unexpected format of utility output: %s", line)
0248
0249 return dictionary
0250
0251 def connect_cluster(self, release_name=None, manager=dask_kubernetes.HelmCluster):
0252 """
0253
0254 """
0255
0256 if not release_name:
0257 release_name = self.servicename
0258 self.cluster = manager(release_name=release_name)
0259 logger.info('connected to %s', manager.__name__)
0260
0261 def scale(self, number):
0262 """
0263
0264 """
0265
0266 if number > 2:
0267 logger.warning('too large scale: %d (please use <= 2 for now)', number)
0268 return
0269 if not self.cluster:
0270 self.connect_cluster()
0271 if not self.cluster:
0272 logger.warning('cluster not connected - cannot proceed')
0273 self.status = 'failed'
0274 return
0275
0276 logger.info('setting scale to: %d', number)
0277 self.cluster.scale(number)
0278
0279 def shutdown(self):
0280 """
0281 Shutdown logging.
0282
0283 """
0284
0285 logging.handlers = []
0286 logging.shutdown()