Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 08:38:45

0001 import json
0002 import re
0003 
0004 from pandaserver.srvcore.CoreUtils import NonJsonObjectEncoder, as_python_object
0005 from pandaserver.taskbuffer.JobSpec import JobSpec
0006 
0007 # list of prod source label for pilot tests
0008 list_ptest_prod_sources = ["ptest", "rc_test", "rc_test2", "rc_alrb"]
0009 
0010 # mapping with prodsourcelabels that belong to analysis and production
0011 analy_sources = ["user", "panda"]
0012 prod_sources = ["managed", "prod_test"]
0013 neutral_sources = ["install"] + list_ptest_prod_sources
0014 
0015 ANALY_PS = "user"
0016 PROD_PS = "managed"
0017 
0018 ANALY_TASKTYPE = "anal"
0019 PROD_TASKTYPE = "prod"
0020 
0021 MEMORY_COMPENSATION = 0.9
0022 
0023 job_labels = [ANALY_PS, PROD_PS]
0024 
0025 # priority of tasks to jumbo over others
0026 priorityTasksToJumpOver = 1500
0027 
0028 
0029 def translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel):
0030     if prodsourcelabel in analy_sources:
0031         return ANALY_PS
0032 
0033     if prodsourcelabel in prod_sources:
0034         return PROD_PS
0035 
0036     if prodsourcelabel in neutral_sources:
0037         if queue_type == "unified" or queue_type == "production":
0038             return PROD_PS
0039         if queue_type == "analysis":
0040             return ANALY_PS
0041 
0042     # currently unmapped
0043     return prodsourcelabel
0044 
0045 
0046 def translate_tasktype_to_jobtype(task_type):
0047     # any unrecognized tasktype will be defaulted to production
0048     if task_type == ANALY_TASKTYPE:
0049         return ANALY_PS
0050     else:
0051         return PROD_PS
0052 
0053 
0054 # get core count
0055 def getCoreCount(actualCoreCount, defCoreCount, jobMetrics):
0056     coreCount = 1
0057     try:
0058         if actualCoreCount is not None:
0059             coreCount = actualCoreCount
0060         else:
0061             tmpMatch = None
0062             if jobMetrics is not None:
0063                 # extract coreCount
0064                 tmpMatch = re.search("coreCount=(\d+)", jobMetrics)
0065             if tmpMatch is not None:
0066                 coreCount = int(tmpMatch.group(1))
0067             else:
0068                 # use jobdef
0069                 if defCoreCount not in [None, 0]:
0070                     coreCount = defCoreCount
0071     except Exception:
0072         pass
0073     return coreCount
0074 
0075 
0076 # get HS06sec
0077 def getHS06sec(startTime, endTime, corePower, coreCount, baseWalltime=0, cpuEfficiency=100):
0078     try:
0079         # no scaling
0080         if cpuEfficiency == 0:
0081             return 0
0082         # get execution time
0083         tmpTimeDelta = endTime - startTime
0084         tmpVal = tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600
0085         if tmpVal <= baseWalltime:
0086             return 0
0087         hs06sec = float(tmpVal - baseWalltime) * corePower * coreCount * float(cpuEfficiency) / 100.0
0088         return hs06sec
0089     except Exception:
0090         return None
0091 
0092 
0093 def get_job_co2(start_time, end_time, core_count, energy_emissions, watts_per_core):
0094     energy_emissions_by_ts = {}
0095     for entry in energy_emissions:
0096         aux_timestamp, region, value = entry
0097         energy_emissions_by_ts[aux_timestamp] = {"value": value}
0098 
0099     try:
0100         timestamps = sorted([entry[0] for entry in energy_emissions])
0101 
0102         started = False
0103         ended = False
0104         i = 0
0105 
0106         g_co2_job = 0
0107 
0108         for timestamp in timestamps:
0109             try:
0110                 if start_time < timestamps[i + 1] and not started:
0111                     started = True
0112             except IndexError:
0113                 pass
0114 
0115             if end_time < timestamp and not ended:
0116                 ended = True
0117 
0118             if started and not ended or i == len(timestamps) - 1:
0119                 bottom = max(start_time, timestamp)
0120                 try:
0121                     top = min(end_time, timestamps[i + 1])
0122                 except IndexError:
0123                     top = end_time
0124 
0125                 g_co2_perkWh = energy_emissions_by_ts[timestamp]["value"]
0126 
0127                 duration = max((top - bottom).total_seconds(), 0)
0128                 g_co2_job = g_co2_job + (duration * g_co2_perkWh * core_count * watts_per_core / 3600 / 1000)
0129 
0130             if ended:
0131                 break
0132 
0133             i = i + 1
0134 
0135         return g_co2_job
0136 
0137     except Exception:
0138         return None
0139 
0140 
0141 # parse string for number of standby jobs
0142 def parseNumStandby(catchall):
0143     retMap = {}
0144     if catchall is not None:
0145         for tmpItem in catchall.split(","):
0146             tmpMatch = re.search("^nStandby=(.+)", tmpItem)
0147             if tmpMatch is None:
0148                 continue
0149             for tmpSubStr in tmpMatch.group(1).split("|"):
0150                 if len(tmpSubStr.split(":")) != 3:
0151                     continue
0152                 sw_id, resource_type, num = tmpSubStr.split(":")
0153                 try:
0154                     sw_id = int(sw_id)
0155                 except Exception:
0156                     pass
0157                 if sw_id not in retMap:
0158                     retMap[sw_id] = {}
0159                 if num == "":
0160                     num = 0
0161                 else:
0162                     num = int(num)
0163                 retMap[sw_id][resource_type] = num
0164             break
0165     return retMap
0166 
0167 
0168 # compensate memory count to prevent jobs with ramCount close to the HIMEM border from going to HIMEM PQs
0169 def compensate_ram_count(ram_count):
0170     if ram_count in ("NULL", None):
0171         return None
0172     ram_count = int(ram_count * MEMORY_COMPENSATION)
0173     return ram_count
0174 
0175 
0176 # undo the memory count compensation
0177 def decompensate_ram_count(ram_count):
0178     if ram_count in ("NULL", None):
0179         return None
0180     ram_count = int(ram_count / MEMORY_COMPENSATION)
0181     return ram_count
0182 
0183 
0184 # dump jobs to serialized json
0185 def dump_jobs_json(jobs):
0186     state_objects = []
0187     for job_spec in jobs:
0188         state_objects.append(job_spec.dump_to_json_serializable())
0189     return json.dumps(state_objects, cls=NonJsonObjectEncoder)
0190 
0191 
0192 # load serialized json to jobs
0193 def load_jobs_json(state):
0194     state_objects = json.loads(state, object_hook=as_python_object)
0195     jobs = []
0196     for job_state in state_objects:
0197         job_spec = JobSpec()
0198         job_spec.load_from_json_serializable(job_state)
0199         jobs.append(job_spec)
0200     return jobs
0201 
0202 
0203 # get resource type for a job
0204 def get_resource_type_job(resource_map: list, job_spec: JobSpec) -> str:
0205     """
0206     Get the resource type for a job based on the job's resource type and the list of resource types.
0207     :param resource_map: The list of resource types.
0208     :param job_spec: The job.
0209     :return: The resource type.
0210     """
0211     for resource_spec in resource_map:
0212         if resource_spec.match_job(job_spec):
0213             return resource_spec.resource_name
0214     return "Undefined"
0215 
0216 
0217 # get min ram count for job
0218 def getJobMinRamCount(taskSpec, inputChunk, siteSpec, coreCount):
0219     minRamCount = inputChunk.getMaxRamCount()
0220     if inputChunk.isMerging:
0221         minRamUnit = "MB"
0222     else:
0223         minRamUnit = taskSpec.ramUnit
0224         if minRamUnit in [None, "", "NULL"]:
0225             minRamUnit = "MB"
0226         if taskSpec.ramPerCore():
0227             minRamCount *= coreCount
0228             minRamCount += taskSpec.baseRamCount
0229             minRamUnit = re.sub("PerCore.*$", "", minRamUnit)
0230     # round up with chunks
0231     minRamCount = compensate_ram_count(minRamCount)
0232     return minRamCount, minRamUnit