Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:14

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2021
0009 
0010 #try:
0011 #    # import dask
0012 #    import dask_kubernetes
0013 ##except ModuleNotFoundError:  # Python 3
0014 #except Exception:
0015 #    pass
0016 
0017 #from pilot.common.exception import NotDefined, NotSameLength, UnknownException
0018 #from pilot.util.container import execute
0019 #from pilot.util.filehandling import establish_logging, write_file
0020 
0021 #import os
0022 #import re
0023 #from time import sleep
0024 
0025 import logging
0026 logger = logging.getLogger(__name__)
0027 
0028 
0029 class DaskSubmitter(object):
0030     """
0031     Dask submitter interface class.
0032     """
0033 
0034     def __init__(self, **kwargs):
0035         """
0036         Init function.
0037 
0038         :param kwargs:
0039         """
0040 
0041         pass
0042 
0043     def install(self, job_definition):
0044         """
0045         Install the pods for the dask scheduler and workers, and Pilot X
0046 
0047         Note: Pilot X is currently a simplified PanDA Pilot, but is likely to be absorbed into the main
0048         PanDA Pilot code base as a special workflow for Dask on Kubernetes resources.
0049 
0050         The install function will start by installing the Pilot X pod on the dask cluster. When it starts running,
0051         Pilot X will wait for a job definition to appear on the shared file system. It will then proceed staging any
0052         input files.
0053 
0054         In the meantime, this function (who also knows about the job definition) will asynchronously install the dask
0055         scheduler and all required workers
0056 
0057         :param job_definition: job definition dictionary.
0058         :return: True for successfully installed pods (Boolean).
0059         """
0060 
0061         # install Pilot X pod
0062         status = self.install_pilotx_pod()
0063         if not status:
0064             return status
0065 
0066         # copy bundle (job definition etc)
0067 
0068         # copy job definition to shared directory
0069         # (copy to Pilot X pod which has the shared directory mounted)
0070         status = self.copy_job_definition(job_definition)
0071         if not status:
0072             return status
0073 
0074         # install dask scheduler
0075         status = self.install_dask_scheduler()
0076         if not status:
0077             return status
0078 
0079         # install dask worker(s)
0080         status = self.install_dask_workers(job_definition)
0081         if not status:
0082             return status
0083 
0084         return status
0085 
0086     def uninstall(self):
0087         """
0088         Uninstall all pods.
0089         """
0090 
0091         # uninstall all pods
0092         # ..
0093 
0094         pass
0095 
0096     def install_pilotx_pod(self):
0097         """
0098 
0099         """
0100 
0101         status = True
0102 
0103         return status
0104 
0105     def copy_job_definition(self, job_definition):
0106         """
0107 
0108         """
0109 
0110         status = True
0111 
0112         return status
0113 
0114     def install_dask_scheduler(self):
0115         """
0116 
0117         """
0118 
0119         status = True
0120 
0121         return status
0122 
0123     def install_dask_workers(self, job_definition):
0124         """
0125 
0126         """
0127 
0128         status = True
0129 
0130         # get number of workers from job definition
0131         # issue all install commands at once, then wait for pod status to be 'running'
0132 
0133         return status