File indexing completed on 2026-04-09 08:38:45
0001 import json
0002 import re
0003
0004 from pandaserver.srvcore.CoreUtils import NonJsonObjectEncoder, as_python_object
0005 from pandaserver.taskbuffer.JobSpec import JobSpec
0006
0007
0008 list_ptest_prod_sources = ["ptest", "rc_test", "rc_test2", "rc_alrb"]
0009
0010
0011 analy_sources = ["user", "panda"]
0012 prod_sources = ["managed", "prod_test"]
0013 neutral_sources = ["install"] + list_ptest_prod_sources
0014
0015 ANALY_PS = "user"
0016 PROD_PS = "managed"
0017
0018 ANALY_TASKTYPE = "anal"
0019 PROD_TASKTYPE = "prod"
0020
0021 MEMORY_COMPENSATION = 0.9
0022
0023 job_labels = [ANALY_PS, PROD_PS]
0024
0025
0026 priorityTasksToJumpOver = 1500
0027
0028
0029 def translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel):
0030 if prodsourcelabel in analy_sources:
0031 return ANALY_PS
0032
0033 if prodsourcelabel in prod_sources:
0034 return PROD_PS
0035
0036 if prodsourcelabel in neutral_sources:
0037 if queue_type == "unified" or queue_type == "production":
0038 return PROD_PS
0039 if queue_type == "analysis":
0040 return ANALY_PS
0041
0042
0043 return prodsourcelabel
0044
0045
0046 def translate_tasktype_to_jobtype(task_type):
0047
0048 if task_type == ANALY_TASKTYPE:
0049 return ANALY_PS
0050 else:
0051 return PROD_PS
0052
0053
0054
0055 def getCoreCount(actualCoreCount, defCoreCount, jobMetrics):
0056 coreCount = 1
0057 try:
0058 if actualCoreCount is not None:
0059 coreCount = actualCoreCount
0060 else:
0061 tmpMatch = None
0062 if jobMetrics is not None:
0063
0064 tmpMatch = re.search("coreCount=(\d+)", jobMetrics)
0065 if tmpMatch is not None:
0066 coreCount = int(tmpMatch.group(1))
0067 else:
0068
0069 if defCoreCount not in [None, 0]:
0070 coreCount = defCoreCount
0071 except Exception:
0072 pass
0073 return coreCount
0074
0075
0076
0077 def getHS06sec(startTime, endTime, corePower, coreCount, baseWalltime=0, cpuEfficiency=100):
0078 try:
0079
0080 if cpuEfficiency == 0:
0081 return 0
0082
0083 tmpTimeDelta = endTime - startTime
0084 tmpVal = tmpTimeDelta.seconds + tmpTimeDelta.days * 24 * 3600
0085 if tmpVal <= baseWalltime:
0086 return 0
0087 hs06sec = float(tmpVal - baseWalltime) * corePower * coreCount * float(cpuEfficiency) / 100.0
0088 return hs06sec
0089 except Exception:
0090 return None
0091
0092
0093 def get_job_co2(start_time, end_time, core_count, energy_emissions, watts_per_core):
0094 energy_emissions_by_ts = {}
0095 for entry in energy_emissions:
0096 aux_timestamp, region, value = entry
0097 energy_emissions_by_ts[aux_timestamp] = {"value": value}
0098
0099 try:
0100 timestamps = sorted([entry[0] for entry in energy_emissions])
0101
0102 started = False
0103 ended = False
0104 i = 0
0105
0106 g_co2_job = 0
0107
0108 for timestamp in timestamps:
0109 try:
0110 if start_time < timestamps[i + 1] and not started:
0111 started = True
0112 except IndexError:
0113 pass
0114
0115 if end_time < timestamp and not ended:
0116 ended = True
0117
0118 if started and not ended or i == len(timestamps) - 1:
0119 bottom = max(start_time, timestamp)
0120 try:
0121 top = min(end_time, timestamps[i + 1])
0122 except IndexError:
0123 top = end_time
0124
0125 g_co2_perkWh = energy_emissions_by_ts[timestamp]["value"]
0126
0127 duration = max((top - bottom).total_seconds(), 0)
0128 g_co2_job = g_co2_job + (duration * g_co2_perkWh * core_count * watts_per_core / 3600 / 1000)
0129
0130 if ended:
0131 break
0132
0133 i = i + 1
0134
0135 return g_co2_job
0136
0137 except Exception:
0138 return None
0139
0140
0141
0142 def parseNumStandby(catchall):
0143 retMap = {}
0144 if catchall is not None:
0145 for tmpItem in catchall.split(","):
0146 tmpMatch = re.search("^nStandby=(.+)", tmpItem)
0147 if tmpMatch is None:
0148 continue
0149 for tmpSubStr in tmpMatch.group(1).split("|"):
0150 if len(tmpSubStr.split(":")) != 3:
0151 continue
0152 sw_id, resource_type, num = tmpSubStr.split(":")
0153 try:
0154 sw_id = int(sw_id)
0155 except Exception:
0156 pass
0157 if sw_id not in retMap:
0158 retMap[sw_id] = {}
0159 if num == "":
0160 num = 0
0161 else:
0162 num = int(num)
0163 retMap[sw_id][resource_type] = num
0164 break
0165 return retMap
0166
0167
0168
0169 def compensate_ram_count(ram_count):
0170 if ram_count in ("NULL", None):
0171 return None
0172 ram_count = int(ram_count * MEMORY_COMPENSATION)
0173 return ram_count
0174
0175
0176
0177 def decompensate_ram_count(ram_count):
0178 if ram_count in ("NULL", None):
0179 return None
0180 ram_count = int(ram_count / MEMORY_COMPENSATION)
0181 return ram_count
0182
0183
0184
0185 def dump_jobs_json(jobs):
0186 state_objects = []
0187 for job_spec in jobs:
0188 state_objects.append(job_spec.dump_to_json_serializable())
0189 return json.dumps(state_objects, cls=NonJsonObjectEncoder)
0190
0191
0192
0193 def load_jobs_json(state):
0194 state_objects = json.loads(state, object_hook=as_python_object)
0195 jobs = []
0196 for job_state in state_objects:
0197 job_spec = JobSpec()
0198 job_spec.load_from_json_serializable(job_state)
0199 jobs.append(job_spec)
0200 return jobs
0201
0202
0203
0204 def get_resource_type_job(resource_map: list, job_spec: JobSpec) -> str:
0205 """
0206 Get the resource type for a job based on the job's resource type and the list of resource types.
0207 :param resource_map: The list of resource types.
0208 :param job_spec: The job.
0209 :return: The resource type.
0210 """
0211 for resource_spec in resource_map:
0212 if resource_spec.match_job(job_spec):
0213 return resource_spec.resource_name
0214 return "Undefined"
0215
0216
0217
0218 def getJobMinRamCount(taskSpec, inputChunk, siteSpec, coreCount):
0219 minRamCount = inputChunk.getMaxRamCount()
0220 if inputChunk.isMerging:
0221 minRamUnit = "MB"
0222 else:
0223 minRamUnit = taskSpec.ramUnit
0224 if minRamUnit in [None, "", "NULL"]:
0225 minRamUnit = "MB"
0226 if taskSpec.ramPerCore():
0227 minRamCount *= coreCount
0228 minRamCount += taskSpec.baseRamCount
0229 minRamUnit = re.sub("PerCore.*$", "", minRamUnit)
0230
0231 minRamCount = compensate_ram_count(minRamCount)
0232 return minRamCount, minRamUnit