File indexing completed on 2026-04-20 07:59:01
0001 from pandaharvester.harvestercore import core_utils
0002 from pandaharvester.harvestercore.work_spec import WorkSpec
0003
0004 from .base_worker_maker import BaseWorkerMaker
0005
0006
0007
0008
0009 baseLogger = core_utils.setup_logger("multijob_workermaker")
0010
0011
0012 class MultiJobWorkerMaker(BaseWorkerMaker):
0013
0014 def __init__(self, **kwarg):
0015 BaseWorkerMaker.__init__(self, **kwarg)
0016 tmpLog = self.make_logger(baseLogger, method_name="__init__")
0017 tmpLog.info("Multijob workermaker")
0018
0019 def _get_executable(self, queue_config):
0020
0021 exe_str = ""
0022
0023 tmpLog = self.make_logger(baseLogger, method_name="_get_executable")
0024
0025
0026 env_str = ""
0027 if self.env not in (None, "NULL"):
0028 env_str = "\n".join(map(lambda s: s.strip(), self.env.split(", ")))
0029
0030
0031 try:
0032 if self.executor == "aprun":
0033 exe_str = self.executor + f" -n {self.nJobsPerWorker} -d {queue_config.submitter['nCorePerNode']} "
0034 exe_str += self.pilot
0035 else:
0036 exe_str = self.executor + " " + self.pilot
0037 if self.pilot_params:
0038 exe_str = " ".join([exe_str, self.pilot_params])
0039 except Exception:
0040 tmpLog.error("Unable to build executor command check configuration")
0041 exe_str = ""
0042
0043 exe_str = "\n".join([env_str, exe_str])
0044 tmpLog.debug(f"Shell script body: \n{exe_str}")
0045
0046 return exe_str
0047
0048
0049 def make_worker(self, jobspec_list, queue_config, job_type, resource_type, **kwargs):
0050 tmpLog = self.make_logger(baseLogger, method_name="make_worker")
0051 workSpec = WorkSpec()
0052 self.nJobsPerWorker = len(jobspec_list)
0053 tmpLog.info(f"Worker for {self.nJobsPerWorker} jobs will be prepared")
0054 if self.nJobsPerWorker > 0:
0055 workSpec.nCore = int(queue_config.submitter["nCorePerNode"]) * self.nJobsPerWorker
0056 workSpec.minRamCount = 0
0057 workSpec.maxDiskCount = 0
0058 workSpec.maxWalltime = 0
0059 if queue_config.walltimeLimit:
0060 workSpec.maxWalltime = queue_config.walltimeLimit
0061 tmpLog.debug(f"Wall time limit for worker: {workSpec.maxWalltime}")
0062 for jobSpec in jobspec_list:
0063 try:
0064 workSpec.minRamCount = max(workSpec.minRamCount, jobSpec.jobParams["minRamCount"])
0065 except Exception:
0066 pass
0067 try:
0068 workSpec.maxDiskCount += jobSpec.jobParams["maxDiskCount"]
0069 except Exception:
0070 pass
0071
0072
0073
0074
0075
0076
0077 workSpec.workParams = self._get_executable(queue_config)
0078
0079 return workSpec