Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 from pandaharvester.harvestercore import core_utils
0002 from pandaharvester.harvestercore.work_spec import WorkSpec
0003 
0004 from .base_worker_maker import BaseWorkerMaker
0005 
0006 # multijob worker maker. one job per node. aprun as executor (initially)
0007 # static parameters collected from queue config file
0008 # pilot
0009 baseLogger = core_utils.setup_logger("multijob_workermaker")
0010 
0011 
0012 class MultiJobWorkerMaker(BaseWorkerMaker):
0013     # constructor
0014     def __init__(self, **kwarg):
0015         BaseWorkerMaker.__init__(self, **kwarg)
0016         tmpLog = self.make_logger(baseLogger, method_name="__init__")
0017         tmpLog.info("Multijob workermaker")
0018 
0019     def _get_executable(self, queue_config):
0020         # return string which contain body of script for scheduler: specific enviroment setup, executor with parameters
0021         exe_str = ""
0022 
0023         tmpLog = self.make_logger(baseLogger, method_name="_get_executable")
0024 
0025         # prepare static enviroment
0026         env_str = ""
0027         if self.env not in (None, "NULL"):
0028             env_str = "\n".join(map(lambda s: s.strip(), self.env.split(", ")))
0029 
0030         # prepare executor
0031         try:
0032             if self.executor == "aprun":  # "aprun -n [number of required nodes/jobs] -d [number of cpu per node/job]" - for one multicore job per node
0033                 exe_str = self.executor + f" -n {self.nJobsPerWorker} -d {queue_config.submitter['nCorePerNode']} "
0034                 exe_str += self.pilot
0035             else:
0036                 exe_str = self.executor + " " + self.pilot
0037             if self.pilot_params:
0038                 exe_str = " ".join([exe_str, self.pilot_params])
0039         except Exception:
0040             tmpLog.error("Unable to build executor command check configuration")
0041             exe_str = ""
0042 
0043         exe_str = "\n".join([env_str, exe_str])
0044         tmpLog.debug(f"Shell script body: \n{exe_str}")
0045 
0046         return exe_str
0047 
0048     # make a worker from a job with a disk access point
0049     def make_worker(self, jobspec_list, queue_config, job_type, resource_type, **kwargs):
0050         tmpLog = self.make_logger(baseLogger, method_name="make_worker")
0051         workSpec = WorkSpec()
0052         self.nJobsPerWorker = len(jobspec_list)
0053         tmpLog.info(f"Worker for {self.nJobsPerWorker} jobs will be prepared")
0054         if self.nJobsPerWorker > 0:
0055             workSpec.nCore = int(queue_config.submitter["nCorePerNode"]) * self.nJobsPerWorker
0056             workSpec.minRamCount = 0
0057             workSpec.maxDiskCount = 0
0058             workSpec.maxWalltime = 0
0059             if queue_config.walltimeLimit:
0060                 workSpec.maxWalltime = queue_config.walltimeLimit
0061                 tmpLog.debug(f"Wall time limit for worker: {workSpec.maxWalltime}")
0062             for jobSpec in jobspec_list:
0063                 try:
0064                     workSpec.minRamCount = max(workSpec.minRamCount, jobSpec.jobParams["minRamCount"])
0065                 except Exception:
0066                     pass
0067                 try:
0068                     workSpec.maxDiskCount += jobSpec.jobParams["maxDiskCount"]
0069                 except Exception:
0070                     pass
0071                 # try:  we should not relay on job parameters yet (not relaible)
0072                 #     if jobSpec.jobParams['maxWalltime'] not in (None, "NULL"):
0073                 #         workSpec.maxWalltime = max(workSpec.maxWalltime, jobSpec.jobParams['maxWalltime'])
0074                 # except Exception:
0075                 #     pass
0076 
0077             workSpec.workParams = self._get_executable(queue_config)
0078 
0079         return workSpec