File indexing completed on 2026-04-10 08:39:14
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025 import logging
0026 logger = logging.getLogger(__name__)
0027
0028
0029 class DaskSubmitter(object):
0030 """
0031 Dask submitter interface class.
0032 """
0033
0034 def __init__(self, **kwargs):
0035 """
0036 Init function.
0037
0038 :param kwargs:
0039 """
0040
0041 pass
0042
0043 def install(self, job_definition):
0044 """
0045 Install the pods for the dask scheduler and workers, and Pilot X
0046
0047 Note: Pilot X is currently a simplified PanDA Pilot, but is likely to be absorbed into the main
0048 PanDA Pilot code base as a special workflow for Dask on Kubernetes resources.
0049
0050 The install function will start by installing the Pilot X pod on the dask cluster. When it starts running,
0051 Pilot X will wait for a job definition to appear on the shared file system. It will then proceed staging any
0052 input files.
0053
0054 In the meantime, this function (who also knows about the job definition) will asynchronously install the dask
0055 scheduler and all required workers
0056
0057 :param job_definition: job definition dictionary.
0058 :return: True for successfully installed pods (Boolean).
0059 """
0060
0061
0062 status = self.install_pilotx_pod()
0063 if not status:
0064 return status
0065
0066
0067
0068
0069
0070 status = self.copy_job_definition(job_definition)
0071 if not status:
0072 return status
0073
0074
0075 status = self.install_dask_scheduler()
0076 if not status:
0077 return status
0078
0079
0080 status = self.install_dask_workers(job_definition)
0081 if not status:
0082 return status
0083
0084 return status
0085
0086 def uninstall(self):
0087 """
0088 Uninstall all pods.
0089 """
0090
0091
0092
0093
0094 pass
0095
0096 def install_pilotx_pod(self):
0097 """
0098
0099 """
0100
0101 status = True
0102
0103 return status
0104
0105 def copy_job_definition(self, job_definition):
0106 """
0107
0108 """
0109
0110 status = True
0111
0112 return status
0113
0114 def install_dask_scheduler(self):
0115 """
0116
0117 """
0118
0119 status = True
0120
0121 return status
0122
0123 def install_dask_workers(self, job_definition):
0124 """
0125
0126 """
0127
0128 status = True
0129
0130
0131
0132
0133 return status