File indexing completed on 2026-04-09 08:38:41
0001 import copy
0002 import datetime
0003 import json
0004 import math
0005 import os
0006 import re
0007 import subprocess
0008 from threading import Lock
0009
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011
0012
0013
0014 def commands_get_status_output(com):
0015 data = ""
0016 try:
0017 p = subprocess.Popen(
0018 com,
0019 shell=True,
0020 universal_newlines=True,
0021 stdout=subprocess.PIPE,
0022 stderr=subprocess.STDOUT,
0023 )
0024 data, unused_err = p.communicate()
0025 retcode = p.poll()
0026 if retcode:
0027 ex = subprocess.CalledProcessError(retcode, com)
0028 raise ex
0029 status = 0
0030 except subprocess.CalledProcessError as ex:
0031 status = ex.returncode
0032 if data[-1:] == "\n":
0033 data = data[:-1]
0034 return status, data
0035
0036
0037
0038 def clean_user_id(id):
0039 try:
0040 up = re.compile("/(DC|O|OU|C|L)=[^\/]+")
0041 username = up.sub("", id)
0042 up2 = re.compile("/CN=[0-9]+")
0043 username = up2.sub("", username)
0044 up3 = re.compile(" [0-9]+")
0045 username = up3.sub("", username)
0046 up4 = re.compile("_[0-9]+")
0047 username = up4.sub("", username)
0048 username = username.replace("/CN=proxy", "")
0049 username = username.replace("/CN=limited proxy", "")
0050 username = username.replace("limited proxy", "")
0051 username = re.sub("/CN=Robot:[^/]+", "", username)
0052 username = re.sub("/CN=nickname:[^/]+", "", username)
0053 pat = re.compile(".*/CN=([^\/]+)/CN=([^\/]+)")
0054 mat = pat.match(username)
0055 if mat:
0056 username = mat.group(2)
0057 else:
0058 username = username.replace("/CN=", "")
0059 if username.lower().find("/email") > 0:
0060 username = username[: username.lower().find("/email")]
0061 pat = re.compile(".*(limited.*proxy).*")
0062 mat = pat.match(username)
0063 if mat:
0064 username = mat.group(1)
0065 username = username.replace("(", "")
0066 username = username.replace(")", "")
0067 username = username.replace("'", "")
0068 name_wo_email = re.sub(r" [a-z][\w\.-]+@[\w\.-]+(?:\.\w+)+", "", username).strip()
0069 if " " in name_wo_email:
0070 username = name_wo_email
0071 return username
0072 except Exception:
0073 return id
0074
0075
0076
0077 def get_bare_dn(dn, keep_proxy=False, keep_digits=True):
0078 dn = re.sub("/CN=limited proxy", "", dn)
0079 if keep_proxy:
0080 dn = re.sub("/CN=proxy(/CN=proxy)+", "/CN=proxy", dn)
0081 else:
0082 dn = re.sub("(/CN=proxy)+", "", dn)
0083 if not keep_digits:
0084 dn = re.sub(r"(/CN=\d+)+$", "", dn)
0085 return dn
0086
0087
0088
0089 def get_id_from_dn(dn, keep_proxy=False, keep_digits=True):
0090 m = re.search("/CN=nickname:([^/]+)", dn)
0091 if m:
0092 return m.group(1)
0093 return get_bare_dn(dn, keep_proxy, keep_digits)
0094
0095
0096 def get_distinguished_name_list(distinguished_name: str) -> list:
0097 """
0098 Get a list of possible distinguished names from a string, including legacy and RFC formats.
0099
0100 Args:
0101 distinguished_name (str): The distinguished name string.
0102
0103 Returns:
0104 list: A list of possible distinguished names.
0105 """
0106 name_list = []
0107
0108 trans_table = str.maketrans({",": r"\,", "+": r"\+", '"': r"\"", "\\": r"\\", "<": r"\<", ">": r"\>", ";": r"\;"})
0109
0110 for tmp_name in [distinguished_name, get_bare_dn(distinguished_name, keep_digits=False)]:
0111 name_list.append(tmp_name)
0112
0113 tmp_list = [s.translate(trans_table) for s in tmp_name.split("/") if s]
0114 name_list.append(",".join(tmp_list[::-1]))
0115 return name_list
0116
0117
0118 def normalize_cpu_model(cpu_model):
0119 cpu_model = cpu_model.upper()
0120
0121 cpu_model = re.sub(r"@\s*\d+(\.\d+)?\s*GHZ", "", cpu_model)
0122 cpu_model = re.sub(r"\d+\s*KB", "", cpu_model)
0123 cpu_model = re.sub(r"CORE PROCESSOR\s*\([^)]*\)", "", cpu_model)
0124 cpu_model = re.sub(r"\b\d+-CORE\b", "", cpu_model)
0125 cpu_model = re.sub(r"CPU|CORE|PROCESSOR|SCALABLE|\(R\)|\(TM\)", "", cpu_model)
0126 cpu_model = re.sub(r"\s+", " ", cpu_model)
0127 cpu_model = cpu_model.strip()
0128 return cpu_model
0129
0130
0131 def clean_host_name(host_name):
0132
0133 match = re.search(r"@(.+)", host_name)
0134 host_name = match.group(1) if match else host_name
0135
0136
0137
0138 match = re.match(r"^atlprd\d+-[^-]+-([^.]+\.cern\.ch)$", host_name)
0139 host_name = match.group(1) if match else host_name
0140
0141 return host_name
0142
0143
0144
0145 def resolve_bool(param):
0146 if isinstance(param, bool):
0147 return param
0148 if param == "True":
0149 return True
0150 if param == "False":
0151 return False
0152 return param
0153
0154
0155
0156 class CachedObject:
0157
0158 def __init__(self, name, time_interval, update_func, log_stream):
0159
0160 self.name = name
0161
0162 self.cachedObj = None
0163
0164 self.lastUpdated = naive_utcnow()
0165
0166 self.timeInterval = datetime.timedelta(seconds=time_interval)
0167
0168 self.lock = Lock()
0169
0170 self.updateFunc = update_func
0171
0172 self.log_stream = log_stream
0173
0174
0175 def update(self):
0176
0177 self.lock.acquire()
0178
0179 current = naive_utcnow()
0180
0181 if self.cachedObj is None or current - self.lastUpdated > self.timeInterval:
0182 self.log_stream.debug(f"PID={os.getpid()} renewing {self.name} cache")
0183 try:
0184 tmp_stat, tmp_out = self.updateFunc()
0185 self.log_stream.debug(f"PID={os.getpid()} got for {self.name} {tmp_stat} {str(tmp_out)}")
0186 if tmp_stat:
0187 self.cachedObj = tmp_out
0188 except Exception as e:
0189 self.log_stream.error(f"PID={os.getpid()} failed to renew {self.name} due to {str(e)}")
0190 self.lastUpdated = current
0191
0192 self.lock.release()
0193
0194 return
0195
0196
0197 def __contains__(self, item):
0198 self.update()
0199 contains = False
0200 try:
0201 contains = item in self.cachedObj
0202 except TypeError:
0203 pass
0204 return contains
0205
0206
0207 def __getitem__(self, name):
0208 self.update()
0209 return self.cachedObj[name]
0210
0211
0212 def get(self, *var):
0213 return self.cachedObj.get(*var)
0214
0215
0216 def get_object(self):
0217 self.lock.acquire()
0218 return self.cachedObj
0219
0220
0221 def release_object(self):
0222 self.lock.release()
0223
0224
0225
0226 class CacheDict:
0227 """
0228 Dictionary of caches with periodic cleanup
0229 """
0230
0231
0232 def __init__(self, update_interval=10, cleanup_interval=60):
0233 self.idx = 0
0234 self.lock = Lock()
0235 self.cache_dict = {}
0236 self.update_interval = datetime.timedelta(minutes=update_interval)
0237 self.cleanup_interval = datetime.timedelta(minutes=cleanup_interval)
0238 self.last_cleanup = naive_utcnow()
0239
0240 def cleanup(self, tmp_log):
0241 """
0242 Cleanup caches
0243 :param tmp_log: logger
0244 """
0245 with self.lock:
0246 current = naive_utcnow()
0247 if current - self.last_cleanup > self.cleanup_interval:
0248 for name in list(self.cache_dict):
0249 cache = self.cache_dict[name]
0250 if current - cache["last_updated"] > self.cleanup_interval:
0251 tmp_log.debug(f"""deleting cache #{self.cache_dict[name]["idx"]}""")
0252 del self.cache_dict[name]
0253 self.last_cleanup = current
0254
0255 def get(self, name, tmp_log, update_func, *update_args, **update_kwargs):
0256 """
0257 Get updated object
0258 :param name: name of cache
0259 :param tmp_log: logger
0260 :param update_func: function to update object
0261 :param update_args: arguments for update function
0262 :param update_kwargs: keyword arguments for update function
0263 :return: object or None
0264 """
0265 self.cleanup(tmp_log)
0266 with self.lock:
0267 obj = self.cache_dict.get(name)
0268 current = naive_utcnow()
0269 if not obj:
0270 tmp_log.debug(f"creating new cache #{self.idx}")
0271
0272 obj = {
0273 "obj": update_func(*update_args, **update_kwargs),
0274 "last_updated": current,
0275 "update_func": update_func,
0276 "update_args": update_args,
0277 "update_kwargs": update_kwargs,
0278 "idx": self.idx,
0279 }
0280 self.cache_dict[name] = obj
0281 self.idx += 1
0282 else:
0283
0284 if current - obj["last_updated"] > self.update_interval:
0285 tmp_log.debug(f"""updating cache #{obj["idx"]}""")
0286 obj["obj"] = obj["update_func"](*obj["update_args"], **obj["update_kwargs"])
0287 obj["last_updated"] = current
0288 else:
0289 tmp_log.debug(f"""reusing cache #{obj["idx"]}""")
0290 return obj["obj"]
0291
0292
0293
0294 class NonJsonObjectEncoder(json.JSONEncoder):
0295 def default(self, obj):
0296 if isinstance(obj, datetime.datetime):
0297 return {"_datetime_object": obj.strftime("%Y-%m-%d %H:%M:%S.%f")}
0298 return json.JSONEncoder.default(self, obj)
0299
0300
0301
0302 def as_python_object(dct):
0303 if "_datetime_object" in dct:
0304 return datetime.datetime.strptime(str(dct["_datetime_object"]), "%Y-%m-%d %H:%M:%S.%f")
0305 return dct
0306
0307
0308
0309 def getEffectiveFileSize(fsize, startEvent, endEvent, nEvents):
0310 inMB = 1024 * 1024
0311 if fsize in [None, 0]:
0312
0313 effectiveFsize = inMB
0314 elif nEvents is not None and startEvent is not None and endEvent is not None:
0315
0316 effectiveFsize = int(float(fsize) * float(endEvent - startEvent + 1) / float(nEvents))
0317 else:
0318 effectiveFsize = fsize
0319
0320 if effectiveFsize == 0:
0321 effectiveFsize = inMB
0322
0323 effectiveFsize = float(effectiveFsize) / inMB
0324
0325 return effectiveFsize
0326
0327
0328
0329 def getEffectiveNumEvents(startEvent, endEvent, nEvents):
0330 if endEvent is not None and startEvent is not None:
0331 evtCounts = endEvent - startEvent + 1
0332 if evtCounts > 0:
0333 return evtCounts
0334 return 1
0335 if nEvents is not None:
0336 return nEvents
0337 return 1
0338
0339
0340
0341 def getMemoryUsage():
0342 try:
0343 t = open(f"/proc/{os.getpid()}/status")
0344 v = t.read()
0345 t.close()
0346 value = 0
0347 for line in v.split("\n"):
0348 if line.startswith("VmRSS"):
0349 items = line.split()
0350 value = int(items[1])
0351 if items[2] in ["kB", "KB"]:
0352 value /= 1024
0353 elif items[2] in ["mB", "MB"]:
0354 pass
0355 break
0356 return int(value)
0357 except Exception:
0358 return None
0359
0360
0361
0362 def checkProcess(pid):
0363 return os.path.exists(f"/proc/{pid}/status")
0364
0365
0366
0367 wallTimeOffset = 10 * 60
0368
0369
0370
0371 def convert_config_params(itemStr):
0372 items = itemStr.split(":")
0373 newItems = []
0374 for item in items:
0375 if item == "":
0376 newItems.append(None)
0377 elif "," in item:
0378 newItems.append(item.split(","))
0379 else:
0380 try:
0381 newItems.append(int(item))
0382 except Exception:
0383 newItems.append(item)
0384 return newItems
0385
0386
0387
0388 def parse_init_params(par):
0389 if isinstance(par, list):
0390 return par
0391 try:
0392 return par.split("|")
0393 except Exception:
0394 return [par]
0395
0396
0397
0398 def getConfigParam(configStr, vo, sourceLabel):
0399 try:
0400 for _ in configStr.split(","):
0401 items = configStr.split(":")
0402 vos = items[0].split("|")
0403 sourceLabels = items[1].split("|")
0404 if vo not in ["", "any"] and vo not in vos and None not in vos and "any" not in vos and "" not in vos:
0405 continue
0406 if (
0407 sourceLabel not in ["", "any"]
0408 and sourceLabel not in sourceLabels
0409 and None not in sourceLabels
0410 and "any" not in sourceLabels
0411 and "" not in sourceLabels
0412 ):
0413 continue
0414 return ",".join(items[2:])
0415 except Exception:
0416 pass
0417 return None
0418
0419
0420
0421 def percentile(inList, percent, idMap):
0422 inList = sorted(copy.copy(inList))
0423 k = (len(inList) - 1) * float(percent) / 100
0424 f = math.floor(k)
0425 c = math.ceil(k)
0426 if f == c:
0427 retVal = inList[int(f)]
0428 return retVal, [retVal]
0429 val0 = inList[int(f)]
0430 val1 = inList[int(c)]
0431 d0 = val0 * (c - k)
0432 d1 = val1 * (k - f)
0433 retVal = d0 + d1
0434 return retVal, [val0, val1]
0435
0436
0437
0438 def getJobMaxWalltime(taskSpec, inputChunk, totalMasterEvents, jobSpec, siteSpec):
0439 try:
0440 if taskSpec.getCpuTime() is None:
0441
0442 jobSpec.maxWalltime = siteSpec.maxtime
0443 jobSpec.maxCpuCount = siteSpec.maxtime
0444 else:
0445 jobSpec.maxWalltime = taskSpec.getCpuTime()
0446 if jobSpec.maxWalltime is not None and jobSpec.maxWalltime > 0:
0447 jobSpec.maxWalltime *= totalMasterEvents
0448 if siteSpec.coreCount > 0:
0449 jobSpec.maxWalltime /= float(siteSpec.coreCount)
0450 if siteSpec.corepower not in [0, None]:
0451 jobSpec.maxWalltime /= siteSpec.corepower
0452 if taskSpec.cpuEfficiency not in [None, 0]:
0453 jobSpec.maxWalltime /= float(taskSpec.cpuEfficiency) / 100.0
0454 if taskSpec.baseWalltime is not None:
0455 jobSpec.maxWalltime += taskSpec.baseWalltime
0456 jobSpec.maxWalltime = int(jobSpec.maxWalltime)
0457 if taskSpec.useHS06():
0458 jobSpec.maxCpuCount = jobSpec.maxWalltime
0459 except Exception:
0460 pass
0461
0462
0463
0464 def use_direct_io_for_job(task_spec, site_spec, input_chunk):
0465
0466 if input_chunk and input_chunk.isMerging:
0467 return False
0468
0469 if site_spec.always_use_direct_io():
0470 return True
0471
0472 if task_spec.useLocalIO():
0473 return False
0474
0475 if task_spec.allowInputLAN() is not None and site_spec.isDirectIO():
0476 return True
0477 return False
0478
0479
0480
0481 class StopWatch:
0482 """Utility class to measure timing information."""
0483
0484 def __init__(self, identifier: str = None):
0485 self.start_time = datetime.datetime.now()
0486 self.checkpoint = self.start_time
0487 self.step_name = None
0488 self.identifier = identifier
0489
0490 def reset(self):
0491 """Reset the stopwatch."""
0492 self.start_time = datetime.datetime.now()
0493 self.checkpoint = self.start_time
0494 self.step_name = None
0495
0496 def get_elapsed_time(self, new_step_name: str) -> str:
0497 """Get the elapsed time since the stopwatch was started and the duration since the last checkpoint.
0498 :param new_step_name: The name of the next step.
0499 Returns:
0500 str: A string with the elapsed time and the duration since the last checkpoint.
0501 """
0502 now = datetime.datetime.now()
0503 total_delta = now - self.start_time
0504 duration_delta = now - self.checkpoint
0505 return_str = ""
0506 if self.identifier:
0507 return_str += f"{self.identifier}: "
0508 return_str += f"elapsed {total_delta.seconds}.{int(total_delta.microseconds / 1000):03d} sec. "
0509 if self.step_name is not None:
0510 return_str += f"{self.step_name} took {duration_delta.seconds}.{int(duration_delta.microseconds / 1000):03d} sec. "
0511 if new_step_name:
0512 return_str += f"{new_step_name} started."
0513 else:
0514 return_str += "done."
0515 self.checkpoint = now
0516 self.step_name = new_step_name
0517 return return_str
0518
0519
0520
0521 def make_reassign_comment(site: str = None, cloud: str = None, nucleus: str = None, mode: str = None) -> str:
0522 """
0523 Construct execution comment to reassign a task to a site, cloud or nucleus with different modes.
0524
0525 :param site: target site for reassignment
0526 :param cloud: target cloud for reassignment
0527 :param nucleus: target nucleus for reassignment
0528 :param mode: can be "nokill" to avoid killing running jobs and "soft" to trigger rebrokerage without killing running jobs
0529
0530 :return: execution comment for task reassignment
0531 """
0532
0533 if site is not None:
0534 comment = f"site:{site}:y"
0535 else:
0536 if nucleus is not None:
0537 comment = f"nucleus:{nucleus}"
0538 else:
0539 comment = f"cloud:{cloud}"
0540
0541 comment += ":n"
0542
0543
0544 if mode == "nokill":
0545 comment += ":nokill reassign"
0546 elif mode == "soft":
0547 comment += ":soft reassign"
0548
0549 return comment
0550
0551
0552
0553 def parse_reassign_comment(comment: str) -> dict:
0554 """
0555 Parse execution comment to get task reassignment instructions, including target site/cloud/nucleus, whether to go back to old status and additional modes.
0556
0557 :param comment: execution comment for task reassignment
0558
0559 :return: a dictionary with keys "target", "value", "back_to_old_status" and "mode" (if any)
0560 """
0561 info = {"target": "site", "value": None, "back_to_old_status": False, "mode": None}
0562 try:
0563 items = comment.split(":")
0564 if len(items) >= 3:
0565 info["target"] = items[0]
0566 info["value"] = items[1]
0567 info["back_to_old_status"] = items[2] == "y"
0568 if len(items) > 3:
0569 info["mode"] = items[3]
0570 except Exception:
0571 pass
0572 return info