Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 08:38:41

0001 import copy
0002 import datetime
0003 import json
0004 import math
0005 import os
0006 import re
0007 import subprocess
0008 from threading import Lock
0009 
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 
0012 
0013 # replacement for commands
0014 def commands_get_status_output(com):
0015     data = ""
0016     try:
0017         p = subprocess.Popen(
0018             com,
0019             shell=True,
0020             universal_newlines=True,
0021             stdout=subprocess.PIPE,
0022             stderr=subprocess.STDOUT,
0023         )
0024         data, unused_err = p.communicate()
0025         retcode = p.poll()
0026         if retcode:
0027             ex = subprocess.CalledProcessError(retcode, com)
0028             raise ex
0029         status = 0
0030     except subprocess.CalledProcessError as ex:
0031         status = ex.returncode
0032     if data[-1:] == "\n":
0033         data = data[:-1]
0034     return status, data
0035 
0036 
0037 # extract name from DN
0038 def clean_user_id(id):
0039     try:
0040         up = re.compile("/(DC|O|OU|C|L)=[^\/]+")
0041         username = up.sub("", id)
0042         up2 = re.compile("/CN=[0-9]+")
0043         username = up2.sub("", username)
0044         up3 = re.compile(" [0-9]+")
0045         username = up3.sub("", username)
0046         up4 = re.compile("_[0-9]+")
0047         username = up4.sub("", username)
0048         username = username.replace("/CN=proxy", "")
0049         username = username.replace("/CN=limited proxy", "")
0050         username = username.replace("limited proxy", "")
0051         username = re.sub("/CN=Robot:[^/]+", "", username)
0052         username = re.sub("/CN=nickname:[^/]+", "", username)
0053         pat = re.compile(".*/CN=([^\/]+)/CN=([^\/]+)")
0054         mat = pat.match(username)
0055         if mat:
0056             username = mat.group(2)
0057         else:
0058             username = username.replace("/CN=", "")
0059         if username.lower().find("/email") > 0:
0060             username = username[: username.lower().find("/email")]
0061         pat = re.compile(".*(limited.*proxy).*")
0062         mat = pat.match(username)
0063         if mat:
0064             username = mat.group(1)
0065         username = username.replace("(", "")
0066         username = username.replace(")", "")
0067         username = username.replace("'", "")
0068         name_wo_email = re.sub(r" [a-z][\w\.-]+@[\w\.-]+(?:\.\w+)+", "", username).strip()
0069         if " " in name_wo_email:
0070             username = name_wo_email
0071         return username
0072     except Exception:
0073         return id
0074 
0075 
0076 # extract bare string from DN
0077 def get_bare_dn(dn, keep_proxy=False, keep_digits=True):
0078     dn = re.sub("/CN=limited proxy", "", dn)
0079     if keep_proxy:
0080         dn = re.sub("/CN=proxy(/CN=proxy)+", "/CN=proxy", dn)
0081     else:
0082         dn = re.sub("(/CN=proxy)+", "", dn)
0083     if not keep_digits:
0084         dn = re.sub(r"(/CN=\d+)+$", "", dn)
0085     return dn
0086 
0087 
0088 # extract id string from DN
0089 def get_id_from_dn(dn, keep_proxy=False, keep_digits=True):
0090     m = re.search("/CN=nickname:([^/]+)", dn)
0091     if m:
0092         return m.group(1)
0093     return get_bare_dn(dn, keep_proxy, keep_digits)
0094 
0095 
0096 def get_distinguished_name_list(distinguished_name: str) -> list:
0097     """
0098     Get a list of possible distinguished names from a string, including legacy and RFC formats.
0099 
0100     Args:
0101         distinguished_name (str): The distinguished name string.
0102 
0103     Returns:
0104         list: A list of possible distinguished names.
0105     """
0106     name_list = []
0107     # characters to be escaped in RFC format
0108     trans_table = str.maketrans({",": r"\,", "+": r"\+", '"': r"\"", "\\": r"\\", "<": r"\<", ">": r"\>", ";": r"\;"})
0109     # loop over distinguished name and without-CN form
0110     for tmp_name in [distinguished_name, get_bare_dn(distinguished_name, keep_digits=False)]:
0111         name_list.append(tmp_name)
0112         # replace backslashes with commas and reverse substrings to be converted to RFC format
0113         tmp_list = [s.translate(trans_table) for s in tmp_name.split("/") if s]
0114         name_list.append(",".join(tmp_list[::-1]))
0115     return name_list
0116 
0117 
0118 def normalize_cpu_model(cpu_model):
0119     cpu_model = cpu_model.upper()
0120     # Remove GHz, cache sizes, and redundant words
0121     cpu_model = re.sub(r"@\s*\d+(\.\d+)?\s*GHZ", "", cpu_model)
0122     cpu_model = re.sub(r"\d+\s*KB", "", cpu_model)
0123     cpu_model = re.sub(r"CORE PROCESSOR\s*\([^)]*\)", "", cpu_model)
0124     cpu_model = re.sub(r"\b\d+-CORE\b", "", cpu_model)
0125     cpu_model = re.sub(r"CPU|CORE|PROCESSOR|SCALABLE|\(R\)|\(TM\)", "", cpu_model)
0126     cpu_model = re.sub(r"\s+", " ", cpu_model)
0127     cpu_model = cpu_model.strip()
0128     return cpu_model
0129 
0130 
0131 def clean_host_name(host_name):
0132     # If the worker node comes in the slot1@worker1.example.com format, we remove the slot1@ part
0133     match = re.search(r"@(.+)", host_name)
0134     host_name = match.group(1) if match else host_name
0135 
0136     # Special handling for ATLAS worker nodes to extract the third field of the hostname, since the first 2 fields are not unique
0137     # e.g. atlprd55-xyz-<third_field>.cern.ch
0138     match = re.match(r"^atlprd\d+-[^-]+-([^.]+\.cern\.ch)$", host_name)
0139     host_name = match.group(1) if match else host_name
0140 
0141     return host_name
0142 
0143 
0144 # resolve string bool
0145 def resolve_bool(param):
0146     if isinstance(param, bool):
0147         return param
0148     if param == "True":
0149         return True
0150     if param == "False":
0151         return False
0152     return param
0153 
0154 
0155 # cached object
0156 class CachedObject:
0157     # constructor
0158     def __init__(self, name, time_interval, update_func, log_stream):
0159         # name
0160         self.name = name
0161         # cached object
0162         self.cachedObj = None
0163         # datetime of last updated
0164         self.lastUpdated = naive_utcnow()
0165         # update frequency
0166         self.timeInterval = datetime.timedelta(seconds=time_interval)
0167         # lock
0168         self.lock = Lock()
0169         # function to update object
0170         self.updateFunc = update_func
0171         # log
0172         self.log_stream = log_stream
0173 
0174     # update obj
0175     def update(self):
0176         # lock
0177         self.lock.acquire()
0178         # get current datetime
0179         current = naive_utcnow()
0180         # update if old
0181         if self.cachedObj is None or current - self.lastUpdated > self.timeInterval:
0182             self.log_stream.debug(f"PID={os.getpid()} renewing {self.name} cache")
0183             try:
0184                 tmp_stat, tmp_out = self.updateFunc()
0185                 self.log_stream.debug(f"PID={os.getpid()} got for {self.name} {tmp_stat} {str(tmp_out)}")
0186                 if tmp_stat:
0187                     self.cachedObj = tmp_out
0188             except Exception as e:
0189                 self.log_stream.error(f"PID={os.getpid()} failed to renew {self.name} due to {str(e)}")
0190             self.lastUpdated = current
0191         # release
0192         self.lock.release()
0193         # return
0194         return
0195 
0196     # contains
0197     def __contains__(self, item):
0198         self.update()
0199         contains = False
0200         try:
0201             contains = item in self.cachedObj
0202         except TypeError:
0203             pass
0204         return contains
0205 
0206     # get item
0207     def __getitem__(self, name):
0208         self.update()
0209         return self.cachedObj[name]
0210 
0211     # get method
0212     def get(self, *var):
0213         return self.cachedObj.get(*var)
0214 
0215     # get object
0216     def get_object(self):
0217         self.lock.acquire()
0218         return self.cachedObj
0219 
0220     # release object
0221     def release_object(self):
0222         self.lock.release()
0223 
0224 
0225 # dictionary of caches
0226 class CacheDict:
0227     """
0228     Dictionary of caches with periodic cleanup
0229     """
0230 
0231     # constructor
0232     def __init__(self, update_interval=10, cleanup_interval=60):
0233         self.idx = 0
0234         self.lock = Lock()
0235         self.cache_dict = {}
0236         self.update_interval = datetime.timedelta(minutes=update_interval)
0237         self.cleanup_interval = datetime.timedelta(minutes=cleanup_interval)
0238         self.last_cleanup = naive_utcnow()
0239 
0240     def cleanup(self, tmp_log):
0241         """
0242         Cleanup caches
0243         :param tmp_log: logger
0244         """
0245         with self.lock:
0246             current = naive_utcnow()
0247             if current - self.last_cleanup > self.cleanup_interval:
0248                 for name in list(self.cache_dict):
0249                     cache = self.cache_dict[name]
0250                     if current - cache["last_updated"] > self.cleanup_interval:
0251                         tmp_log.debug(f"""deleting cache #{self.cache_dict[name]["idx"]}""")
0252                         del self.cache_dict[name]
0253                 self.last_cleanup = current
0254 
0255     def get(self, name, tmp_log, update_func, *update_args, **update_kwargs):
0256         """
0257         Get updated object
0258         :param name: name of cache
0259         :param tmp_log: logger
0260         :param update_func: function to update object
0261         :param update_args: arguments for update function
0262         :param update_kwargs: keyword arguments for update function
0263         :return: object or None
0264         """
0265         self.cleanup(tmp_log)
0266         with self.lock:
0267             obj = self.cache_dict.get(name)
0268             current = naive_utcnow()
0269             if not obj:
0270                 tmp_log.debug(f"creating new cache #{self.idx}")
0271                 # create new cache
0272                 obj = {
0273                     "obj": update_func(*update_args, **update_kwargs),
0274                     "last_updated": current,
0275                     "update_func": update_func,
0276                     "update_args": update_args,
0277                     "update_kwargs": update_kwargs,
0278                     "idx": self.idx,
0279                 }
0280                 self.cache_dict[name] = obj
0281                 self.idx += 1
0282             else:
0283                 # update if old
0284                 if current - obj["last_updated"] > self.update_interval:
0285                     tmp_log.debug(f"""updating cache #{obj["idx"]}""")
0286                     obj["obj"] = obj["update_func"](*obj["update_args"], **obj["update_kwargs"])
0287                     obj["last_updated"] = current
0288                 else:
0289                     tmp_log.debug(f"""reusing cache #{obj["idx"]}""")
0290             return obj["obj"]
0291 
0292 
0293 # convert datetime to string
0294 class NonJsonObjectEncoder(json.JSONEncoder):
0295     def default(self, obj):
0296         if isinstance(obj, datetime.datetime):
0297             return {"_datetime_object": obj.strftime("%Y-%m-%d %H:%M:%S.%f")}
0298         return json.JSONEncoder.default(self, obj)
0299 
0300 
0301 # hook for json decoder
0302 def as_python_object(dct):
0303     if "_datetime_object" in dct:
0304         return datetime.datetime.strptime(str(dct["_datetime_object"]), "%Y-%m-%d %H:%M:%S.%f")
0305     return dct
0306 
0307 
0308 # get effective file size
0309 def getEffectiveFileSize(fsize, startEvent, endEvent, nEvents):
0310     inMB = 1024 * 1024
0311     if fsize in [None, 0]:
0312         # use dummy size for pseudo input
0313         effectiveFsize = inMB
0314     elif nEvents is not None and startEvent is not None and endEvent is not None:
0315         # take event range into account
0316         effectiveFsize = int(float(fsize) * float(endEvent - startEvent + 1) / float(nEvents))
0317     else:
0318         effectiveFsize = fsize
0319     # use dummy size if input is too small
0320     if effectiveFsize == 0:
0321         effectiveFsize = inMB
0322     # in MB
0323     effectiveFsize = float(effectiveFsize) / inMB
0324     # return
0325     return effectiveFsize
0326 
0327 
0328 # get effective number of events
0329 def getEffectiveNumEvents(startEvent, endEvent, nEvents):
0330     if endEvent is not None and startEvent is not None:
0331         evtCounts = endEvent - startEvent + 1
0332         if evtCounts > 0:
0333             return evtCounts
0334         return 1
0335     if nEvents is not None:
0336         return nEvents
0337     return 1
0338 
0339 
0340 # get memory usage
0341 def getMemoryUsage():
0342     try:
0343         t = open(f"/proc/{os.getpid()}/status")
0344         v = t.read()
0345         t.close()
0346         value = 0
0347         for line in v.split("\n"):
0348             if line.startswith("VmRSS"):
0349                 items = line.split()
0350                 value = int(items[1])
0351                 if items[2] in ["kB", "KB"]:
0352                     value /= 1024
0353                 elif items[2] in ["mB", "MB"]:
0354                     pass
0355                 break
0356         return int(value)
0357     except Exception:
0358         return None
0359 
0360 
0361 # check process
0362 def checkProcess(pid):
0363     return os.path.exists(f"/proc/{pid}/status")
0364 
0365 
0366 # offset for walltime
0367 wallTimeOffset = 10 * 60
0368 
0369 
0370 # convert config parameters
0371 def convert_config_params(itemStr):
0372     items = itemStr.split(":")
0373     newItems = []
0374     for item in items:
0375         if item == "":
0376             newItems.append(None)
0377         elif "," in item:
0378             newItems.append(item.split(","))
0379         else:
0380             try:
0381                 newItems.append(int(item))
0382             except Exception:
0383                 newItems.append(item)
0384     return newItems
0385 
0386 
0387 # parse init params
0388 def parse_init_params(par):
0389     if isinstance(par, list):
0390         return par
0391     try:
0392         return par.split("|")
0393     except Exception:
0394         return [par]
0395 
0396 
0397 # get config param for vo and prodSourceLabel
0398 def getConfigParam(configStr, vo, sourceLabel):
0399     try:
0400         for _ in configStr.split(","):
0401             items = configStr.split(":")
0402             vos = items[0].split("|")
0403             sourceLabels = items[1].split("|")
0404             if vo not in ["", "any"] and vo not in vos and None not in vos and "any" not in vos and "" not in vos:
0405                 continue
0406             if (
0407                 sourceLabel not in ["", "any"]
0408                 and sourceLabel not in sourceLabels
0409                 and None not in sourceLabels
0410                 and "any" not in sourceLabels
0411                 and "" not in sourceLabels
0412             ):
0413                 continue
0414             return ",".join(items[2:])
0415     except Exception:
0416         pass
0417     return None
0418 
0419 
0420 # get percentile until numpy 1.5.X becomes available
0421 def percentile(inList, percent, idMap):
0422     inList = sorted(copy.copy(inList))
0423     k = (len(inList) - 1) * float(percent) / 100
0424     f = math.floor(k)
0425     c = math.ceil(k)
0426     if f == c:
0427         retVal = inList[int(f)]
0428         return retVal, [retVal]
0429     val0 = inList[int(f)]
0430     val1 = inList[int(c)]
0431     d0 = val0 * (c - k)
0432     d1 = val1 * (k - f)
0433     retVal = d0 + d1
0434     return retVal, [val0, val1]
0435 
0436 
0437 # get max walltime and cpu count
0438 def getJobMaxWalltime(taskSpec, inputChunk, totalMasterEvents, jobSpec, siteSpec):
0439     try:
0440         if taskSpec.getCpuTime() is None:
0441             # use PQ maxtime when CPU time is not defined
0442             jobSpec.maxWalltime = siteSpec.maxtime
0443             jobSpec.maxCpuCount = siteSpec.maxtime
0444         else:
0445             jobSpec.maxWalltime = taskSpec.getCpuTime()
0446             if jobSpec.maxWalltime is not None and jobSpec.maxWalltime > 0:
0447                 jobSpec.maxWalltime *= totalMasterEvents
0448                 if siteSpec.coreCount > 0:
0449                     jobSpec.maxWalltime /= float(siteSpec.coreCount)
0450                 if siteSpec.corepower not in [0, None]:
0451                     jobSpec.maxWalltime /= siteSpec.corepower
0452             if taskSpec.cpuEfficiency not in [None, 0]:
0453                 jobSpec.maxWalltime /= float(taskSpec.cpuEfficiency) / 100.0
0454             if taskSpec.baseWalltime is not None:
0455                 jobSpec.maxWalltime += taskSpec.baseWalltime
0456             jobSpec.maxWalltime = int(jobSpec.maxWalltime)
0457             if taskSpec.useHS06():
0458                 jobSpec.maxCpuCount = jobSpec.maxWalltime
0459     except Exception:
0460         pass
0461 
0462 
0463 # use direct IO for job
0464 def use_direct_io_for_job(task_spec, site_spec, input_chunk):
0465     # not for merging
0466     if input_chunk and input_chunk.isMerging:
0467         return False
0468     # always
0469     if site_spec.always_use_direct_io():
0470         return True
0471     # force copy-to-scratch
0472     if task_spec.useLocalIO():
0473         return False
0474     # depends on task and site specs
0475     if task_spec.allowInputLAN() is not None and site_spec.isDirectIO():
0476         return True
0477     return False
0478 
0479 
0480 # stopwatch
0481 class StopWatch:
0482     """Utility class to measure timing information."""
0483 
0484     def __init__(self, identifier: str = None):
0485         self.start_time = datetime.datetime.now()
0486         self.checkpoint = self.start_time
0487         self.step_name = None
0488         self.identifier = identifier
0489 
0490     def reset(self):
0491         """Reset the stopwatch."""
0492         self.start_time = datetime.datetime.now()
0493         self.checkpoint = self.start_time
0494         self.step_name = None
0495 
0496     def get_elapsed_time(self, new_step_name: str) -> str:
0497         """Get the elapsed time since the stopwatch was started and the duration since the last checkpoint.
0498         :param new_step_name: The name of the next step.
0499         Returns:
0500             str: A string with the elapsed time and the duration since the last checkpoint.
0501         """
0502         now = datetime.datetime.now()
0503         total_delta = now - self.start_time
0504         duration_delta = now - self.checkpoint
0505         return_str = ""
0506         if self.identifier:
0507             return_str += f"{self.identifier}: "
0508         return_str += f"elapsed {total_delta.seconds}.{int(total_delta.microseconds / 1000):03d} sec. "
0509         if self.step_name is not None:
0510             return_str += f"{self.step_name} took {duration_delta.seconds}.{int(duration_delta.microseconds / 1000):03d} sec. "
0511         if new_step_name:
0512             return_str += f"{new_step_name} started."
0513         else:
0514             return_str += "done."
0515         self.checkpoint = now
0516         self.step_name = new_step_name
0517         return return_str
0518 
0519 
0520 # construct execution comment to reassign a task
0521 def make_reassign_comment(site: str = None, cloud: str = None, nucleus: str = None, mode: str = None) -> str:
0522     """
0523     Construct execution comment to reassign a task to a site, cloud or nucleus with different modes.
0524 
0525     :param site: target site for reassignment
0526     :param cloud: target cloud for reassignment
0527     :param nucleus: target nucleus for reassignment
0528     :param mode: can be "nokill" to avoid killing running jobs and "soft" to trigger rebrokerage without killing running jobs
0529 
0530     :return: execution comment for task reassignment
0531     """
0532     # reassign to site, nucleus or cloud. Note that Prodsys use blah="" to trigger rebrokerage
0533     if site is not None:
0534         comment = f"site:{site}:y"  # set 'y' to go back to oldStatus bypassing rebrokerage
0535     else:
0536         if nucleus is not None:
0537             comment = f"nucleus:{nucleus}"
0538         else:
0539             comment = f"cloud:{cloud}"
0540         # set 'y' to trigger rebrokerage
0541         comment += ":n"
0542 
0543     # set additional modes
0544     if mode == "nokill":
0545         comment += ":nokill reassign"
0546     elif mode == "soft":
0547         comment += ":soft reassign"
0548 
0549     return comment
0550 
0551 
0552 # parse execution comment to get task reassignment instructions
0553 def parse_reassign_comment(comment: str) -> dict:
0554     """
0555     Parse execution comment to get task reassignment instructions, including target site/cloud/nucleus, whether to go back to old status and additional modes.
0556 
0557     :param comment: execution comment for task reassignment
0558 
0559     :return: a dictionary with keys "target", "value", "back_to_old_status" and "mode" (if any)
0560     """
0561     info = {"target": "site", "value": None, "back_to_old_status": False, "mode": None}
0562     try:
0563         items = comment.split(":")
0564         if len(items) >= 3:
0565             info["target"] = items[0]
0566             info["value"] = items[1]
0567             info["back_to_old_status"] = items[2] == "y"
0568             if len(items) > 3:
0569                 info["mode"] = items[3]
0570     except Exception:
0571         pass
0572     return info