Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2020
0009 
0010 import re
0011 import json
0012 import numbers
0013 import traceback
0014 import threading
0015 
0016 from pilot.util.auxiliary import is_python3
0017 
0018 import logging
0019 logger = logging.getLogger(__name__)
0020 
0021 
0022 def camel_to_snake(name):
0023     """
0024     Changes CamelCase to snake_case, used by python.
0025 
0026     :param name: name to change
0027     :return: name in snake_case
0028     """
0029     s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
0030     return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
0031 
0032 
0033 def snake_to_camel(snake_str):
0034     """
0035     Changes snake_case to firstLowCamelCase, used by server.
0036 
0037     :param snake_str: name to change
0038     :return: name in camelCase
0039     """
0040     components = snake_str.split('_')
0041     # We capitalize the first letter of each component except the first one
0042     # with the 'title' method and join them together.
0043     return components[0] + "".join(x.title() for x in components[1:])
0044 
0045 
0046 def split(val, separator=",", min_len=0, fill_last=False):
0047     """
0048     Splits comma separated values and parses them.
0049 
0050     :param val:         values to split
0051     :param separator:   comma or whatever
0052     :param min_len:     minimum needed length of array, array is filled up to this value
0053     :param fill_last:   Flag stating the array filler, if min_value is greater then extracted array length.
0054                         If true, array is filled with last value, else, with Nones.
0055     :return: parsed array
0056     """
0057     if val is None:
0058         return [None for _ in range(min_len)]
0059 
0060     v_arr = val.split(separator)
0061 
0062     for i, v in enumerate(v_arr):
0063         v_arr[i] = parse_value(v)
0064 
0065     if min_len > len(v_arr):
0066         filler = None if not fill_last or len(v_arr) < 1 else v_arr[0]
0067         v_arr.extend([filler for _ in range(min_len - len(v_arr))])
0068 
0069     return v_arr
0070 
0071 
0072 def get_nulls(val):
0073     """
0074     Converts every "NULL" string to python's None.
0075 
0076     :param val: string or whatever
0077     :return: val or None if val is "NULL"
0078     """
0079     return val if val != "NULL" else None
0080 
0081 
0082 def is_float(val):
0083     """
0084     Test floatliness of the string value.
0085 
0086     :param val: string or whatever
0087     :return: True if the value may be converted to Float
0088     """
0089     try:
0090         float(val)
0091         return True
0092     except ValueError:
0093         return False
0094 
0095 
0096 def is_int(val):
0097     """
0098     Test int of the string value.
0099 
0100     :param val: string or whatever
0101     :return: True if the value may be converted to int
0102     """
0103     try:
0104         int(val)
0105         return True
0106     except ValueError:
0107         return False
0108 
0109 
0110 def is_long(s):
0111     """
0112     Test value to be convertable to integer.
0113 
0114     :param s: string or whatever
0115     :return: True if the value may be converted to Long
0116     """
0117 
0118     try:
0119         if not isinstance(s, basestring):  # Python 2 # noqa: F821
0120             try:
0121                 long(s)  # noqa: F821
0122                 return True
0123             except ValueError:
0124                 return False
0125     except Exception:
0126         return False  # Python 3 - this function should not be used on Python 3
0127 
0128     if s and s[0] in ('-', '+'):
0129         return s[1:].isdigit()
0130     return s.isdigit()
0131 
0132 
0133 def parse_value(value):
0134     """
0135     Tries to parse value as number or None. If some of this can be done, parsed value is returned. Otherwise returns
0136     value unparsed.
0137 
0138     :param value:
0139     :return: mixed
0140     """
0141 
0142     try:
0143         if not isinstance(value, basestring):  # Python 2 # noqa: F821
0144             return value
0145     except Exception:
0146         if not isinstance(value, str):  # Python 3
0147             return value
0148 
0149     if is_python3():  # Python 3
0150         if is_int(value):  # Python 3
0151             return int(value)
0152     else:
0153         if is_long(value):  # Python 2
0154             return long(value)  # noqa: F821
0155     if is_float(value):
0156         return float(value)
0157 
0158     return get_nulls(value)
0159 
0160 
0161 def stringify_weird(arg):
0162     """
0163     Converts None to "NULL"
0164 
0165     :param arg:
0166     :return: arg or "NULL"
0167     """
0168     if arg is None:
0169         return "NULL"
0170     if isinstance(arg, numbers.Number):
0171         return arg
0172     return str(arg)
0173 
0174 
0175 def join(arr):
0176     """
0177     Joins arrays, converting contents to strings.
0178 
0179     :param arr:
0180     :return: joined array
0181     """
0182     return ",".join(str(stringify_weird(x)) for x in arr)
0183 
0184 
0185 def get_input_files(description):
0186     """
0187     Extracts input files from the description.
0188 
0189     :param description:
0190     :return: file list
0191     """
0192     logger.info("Extracting input files from job description")
0193     files = {}
0194     if description['inFiles'] and description['inFiles'] != "NULL":
0195         in_files = split(description["inFiles"])
0196         length = len(in_files)
0197         ddm_endpoint = split(description.get("ddmEndPointIn"), min_len=length)
0198         destination_se = split(description.get("destinationSE"), min_len=length)
0199         dispatch_dblock = split(description.get("dispatchDblock"), min_len=length)
0200         dispatch_dblock_token = split(description.get("dispatchDBlockToken"), min_len=length)
0201         datasets = split(description.get("realDatasetsIn"), min_len=length, fill_last=True)
0202         dblocks = split(description.get("prodDBlocks"), min_len=length)
0203         dblock_tokens = split(description.get("prodDBlockToken"), min_len=length)
0204         size = split(description.get("fsize"), min_len=length)
0205         c_sum = split(description.get("checksum"), min_len=length)
0206         scope = split(description.get("scopeIn"), min_len=length, fill_last=True)
0207         guids = split(description.get("GUID"), min_len=length, fill_last=True)
0208 
0209         for i, f in enumerate(in_files):
0210             if f is not None:
0211                 files[f] = {
0212                     "ddm_endpoint": ddm_endpoint[i],
0213                     "storage_element": destination_se[i],
0214                     "dispatch_dblock": dispatch_dblock[i],
0215                     "dispatch_dblock_token": dispatch_dblock_token[i],
0216                     "dataset": datasets[i],
0217                     "dblock": dblocks[i],
0218                     "dblock_token": dblock_tokens[i],
0219                     "size": size[i],
0220                     "checksum": c_sum[i],
0221                     'scope': scope[i],
0222                     "guid": guids[i]
0223                 }
0224     return files
0225 
0226 
0227 def fix_log(description, files):
0228     """
0229     Fixes log file description in output files (changes GUID and scope).
0230 
0231     :param description:
0232     :param files: output files
0233     :return: fixed output files
0234     """
0235     logger.info("modifying log-specific values in a log file description")
0236     if description["logFile"] and description["logFile"] != "NULL":
0237         if description["logGUID"] and description["logGUID"] != "NULL" and description["logFile"] in \
0238                 files:
0239             files[description["logFile"]]["guid"] = description["logGUID"]
0240             files[description["logFile"]]["scope"] = description["scopeLog"]
0241 
0242     return files
0243 
0244 
0245 def get_output_files(description):
0246     """
0247     Extracts output files from the description.
0248 
0249     :param description:
0250     :return: output files
0251     """
0252     logger.info("Extracting output files in description")
0253     files = {}
0254     if description['outFiles'] and description['outFiles'] != "NULL":
0255         out_files = split(description["outFiles"])
0256         length = len(out_files)
0257         ddm_endpoint = split(description.get("ddmEndPointOut"), min_len=length)
0258         destination_se = split(description.get("fileDestinationSE"), min_len=length)
0259         dblock_token = split(description.get("dispatchDBlockTokenForOut"), min_len=length)
0260         dblock_tokens = split(description.get("prodDBlockTokenForOut"), min_len=length)
0261         datasets = split(description.get("realDatasets"), min_len=length)
0262         dblocks = split(description.get("destinationDblock"), min_len=length)
0263         destination_dblock_token = split(description.get("destinationDBlockToken"), min_len=length)
0264         scope = split(description.get("scopeOut"), min_len=length, fill_last=True)
0265 
0266         for i, f in enumerate(out_files):
0267             if f is not None:
0268                 files[f] = {
0269                     "ddm_endpoint": ddm_endpoint[i],
0270                     "storage_element": destination_se[i],
0271                     "dispatch_dblock_token": dblock_token[i],
0272                     "destination_dblock_token": destination_dblock_token[i],
0273                     "dblock_token": dblock_tokens[i],
0274                     "dataset": datasets[i],
0275                     "dblock": dblocks[i],
0276                     "scope": scope[i]
0277                 }
0278 
0279     return fix_log(description, files)
0280 
0281 
0282 def one_or_set(array):
0283     if len(array) < 1:
0284         return join(array)
0285 
0286     zero = array[0]
0287 
0288     for i in array:
0289         if i != zero:
0290             return join(array)
0291 
0292     return stringify_weird(zero)
0293 
0294 
0295 class JobDescription(object):
0296     __holder = None
0297     __key_aliases = {
0298         'PandaID': 'jobid',  # it is job id, not PanDA
0299         'transformation': 'script',  # making it more convenient
0300         'jobPars': 'script_parameters',  # -.-
0301         'coreCount': 'number_of_cores',
0302         'prodUserID': 'user_dn',
0303         'prodSourceLabel': 'label',  # We don't have any other labels in there. And this is The Label, or just label
0304         'homepackage': 'home_package',  # lowercase, all of a sudden, splitting words
0305         "nSent": 'throttle',  # as it's usage says
0306         'minRamCount': 'minimum_ram',  # reads better
0307         'maxDiskCount': 'maximum_input_file_size',
0308         'maxCpuCount': 'maximum_cpu_usage_time',
0309         'attemptNr': 'attempt_number',  # bad practice to strip words API needs to be readable
0310     }
0311     __key_back_aliases = {
0312         'task_id': 'taskID',  # all ID's are to be placed here, because snake case lacks of all-caps abbrev info
0313         'jobset_id': 'jobsetID',
0314         'job_definition_id': 'jobDefinitionID',
0315         'status_code': 'StatusCode',  # uppercase starting names also should be here
0316     }
0317     __soft_key_aliases = {
0318         'id': 'jobid',
0319         'command': 'script',
0320         'command_parameters': 'script_parameters'
0321     }
0322 
0323     __input_file_keys = {   # corresponding fields in input_files
0324         'inFiles': '',
0325         "ddmEndPointIn": 'ddm_endpoint',
0326         "destinationSE": 'storage_element',
0327         "dispatchDBlockToken": 'dispatch_dblock_token',
0328         "realDatasetsIn": 'dataset',
0329         "prodDBlocks": 'dblock',
0330         "fsize": 'size',
0331         "dispatchDblock": 'dispatch_dblock',
0332         'prodDBlockToken': 'dblock_token',
0333         "GUID": 'guid',
0334         "checksum": 'checksum',
0335         "scopeIn": 'scope'
0336     }
0337     __may_be_united = ['guid', 'scope', 'dataset']  # can be sent as one for all files, if is the same
0338 
0339     __output_file_keys = {   # corresponding fields in output_files
0340         'outFiles': '',
0341         'ddmEndPointOut': 'ddm_endpoint',
0342         'fileDestinationSE': 'storage_element',
0343         'dispatchDBlockTokenForOut': 'dispatch_dblock_token',
0344         'prodDBlockTokenForOut': 'dblock_token',
0345         'realDatasets': 'dataset',
0346         'destinationDblock': 'dblock',
0347         'destinationDBlockToken': 'destination_dblock_token',
0348         'scopeOut': 'scope',
0349         'logGUID': 'guid',
0350         'scopeLog': 'scope'
0351     }
0352 
0353     __key_back_aliases_from_forward = None
0354     __key_reverse_aliases = None
0355     __key_aliases_snake = None
0356     input_files = None
0357     output_files = None
0358 
0359     def __init__(self):
0360         super(JobDescription, self).__init__()
0361 
0362         self.__key_back_aliases_from_forward = self.__key_back_aliases.copy()
0363         self.__key_reverse_aliases = {}
0364         self.__key_aliases_snake = {}
0365         self.input_files = {}
0366         self.output_files = {}
0367 
0368         for key in self.__key_aliases:
0369             alias = self.__key_aliases[key]
0370             self.__key_back_aliases_from_forward[alias] = key
0371             self.__key_aliases_snake[camel_to_snake(key)] = alias
0372 
0373     def get_input_file_prop(self, key):
0374         corresponding_key = self.__input_file_keys[key]
0375         ret = []
0376 
0377         for f in self.input_files:
0378             ret.append(f if corresponding_key == '' else self.input_files[f][corresponding_key])
0379 
0380         if corresponding_key in self.__may_be_united:
0381             return one_or_set(ret)
0382 
0383         return join(ret)
0384 
0385     def get_output_file_prop(self, key):
0386         log_file = self.log_file
0387 
0388         if key == 'logGUID':
0389             return stringify_weird(self.output_files[log_file]['guid'])
0390         if key == 'scopeLog':
0391             return stringify_weird(self.output_files[log_file]['scope'])
0392 
0393         corresponding_key = self.__output_file_keys[key]
0394         ret = []
0395 
0396         for f in self.output_files:
0397             if key != 'scopeOut' or f != log_file:
0398                 ret.append(f if corresponding_key == '' else self.output_files[f][corresponding_key])
0399 
0400         if corresponding_key in self.__may_be_united:
0401             return one_or_set(ret)
0402 
0403         return join(ret)
0404 
0405     def load(self, new_desc):
0406         try:
0407             if isinstance(new_desc, basestring):  # Python 2 # noqa: F821
0408                 new_desc = json.loads(new_desc)
0409         except Exception:
0410             if isinstance(new_desc, str):  # Python 3
0411                 new_desc = json.loads(new_desc)
0412 
0413         if "PandaID" in new_desc:
0414             logger.info("Parsing description to be of readable, easy to use format")
0415 
0416             fixed = {}
0417 
0418             self.input_files = get_input_files(new_desc)
0419             self.output_files = get_output_files(new_desc)
0420 
0421             for key in new_desc:
0422                 value = new_desc[key]
0423 
0424                 if key not in self.__input_file_keys and key not in self.__output_file_keys:
0425                     old_key = key
0426                     if key in self.__key_aliases:
0427                         key = self.__key_aliases[key]
0428                     else:
0429                         key = camel_to_snake(key)
0430 
0431                     if key != old_key:
0432                         self.__key_back_aliases_from_forward[key] = old_key
0433 
0434                     self.__key_reverse_aliases[old_key] = key
0435 
0436                     fixed[key] = parse_value(value)
0437 
0438             new_desc = fixed
0439         else:
0440             self.input_files = new_desc['input_files']
0441             self.output_files = new_desc['output_files']
0442 
0443         self.__holder = new_desc
0444 
0445     def to_json(self, decompose=False, **kwargs):
0446         if decompose:
0447             prep = {}
0448 
0449             for k in self.__holder:
0450                 if k not in ['input_files', 'output_files']:
0451                     if k in self.__key_back_aliases_from_forward:
0452                         rev = self.__key_back_aliases_from_forward[k]
0453                     else:
0454                         rev = snake_to_camel(k)
0455                     prep[rev] = stringify_weird(self.__holder[k])
0456 
0457             for k in self.__output_file_keys:
0458                 prep[k] = self.get_output_file_prop(k)
0459             for k in self.__input_file_keys:
0460                 prep[k] = self.get_input_file_prop(k)
0461 
0462         else:
0463             prep = self.__holder.copy()
0464             prep['input_files'] = self.input_files
0465             prep['output_files'] = self.output_files
0466 
0467         return json.dumps(prep, **kwargs)
0468 
0469     def get_description_parameter(self, key):
0470         if self.__holder is not None:
0471             if key in self.__holder:
0472                 return self.__holder[key]
0473 
0474             if key in self.__input_file_keys:
0475                 logger.warning(("Old key JobDescription.%s is used. Better to use JobDescription.input_files[][%s] to "
0476                                 "access and manipulate this value.\n" % (key, self.__input_file_keys[key])) + self.get_traceback())
0477                 return self.get_input_file_prop(key)
0478             if key in self.__output_file_keys:
0479                 logger.warning(("Old key JobDescription.%s is used. Better to use JobDescription.output_files[][%s] to"
0480                                 " access and manipulate this value.\n" % (key, self.__output_file_keys[key])) + self.get_traceback())
0481                 return self.get_output_file_prop(key)
0482 
0483             snake_key = camel_to_snake(key)
0484             if snake_key in self.__key_aliases_snake:
0485                 logger.warning(("Old key JobDescription.%s is used. Better to use JobDescription.%s to access and "
0486                                 "manipulate this value.\n" % (key, self.__key_aliases_snake[snake_key])) + self.get_traceback())
0487                 return stringify_weird(self.__holder[self.__key_aliases_snake[snake_key]])
0488 
0489             if key in self.__soft_key_aliases:
0490                 return self.get_description_parameter(self.__soft_key_aliases[key])
0491 
0492         raise AttributeError("Description parameter not found")
0493 
0494     def set_description_parameter(self, key, value):
0495         if self.__holder is not None:
0496             if key in self.__holder:
0497                 self.__holder[key] = value
0498                 return True
0499 
0500             if key in self.__input_file_keys:
0501                 err = "Key JobDescription.%s is read-only\n" % key
0502                 if key == 'inFiles':
0503                     err += "Use JobDescription.input_files to manipulate input files"
0504                 else:
0505                     err += "Use JobDescription.input_files[][%s] to set up this parameter in files description" %\
0506                            self.__input_file_keys[key]
0507                 raise AttributeError(err)
0508 
0509             if key in self.__output_file_keys:
0510                 err = "Key JobDescription.%s is read-only\n" % key
0511                 if key == 'outFiles':
0512                     err += "Use JobDescription.output_files to manipulate output files"
0513                 else:
0514                     err += "Use JobDescription.output_files[][%s] to set up this parameter in files description" %\
0515                            self.__output_file_keys[key]
0516                 raise AttributeError(err)
0517 
0518             snake_key = camel_to_snake(key)
0519             if snake_key in self.__key_aliases_snake:
0520                 logger.warning(("Old key JobDescription.%s is used. Better to use JobDescription.%s to access and"
0521                                 "manipulate this value.\n" % (key, self.__key_aliases_snake[snake_key])) + self.get_traceback())
0522                 self.__holder[self.__key_aliases_snake[snake_key]] = parse_value(value)
0523 
0524             if key in self.__soft_key_aliases:
0525                 return self.set_description_parameter(self.__soft_key_aliases[key], value)
0526 
0527         return False
0528 
0529     def get_traceback(self):
0530         tb = list(reversed(traceback.extract_stack()))
0531 
0532         tb_str = '\n'
0533         for ii in enumerate(tb):
0534             if ii[0] < 3:
0535                 continue  # we don't need inner scopes of this and subsequent calls
0536             i = ii[1]
0537             tb_str += '{file}:{line} (in {module}): {call}\n'.format(file=i[0],
0538                                                                      line=i[1],
0539                                                                      module=i[2],
0540                                                                      call=i[3])
0541         thread = threading.currentThread()
0542         return 'Traceback: (latest call first)' + tb_str + 'Thread: %s(%d)' % (thread.getName(), thread.ident)
0543 
0544     def __getattr__(self, key):
0545         """
0546         Reflection of description values into Job instance properties if they are not shadowed.
0547         If there is no own property with corresponding name, the value of Description is used.
0548         Params and return described in __getattr__ interface.
0549         """
0550         try:
0551             return object.__getattribute__(self, key)
0552         except AttributeError:
0553             return self.get_description_parameter(key)
0554 
0555     def __setattr__(self, key, value):
0556         """
0557         Reflection of description values into Job instance properties if they are not shadowed.
0558         If there is no own property with corresponding name, the value of Description is set.
0559         Params and return described in __setattr__ interface.
0560         """
0561         try:
0562             object.__getattribute__(self, key)
0563             return object.__setattr__(self, key, value)
0564         except AttributeError:
0565             if not self.set_description_parameter(key, value):
0566                 return object.__setattr__(self, key, value)
0567 
0568 
0569 if __name__ == "__main__":
0570     import sys
0571     logging.basicConfig()
0572     logger.setLevel(logging.DEBUG)
0573 
0574     jd = JobDescription()
0575     with open(sys.argv[1], "r") as f:
0576         contents = f.read()
0577 
0578     jd.load(contents)
0579 
0580     logger.debug(jd.id)
0581     logger.debug(jd.command)
0582     logger.debug(jd.PandaID)
0583     logger.debug(jd.scopeOut)
0584     logger.debug(jd.scopeLog)
0585     logger.debug(jd.fileDestinationSE)
0586     logger.debug(jd.inFiles)
0587     logger.debug(json.dumps(jd.output_files, indent=4, sort_keys=True))
0588 
0589     logger.debug(jd.to_json(True, indent=4, sort_keys=True))