Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:56

0001 import datetime
0002 import math
0003 import random
0004 import socket
0005 
0006 from pandaharvester.harvesterbody.agent_base import AgentBase
0007 from pandaharvester.harvesterconfig import harvester_config
0008 from pandaharvester.harvestercore import core_utils
0009 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0010 from pandaharvester.harvestercore.file_spec import FileSpec
0011 from pandaharvester.harvestercore.job_spec import JobSpec
0012 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0013 from pandaharvester.harvestercore.resource_type_constants import (
0014     BASIC_RESOURCE_TYPE_MULTI_CORE,
0015     BASIC_RESOURCE_TYPE_SINGLE_CORE,
0016 )
0017 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0018 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0019 
0020 # logger
0021 _logger = core_utils.setup_logger("job_fetcher")
0022 
0023 # resource type mapper
0024 rt_mapper = ResourceTypeMapper()
0025 all_resource_types = rt_mapper.get_all_resource_types()
0026 
0027 
0028 # class to fetch jobs
0029 class JobFetcher(AgentBase):
0030     # constructor
0031     def __init__(self, communicator, queue_config_mapper, single_mode=False):
0032         AgentBase.__init__(self, single_mode)
0033         self.dbProxy = DBProxy()
0034         self.communicator = communicator
0035         self.nodeName = socket.gethostname()
0036         self.queueConfigMapper = queue_config_mapper
0037         self.pluginFactory = PluginFactory()
0038 
0039     # main loop
0040     def run(self):
0041         while True:
0042             mainLog = self.make_logger(_logger, f"id={self.get_pid()}", method_name="run")
0043             mainLog.debug("getting number of jobs to be fetched")
0044             # get number of jobs to be fetched
0045             job_limit_to_fetch_dict = self.dbProxy.get_num_jobs_to_fetch(
0046                 harvester_config.jobfetcher.nQueues, harvester_config.jobfetcher.lookupTime, self.queueConfigMapper
0047             )
0048             mainLog.debug(f"got {len(job_limit_to_fetch_dict)} queues")
0049             # get up to date queue configuration
0050             pandaQueueDict = PandaQueuesDict(filter_site_list=job_limit_to_fetch_dict.keys())
0051             # loop over all queues
0052             for queueName, value_dict in job_limit_to_fetch_dict.items():
0053                 n_jobs = value_dict["jobs"]
0054                 n_cores = value_dict["cores"]
0055                 if n_cores is None:
0056                     n_cores = math.inf
0057                 # check queue
0058                 if not self.queueConfigMapper.has_queue(queueName):
0059                     continue
0060                 tmpLog = self.make_logger(_logger, f"queueName={queueName}", method_name="run")
0061                 # get queue
0062                 queueConfig = self.queueConfigMapper.get_queue(queueName)
0063                 siteName = queueConfig.siteName
0064                 # upper limit
0065                 if n_jobs > harvester_config.jobfetcher.maxJobs:
0066                     n_jobs = harvester_config.jobfetcher.maxJobs
0067                 if n_jobs == 0:
0068                     tmpLog.debug("no job to fetch; skip")
0069                     continue
0070                 # prod_source_label
0071                 try:
0072                     is_grandly_unified_queue = pandaQueueDict.is_grandly_unified_queue(siteName)
0073                 except Exception:
0074                     is_grandly_unified_queue = False
0075                 default_prodSourceLabel = queueConfig.get_source_label(is_gu=is_grandly_unified_queue)
0076                 # randomize prod_source_label if configured
0077                 pdpm = getattr(queueConfig, "prodSourceLabelRandomWeightsPermille", {})
0078                 choice_list = core_utils.make_choice_list(pdpm=pdpm, default=default_prodSourceLabel)
0079                 prodSourceLabel = random.choice(choice_list)
0080                 # caps from resource_type_limit params on CRIC of the PQ
0081                 resource_type_limits_dict = dict()
0082                 for key, val in pandaQueueDict.get_harvester_params(siteName).items():
0083                     if str(key).startswith("resource_type_limits."):
0084                         new_key = str(key).lstrip("resource_type_limits.")
0085                         if isinstance(val, int):
0086                             resource_type_limits_dict[new_key] = val
0087 
0088                 # function to call get jobs
0089                 def _get_jobs(resource_type=None, n_jobs=0):
0090                     # custom criteria from queueconfig
0091                     additional_criteria = queueConfig.getJobCriteria
0092                     if resource_type:
0093                         # addition criteria for getJob on resourcetype
0094                         additional_criteria = {"resourceType": resource_type}
0095                     # call get jobs
0096                     tmpLog.debug(f"getting {n_jobs} jobs for prodSourceLabel={prodSourceLabel} rtype={resource_type}")
0097                     sw = core_utils.get_stopwatch()
0098                     if n_jobs > 0:
0099                         jobs, errStr = self.communicator.get_jobs(siteName, self.nodeName, prodSourceLabel, self.nodeName, n_jobs, additional_criteria)
0100                     else:
0101                         jobs, errStr = [], "no need to get job"
0102                     tmpLog.info(f"got {len(jobs)} jobs for prodSourceLabel={prodSourceLabel} rtype={resource_type} with {errStr} {sw.get_elapsed_time()}")
0103                     # convert to JobSpec
0104                     if len(jobs) > 0:
0105                         # get extractor plugin
0106                         if hasattr(queueConfig, "extractor"):
0107                             extractorCore = self.pluginFactory.get_plugin(queueConfig.extractor)
0108                         else:
0109                             extractorCore = None
0110                         jobSpecs = []
0111                         fileStatMap = dict()
0112                         sw_startconvert = core_utils.get_stopwatch()
0113                         for job in jobs:
0114                             timeNow = core_utils.naive_utcnow()
0115                             jobSpec = JobSpec()
0116                             jobSpec.convert_job_json(job)
0117                             jobSpec.computingSite = queueName
0118                             jobSpec.status = "starting"
0119                             jobSpec.subStatus = "fetched"
0120                             jobSpec.creationTime = timeNow
0121                             jobSpec.stateChangeTime = timeNow
0122                             jobSpec.configID = queueConfig.configID
0123                             jobSpec.set_one_attribute("schedulerID", f"harvester-{harvester_config.master.harvester_id}")
0124                             if queueConfig.zipPerMB is not None and jobSpec.zipPerMB is None:
0125                                 jobSpec.zipPerMB = queueConfig.zipPerMB
0126                             fileGroupDictList = [jobSpec.get_input_file_attributes()]
0127                             if extractorCore is not None:
0128                                 fileGroupDictList.append(extractorCore.get_aux_inputs(jobSpec))
0129                             for fileGroupDict in fileGroupDictList:
0130                                 for tmpLFN, fileAttrs in fileGroupDict.items():
0131                                     # make file spec
0132                                     fileSpec = FileSpec()
0133                                     fileSpec.PandaID = jobSpec.PandaID
0134                                     fileSpec.taskID = jobSpec.taskID
0135                                     fileSpec.lfn = tmpLFN
0136                                     fileSpec.endpoint = queueConfig.ddmEndpointIn
0137                                     fileSpec.scope = fileAttrs["scope"]
0138                                     if "INTERNAL_FileType" in fileAttrs:
0139                                         fileSpec.fileType = fileAttrs["INTERNAL_FileType"]
0140                                         jobSpec.auxInput = JobSpec.AUX_hasAuxInput
0141                                     else:
0142                                         fileSpec.fileType = "input"
0143                                     # check file status
0144                                     if tmpLFN not in fileStatMap:
0145                                         fileStatMap[tmpLFN] = self.dbProxy.get_file_status(tmpLFN, fileSpec.fileType, queueConfig.ddmEndpointIn, "starting")
0146                                     # set preparing to skip stage-in if the file is (being) taken care of by another job
0147                                     if [x for x in ["ready", "preparing", "to_prepare", "triggered"] if x in fileStatMap[tmpLFN]]:
0148                                         fileSpec.status = "preparing"
0149                                     else:
0150                                         fileSpec.status = "to_prepare"
0151                                     fileStatMap[tmpLFN].setdefault(fileSpec.status, None)
0152                                     if "INTERNAL_URL" in fileAttrs:
0153                                         fileSpec.url = fileAttrs["INTERNAL_URL"]
0154                                     jobSpec.add_in_file(fileSpec)
0155                             jobSpec.trigger_propagation()
0156                             jobSpecs.append(jobSpec)
0157                         # insert to DB
0158                         tmpLog.debug(f"Converting of {len(jobs)} jobs {sw_startconvert.get_elapsed_time()}")
0159                         sw_insertdb = core_utils.get_stopwatch()
0160                         self.dbProxy.insert_jobs(jobSpecs)
0161                         tmpLog.debug(f"Insert of {len(jobSpecs)} jobs {sw_insertdb.get_elapsed_time()}")
0162                     # return len jobs
0163                     return len(jobs)
0164 
0165                 # call get jobs
0166                 _get_jobs(n_jobs=n_jobs)
0167 
0168             # done loop
0169             mainLog.debug("done")
0170             # check if being terminated
0171             if self.terminated(harvester_config.jobfetcher.sleepTime):
0172                 mainLog.debug("terminated")
0173                 return