Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 import math
0002 import random
0003 
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.job_spec import JobSpec
0006 from pandaharvester.harvestercore.resource_type_constants import (
0007     BASIC_RESOURCE_TYPE_MULTI_CORE,
0008     BASIC_RESOURCE_TYPE_SINGLE_CORE,
0009 )
0010 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0011 from pandaharvester.harvestercore.work_spec import WorkSpec
0012 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0013 
0014 from .base_worker_maker import BaseWorkerMaker
0015 
0016 # logger
0017 _logger = core_utils.setup_logger("simple_worker_maker")
0018 
0019 
0020 # simple maker
0021 class SimpleWorkerMaker(BaseWorkerMaker):
0022     # constructor
0023     def __init__(self, **kwarg):
0024         self.jobAttributesToUse = ["nCore", "minRamCount", "maxDiskCount", "maxWalltime", "ioIntensity"]
0025         BaseWorkerMaker.__init__(self, **kwarg)
0026         self.rt_mapper = ResourceTypeMapper()
0027 
0028     def get_job_core_and_memory(self, queue_dict, job_spec):
0029         job_memory = job_spec.jobParams.get("minRamCount", 0) or 0
0030         job_core_count = job_spec.jobParams.get("coreCount", 1) or 1
0031 
0032         is_ucore = queue_dict.get("capability", "") == "ucore"
0033 
0034         if not job_memory and is_ucore:
0035             site_maxrss = queue_dict.get("maxrss", 0) or 0
0036             site_corecount = queue_dict.get("corecount", 1) or 1
0037 
0038             if job_core_count == 1:
0039                 job_memory = int(math.ceil(site_maxrss / site_corecount))
0040             else:
0041                 job_memory = site_maxrss
0042 
0043         return job_core_count, job_memory
0044 
0045     def get_job_type(self, job_spec, job_type, queue_dict, tmp_prod_source_label="ANY"):
0046         queue_type = queue_dict.get("type", None)
0047 
0048         # 1. get prodSourceLabel from job (PUSH)
0049         if job_spec and "prodSourceLabel" in job_spec.jobParams:
0050             job_type_final = job_spec.jobParams["prodSourceLabel"]
0051 
0052         # 2. get prodSourceLabel from the specified job_type (PULL UPS)
0053         elif job_type:
0054             job_type_final = job_type
0055             if tmp_prod_source_label != "ANY":
0056                 if queue_type != "analysis" and tmp_prod_source_label not in ("user", "panda", "managed"):
0057                     # for production, unified or other types of queues we need to run neutral prod_source_labels
0058                     # with production proxy since they can't be distinguished and can fail
0059                     job_type_final = "managed"
0060 
0061         # 3. convert the prodSourcelabel from the queue configuration or leave it empty (PULL)
0062         else:
0063             # map CRIC types to PanDA types
0064             if queue_type == "analysis":
0065                 job_type_final = "user"
0066             elif queue_type == "production":
0067                 job_type_final = "managed"
0068             else:
0069                 job_type_final = None
0070 
0071         return job_type_final
0072 
0073     # make a worker from jobs
0074     def make_worker(self, jobspec_list, queue_config, job_type, resource_type, prod_source_label="ANY"):
0075         tmp_log = self.make_logger(_logger, f"queue={queue_config.queueName}:{job_type}:{resource_type}:{prod_source_label}", method_name="make_worker")
0076 
0077         tmp_log.debug(f"jobspec_list: {jobspec_list}")
0078 
0079         work_spec = WorkSpec()
0080         work_spec.creationTime = core_utils.naive_utcnow()
0081 
0082         # get the queue configuration from CRIC
0083         panda_queues_dict = PandaQueuesDict()
0084         queue_dict = panda_queues_dict.get(queue_config.queueName, {})
0085         associated_params_dict = panda_queues_dict.get_harvester_params(queue_config.queueName)
0086 
0087         is_ucore = queue_dict.get("capability", "") == "ucore"
0088         # case of traditional (non-ucore) queue: look at the queue configuration
0089         if not is_ucore:
0090             work_spec.nCore = queue_dict.get("corecount", 1) or 1
0091             work_spec.minRamCount = queue_dict.get("maxrss", 1) or 1
0092 
0093         # case of unified queue: look at the job & resource type and queue configuration
0094         else:
0095             catchall = queue_dict.get("catchall", "")
0096             if "useMaxRam" in catchall:
0097                 # some sites require to always set the maximum memory due to memory killing jobs
0098                 site_corecount = queue_dict.get("corecount", 1) or 1
0099                 site_maxrss = queue_dict.get("maxrss", 1) or 1
0100 
0101                 # some cases need to overwrite those values
0102                 if self.rt_mapper.is_single_core_resource_type(resource_type):
0103                     work_spec.nCore = 1
0104                     work_spec.minRamCount = int(math.ceil(site_maxrss / site_corecount))
0105                 else:
0106                     # default values
0107                     work_spec.nCore = site_corecount
0108                     work_spec.minRamCount = site_maxrss
0109             else:
0110                 if not len(jobspec_list) and not self.rt_mapper.is_valid_resource_type(resource_type):
0111                     # some testing PQs have ucore + pure pull, need to default to the basic 1-core resource type
0112                     tmp_log.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to the basic 1-core resource type')
0113                     resource_type = BASIC_RESOURCE_TYPE_SINGLE_CORE
0114                 work_spec.nCore, work_spec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict)
0115 
0116         # parameters that are independent on traditional vs unified
0117         work_spec.maxWalltime = queue_dict.get("maxtime", 1)
0118         work_spec.maxDiskCount = queue_dict.get("maxwdir", 1)
0119 
0120         if len(jobspec_list) > 0:
0121             # get info from jobs
0122             nCore = 0
0123             minRamCount = 0
0124             maxDiskCount = 0
0125             maxWalltime = 0
0126             ioIntensity = 0
0127             for jobSpec in jobspec_list:
0128                 job_core_count, job_memory = self.get_job_core_and_memory(queue_dict, jobSpec)
0129                 nCore += job_core_count
0130                 minRamCount += job_memory
0131                 try:
0132                     maxDiskCount += jobSpec.jobParams["maxDiskCount"]
0133                 except Exception:
0134                     pass
0135                 try:
0136                     maxWalltime += jobSpec.jobParams["maxWalltime"]
0137                 except Exception:
0138                     pass
0139                 try:
0140                     ioIntensity += jobSpec.jobParams["ioIntensity"]
0141                 except Exception:
0142                     pass
0143             # update resource type
0144             job_resource_type = getattr(jobspec_list[0], "resourceType", None)
0145             if job_resource_type:
0146                 resource_type = job_resource_type
0147             # fill in worker attributes
0148             if is_ucore or (nCore > 0 and "nCore" in self.jobAttributesToUse):
0149                 work_spec.nCore = nCore
0150             if is_ucore or (minRamCount > 0 and ("minRamCount" in self.jobAttributesToUse or associated_params_dict.get("job_minramcount") is True)):
0151                 work_spec.minRamCount = minRamCount
0152             if maxDiskCount > 0 and ("maxDiskCount" in self.jobAttributesToUse or associated_params_dict.get("job_maxdiskcount") is True):
0153                 work_spec.maxDiskCount = maxDiskCount
0154             if maxWalltime > 0 and ("maxWalltime" in self.jobAttributesToUse or associated_params_dict.get("job_maxwalltime") is True):
0155                 work_spec.maxWalltime = maxWalltime
0156             if ioIntensity > 0 and ("ioIntensity" in self.jobAttributesToUse or associated_params_dict.get("job_iointensity") is True):
0157                 work_spec.ioIntensity = ioIntensity
0158 
0159             work_spec.pilotType = jobspec_list[0].get_pilot_type()
0160             work_spec.jobType = self.get_job_type(jobspec_list[0], job_type, queue_dict)
0161 
0162         else:
0163             # when no job
0164             tmp_prod_source_label = prod_source_label
0165             if tmp_prod_source_label == "ANY":
0166                 # no specified prod_source_label; randomize pilot type with weighting
0167                 pdpm = getattr(queue_config, "prodSourceLabelRandomWeightsPermille", {})
0168                 choice_list = core_utils.make_choice_list(pdpm=pdpm, default="managed")
0169                 tmp_prod_source_label = random.choice(choice_list)
0170 
0171             fake_job = JobSpec()
0172             fake_job.jobParams = {"prodSourceLabel": tmp_prod_source_label}
0173             work_spec.pilotType = fake_job.get_pilot_type()
0174             del fake_job
0175             if work_spec.pilotType in ["RC", "ALRB", "PT"]:
0176                 tmp_log.info(f"a worker has pilotType={work_spec.pilotType}")
0177 
0178             work_spec.jobType = self.get_job_type(None, job_type, queue_dict, tmp_prod_source_label)
0179             tmp_log.debug(
0180                 "get_job_type decided for job_type: {0} (input job_type: {1}, queue_type: {2}, tmp_prod_source_label: {3})".format(
0181                     work_spec.jobType, job_type, queue_dict.get("type", None), tmp_prod_source_label
0182                 )
0183             )
0184 
0185         # retrieve queue resource types
0186         queue_rtype = self.rt_mapper.get_rtype_for_queue(queue_dict)
0187 
0188         if resource_type and resource_type != "ANY":
0189             work_spec.resourceType = resource_type
0190         elif queue_rtype:
0191             work_spec.resourceType = queue_rtype
0192         elif work_spec.nCore == 1:
0193             work_spec.resourceType = BASIC_RESOURCE_TYPE_SINGLE_CORE
0194         else:
0195             work_spec.resourceType = BASIC_RESOURCE_TYPE_MULTI_CORE
0196 
0197         return work_spec