File indexing completed on 2026-04-20 07:58:56
0001 import datetime
0002 import math
0003 import random
0004 import socket
0005
0006 from pandaharvester.harvesterbody.agent_base import AgentBase
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0010 from pandaharvester.harvestercore.file_spec import FileSpec
0011 from pandaharvester.harvestercore.job_spec import JobSpec
0012 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0013 from pandaharvester.harvestercore.resource_type_constants import (
0014 BASIC_RESOURCE_TYPE_MULTI_CORE,
0015 BASIC_RESOURCE_TYPE_SINGLE_CORE,
0016 )
0017 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0018 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0019
0020
0021 _logger = core_utils.setup_logger("job_fetcher")
0022
0023
0024 rt_mapper = ResourceTypeMapper()
0025 all_resource_types = rt_mapper.get_all_resource_types()
0026
0027
0028
0029 class JobFetcher(AgentBase):
0030
0031 def __init__(self, communicator, queue_config_mapper, single_mode=False):
0032 AgentBase.__init__(self, single_mode)
0033 self.dbProxy = DBProxy()
0034 self.communicator = communicator
0035 self.nodeName = socket.gethostname()
0036 self.queueConfigMapper = queue_config_mapper
0037 self.pluginFactory = PluginFactory()
0038
0039
0040 def run(self):
0041 while True:
0042 mainLog = self.make_logger(_logger, f"id={self.get_pid()}", method_name="run")
0043 mainLog.debug("getting number of jobs to be fetched")
0044
0045 job_limit_to_fetch_dict = self.dbProxy.get_num_jobs_to_fetch(
0046 harvester_config.jobfetcher.nQueues, harvester_config.jobfetcher.lookupTime, self.queueConfigMapper
0047 )
0048 mainLog.debug(f"got {len(job_limit_to_fetch_dict)} queues")
0049
0050 pandaQueueDict = PandaQueuesDict(filter_site_list=job_limit_to_fetch_dict.keys())
0051
0052 for queueName, value_dict in job_limit_to_fetch_dict.items():
0053 n_jobs = value_dict["jobs"]
0054 n_cores = value_dict["cores"]
0055 if n_cores is None:
0056 n_cores = math.inf
0057
0058 if not self.queueConfigMapper.has_queue(queueName):
0059 continue
0060 tmpLog = self.make_logger(_logger, f"queueName={queueName}", method_name="run")
0061
0062 queueConfig = self.queueConfigMapper.get_queue(queueName)
0063 siteName = queueConfig.siteName
0064
0065 if n_jobs > harvester_config.jobfetcher.maxJobs:
0066 n_jobs = harvester_config.jobfetcher.maxJobs
0067 if n_jobs == 0:
0068 tmpLog.debug("no job to fetch; skip")
0069 continue
0070
0071 try:
0072 is_grandly_unified_queue = pandaQueueDict.is_grandly_unified_queue(siteName)
0073 except Exception:
0074 is_grandly_unified_queue = False
0075 default_prodSourceLabel = queueConfig.get_source_label(is_gu=is_grandly_unified_queue)
0076
0077 pdpm = getattr(queueConfig, "prodSourceLabelRandomWeightsPermille", {})
0078 choice_list = core_utils.make_choice_list(pdpm=pdpm, default=default_prodSourceLabel)
0079 prodSourceLabel = random.choice(choice_list)
0080
0081 resource_type_limits_dict = dict()
0082 for key, val in pandaQueueDict.get_harvester_params(siteName).items():
0083 if str(key).startswith("resource_type_limits."):
0084 new_key = str(key).lstrip("resource_type_limits.")
0085 if isinstance(val, int):
0086 resource_type_limits_dict[new_key] = val
0087
0088
0089 def _get_jobs(resource_type=None, n_jobs=0):
0090
0091 additional_criteria = queueConfig.getJobCriteria
0092 if resource_type:
0093
0094 additional_criteria = {"resourceType": resource_type}
0095
0096 tmpLog.debug(f"getting {n_jobs} jobs for prodSourceLabel={prodSourceLabel} rtype={resource_type}")
0097 sw = core_utils.get_stopwatch()
0098 if n_jobs > 0:
0099 jobs, errStr = self.communicator.get_jobs(siteName, self.nodeName, prodSourceLabel, self.nodeName, n_jobs, additional_criteria)
0100 else:
0101 jobs, errStr = [], "no need to get job"
0102 tmpLog.info(f"got {len(jobs)} jobs for prodSourceLabel={prodSourceLabel} rtype={resource_type} with {errStr} {sw.get_elapsed_time()}")
0103
0104 if len(jobs) > 0:
0105
0106 if hasattr(queueConfig, "extractor"):
0107 extractorCore = self.pluginFactory.get_plugin(queueConfig.extractor)
0108 else:
0109 extractorCore = None
0110 jobSpecs = []
0111 fileStatMap = dict()
0112 sw_startconvert = core_utils.get_stopwatch()
0113 for job in jobs:
0114 timeNow = core_utils.naive_utcnow()
0115 jobSpec = JobSpec()
0116 jobSpec.convert_job_json(job)
0117 jobSpec.computingSite = queueName
0118 jobSpec.status = "starting"
0119 jobSpec.subStatus = "fetched"
0120 jobSpec.creationTime = timeNow
0121 jobSpec.stateChangeTime = timeNow
0122 jobSpec.configID = queueConfig.configID
0123 jobSpec.set_one_attribute("schedulerID", f"harvester-{harvester_config.master.harvester_id}")
0124 if queueConfig.zipPerMB is not None and jobSpec.zipPerMB is None:
0125 jobSpec.zipPerMB = queueConfig.zipPerMB
0126 fileGroupDictList = [jobSpec.get_input_file_attributes()]
0127 if extractorCore is not None:
0128 fileGroupDictList.append(extractorCore.get_aux_inputs(jobSpec))
0129 for fileGroupDict in fileGroupDictList:
0130 for tmpLFN, fileAttrs in fileGroupDict.items():
0131
0132 fileSpec = FileSpec()
0133 fileSpec.PandaID = jobSpec.PandaID
0134 fileSpec.taskID = jobSpec.taskID
0135 fileSpec.lfn = tmpLFN
0136 fileSpec.endpoint = queueConfig.ddmEndpointIn
0137 fileSpec.scope = fileAttrs["scope"]
0138 if "INTERNAL_FileType" in fileAttrs:
0139 fileSpec.fileType = fileAttrs["INTERNAL_FileType"]
0140 jobSpec.auxInput = JobSpec.AUX_hasAuxInput
0141 else:
0142 fileSpec.fileType = "input"
0143
0144 if tmpLFN not in fileStatMap:
0145 fileStatMap[tmpLFN] = self.dbProxy.get_file_status(tmpLFN, fileSpec.fileType, queueConfig.ddmEndpointIn, "starting")
0146
0147 if [x for x in ["ready", "preparing", "to_prepare", "triggered"] if x in fileStatMap[tmpLFN]]:
0148 fileSpec.status = "preparing"
0149 else:
0150 fileSpec.status = "to_prepare"
0151 fileStatMap[tmpLFN].setdefault(fileSpec.status, None)
0152 if "INTERNAL_URL" in fileAttrs:
0153 fileSpec.url = fileAttrs["INTERNAL_URL"]
0154 jobSpec.add_in_file(fileSpec)
0155 jobSpec.trigger_propagation()
0156 jobSpecs.append(jobSpec)
0157
0158 tmpLog.debug(f"Converting of {len(jobs)} jobs {sw_startconvert.get_elapsed_time()}")
0159 sw_insertdb = core_utils.get_stopwatch()
0160 self.dbProxy.insert_jobs(jobSpecs)
0161 tmpLog.debug(f"Insert of {len(jobSpecs)} jobs {sw_insertdb.get_elapsed_time()}")
0162
0163 return len(jobs)
0164
0165
0166 _get_jobs(n_jobs=n_jobs)
0167
0168
0169 mainLog.debug("done")
0170
0171 if self.terminated(harvester_config.jobfetcher.sleepTime):
0172 mainLog.debug("terminated")
0173 return