Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:14

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2021
0009 
0010 try:
0011     # import dask
0012     import dask_kubernetes
0013 #except ModuleNotFoundError:  # Python 3
0014 except Exception:
0015     pass
0016 
0017 #from pilot.common.exception import NotDefined, NotSameLength, UnknownException
0018 from pilot.util.container import execute
0019 from pilot.util.filehandling import establish_logging, write_file
0020 
0021 import os
0022 import re
0023 from time import sleep
0024 
0025 import logging
0026 logger = logging.getLogger(__name__)
0027 
0028 
0029 class Dask(object):
0030     """
0031     Dask interface class.
0032     """
0033 
0034     servicename = 'single-dask'
0035     status = None
0036     loadbalancerip = None
0037     servicetype = "LoadBalancer"
0038     jupyter = False
0039     overrides = "override_values.yaml"
0040     _workdir = os.getcwd()
0041     cluster = None
0042 
0043     def __init__(self, **kwargs):
0044         """
0045         Init function.
0046 
0047         :param kwargs:
0048         """
0049 
0050         _servicename = kwargs.get('servicename', None)
0051         if _servicename:
0052             self.servicename = _servicename
0053         _servicetype = kwargs.get('servicetype', None)
0054         if _servicetype:
0055             self.servicetype = _servicetype
0056         _jupyter = kwargs.get('jupyter', None)
0057         if _jupyter:
0058             self.jupyter = _jupyter
0059         _overrides = kwargs.get('overrides', None)
0060         if _overrides:
0061             self.overrides = _overrides
0062 
0063     def uninstall(self, block=True):
0064         """
0065 
0066         """
0067 
0068         logger.info('uninstalling service %s', self.servicename)
0069         if block:
0070             logger.warning('blocking mode not yet implemented')
0071 
0072         cmd = 'helm uninstall %s' % self.servicename
0073         exit_code, stdout, stderr = execute(cmd, mute=True)
0074         if not exit_code:
0075             self.status = 'uninstalled'
0076             logger.info('uninstall of service %s has been requested', self.servicename)
0077 
0078     def install(self, block=True):
0079         """
0080 
0081         """
0082 
0083         # can dask be installed?
0084         if not self._validate():
0085             logger.warning('validation failed')
0086             self.status = 'failed'
0087         else:
0088             logger.debug('dask has been validated')
0089             self.status = 'validated'
0090 
0091             # is the single-dask cluster already running?
0092             name = '%s-scheduler' % self.servicename
0093             if self.is_running(name=name):
0094                 logger.info('service %s is already running - nothing to install', name)
0095             else:
0096                 logger.info('service %s is not yet running - proceed with installation', name)
0097 
0098                 # perform helm updates before actual instqllation
0099                 cmd = ''
0100                 #
0101                 override_option = "-f %s" % self.overrides if self.overrides else ""
0102                 cmd = 'helm install %s %s dask/dask' % (override_option, self.servicename)
0103                 exit_code, stdout, stderr = execute(cmd, mute=True)
0104                 if not exit_code:
0105                     logger.info('installation of service %s is in progress', self.servicename)
0106 
0107                     if block:
0108                         while True:
0109                             name = '%s-scheduler' % self.servicename
0110                             if self.is_running(name=name):
0111                                 logger.info('service %s is running', name)
0112                                 self.status = 'running'
0113                                 break
0114                             else:
0115                                 self.status = 'pending'
0116                                 sleep(2)
0117                     # note: in non-blocking mode, status is not getting updated
0118 
0119     def is_running(self, name='single-dask-scheduler'):
0120         """
0121 
0122         """
0123 
0124         status = False
0125         dictionary = self._get_dictionary(cmd='kubectl get services')
0126         for key in dictionary:
0127             if key == name:
0128                 status = True if self._is_valid_ip(dictionary[key]['EXTERNAL-IP']) else False
0129                 break
0130 
0131         return status
0132 
0133     def _is_valid_ip(self, ip):
0134         """
0135         Verify that the given IP number is valid.
0136 
0137         :param ip: IP number (string).
0138         :return: Boolean.
0139         """
0140 
0141         regex = r"^((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])$"
0142         return True if re.search(regex, ip) else False
0143 
0144     def _get_dictionary(self, cmd=None):
0145         """
0146 
0147         """
0148 
0149         dictionary = {}
0150         if not cmd:
0151             return dictionary
0152 
0153         exit_code, stdout, stderr = execute(cmd, mute=True)
0154         if exit_code:
0155             logger.warning('failed to execute \'%s\': %s', cmd, stdout)
0156             self.status = 'failed'
0157         else:
0158             # parse output
0159             dictionary = self._convert_to_dict(stdout)
0160 
0161         return dictionary
0162 
0163     def _validate(self):
0164         """
0165         Make sure that pre-conditions are met before any installation can be attempted.
0166 
0167         Pre-conditions: required libraries and commands
0168         1. library: dask
0169         2. library: dask_kubernetes
0170         3. command: helm
0171         4. command: kubectl
0172         5. copy relevant yaml file(s)
0173         """
0174 
0175         establish_logging(debug=True)
0176 
0177         # check imported modules
0178         # dask
0179         # dask_kubernetes
0180 
0181         # verify relevant commands
0182         commands = ['helm', 'kubectl']
0183         found = False
0184         for cmd in commands:
0185             exit_code, stdout, stderr = execute('which %s' % cmd, mute=True)
0186             found = True if 'not found' not in stdout else False
0187             if not found:
0188                 logger.warning(stdout)
0189                 break
0190             else:
0191                 logger.debug('%s verified', cmd)
0192         if not found:
0193             return False
0194 
0195         # create yaml file(s)
0196         self._generate_override_script()
0197 
0198         return True
0199 
0200     def _generate_override_script(self, jupyter=False, servicetype='LoadBalancer'):
0201         """
0202         Generate a values yaml script, unless it already exists.
0203 
0204         :param jupyter: False if jupyter notebook server should be disabled (Boolean).
0205         :param servicetype: name of service type (string).
0206         :return:
0207         """
0208 
0209         filename = os.path.join(self._workdir, self.overrides)
0210         if os.path.exists(filename):
0211             logger.info('file \'%s\' already exists - will not override', filename)
0212             return
0213 
0214         script = ""
0215         if not jupyter:
0216             script += 'jupyter:\n    enabled: false\n\n'
0217         if servicetype:
0218             script += 'scheduler:\n    serviceType: \"%s\"\n' % servicetype
0219 
0220         if script:
0221             status = write_file(filename, script)
0222             if status:
0223                 logger.debug('generated script: %s', filename)
0224         else:
0225             self.overrides = None
0226 
0227     def _convert_to_dict(self, output):
0228         """
0229 
0230         """
0231 
0232         dictionary = {}
0233         first_line = []
0234         for line in output.split('\n'):
0235             try:
0236                 # Remove empty entries from list (caused by multiple \t)
0237                 _l = re.sub(' +', ' ', line)
0238                 _l = [_f for _f in _l.split(' ') if _f]
0239                 if first_line == []:  # "NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
0240                     first_line = _l[1:]
0241                 else:
0242                     dictionary[_l[0]] = {}
0243                     for i in range(len(_l[1:])):
0244                         dictionary[_l[0]][first_line[i]] = _l[1:][i]
0245 
0246             except Exception:
0247                 logger.warning("unexpected format of utility output: %s", line)
0248 
0249         return dictionary
0250 
0251     def connect_cluster(self, release_name=None, manager=dask_kubernetes.HelmCluster):
0252         """
0253 
0254         """
0255 
0256         if not release_name:
0257             release_name = self.servicename
0258         self.cluster = manager(release_name=release_name)
0259         logger.info('connected to %s', manager.__name__)
0260 
0261     def scale(self, number):
0262         """
0263 
0264         """
0265 
0266         if number > 2:
0267             logger.warning('too large scale: %d (please use <= 2 for now)', number)
0268             return
0269         if not self.cluster:
0270             self.connect_cluster()
0271         if not self.cluster:
0272             logger.warning('cluster not connected - cannot proceed')
0273             self.status = 'failed'
0274             return
0275 
0276         logger.info('setting scale to: %d', number)
0277         self.cluster.scale(number)
0278 
0279     def shutdown(self):
0280         """
0281         Shutdown logging.
0282 
0283         """
0284 
0285         logging.handlers = []
0286         logging.shutdown()