Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 from pandaharvester.harvestercore import core_utils
0002 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0003 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0004 
0005 # logger
0006 _logger = core_utils.setup_logger("worker_maker")
0007 
0008 
0009 # class to make worker
0010 class WorkerMaker(object):
0011     # constructor
0012     def __init__(self):
0013         self.pluginFactory = PluginFactory()
0014         self.dbProxy = DBProxy()
0015 
0016     # get plugin
0017     def get_plugin(self, queue_config):
0018         return self.pluginFactory.get_plugin(queue_config.workerMaker)
0019 
0020     # make workers
0021     def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_type, prod_source_label="ANY", maker=None):
0022         tmpLog = core_utils.make_logger(
0023             _logger, f"queue={queue_config.queueName} jtype={job_type} rtype={resource_type} pslabel={prod_source_label}", method_name="make_workers"
0024         )
0025         tmpLog.debug("start")
0026         try:
0027             # get plugin
0028             if maker is None:
0029                 maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0030             if maker is None:
0031                 # not found
0032                 tmpLog.error(f"plugin for {queue_config.queueName} not found")
0033                 return [], jobchunk_list
0034             # get ready workers
0035             readyWorkers = self.dbProxy.get_ready_workers(queue_config.queueName, n_ready)
0036             # loop over all chunks
0037             okChunks = []
0038             ngChunks = []
0039             for iChunk, jobChunk in enumerate(jobchunk_list):
0040                 # make a worker
0041                 if iChunk >= n_ready:
0042                     workSpec = maker.make_worker(jobChunk, queue_config, job_type, resource_type, prod_source_label=prod_source_label)
0043                 else:
0044                     # use ready worker
0045                     if iChunk < len(readyWorkers):
0046                         workSpec = readyWorkers[iChunk]
0047                     else:
0048                         workSpec = None
0049                 # failed
0050                 if workSpec is None:
0051                     ngChunks.append(jobChunk)
0052                     continue
0053                 # set workerID
0054                 if workSpec.workerID is None:
0055                     workSpec.workerID = self.dbProxy.get_next_seq_number("SEQ_workerID")
0056                     workSpec.configID = queue_config.configID
0057                     workSpec.isNew = True
0058                 okChunks.append((workSpec, jobChunk))
0059             # dump
0060             tmpLog.debug(f"made {len(okChunks)} workers while {len(ngChunks)} chunks failed")
0061             return okChunks, ngChunks
0062         except Exception:
0063             # dump error
0064             core_utils.dump_error_message(tmpLog)
0065             return [], jobchunk_list
0066 
0067     # get number of jobs per worker
0068     def get_num_jobs_per_worker(self, queue_config, n_workers, job_type, resource_type, prod_source_label="ANY", maker=None):
0069         # get plugin
0070         if maker is None:
0071             maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0072         return maker.get_num_jobs_per_worker(n_workers)
0073 
0074     # get number of workers per job
0075     def get_num_workers_per_job(self, queue_config, n_workers, job_type, resource_type, prod_source_label="ANY", maker=None):
0076         # get plugin
0077         if maker is None:
0078             maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0079         return maker.get_num_workers_per_job(n_workers)
0080 
0081     # check number of ready resources
0082     def num_ready_resources(self, queue_config, job_type, resource_type, prod_source_label="ANY", maker=None):
0083         # get plugin
0084         if maker is None:
0085             maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0086         return maker.num_ready_resources()
0087 
0088     # get upper limit on the cumulative total of workers per job
0089     def get_max_workers_per_job_in_total(self, queue_config, job_type, resource_type, prod_source_label="ANY", maker=None):
0090         # get plugin
0091         if maker is None:
0092             maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0093         return maker.get_max_workers_per_job_in_total()
0094 
0095     # get upper limit on the number of new workers per job in a cycle
0096     def get_max_workers_per_job_per_cycle(self, queue_config, job_type, resource_type, prod_source_label="ANY", maker=None):
0097         # get plugin
0098         if maker is None:
0099             maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0100         return maker.get_max_workers_per_job_per_cycle()