File indexing completed on 2026-04-20 07:58:57
0001 """
0002 utilities
0003
0004 """
0005
0006 import base64
0007 import codecs
0008 import fcntl
0009 import functools
0010 import inspect
0011 import json
0012 import math
0013 import os
0014 import pickle
0015 import random
0016 import socket
0017 import sys
0018 import threading
0019 import time
0020 import traceback
0021 import uuid
0022 import zlib
0023 from contextlib import contextmanager
0024 from datetime import datetime, timedelta, timezone
0025 from threading import get_ident
0026
0027 import Cryptodome.Cipher.AES
0028 import Cryptodome.Hash.HMAC
0029 import Cryptodome.Random
0030 from pandalogger.LogWrapper import LogWrapper
0031 from pandalogger.PandaLogger import PandaLogger
0032
0033 from pandaharvester.harvesterconfig import harvester_config
0034
0035 from .event_spec import EventSpec
0036 from .file_spec import FileSpec
0037 from .work_spec import WorkSpec
0038
0039 with_memory_profile = False
0040
0041
0042
0043 sync_lock = threading.Lock()
0044
0045
0046
0047
0048
0049
0050
0051 def synchronize(func):
0052 """synchronize decorator"""
0053
0054 @functools.wraps(func)
0055 def wrapper(*args, **kwargs):
0056 with sync_lock:
0057 return func(*args, **kwargs)
0058
0059 return wrapper
0060
0061
0062
0063
0064
0065
0066
0067
0068 class StopWatch(object):
0069
0070 def __init__(self):
0071 self.start_time = time.monotonic()
0072
0073
0074 def get_elapsed_time(self):
0075 time_diff = self.get_elapsed_time_in_sec()
0076 return f" : took {time_diff:.3f} sec"
0077
0078
0079 def get_elapsed_time_in_sec(self):
0080 return time.monotonic() - self.start_time
0081
0082
0083 def reset(self):
0084 self.start_time = time.monotonic()
0085
0086
0087
0088 class MapWithLock(object):
0089 def __init__(self):
0090 self.lock = threading.Lock()
0091 self.dataMap = dict()
0092
0093 def __getitem__(self, item):
0094 ret = self.dataMap.__getitem__(item)
0095 return ret
0096
0097 def __setitem__(self, item, value):
0098 self.dataMap.__setitem__(item, value)
0099
0100 def __contains__(self, item):
0101 ret = self.dataMap.__contains__(item)
0102 return ret
0103
0104 def acquire(self):
0105 self.lock.acquire()
0106
0107 def release(self):
0108 self.lock.release()
0109
0110 def iteritems(self):
0111 return self.dataMap.items()
0112
0113
0114
0115 global_dict = MapWithLock()
0116
0117
0118
0119 class SingletonWithID(type):
0120 def __init__(cls, *args, **kwargs):
0121 cls.__instance = {}
0122 super(SingletonWithID, cls).__init__(*args, **kwargs)
0123
0124 @synchronize
0125 def __call__(cls, *args, **kwargs):
0126 obj_id = str(kwargs.get("id", ""))
0127 if obj_id not in cls.__instance:
0128 cls.__instance[obj_id] = super(SingletonWithID, cls).__call__(*args, **kwargs)
0129 return cls.__instance.get(obj_id)
0130
0131
0132
0133 class SingletonWithThreadAndID(type):
0134 def __init__(cls, *args, **kwargs):
0135 cls.__instance = {}
0136 super(SingletonWithThreadAndID, cls).__init__(*args, **kwargs)
0137
0138 @synchronize
0139 def __call__(cls, *args, **kwargs):
0140 thread_id = get_ident()
0141 obj_id = (thread_id, str(kwargs.get("id", "")))
0142 if obj_id not in cls.__instance:
0143 cls.__instance[obj_id] = super(SingletonWithThreadAndID, cls).__call__(*args, **kwargs)
0144 return cls.__instance.get(obj_id)
0145
0146
0147
0148 class DictTupleHybrid(tuple):
0149 def set_attributes(self, attributes):
0150 self.attributes = attributes
0151
0152 def _asdict(self):
0153 return dict(zip(self.attributes, self))
0154
0155
0156
0157 class SafeDict(dict):
0158 def __missing__(self, key):
0159 return "{" + key + "}"
0160
0161
0162
0163
0164
0165
0166
0167
0168 def enable_memory_profiling():
0169 global with_memory_profile
0170 with_memory_profile = True
0171
0172
0173
0174 def setup_logger(name=None):
0175 if name is None:
0176 frm = inspect.stack()[1][0]
0177 mod = inspect.getmodule(frm)
0178 name = mod.__name__.split(".")[-1]
0179 try:
0180 log_level = getattr(harvester_config.log_level, name)
0181 return PandaLogger().getLogger(name, log_level=log_level)
0182 except Exception:
0183 pass
0184 return PandaLogger().getLogger(name)
0185
0186
0187
0188 def make_logger(tmp_log, token=None, method_name=None, hook=None):
0189
0190 if method_name is None:
0191 tmp_str = inspect.stack()[1][3]
0192 else:
0193 tmp_str = method_name
0194 if token is not None:
0195 tmp_str += f" <{token}>"
0196 else:
0197 tmp_str += " :".format(token)
0198 new_log = LogWrapper(tmp_log, tmp_str, seeMem=with_memory_profile, hook=hook)
0199 return new_log
0200
0201
0202
0203 def dump_error_message(tmp_log, err_str=None, no_message=False):
0204 if not isinstance(tmp_log, LogWrapper):
0205 method_name = f"{inspect.stack()[1][3]} : "
0206 else:
0207 method_name = ""
0208
0209 if err_str is None:
0210 err_type, err_value = sys.exc_info()[:2]
0211 err_str = f"{method_name} {err_type.__name__} {err_value} {traceback.format_exc()}"
0212 if not no_message:
0213 tmp_log.error(err_str)
0214 return err_str
0215
0216
0217
0218 def sleep(interval, stop_event, randomize=True):
0219 if randomize and interval > 0:
0220 random_interval = random.randint(int(interval * 0.4), int(interval * 1.4))
0221 else:
0222 random_interval = interval
0223 if stop_event is None:
0224 time.sleep(random_interval)
0225 else:
0226 i = 0
0227 while True:
0228 if stop_event.is_set():
0229 return True
0230 if i >= random_interval:
0231 break
0232 stop_event.wait(1)
0233 i += 1
0234 return False
0235
0236
0237
0238 def make_pool_file_catalog(jobspec_list):
0239 xml_str = """<?xml version="1.0" ?>
0240 <!DOCTYPE POOLFILECATALOG SYSTEM "InMemory">
0241 <POOLFILECATALOG>
0242 """
0243
0244 done_lfns = set()
0245 for jobSpec in jobspec_list:
0246 input_files = jobSpec.get_input_file_attributes()
0247 for input_lfn, input_file in input_files.items():
0248 if input_lfn in done_lfns:
0249 continue
0250 done_lfns.add(input_lfn)
0251 xml_str += f""" <File ID="{input_file['guid']}">
0252 <physical>
0253 <pfn filetype="ROOT_All" name="{input_lfn}"/>
0254 </physical>
0255 <logical/>
0256 </File>
0257 """
0258
0259 xml_str += "</POOLFILECATALOG>"
0260 return xml_str
0261
0262
0263
0264 def calc_adler32(file_name):
0265 val = 1
0266 block_size = 32 * 1024 * 1024
0267 with open(file_name, "rb") as fp:
0268 while True:
0269 data = fp.read(block_size)
0270 if not data:
0271 break
0272 val = zlib.adler32(data, val)
0273 if val < 0:
0274 val += 2**32
0275 return hex(val)[2:10].zfill(8).lower()
0276
0277
0278
0279 def get_output_file_report(jobspec):
0280 if jobspec.outputFilesToReport is not None:
0281 return jobspec.outputFilesToReport
0282 report = {}
0283
0284 for fileSpec in jobspec.outFiles:
0285
0286 if fileSpec.status != "finished":
0287 continue
0288
0289 if "guid" in fileSpec.fileAttributes:
0290 guid = fileSpec.fileAttributes["guid"]
0291 elif fileSpec.fileType == "log":
0292 guid = jobspec.get_logfile_info()["guid"]
0293 else:
0294 guid = str(uuid.uuid4())
0295
0296 if fileSpec.chksum is not None and ":" in fileSpec.chksum:
0297 chksum = fileSpec.chksum.split(":")[-1]
0298 else:
0299 chksum = fileSpec.chksum
0300 report[fileSpec.lfn] = {"guid": guid, "fsize": fileSpec.fsize, "adler32": chksum}
0301 return json.dumps(report)
0302
0303
0304 def create_shards(input_list, size):
0305 """
0306 Creates shards of size n from the input list.
0307 """
0308 shard, i = [], 0
0309 for element in input_list:
0310 shard.append(element)
0311 i += 1
0312 if i == size:
0313 yield shard
0314 shard, i = [], 0
0315
0316 if i > 0:
0317 yield shard
0318
0319
0320
0321 def update_job_attributes_with_workers(map_type, jobspec_list, workspec_list, files_to_stage_out_list, events_to_update_list):
0322 if map_type in [WorkSpec.MT_OneToOne, WorkSpec.MT_MultiJobs]:
0323 workSpec = workspec_list[0]
0324 for jobSpec in jobspec_list:
0325 jobSpec.set_attributes(workSpec.workAttributes)
0326
0327 try:
0328 del workSpec.workAttributes[jobSpec.PandaID]["metaData"]
0329 except Exception:
0330 pass
0331
0332 if workSpec.status in [WorkSpec.ST_running]:
0333 jobSpec.set_start_time()
0334 elif workSpec.pilot_closed:
0335 jobSpec.reset_start_end_time()
0336 elif workSpec.is_final_status():
0337 jobSpec.set_end_time()
0338
0339 if workSpec.nCore is not None and jobSpec.nCore is None:
0340 try:
0341 jobSpec.nCore = int(workSpec.nCore / len(jobspec_list))
0342 if jobSpec.nCore == 0:
0343 jobSpec.nCore = 1
0344 except Exception:
0345 pass
0346
0347 if not jobSpec.has_attribute("batchID"):
0348 if workSpec.batchID is not None:
0349 jobSpec.set_one_attribute("batchID", workSpec.batchID)
0350
0351 outFileAttrs = jobSpec.get_output_file_attributes()
0352 for tmpWorkerID, files_to_stage_out in files_to_stage_out_list.items():
0353 if jobSpec.PandaID in files_to_stage_out:
0354 for lfn, fileAttersList in files_to_stage_out[jobSpec.PandaID].items():
0355 for fileAtters in fileAttersList:
0356 fileSpec = FileSpec()
0357 fileSpec.lfn = lfn
0358 fileSpec.PandaID = jobSpec.PandaID
0359 fileSpec.taskID = jobSpec.taskID
0360 fileSpec.path = fileAtters["path"]
0361 fileSpec.fsize = fileAtters["fsize"]
0362 fileSpec.fileType = fileAtters["type"]
0363 fileSpec.fileAttributes = fileAtters
0364 fileSpec.workerID = tmpWorkerID
0365 if "isZip" in fileAtters:
0366 fileSpec.isZip = fileAtters["isZip"]
0367 if "chksum" in fileAtters:
0368 fileSpec.chksum = fileAtters["chksum"]
0369 if "eventRangeID" in fileAtters:
0370 fileSpec.eventRangeID = fileAtters["eventRangeID"]
0371
0372 try:
0373 provenanceID = fileSpec.eventRangeID.split("-")[2]
0374 except Exception:
0375 provenanceID = None
0376 fileSpec.provenanceID = provenanceID
0377 if lfn in outFileAttrs:
0378 fileSpec.scope = outFileAttrs[lfn]["scope"]
0379 jobSpec.add_out_file(fileSpec)
0380
0381 for events_to_update in events_to_update_list:
0382 if jobSpec.PandaID in events_to_update:
0383 for data in events_to_update[jobSpec.PandaID]:
0384 eventSpec = EventSpec()
0385 eventSpec.from_data(data, jobSpec.PandaID)
0386 jobSpec.add_event(eventSpec, None)
0387 statusInJobAttr = jobSpec.get_job_status_from_attributes()
0388 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(statusInJobAttr)
0389 if workSpec.pilot_closed:
0390 jobSpec.set_pilot_closed()
0391 if workSpec.new_status:
0392 jobSpec.trigger_propagation()
0393 elif map_type == WorkSpec.MT_MultiWorkers:
0394 jobSpec = jobspec_list[0]
0395
0396 allDone = True
0397 isRunning = False
0398 oneFinished = False
0399 oneFailed = False
0400 nCore = 0
0401 nCoreTime = 0
0402 for workSpec in workspec_list:
0403 if workSpec.new_status:
0404 jobSpec.trigger_propagation()
0405
0406 if workSpec.status in [WorkSpec.ST_running]:
0407 isRunning = True
0408
0409 jobSpec.set_start_time()
0410 nCore += workSpec.nCore
0411
0412 if workSpec.is_final_status():
0413 if workSpec.startTime is not None and workSpec.endTime is not None:
0414 nCoreTime += workSpec.nCore * (workSpec.endTime - workSpec.startTime).total_seconds()
0415 if workSpec.status == WorkSpec.ST_finished:
0416 oneFinished = True
0417 elif workSpec.status == WorkSpec.ST_failed:
0418 oneFailed = True
0419 else:
0420
0421 allDone = False
0422
0423 if allDone:
0424
0425 jobSpec.set_end_time()
0426
0427 if jobSpec.startTime is not None:
0428 total_time = (jobSpec.endTime - jobSpec.startTime).total_seconds()
0429 if total_time > 0:
0430 jobSpec.nCore = float(nCoreTime) / float(total_time)
0431 jobSpec.nCore = int(math.ceil(jobSpec.nCore))
0432
0433 jobSpec.moreWorkers = 0
0434 else:
0435
0436 jobSpec.nCore = nCore
0437
0438
0439
0440
0441
0442
0443 outFileAttrs = jobSpec.get_output_file_attributes()
0444 for tmpWorkerID, files_to_stage_out in files_to_stage_out_list.items():
0445 if jobSpec.PandaID in files_to_stage_out:
0446 for lfn, fileAttersList in files_to_stage_out[jobSpec.PandaID].items():
0447 for fileAtters in fileAttersList:
0448 fileSpec = FileSpec()
0449 fileSpec.lfn = lfn
0450 fileSpec.PandaID = jobSpec.PandaID
0451 fileSpec.taskID = jobSpec.taskID
0452 fileSpec.path = fileAtters["path"]
0453 fileSpec.fsize = fileAtters["fsize"]
0454 fileSpec.fileType = fileAtters["type"]
0455 fileSpec.fileAttributes = fileAtters
0456 fileSpec.workerID = tmpWorkerID
0457 if "isZip" in fileAtters:
0458 fileSpec.isZip = fileAtters["isZip"]
0459 if "chksum" in fileAtters:
0460 fileSpec.chksum = fileAtters["chksum"]
0461 if "eventRangeID" in fileAtters:
0462 fileSpec.eventRangeID = fileAtters["eventRangeID"]
0463
0464 try:
0465 provenanceID = fileSpec.eventRangeID.split("-")[2]
0466 except Exception:
0467 provenanceID = None
0468 fileSpec.provenanceID = provenanceID
0469 if lfn in outFileAttrs:
0470 fileSpec.scope = outFileAttrs[lfn]["scope"]
0471 jobSpec.add_out_file(fileSpec)
0472
0473 for events_to_update in events_to_update_list:
0474 if jobSpec.PandaID in events_to_update:
0475 for data in events_to_update[jobSpec.PandaID]:
0476 eventSpec = EventSpec()
0477 eventSpec.from_data(data, jobSpec.PandaID)
0478 jobSpec.add_event(eventSpec, None)
0479
0480 workSpec = workspec_list[0]
0481 if allDone:
0482 if oneFinished:
0483 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_finished)
0484 elif oneFailed:
0485 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_failed)
0486 else:
0487 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_cancelled)
0488 else:
0489 if isRunning or jobSpec.status == "running":
0490 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_running)
0491 else:
0492 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_submitted)
0493 return True
0494
0495
0496
0497 def do_log_rollover():
0498 PandaLogger.doRollOver()
0499
0500
0501
0502 def get_stopwatch():
0503 return StopWatch()
0504
0505
0506
0507 def get_global_dict():
0508 return global_dict
0509
0510
0511
0512 @contextmanager
0513 def get_file_lock(file_name, lock_interval):
0514 if os.path.exists(file_name):
0515 opt = "r+"
0516 else:
0517 opt = "w+"
0518 with open(file_name, opt) as f:
0519 locked = False
0520 try:
0521
0522 fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
0523 locked = True
0524
0525 timeNow = naive_utcnow()
0526 toSkip = False
0527 try:
0528 s = f.read()
0529 pTime = datetime.strptime(s, "%Y-%m-%d %H:%M:%S.%f")
0530 if timeNow - pTime < timedelta(seconds=lock_interval):
0531 toSkip = True
0532 except Exception:
0533 pass
0534
0535 if toSkip:
0536 raise IOError("skipped since still in locked interval")
0537
0538 f.seek(0)
0539 f.write(timeNow.strftime("%Y-%m-%d %H:%M:%S.%f"))
0540 f.truncate()
0541
0542 yield
0543 finally:
0544
0545 if locked:
0546 fcntl.flock(f, fcntl.LOCK_UN)
0547
0548
0549
0550 def convert_phrase_to_key(key_phrase):
0551 h = Cryptodome.Hash.HMAC.new(key_phrase)
0552 return h.hexdigest()
0553
0554
0555
0556 def encrypt_string(key_phrase, plain_text):
0557 k = convert_phrase_to_key(key_phrase)
0558 v = Cryptodome.Random.new().read(Cryptodome.Cipher.AES.block_size)
0559 c = Cryptodome.Cipher.AES.new(k, Cryptodome.Cipher.AES.MODE_CFB, v)
0560 return base64.b64encode(v + c.encrypt(plain_text))
0561
0562
0563
0564 def decrypt_string(key_phrase, cipher_text):
0565 cipher_text = base64.b64decode(cipher_text)
0566 k = convert_phrase_to_key(key_phrase)
0567 v = cipher_text[: Cryptodome.Cipher.AES.block_size]
0568 c = Cryptodome.Cipher.AES.new(k, Cryptodome.Cipher.AES.MODE_CFB, v)
0569 cipher_text = cipher_text[Cryptodome.Cipher.AES.block_size :]
0570 return c.decrypt(cipher_text)
0571
0572
0573
0574 def set_file_permission(path):
0575 if not os.path.exists(path):
0576 return
0577 targets = []
0578 if os.path.isfile(path):
0579 targets += [path]
0580 else:
0581 for root, dirs, files in os.walk(path):
0582 targets += [os.path.join(root, f) for f in files]
0583 umask = os.umask(0)
0584 uid = os.getuid()
0585 gid = os.getgid()
0586 for f in targets:
0587 try:
0588 os.chmod(f, 0o666 - umask)
0589 os.chown(f, uid, gid)
0590 except Exception:
0591 pass
0592 os.umask(umask)
0593
0594
0595
0596 def get_queues_config_url():
0597 try:
0598 return os.environ["HARVESTER_QUEUE_CONFIG_URL"]
0599 except Exception:
0600 return None
0601
0602
0603
0604 def get_unique_queue_name(queue_name, resource_type, job_type):
0605 return f"{queue_name}:{resource_type}:{job_type}"
0606
0607
0608
0609 def dynamic_plugin_change():
0610 try:
0611 return harvester_config.master.dynamic_plugin_change
0612 except Exception:
0613 return True
0614
0615
0616
0617 def make_choice_list(pdpm={}, default=None):
0618 weight_sum = sum(pdpm.values())
0619 weight_default = 1000
0620 ret_list = []
0621 for candidate, weight in pdpm.items():
0622 if weight_sum > 1000:
0623 real_weight = int(weight * 1000 / weight_sum)
0624 else:
0625 real_weight = int(weight)
0626 ret_list.extend([candidate] * real_weight)
0627 weight_default -= real_weight
0628 ret_list.extend([default] * weight_default)
0629 return ret_list
0630
0631
0632
0633 def pickle_to_text(data):
0634 return codecs.encode(pickle.dumps(data), "base64").decode()
0635
0636
0637
0638 def unpickle_from_text(text):
0639 return pickle.loads(codecs.decode(text.encode(), "base64"))
0640
0641
0642
0643 def retry_period_sec(nth_retry, increment=1, max_retries=None, max_seconds=None, min_seconds=1):
0644 nth = max(nth_retry, 1)
0645 ret_period = max(min_seconds, 1)
0646 if max_retries and nth_retry > max_retries:
0647 return False
0648 else:
0649 ret_period += (nth - 1) * increment
0650 if max_seconds:
0651 ret_period = min(ret_period, max_seconds)
0652 return ret_period
0653
0654
0655
0656 def get_pid():
0657 hostname = socket.gethostname()
0658 os_pid = os.getpid()
0659 thread_id = get_ident()
0660 if thread_id is None:
0661 thread_id = 0
0662 return f"{hostname}_{os_pid}-{format(get_ident(), 'x')}"
0663
0664
0665 def aware_utcnow() -> datetime:
0666 """
0667 Return the current UTC date and time, with tzinfo timezone.utc
0668
0669 Returns:
0670 datetime: current UTC date and time, with tzinfo timezone.utc
0671 """
0672 return datetime.now(timezone.utc)
0673
0674
0675 def aware_utcfromtimestamp(timestamp: float) -> datetime:
0676 """
0677 Return the local date and time, with tzinfo timezone.utc, corresponding to the POSIX timestamp
0678
0679 Args:
0680 timestamp (float): POSIX timestamp
0681
0682 Returns:
0683 datetime: current UTC date and time, with tzinfo timezone.utc
0684 """
0685 return datetime.fromtimestamp(timestamp, timezone.utc)
0686
0687
0688 def naive_utcnow() -> datetime:
0689 """
0690 Return the current UTC date and time, without tzinfo
0691
0692 Returns:
0693 datetime: current UTC date and time, without tzinfo
0694 """
0695 return aware_utcnow().replace(tzinfo=None)
0696
0697
0698 def naive_utcfromtimestamp(timestamp: float) -> datetime:
0699 """
0700 Return the local date and time, without tzinfo, corresponding to the POSIX timestamp
0701
0702 Args:
0703 timestamp (float): POSIX timestamp
0704
0705 Returns:
0706 datetime: current UTC date and time, without tzinfo
0707 """
0708 return aware_utcfromtimestamp(timestamp).replace(tzinfo=None)
0709
0710
0711 def special_pilot_type_to_prod_source_label(pilot_type: str) -> str:
0712 """
0713 Convert special pilotType of worker to prodSourceLabel of PanDA job.
0714
0715 Args:
0716 pilot_type (str): pilotType of worker, e.g. "RC", "ALRB", "PT" (except "PR" which is production)
0717
0718 Returns:
0719 str: prodSourceLabel of PanDA job, e.g. "rc_test2", "rc_alrb", "ptest"; "ANY" if no mapping is defined for the given pilot_type
0720 """
0721 pilot_type_to_prod_source_label_map = {
0722 "RC": "rc_test2",
0723 "ALRB": "rc_alrb",
0724 "PT": "ptest",
0725 }
0726 return pilot_type_to_prod_source_label_map.get(pilot_type, "ANY")
0727
0728
0729 def prod_source_label_to_pilot_type(prod_source_label: str) -> str:
0730 """
0731 Convert prodSourceLabel of PanDA job to pilotType of worker.
0732
0733 Args:
0734 prod_source_label (str): prodSourceLabel of PanDA job, e.g. "rc_test2", "rc_alrb", "ptest"
0735
0736 Returns:
0737 str: pilotType of worker, e.g. "RC", "ALRB", "PT"; default to "PR" (production) if no mapping is defined for the given prod_source_label
0738 """
0739 prod_source_label_to_pilot_type_map = {
0740 "rc_test2": "RC",
0741 "rc_alrb": "ALRB",
0742 "ptest": "PT",
0743 }
0744 return prod_source_label_to_pilot_type_map.get(prod_source_label, "PR")