File indexing completed on 2026-04-20 07:58:57
0001 from pandaharvester.harvestercore import core_utils
0002 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0003 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0004
0005
0006 _logger = core_utils.setup_logger("worker_maker")
0007
0008
0009
0010 class WorkerMaker(object):
0011
0012 def __init__(self):
0013 self.pluginFactory = PluginFactory()
0014 self.dbProxy = DBProxy()
0015
0016
0017 def get_plugin(self, queue_config):
0018 return self.pluginFactory.get_plugin(queue_config.workerMaker)
0019
0020
0021 def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_type, prod_source_label="ANY", maker=None):
0022 tmpLog = core_utils.make_logger(
0023 _logger, f"queue={queue_config.queueName} jtype={job_type} rtype={resource_type} pslabel={prod_source_label}", method_name="make_workers"
0024 )
0025 tmpLog.debug("start")
0026 try:
0027
0028 if maker is None:
0029 maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0030 if maker is None:
0031
0032 tmpLog.error(f"plugin for {queue_config.queueName} not found")
0033 return [], jobchunk_list
0034
0035 readyWorkers = self.dbProxy.get_ready_workers(queue_config.queueName, n_ready)
0036
0037 okChunks = []
0038 ngChunks = []
0039 for iChunk, jobChunk in enumerate(jobchunk_list):
0040
0041 if iChunk >= n_ready:
0042 workSpec = maker.make_worker(jobChunk, queue_config, job_type, resource_type, prod_source_label=prod_source_label)
0043 else:
0044
0045 if iChunk < len(readyWorkers):
0046 workSpec = readyWorkers[iChunk]
0047 else:
0048 workSpec = None
0049
0050 if workSpec is None:
0051 ngChunks.append(jobChunk)
0052 continue
0053
0054 if workSpec.workerID is None:
0055 workSpec.workerID = self.dbProxy.get_next_seq_number("SEQ_workerID")
0056 workSpec.configID = queue_config.configID
0057 workSpec.isNew = True
0058 okChunks.append((workSpec, jobChunk))
0059
0060 tmpLog.debug(f"made {len(okChunks)} workers while {len(ngChunks)} chunks failed")
0061 return okChunks, ngChunks
0062 except Exception:
0063
0064 core_utils.dump_error_message(tmpLog)
0065 return [], jobchunk_list
0066
0067
0068 def get_num_jobs_per_worker(self, queue_config, n_workers, job_type, resource_type, prod_source_label="ANY", maker=None):
0069
0070 if maker is None:
0071 maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0072 return maker.get_num_jobs_per_worker(n_workers)
0073
0074
0075 def get_num_workers_per_job(self, queue_config, n_workers, job_type, resource_type, prod_source_label="ANY", maker=None):
0076
0077 if maker is None:
0078 maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0079 return maker.get_num_workers_per_job(n_workers)
0080
0081
0082 def num_ready_resources(self, queue_config, job_type, resource_type, prod_source_label="ANY", maker=None):
0083
0084 if maker is None:
0085 maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0086 return maker.num_ready_resources()
0087
0088
0089 def get_max_workers_per_job_in_total(self, queue_config, job_type, resource_type, prod_source_label="ANY", maker=None):
0090
0091 if maker is None:
0092 maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0093 return maker.get_max_workers_per_job_in_total()
0094
0095
0096 def get_max_workers_per_job_per_cycle(self, queue_config, job_type, resource_type, prod_source_label="ANY", maker=None):
0097
0098 if maker is None:
0099 maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
0100 return maker.get_max_workers_per_job_per_cycle()