Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 """
0002 utilities
0003 
0004 """
0005 
0006 import base64
0007 import codecs
0008 import fcntl
0009 import functools
0010 import inspect
0011 import json
0012 import math
0013 import os
0014 import pickle
0015 import random
0016 import socket
0017 import sys
0018 import threading
0019 import time
0020 import traceback
0021 import uuid
0022 import zlib
0023 from contextlib import contextmanager
0024 from datetime import datetime, timedelta, timezone
0025 from threading import get_ident
0026 
0027 import Cryptodome.Cipher.AES
0028 import Cryptodome.Hash.HMAC
0029 import Cryptodome.Random
0030 from pandalogger.LogWrapper import LogWrapper
0031 from pandalogger.PandaLogger import PandaLogger
0032 
0033 from pandaharvester.harvesterconfig import harvester_config
0034 
0035 from .event_spec import EventSpec
0036 from .file_spec import FileSpec
0037 from .work_spec import WorkSpec
0038 
0039 with_memory_profile = False
0040 
0041 
0042 # lock for synchronization
0043 sync_lock = threading.Lock()
0044 
0045 ##############
0046 # Decorators #
0047 ##############
0048 
0049 
0050 # synchronize decorator
0051 def synchronize(func):
0052     """synchronize decorator"""
0053 
0054     @functools.wraps(func)
0055     def wrapper(*args, **kwargs):
0056         with sync_lock:
0057             return func(*args, **kwargs)
0058 
0059     return wrapper
0060 
0061 
0062 ###########
0063 # Classes #
0064 ###########
0065 
0066 
0067 # stopwatch class
0068 class StopWatch(object):
0069     # constructor
0070     def __init__(self):
0071         self.start_time = time.monotonic()
0072 
0073     # get string message about elapsed time
0074     def get_elapsed_time(self):
0075         time_diff = self.get_elapsed_time_in_sec()
0076         return f" : took {time_diff:.3f} sec"
0077 
0078     # get elapsed time in seconds
0079     def get_elapsed_time_in_sec(self):
0080         return time.monotonic() - self.start_time
0081 
0082     # reset
0083     def reset(self):
0084         self.start_time = time.monotonic()
0085 
0086 
0087 # map with lock
0088 class MapWithLock(object):
0089     def __init__(self):
0090         self.lock = threading.Lock()
0091         self.dataMap = dict()
0092 
0093     def __getitem__(self, item):
0094         ret = self.dataMap.__getitem__(item)
0095         return ret
0096 
0097     def __setitem__(self, item, value):
0098         self.dataMap.__setitem__(item, value)
0099 
0100     def __contains__(self, item):
0101         ret = self.dataMap.__contains__(item)
0102         return ret
0103 
0104     def acquire(self):
0105         self.lock.acquire()
0106 
0107     def release(self):
0108         self.lock.release()
0109 
0110     def iteritems(self):
0111         return self.dataMap.items()
0112 
0113 
0114 # global dict for all threads
0115 global_dict = MapWithLock()
0116 
0117 
0118 # singleton distinguishable with id
0119 class SingletonWithID(type):
0120     def __init__(cls, *args, **kwargs):
0121         cls.__instance = {}
0122         super(SingletonWithID, cls).__init__(*args, **kwargs)
0123 
0124     @synchronize
0125     def __call__(cls, *args, **kwargs):
0126         obj_id = str(kwargs.get("id", ""))
0127         if obj_id not in cls.__instance:
0128             cls.__instance[obj_id] = super(SingletonWithID, cls).__call__(*args, **kwargs)
0129         return cls.__instance.get(obj_id)
0130 
0131 
0132 # singleton distinguishable with each thread and id
0133 class SingletonWithThreadAndID(type):
0134     def __init__(cls, *args, **kwargs):
0135         cls.__instance = {}
0136         super(SingletonWithThreadAndID, cls).__init__(*args, **kwargs)
0137 
0138     @synchronize
0139     def __call__(cls, *args, **kwargs):
0140         thread_id = get_ident()
0141         obj_id = (thread_id, str(kwargs.get("id", "")))
0142         if obj_id not in cls.__instance:
0143             cls.__instance[obj_id] = super(SingletonWithThreadAndID, cls).__call__(*args, **kwargs)
0144         return cls.__instance.get(obj_id)
0145 
0146 
0147 # replacement for slow namedtuple in python 2
0148 class DictTupleHybrid(tuple):
0149     def set_attributes(self, attributes):
0150         self.attributes = attributes
0151 
0152     def _asdict(self):
0153         return dict(zip(self.attributes, self))
0154 
0155 
0156 # safe dictionary to retrun original strings for missing keys
0157 class SafeDict(dict):
0158     def __missing__(self, key):
0159         return "{" + key + "}"
0160 
0161 
0162 #############
0163 # Functions #
0164 #############
0165 
0166 
0167 # enable memory profiling
0168 def enable_memory_profiling():
0169     global with_memory_profile
0170     with_memory_profile = True
0171 
0172 
0173 # setup logger
0174 def setup_logger(name=None):
0175     if name is None:
0176         frm = inspect.stack()[1][0]
0177         mod = inspect.getmodule(frm)
0178         name = mod.__name__.split(".")[-1]
0179     try:
0180         log_level = getattr(harvester_config.log_level, name)
0181         return PandaLogger().getLogger(name, log_level=log_level)
0182     except Exception:
0183         pass
0184     return PandaLogger().getLogger(name)
0185 
0186 
0187 # make logger
0188 def make_logger(tmp_log, token=None, method_name=None, hook=None):
0189     # get method name of caller
0190     if method_name is None:
0191         tmp_str = inspect.stack()[1][3]
0192     else:
0193         tmp_str = method_name
0194     if token is not None:
0195         tmp_str += f" <{token}>"
0196     else:
0197         tmp_str += " :".format(token)
0198     new_log = LogWrapper(tmp_log, tmp_str, seeMem=with_memory_profile, hook=hook)
0199     return new_log
0200 
0201 
0202 # dump error message
0203 def dump_error_message(tmp_log, err_str=None, no_message=False):
0204     if not isinstance(tmp_log, LogWrapper):
0205         method_name = f"{inspect.stack()[1][3]} : "
0206     else:
0207         method_name = ""
0208     # error
0209     if err_str is None:
0210         err_type, err_value = sys.exc_info()[:2]
0211         err_str = f"{method_name} {err_type.__name__} {err_value} {traceback.format_exc()}"
0212     if not no_message:
0213         tmp_log.error(err_str)
0214     return err_str
0215 
0216 
0217 # sleep for random duration and return True if no more sleep is needed
0218 def sleep(interval, stop_event, randomize=True):
0219     if randomize and interval > 0:
0220         random_interval = random.randint(int(interval * 0.4), int(interval * 1.4))
0221     else:
0222         random_interval = interval
0223     if stop_event is None:
0224         time.sleep(random_interval)
0225     else:
0226         i = 0
0227         while True:
0228             if stop_event.is_set():
0229                 return True
0230             if i >= random_interval:
0231                 break
0232             stop_event.wait(1)
0233             i += 1
0234     return False
0235 
0236 
0237 # make PFC
0238 def make_pool_file_catalog(jobspec_list):
0239     xml_str = """<?xml version="1.0" ?>
0240 <!DOCTYPE POOLFILECATALOG  SYSTEM "InMemory">
0241 <POOLFILECATALOG>
0242     """
0243 
0244     done_lfns = set()
0245     for jobSpec in jobspec_list:
0246         input_files = jobSpec.get_input_file_attributes()
0247         for input_lfn, input_file in input_files.items():
0248             if input_lfn in done_lfns:
0249                 continue
0250             done_lfns.add(input_lfn)
0251             xml_str += f"""  <File ID="{input_file['guid']}">
0252     <physical>
0253       <pfn filetype="ROOT_All" name="{input_lfn}"/>
0254     </physical>
0255     <logical/>
0256   </File>
0257   """
0258 
0259     xml_str += "</POOLFILECATALOG>"
0260     return xml_str
0261 
0262 
0263 # calculate adler32
0264 def calc_adler32(file_name):
0265     val = 1
0266     block_size = 32 * 1024 * 1024
0267     with open(file_name, "rb") as fp:
0268         while True:
0269             data = fp.read(block_size)
0270             if not data:
0271                 break
0272             val = zlib.adler32(data, val)
0273     if val < 0:
0274         val += 2**32
0275     return hex(val)[2:10].zfill(8).lower()
0276 
0277 
0278 # get output file report
0279 def get_output_file_report(jobspec):
0280     if jobspec.outputFilesToReport is not None:
0281         return jobspec.outputFilesToReport
0282     report = {}
0283     # body
0284     for fileSpec in jobspec.outFiles:
0285         # only successful files
0286         if fileSpec.status != "finished":
0287             continue
0288         # extract guid
0289         if "guid" in fileSpec.fileAttributes:
0290             guid = fileSpec.fileAttributes["guid"]
0291         elif fileSpec.fileType == "log":
0292             guid = jobspec.get_logfile_info()["guid"]
0293         else:
0294             guid = str(uuid.uuid4())
0295         # checksum
0296         if fileSpec.chksum is not None and ":" in fileSpec.chksum:
0297             chksum = fileSpec.chksum.split(":")[-1]
0298         else:
0299             chksum = fileSpec.chksum
0300         report[fileSpec.lfn] = {"guid": guid, "fsize": fileSpec.fsize, "adler32": chksum}
0301     return json.dumps(report)
0302 
0303 
0304 def create_shards(input_list, size):
0305     """
0306     Creates shards of size n from the input list.
0307     """
0308     shard, i = [], 0
0309     for element in input_list:
0310         shard.append(element)
0311         i += 1
0312         if i == size:
0313             yield shard
0314             shard, i = [], 0
0315 
0316     if i > 0:
0317         yield shard
0318 
0319 
0320 # update job attributes with workers
0321 def update_job_attributes_with_workers(map_type, jobspec_list, workspec_list, files_to_stage_out_list, events_to_update_list):
0322     if map_type in [WorkSpec.MT_OneToOne, WorkSpec.MT_MultiJobs]:
0323         workSpec = workspec_list[0]
0324         for jobSpec in jobspec_list:
0325             jobSpec.set_attributes(workSpec.workAttributes)
0326             # delete job metadata from worker attributes
0327             try:
0328                 del workSpec.workAttributes[jobSpec.PandaID]["metaData"]
0329             except Exception:
0330                 pass
0331             # set start and end times
0332             if workSpec.status in [WorkSpec.ST_running]:
0333                 jobSpec.set_start_time()
0334             elif workSpec.pilot_closed:
0335                 jobSpec.reset_start_end_time()
0336             elif workSpec.is_final_status():
0337                 jobSpec.set_end_time()
0338             # core count
0339             if workSpec.nCore is not None and jobSpec.nCore is None:
0340                 try:
0341                     jobSpec.nCore = int(workSpec.nCore / len(jobspec_list))
0342                     if jobSpec.nCore == 0:
0343                         jobSpec.nCore = 1
0344                 except Exception:
0345                     pass
0346             # batch ID
0347             if not jobSpec.has_attribute("batchID"):
0348                 if workSpec.batchID is not None:
0349                     jobSpec.set_one_attribute("batchID", workSpec.batchID)
0350             # add files
0351             outFileAttrs = jobSpec.get_output_file_attributes()
0352             for tmpWorkerID, files_to_stage_out in files_to_stage_out_list.items():
0353                 if jobSpec.PandaID in files_to_stage_out:
0354                     for lfn, fileAttersList in files_to_stage_out[jobSpec.PandaID].items():
0355                         for fileAtters in fileAttersList:
0356                             fileSpec = FileSpec()
0357                             fileSpec.lfn = lfn
0358                             fileSpec.PandaID = jobSpec.PandaID
0359                             fileSpec.taskID = jobSpec.taskID
0360                             fileSpec.path = fileAtters["path"]
0361                             fileSpec.fsize = fileAtters["fsize"]
0362                             fileSpec.fileType = fileAtters["type"]
0363                             fileSpec.fileAttributes = fileAtters
0364                             fileSpec.workerID = tmpWorkerID
0365                             if "isZip" in fileAtters:
0366                                 fileSpec.isZip = fileAtters["isZip"]
0367                             if "chksum" in fileAtters:
0368                                 fileSpec.chksum = fileAtters["chksum"]
0369                             if "eventRangeID" in fileAtters:
0370                                 fileSpec.eventRangeID = fileAtters["eventRangeID"]
0371                                 # use input fileID as provenanceID
0372                                 try:
0373                                     provenanceID = fileSpec.eventRangeID.split("-")[2]
0374                                 except Exception:
0375                                     provenanceID = None
0376                                 fileSpec.provenanceID = provenanceID
0377                             if lfn in outFileAttrs:
0378                                 fileSpec.scope = outFileAttrs[lfn]["scope"]
0379                             jobSpec.add_out_file(fileSpec)
0380             # add events
0381             for events_to_update in events_to_update_list:
0382                 if jobSpec.PandaID in events_to_update:
0383                     for data in events_to_update[jobSpec.PandaID]:
0384                         eventSpec = EventSpec()
0385                         eventSpec.from_data(data, jobSpec.PandaID)
0386                         jobSpec.add_event(eventSpec, None)
0387             statusInJobAttr = jobSpec.get_job_status_from_attributes()
0388             jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(statusInJobAttr)
0389             if workSpec.pilot_closed:
0390                 jobSpec.set_pilot_closed()
0391             if workSpec.new_status:
0392                 jobSpec.trigger_propagation()
0393     elif map_type == WorkSpec.MT_MultiWorkers:
0394         jobSpec = jobspec_list[0]
0395         # scan all workers
0396         allDone = True
0397         isRunning = False
0398         oneFinished = False
0399         oneFailed = False
0400         nCore = 0
0401         nCoreTime = 0
0402         for workSpec in workspec_list:
0403             if workSpec.new_status:
0404                 jobSpec.trigger_propagation()
0405             # the worker is running
0406             if workSpec.status in [WorkSpec.ST_running]:
0407                 isRunning = True
0408                 # set start time
0409                 jobSpec.set_start_time()
0410                 nCore += workSpec.nCore
0411             # the worker is done
0412             if workSpec.is_final_status():
0413                 if workSpec.startTime is not None and workSpec.endTime is not None:
0414                     nCoreTime += workSpec.nCore * (workSpec.endTime - workSpec.startTime).total_seconds()
0415                 if workSpec.status == WorkSpec.ST_finished:
0416                     oneFinished = True
0417                 elif workSpec.status == WorkSpec.ST_failed:
0418                     oneFailed = True
0419             else:
0420                 # the worker is still active
0421                 allDone = False
0422         # set final values
0423         if allDone:
0424             # set end time
0425             jobSpec.set_end_time()
0426             # time-averaged core count
0427             if jobSpec.startTime is not None:
0428                 total_time = (jobSpec.endTime - jobSpec.startTime).total_seconds()
0429                 if total_time > 0:
0430                     jobSpec.nCore = float(nCoreTime) / float(total_time)
0431                     jobSpec.nCore = int(math.ceil(jobSpec.nCore))
0432             # disable to get more workers
0433             jobSpec.moreWorkers = 0
0434         else:
0435             # live core count
0436             jobSpec.nCore = nCore
0437         # combine worker attributes and set it to job
0438         # NOTE: Setting worker attributes is commented out for MultiWorkers map type
0439         # as it requires aggregating attributes from multiple workers, which needs
0440         # careful implementation to avoid conflicts
0441         # jobSpec.set_attributes(workAttributes)
0442         # add files
0443         outFileAttrs = jobSpec.get_output_file_attributes()
0444         for tmpWorkerID, files_to_stage_out in files_to_stage_out_list.items():
0445             if jobSpec.PandaID in files_to_stage_out:
0446                 for lfn, fileAttersList in files_to_stage_out[jobSpec.PandaID].items():
0447                     for fileAtters in fileAttersList:
0448                         fileSpec = FileSpec()
0449                         fileSpec.lfn = lfn
0450                         fileSpec.PandaID = jobSpec.PandaID
0451                         fileSpec.taskID = jobSpec.taskID
0452                         fileSpec.path = fileAtters["path"]
0453                         fileSpec.fsize = fileAtters["fsize"]
0454                         fileSpec.fileType = fileAtters["type"]
0455                         fileSpec.fileAttributes = fileAtters
0456                         fileSpec.workerID = tmpWorkerID
0457                         if "isZip" in fileAtters:
0458                             fileSpec.isZip = fileAtters["isZip"]
0459                         if "chksum" in fileAtters:
0460                             fileSpec.chksum = fileAtters["chksum"]
0461                         if "eventRangeID" in fileAtters:
0462                             fileSpec.eventRangeID = fileAtters["eventRangeID"]
0463                             # use input fileID as provenanceID
0464                             try:
0465                                 provenanceID = fileSpec.eventRangeID.split("-")[2]
0466                             except Exception:
0467                                 provenanceID = None
0468                             fileSpec.provenanceID = provenanceID
0469                         if lfn in outFileAttrs:
0470                             fileSpec.scope = outFileAttrs[lfn]["scope"]
0471                         jobSpec.add_out_file(fileSpec)
0472         # add events
0473         for events_to_update in events_to_update_list:
0474             if jobSpec.PandaID in events_to_update:
0475                 for data in events_to_update[jobSpec.PandaID]:
0476                     eventSpec = EventSpec()
0477                     eventSpec.from_data(data, jobSpec.PandaID)
0478                     jobSpec.add_event(eventSpec, None)
0479         # set job status
0480         workSpec = workspec_list[0]
0481         if allDone:
0482             if oneFinished:
0483                 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_finished)
0484             elif oneFailed:
0485                 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_failed)
0486             else:
0487                 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_cancelled)
0488         else:
0489             if isRunning or jobSpec.status == "running":
0490                 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_running)
0491             else:
0492                 jobSpec.status, jobSpec.subStatus = workSpec.convert_to_job_status(WorkSpec.ST_submitted)
0493     return True
0494 
0495 
0496 # rollover for log files
0497 def do_log_rollover():
0498     PandaLogger.doRollOver()
0499 
0500 
0501 # get stopwatch
0502 def get_stopwatch():
0503     return StopWatch()
0504 
0505 
0506 # get global dict
0507 def get_global_dict():
0508     return global_dict
0509 
0510 
0511 # get file lock
0512 @contextmanager
0513 def get_file_lock(file_name, lock_interval):
0514     if os.path.exists(file_name):
0515         opt = "r+"
0516     else:
0517         opt = "w+"
0518     with open(file_name, opt) as f:
0519         locked = False
0520         try:
0521             # lock file
0522             fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
0523             locked = True
0524             # read timestamp
0525             timeNow = naive_utcnow()
0526             toSkip = False
0527             try:
0528                 s = f.read()
0529                 pTime = datetime.strptime(s, "%Y-%m-%d %H:%M:%S.%f")
0530                 if timeNow - pTime < timedelta(seconds=lock_interval):
0531                     toSkip = True
0532             except Exception:
0533                 pass
0534             # skip if still in locked interval
0535             if toSkip:
0536                 raise IOError("skipped since still in locked interval")
0537             # write timestamp
0538             f.seek(0)
0539             f.write(timeNow.strftime("%Y-%m-%d %H:%M:%S.%f"))
0540             f.truncate()
0541             # execute with block
0542             yield
0543         finally:
0544             # unlock
0545             if locked:
0546                 fcntl.flock(f, fcntl.LOCK_UN)
0547 
0548 
0549 # convert a key phrase to a cipher key
0550 def convert_phrase_to_key(key_phrase):
0551     h = Cryptodome.Hash.HMAC.new(key_phrase)
0552     return h.hexdigest()
0553 
0554 
0555 # encrypt a string
0556 def encrypt_string(key_phrase, plain_text):
0557     k = convert_phrase_to_key(key_phrase)
0558     v = Cryptodome.Random.new().read(Cryptodome.Cipher.AES.block_size)
0559     c = Cryptodome.Cipher.AES.new(k, Cryptodome.Cipher.AES.MODE_CFB, v)
0560     return base64.b64encode(v + c.encrypt(plain_text))
0561 
0562 
0563 # decrypt a string
0564 def decrypt_string(key_phrase, cipher_text):
0565     cipher_text = base64.b64decode(cipher_text)
0566     k = convert_phrase_to_key(key_phrase)
0567     v = cipher_text[: Cryptodome.Cipher.AES.block_size]
0568     c = Cryptodome.Cipher.AES.new(k, Cryptodome.Cipher.AES.MODE_CFB, v)
0569     cipher_text = cipher_text[Cryptodome.Cipher.AES.block_size :]
0570     return c.decrypt(cipher_text)
0571 
0572 
0573 # set permission
0574 def set_file_permission(path):
0575     if not os.path.exists(path):
0576         return
0577     targets = []
0578     if os.path.isfile(path):
0579         targets += [path]
0580     else:
0581         for root, dirs, files in os.walk(path):
0582             targets += [os.path.join(root, f) for f in files]
0583     umask = os.umask(0)
0584     uid = os.getuid()
0585     gid = os.getgid()
0586     for f in targets:
0587         try:
0588             os.chmod(f, 0o666 - umask)
0589             os.chown(f, uid, gid)
0590         except Exception:
0591             pass
0592     os.umask(umask)
0593 
0594 
0595 # get URL of queues config file
0596 def get_queues_config_url():
0597     try:
0598         return os.environ["HARVESTER_QUEUE_CONFIG_URL"]
0599     except Exception:
0600         return None
0601 
0602 
0603 # get unique queue name
0604 def get_unique_queue_name(queue_name, resource_type, job_type):
0605     return f"{queue_name}:{resource_type}:{job_type}"
0606 
0607 
0608 # capability to dynamically change plugins
0609 def dynamic_plugin_change():
0610     try:
0611         return harvester_config.master.dynamic_plugin_change
0612     except Exception:
0613         return True
0614 
0615 
0616 # Make a list of choice candidates according to permille weight
0617 def make_choice_list(pdpm={}, default=None):
0618     weight_sum = sum(pdpm.values())
0619     weight_default = 1000
0620     ret_list = []
0621     for candidate, weight in pdpm.items():
0622         if weight_sum > 1000:
0623             real_weight = int(weight * 1000 / weight_sum)
0624         else:
0625             real_weight = int(weight)
0626         ret_list.extend([candidate] * real_weight)
0627         weight_default -= real_weight
0628     ret_list.extend([default] * weight_default)
0629     return ret_list
0630 
0631 
0632 # pickle to text
0633 def pickle_to_text(data):
0634     return codecs.encode(pickle.dumps(data), "base64").decode()
0635 
0636 
0637 # unpickle from text
0638 def unpickle_from_text(text):
0639     return pickle.loads(codecs.decode(text.encode(), "base64"))
0640 
0641 
0642 # increasing retry period after timeout or failure
0643 def retry_period_sec(nth_retry, increment=1, max_retries=None, max_seconds=None, min_seconds=1):
0644     nth = max(nth_retry, 1)
0645     ret_period = max(min_seconds, 1)
0646     if max_retries and nth_retry > max_retries:
0647         return False
0648     else:
0649         ret_period += (nth - 1) * increment
0650         if max_seconds:
0651             ret_period = min(ret_period, max_seconds)
0652         return ret_period
0653 
0654 
0655 # get process identifier on the fly
0656 def get_pid():
0657     hostname = socket.gethostname()
0658     os_pid = os.getpid()
0659     thread_id = get_ident()
0660     if thread_id is None:
0661         thread_id = 0
0662     return f"{hostname}_{os_pid}-{format(get_ident(), 'x')}"
0663 
0664 
0665 def aware_utcnow() -> datetime:
0666     """
0667     Return the current UTC date and time, with tzinfo timezone.utc
0668 
0669     Returns:
0670         datetime: current UTC date and time, with tzinfo timezone.utc
0671     """
0672     return datetime.now(timezone.utc)
0673 
0674 
0675 def aware_utcfromtimestamp(timestamp: float) -> datetime:
0676     """
0677     Return the local date and time, with tzinfo timezone.utc, corresponding to the POSIX timestamp
0678 
0679     Args:
0680         timestamp (float): POSIX timestamp
0681 
0682     Returns:
0683         datetime: current UTC date and time, with tzinfo timezone.utc
0684     """
0685     return datetime.fromtimestamp(timestamp, timezone.utc)
0686 
0687 
0688 def naive_utcnow() -> datetime:
0689     """
0690     Return the current UTC date and time, without tzinfo
0691 
0692     Returns:
0693         datetime: current UTC date and time, without tzinfo
0694     """
0695     return aware_utcnow().replace(tzinfo=None)
0696 
0697 
0698 def naive_utcfromtimestamp(timestamp: float) -> datetime:
0699     """
0700     Return the local date and time, without tzinfo, corresponding to the POSIX timestamp
0701 
0702     Args:
0703         timestamp (float): POSIX timestamp
0704 
0705     Returns:
0706         datetime: current UTC date and time, without tzinfo
0707     """
0708     return aware_utcfromtimestamp(timestamp).replace(tzinfo=None)
0709 
0710 
0711 def special_pilot_type_to_prod_source_label(pilot_type: str) -> str:
0712     """
0713     Convert special pilotType of worker to prodSourceLabel of PanDA job.
0714 
0715     Args:
0716         pilot_type (str): pilotType of worker, e.g. "RC", "ALRB", "PT" (except "PR" which is production)
0717 
0718     Returns:
0719         str: prodSourceLabel of PanDA job, e.g. "rc_test2", "rc_alrb", "ptest"; "ANY" if no mapping is defined for the given pilot_type
0720     """
0721     pilot_type_to_prod_source_label_map = {
0722         "RC": "rc_test2",
0723         "ALRB": "rc_alrb",
0724         "PT": "ptest",
0725     }
0726     return pilot_type_to_prod_source_label_map.get(pilot_type, "ANY")
0727 
0728 
0729 def prod_source_label_to_pilot_type(prod_source_label: str) -> str:
0730     """
0731     Convert prodSourceLabel of PanDA job to pilotType of worker.
0732 
0733     Args:
0734         prod_source_label (str): prodSourceLabel of PanDA job, e.g. "rc_test2", "rc_alrb", "ptest"
0735 
0736     Returns:
0737         str: pilotType of worker, e.g. "RC", "ALRB", "PT"; default to "PR" (production) if no mapping is defined for the given prod_source_label
0738     """
0739     prod_source_label_to_pilot_type_map = {
0740         "rc_test2": "RC",
0741         "rc_alrb": "ALRB",
0742         "ptest": "PT",
0743     }
0744     return prod_source_label_to_pilot_type_map.get(prod_source_label, "PR")