Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 from pandaharvester.harvestercore import core_utils
0002 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0003 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0004 from pandaharvester.harvestercore.work_spec import WorkSpec
0005 
0006 from .base_worker_maker import BaseWorkerMaker
0007 
0008 # multinode worker maker. one job per node. aprun as executor (initially)
0009 # static parameters collected from queue config file
0010 # dynamic parametrs from infrastructure through plugins
0011 
0012 
0013 baseLogger = core_utils.setup_logger("multinode_workermaker")
0014 
0015 
0016 class MultiNodeWorkerMaker(BaseWorkerMaker):
0017     # constructor
0018     def __init__(self, **kwarg):
0019         BaseWorkerMaker.__init__(self, **kwarg)
0020         self.pluginFactory = PluginFactory()
0021         self.queue_config_mapper = QueueConfigMapper()
0022         tmpLog = self.make_logger(baseLogger, method_name="__init__")
0023         tmpLog.info("Multinode workermaker: created.")
0024         tmpLog.debug(f"Queue name: {self.queueName}")
0025         if self.mode == "static":
0026             tmpLog.info("Static configuration")
0027         elif self.mode == "dynamic":
0028             tmpLog.info("Dynamic configuration")
0029             self.nNodes, self.walltimelimit = self.get_resources()
0030         self.nJobsPerWorker = self.nNodes * self.nJobsPerNode
0031 
0032     def _get_executable(self):
0033         # return string which contain body of script for scheduler: specific enviroment setup, executor with parameters
0034         exe_str = ""
0035 
0036         tmpLog = self.make_logger(baseLogger, method_name="_get_executable")
0037 
0038         # prepare static enviroment
0039         env_str = ""
0040         if self.env not in (None, "NULL"):
0041             env_str = "\n".join(map(lambda s: s.strip(), self.env.split(", ")))
0042 
0043         # prepare executor
0044         try:
0045             if self.executor == "aprun":  # "aprun -n [number of required nodes/jobs] -d [number of cpu per node/job]" - for one multicore job per node
0046                 exe_str = self.executor + f" -n {self.nJobsPerWorker} -d {self.nCorePerJob} "
0047                 exe_str += self.pilot
0048             else:
0049                 exe_str = self.executor + " " + self.pilot
0050             if self.pilot_params:
0051                 exe_str = " ".join([exe_str, self.pilot_params])
0052         except Exception:
0053             tmpLog.error("Unable to build executor command check configuration")
0054             exe_str = ""
0055 
0056         exe_str = "\n".join([env_str, exe_str])
0057         tmpLog.debug(f"Shell script body: \n{exe_str}")
0058 
0059         return exe_str
0060 
0061     # make a worker from jobs
0062     def make_worker(self, jobspec_list, queue_config, job_type, resource_type, **kwargs):
0063         tmpLog = core_utils.make_logger(baseLogger, f"queue={queue_config.queueName}", method_name="make_worker")
0064 
0065         tmpLog.info("Multi node worker preparation started.")
0066         tmpLog.info(f"Worker size: {self.nJobsPerWorker} jobs on {self.nNodes} nodes for {self.walltimelimit} sec.")
0067 
0068         workSpec = WorkSpec()
0069         workSpec.nCore = self.nNodes * queue_config.submitter["nCorePerNode"]
0070         workSpec.minRamCount = 0
0071         workSpec.maxDiskCount = 0
0072         workSpec.maxWalltime = self.walltimelimit
0073         workSpec.workParams = self._get_executable()
0074 
0075         if len(jobspec_list) > 0:
0076             # push case: we know the job and set the parameters of the job
0077             for jobSpec in jobspec_list:
0078                 try:
0079                     workSpec.minRamCount += jobSpec.jobParams["minRamCount"]
0080                 except Exception:
0081                     pass
0082                 try:
0083                     workSpec.maxDiskCount += jobSpec.jobParams["maxDiskCount"]
0084                 except Exception:
0085                     pass
0086                 # try:
0087                 #    if jobSpec.jobParams['maxWalltime'] not in (None, "NULL"):
0088                 #        workSpec.maxWalltime = max(int(queue_config.walltimeLimit), jobSpec.jobParams['maxWalltime'])
0089                 #    else:
0090                 #        workSpec.maxWalltime = queue_config.walltimeLimit
0091                 # except Exception:
0092                 #    pass
0093         tmpLog.info(f"Worker for {self.nNodes} nodes with {self.nJobsPerWorker} jobs with walltime {workSpec.maxWalltime} sec. defined")
0094 
0095         return workSpec
0096 
0097     # def get_num_jobs_per_worker(self, n_workers):
0098     #     """
0099     #     Function to set 'size' of worker. Define number of jobs per worker
0100     #     """
0101     #     tmpLog = core_utils.make_logger(baseLogger, 'queue={0}'.format(self.queueName),
0102     #                                     method_name='get_num_jobs_per_worker')
0103     #     tmpLog.info("Get number of jobs per worker")
0104     #     self.nJobsPerWorker = 1
0105     #     if self.mode == "static":
0106     #         tmpLog.info("Static configuration")
0107     #         self.nJobsPerWorker = self.nNodes * self.nJobsPerNode
0108     #     elif self.mode == "dynamic":
0109     #         tmpLog.info("Dynamic configuration")
0110     #         self.nNodes, self.walltimelimit = self.get_resources()
0111     #         self.nJobsPerWorker = self.nNodes * self.nJobsPerNode
0112     #
0113     #     tmpLog.info("Get: {0} jobs to run for {1} sec.".format(self.nJobsPerWorker, self.walltimelimit))
0114     #     return self.nJobsPerWorker
0115 
0116     def get_resources(self):
0117         """
0118         Function to get resourcese and map them to number of jobs
0119         """
0120         tmpLog = core_utils.make_logger(baseLogger, f"queue={self.queueName}", method_name="get_resources")
0121         njobs = 0
0122         walltime = self.walltimelimit
0123         queue_config = self.queue_config_mapper.get_queue(self.queueName)
0124         resource_utils = self.pluginFactory.get_plugin(queue_config.resource)
0125         if resource_utils:
0126             nodes, walltime = resource_utils.get_resources()
0127         else:
0128             tmpLog.info("Resource plugin is not defined")
0129             nodes = self.nNodes
0130 
0131         return nodes, walltime