File indexing completed on 2026-04-20 07:59:01
0001 from pandaharvester.harvestercore import core_utils
0002 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0003 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0004 from pandaharvester.harvestercore.work_spec import WorkSpec
0005
0006 from .base_worker_maker import BaseWorkerMaker
0007
0008
0009
0010
0011
0012
0013 baseLogger = core_utils.setup_logger("multinode_workermaker")
0014
0015
0016 class MultiNodeWorkerMaker(BaseWorkerMaker):
0017
0018 def __init__(self, **kwarg):
0019 BaseWorkerMaker.__init__(self, **kwarg)
0020 self.pluginFactory = PluginFactory()
0021 self.queue_config_mapper = QueueConfigMapper()
0022 tmpLog = self.make_logger(baseLogger, method_name="__init__")
0023 tmpLog.info("Multinode workermaker: created.")
0024 tmpLog.debug(f"Queue name: {self.queueName}")
0025 if self.mode == "static":
0026 tmpLog.info("Static configuration")
0027 elif self.mode == "dynamic":
0028 tmpLog.info("Dynamic configuration")
0029 self.nNodes, self.walltimelimit = self.get_resources()
0030 self.nJobsPerWorker = self.nNodes * self.nJobsPerNode
0031
0032 def _get_executable(self):
0033
0034 exe_str = ""
0035
0036 tmpLog = self.make_logger(baseLogger, method_name="_get_executable")
0037
0038
0039 env_str = ""
0040 if self.env not in (None, "NULL"):
0041 env_str = "\n".join(map(lambda s: s.strip(), self.env.split(", ")))
0042
0043
0044 try:
0045 if self.executor == "aprun":
0046 exe_str = self.executor + f" -n {self.nJobsPerWorker} -d {self.nCorePerJob} "
0047 exe_str += self.pilot
0048 else:
0049 exe_str = self.executor + " " + self.pilot
0050 if self.pilot_params:
0051 exe_str = " ".join([exe_str, self.pilot_params])
0052 except Exception:
0053 tmpLog.error("Unable to build executor command check configuration")
0054 exe_str = ""
0055
0056 exe_str = "\n".join([env_str, exe_str])
0057 tmpLog.debug(f"Shell script body: \n{exe_str}")
0058
0059 return exe_str
0060
0061
0062 def make_worker(self, jobspec_list, queue_config, job_type, resource_type, **kwargs):
0063 tmpLog = core_utils.make_logger(baseLogger, f"queue={queue_config.queueName}", method_name="make_worker")
0064
0065 tmpLog.info("Multi node worker preparation started.")
0066 tmpLog.info(f"Worker size: {self.nJobsPerWorker} jobs on {self.nNodes} nodes for {self.walltimelimit} sec.")
0067
0068 workSpec = WorkSpec()
0069 workSpec.nCore = self.nNodes * queue_config.submitter["nCorePerNode"]
0070 workSpec.minRamCount = 0
0071 workSpec.maxDiskCount = 0
0072 workSpec.maxWalltime = self.walltimelimit
0073 workSpec.workParams = self._get_executable()
0074
0075 if len(jobspec_list) > 0:
0076
0077 for jobSpec in jobspec_list:
0078 try:
0079 workSpec.minRamCount += jobSpec.jobParams["minRamCount"]
0080 except Exception:
0081 pass
0082 try:
0083 workSpec.maxDiskCount += jobSpec.jobParams["maxDiskCount"]
0084 except Exception:
0085 pass
0086
0087
0088
0089
0090
0091
0092
0093 tmpLog.info(f"Worker for {self.nNodes} nodes with {self.nJobsPerWorker} jobs with walltime {workSpec.maxWalltime} sec. defined")
0094
0095 return workSpec
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116 def get_resources(self):
0117 """
0118 Function to get resourcese and map them to number of jobs
0119 """
0120 tmpLog = core_utils.make_logger(baseLogger, f"queue={self.queueName}", method_name="get_resources")
0121 njobs = 0
0122 walltime = self.walltimelimit
0123 queue_config = self.queue_config_mapper.get_queue(self.queueName)
0124 resource_utils = self.pluginFactory.get_plugin(queue_config.resource)
0125 if resource_utils:
0126 nodes, walltime = resource_utils.get_resources()
0127 else:
0128 tmpLog.info("Resource plugin is not defined")
0129 nodes = self.nNodes
0130
0131 return nodes, walltime