File indexing completed on 2026-04-20 07:59:01
0001 import math
0002 import random
0003
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.job_spec import JobSpec
0006 from pandaharvester.harvestercore.resource_type_constants import (
0007 BASIC_RESOURCE_TYPE_MULTI_CORE,
0008 BASIC_RESOURCE_TYPE_SINGLE_CORE,
0009 )
0010 from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
0011 from pandaharvester.harvestercore.work_spec import WorkSpec
0012 from pandaharvester.harvestermisc.info_utils import PandaQueuesDict
0013
0014 from .base_worker_maker import BaseWorkerMaker
0015
0016
0017 _logger = core_utils.setup_logger("simple_worker_maker")
0018
0019
0020
0021 class SimpleWorkerMaker(BaseWorkerMaker):
0022
0023 def __init__(self, **kwarg):
0024 self.jobAttributesToUse = ["nCore", "minRamCount", "maxDiskCount", "maxWalltime", "ioIntensity"]
0025 BaseWorkerMaker.__init__(self, **kwarg)
0026 self.rt_mapper = ResourceTypeMapper()
0027
0028 def get_job_core_and_memory(self, queue_dict, job_spec):
0029 job_memory = job_spec.jobParams.get("minRamCount", 0) or 0
0030 job_core_count = job_spec.jobParams.get("coreCount", 1) or 1
0031
0032 is_ucore = queue_dict.get("capability", "") == "ucore"
0033
0034 if not job_memory and is_ucore:
0035 site_maxrss = queue_dict.get("maxrss", 0) or 0
0036 site_corecount = queue_dict.get("corecount", 1) or 1
0037
0038 if job_core_count == 1:
0039 job_memory = int(math.ceil(site_maxrss / site_corecount))
0040 else:
0041 job_memory = site_maxrss
0042
0043 return job_core_count, job_memory
0044
0045 def get_job_type(self, job_spec, job_type, queue_dict, tmp_prod_source_label="ANY"):
0046 queue_type = queue_dict.get("type", None)
0047
0048
0049 if job_spec and "prodSourceLabel" in job_spec.jobParams:
0050 job_type_final = job_spec.jobParams["prodSourceLabel"]
0051
0052
0053 elif job_type:
0054 job_type_final = job_type
0055 if tmp_prod_source_label != "ANY":
0056 if queue_type != "analysis" and tmp_prod_source_label not in ("user", "panda", "managed"):
0057
0058
0059 job_type_final = "managed"
0060
0061
0062 else:
0063
0064 if queue_type == "analysis":
0065 job_type_final = "user"
0066 elif queue_type == "production":
0067 job_type_final = "managed"
0068 else:
0069 job_type_final = None
0070
0071 return job_type_final
0072
0073
0074 def make_worker(self, jobspec_list, queue_config, job_type, resource_type, prod_source_label="ANY"):
0075 tmp_log = self.make_logger(_logger, f"queue={queue_config.queueName}:{job_type}:{resource_type}:{prod_source_label}", method_name="make_worker")
0076
0077 tmp_log.debug(f"jobspec_list: {jobspec_list}")
0078
0079 work_spec = WorkSpec()
0080 work_spec.creationTime = core_utils.naive_utcnow()
0081
0082
0083 panda_queues_dict = PandaQueuesDict()
0084 queue_dict = panda_queues_dict.get(queue_config.queueName, {})
0085 associated_params_dict = panda_queues_dict.get_harvester_params(queue_config.queueName)
0086
0087 is_ucore = queue_dict.get("capability", "") == "ucore"
0088
0089 if not is_ucore:
0090 work_spec.nCore = queue_dict.get("corecount", 1) or 1
0091 work_spec.minRamCount = queue_dict.get("maxrss", 1) or 1
0092
0093
0094 else:
0095 catchall = queue_dict.get("catchall", "")
0096 if "useMaxRam" in catchall:
0097
0098 site_corecount = queue_dict.get("corecount", 1) or 1
0099 site_maxrss = queue_dict.get("maxrss", 1) or 1
0100
0101
0102 if self.rt_mapper.is_single_core_resource_type(resource_type):
0103 work_spec.nCore = 1
0104 work_spec.minRamCount = int(math.ceil(site_maxrss / site_corecount))
0105 else:
0106
0107 work_spec.nCore = site_corecount
0108 work_spec.minRamCount = site_maxrss
0109 else:
0110 if not len(jobspec_list) and not self.rt_mapper.is_valid_resource_type(resource_type):
0111
0112 tmp_log.warning(f'Invalid resource type "{resource_type}" (perhaps due to ucore with pure pull); default to the basic 1-core resource type')
0113 resource_type = BASIC_RESOURCE_TYPE_SINGLE_CORE
0114 work_spec.nCore, work_spec.minRamCount = self.rt_mapper.calculate_worker_requirements(resource_type, queue_dict)
0115
0116
0117 work_spec.maxWalltime = queue_dict.get("maxtime", 1)
0118 work_spec.maxDiskCount = queue_dict.get("maxwdir", 1)
0119
0120 if len(jobspec_list) > 0:
0121
0122 nCore = 0
0123 minRamCount = 0
0124 maxDiskCount = 0
0125 maxWalltime = 0
0126 ioIntensity = 0
0127 for jobSpec in jobspec_list:
0128 job_core_count, job_memory = self.get_job_core_and_memory(queue_dict, jobSpec)
0129 nCore += job_core_count
0130 minRamCount += job_memory
0131 try:
0132 maxDiskCount += jobSpec.jobParams["maxDiskCount"]
0133 except Exception:
0134 pass
0135 try:
0136 maxWalltime += jobSpec.jobParams["maxWalltime"]
0137 except Exception:
0138 pass
0139 try:
0140 ioIntensity += jobSpec.jobParams["ioIntensity"]
0141 except Exception:
0142 pass
0143
0144 job_resource_type = getattr(jobspec_list[0], "resourceType", None)
0145 if job_resource_type:
0146 resource_type = job_resource_type
0147
0148 if is_ucore or (nCore > 0 and "nCore" in self.jobAttributesToUse):
0149 work_spec.nCore = nCore
0150 if is_ucore or (minRamCount > 0 and ("minRamCount" in self.jobAttributesToUse or associated_params_dict.get("job_minramcount") is True)):
0151 work_spec.minRamCount = minRamCount
0152 if maxDiskCount > 0 and ("maxDiskCount" in self.jobAttributesToUse or associated_params_dict.get("job_maxdiskcount") is True):
0153 work_spec.maxDiskCount = maxDiskCount
0154 if maxWalltime > 0 and ("maxWalltime" in self.jobAttributesToUse or associated_params_dict.get("job_maxwalltime") is True):
0155 work_spec.maxWalltime = maxWalltime
0156 if ioIntensity > 0 and ("ioIntensity" in self.jobAttributesToUse or associated_params_dict.get("job_iointensity") is True):
0157 work_spec.ioIntensity = ioIntensity
0158
0159 work_spec.pilotType = jobspec_list[0].get_pilot_type()
0160 work_spec.jobType = self.get_job_type(jobspec_list[0], job_type, queue_dict)
0161
0162 else:
0163
0164 tmp_prod_source_label = prod_source_label
0165 if tmp_prod_source_label == "ANY":
0166
0167 pdpm = getattr(queue_config, "prodSourceLabelRandomWeightsPermille", {})
0168 choice_list = core_utils.make_choice_list(pdpm=pdpm, default="managed")
0169 tmp_prod_source_label = random.choice(choice_list)
0170
0171 fake_job = JobSpec()
0172 fake_job.jobParams = {"prodSourceLabel": tmp_prod_source_label}
0173 work_spec.pilotType = fake_job.get_pilot_type()
0174 del fake_job
0175 if work_spec.pilotType in ["RC", "ALRB", "PT"]:
0176 tmp_log.info(f"a worker has pilotType={work_spec.pilotType}")
0177
0178 work_spec.jobType = self.get_job_type(None, job_type, queue_dict, tmp_prod_source_label)
0179 tmp_log.debug(
0180 "get_job_type decided for job_type: {0} (input job_type: {1}, queue_type: {2}, tmp_prod_source_label: {3})".format(
0181 work_spec.jobType, job_type, queue_dict.get("type", None), tmp_prod_source_label
0182 )
0183 )
0184
0185
0186 queue_rtype = self.rt_mapper.get_rtype_for_queue(queue_dict)
0187
0188 if resource_type and resource_type != "ANY":
0189 work_spec.resourceType = resource_type
0190 elif queue_rtype:
0191 work_spec.resourceType = queue_rtype
0192 elif work_spec.nCore == 1:
0193 work_spec.resourceType = BASIC_RESOURCE_TYPE_SINGLE_CORE
0194 else:
0195 work_spec.resourceType = BASIC_RESOURCE_TYPE_MULTI_CORE
0196
0197 return work_spec