File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import re
0011 import json
0012 import numbers
0013 import traceback
0014 import threading
0015
0016 from pilot.util.auxiliary import is_python3
0017
0018 import logging
0019 logger = logging.getLogger(__name__)
0020
0021
0022 def camel_to_snake(name):
0023 """
0024 Changes CamelCase to snake_case, used by python.
0025
0026 :param name: name to change
0027 :return: name in snake_case
0028 """
0029 s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
0030 return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
0031
0032
0033 def snake_to_camel(snake_str):
0034 """
0035 Changes snake_case to firstLowCamelCase, used by server.
0036
0037 :param snake_str: name to change
0038 :return: name in camelCase
0039 """
0040 components = snake_str.split('_')
0041
0042
0043 return components[0] + "".join(x.title() for x in components[1:])
0044
0045
0046 def split(val, separator=",", min_len=0, fill_last=False):
0047 """
0048 Splits comma separated values and parses them.
0049
0050 :param val: values to split
0051 :param separator: comma or whatever
0052 :param min_len: minimum needed length of array, array is filled up to this value
0053 :param fill_last: Flag stating the array filler, if min_value is greater then extracted array length.
0054 If true, array is filled with last value, else, with Nones.
0055 :return: parsed array
0056 """
0057 if val is None:
0058 return [None for _ in range(min_len)]
0059
0060 v_arr = val.split(separator)
0061
0062 for i, v in enumerate(v_arr):
0063 v_arr[i] = parse_value(v)
0064
0065 if min_len > len(v_arr):
0066 filler = None if not fill_last or len(v_arr) < 1 else v_arr[0]
0067 v_arr.extend([filler for _ in range(min_len - len(v_arr))])
0068
0069 return v_arr
0070
0071
0072 def get_nulls(val):
0073 """
0074 Converts every "NULL" string to python's None.
0075
0076 :param val: string or whatever
0077 :return: val or None if val is "NULL"
0078 """
0079 return val if val != "NULL" else None
0080
0081
0082 def is_float(val):
0083 """
0084 Test floatliness of the string value.
0085
0086 :param val: string or whatever
0087 :return: True if the value may be converted to Float
0088 """
0089 try:
0090 float(val)
0091 return True
0092 except ValueError:
0093 return False
0094
0095
0096 def is_int(val):
0097 """
0098 Test int of the string value.
0099
0100 :param val: string or whatever
0101 :return: True if the value may be converted to int
0102 """
0103 try:
0104 int(val)
0105 return True
0106 except ValueError:
0107 return False
0108
0109
0110 def is_long(s):
0111 """
0112 Test value to be convertable to integer.
0113
0114 :param s: string or whatever
0115 :return: True if the value may be converted to Long
0116 """
0117
0118 try:
0119 if not isinstance(s, basestring):
0120 try:
0121 long(s)
0122 return True
0123 except ValueError:
0124 return False
0125 except Exception:
0126 return False
0127
0128 if s and s[0] in ('-', '+'):
0129 return s[1:].isdigit()
0130 return s.isdigit()
0131
0132
0133 def parse_value(value):
0134 """
0135 Tries to parse value as number or None. If some of this can be done, parsed value is returned. Otherwise returns
0136 value unparsed.
0137
0138 :param value:
0139 :return: mixed
0140 """
0141
0142 try:
0143 if not isinstance(value, basestring):
0144 return value
0145 except Exception:
0146 if not isinstance(value, str):
0147 return value
0148
0149 if is_python3():
0150 if is_int(value):
0151 return int(value)
0152 else:
0153 if is_long(value):
0154 return long(value)
0155 if is_float(value):
0156 return float(value)
0157
0158 return get_nulls(value)
0159
0160
0161 def stringify_weird(arg):
0162 """
0163 Converts None to "NULL"
0164
0165 :param arg:
0166 :return: arg or "NULL"
0167 """
0168 if arg is None:
0169 return "NULL"
0170 if isinstance(arg, numbers.Number):
0171 return arg
0172 return str(arg)
0173
0174
0175 def join(arr):
0176 """
0177 Joins arrays, converting contents to strings.
0178
0179 :param arr:
0180 :return: joined array
0181 """
0182 return ",".join(str(stringify_weird(x)) for x in arr)
0183
0184
0185 def get_input_files(description):
0186 """
0187 Extracts input files from the description.
0188
0189 :param description:
0190 :return: file list
0191 """
0192 logger.info("Extracting input files from job description")
0193 files = {}
0194 if description['inFiles'] and description['inFiles'] != "NULL":
0195 in_files = split(description["inFiles"])
0196 length = len(in_files)
0197 ddm_endpoint = split(description.get("ddmEndPointIn"), min_len=length)
0198 destination_se = split(description.get("destinationSE"), min_len=length)
0199 dispatch_dblock = split(description.get("dispatchDblock"), min_len=length)
0200 dispatch_dblock_token = split(description.get("dispatchDBlockToken"), min_len=length)
0201 datasets = split(description.get("realDatasetsIn"), min_len=length, fill_last=True)
0202 dblocks = split(description.get("prodDBlocks"), min_len=length)
0203 dblock_tokens = split(description.get("prodDBlockToken"), min_len=length)
0204 size = split(description.get("fsize"), min_len=length)
0205 c_sum = split(description.get("checksum"), min_len=length)
0206 scope = split(description.get("scopeIn"), min_len=length, fill_last=True)
0207 guids = split(description.get("GUID"), min_len=length, fill_last=True)
0208
0209 for i, f in enumerate(in_files):
0210 if f is not None:
0211 files[f] = {
0212 "ddm_endpoint": ddm_endpoint[i],
0213 "storage_element": destination_se[i],
0214 "dispatch_dblock": dispatch_dblock[i],
0215 "dispatch_dblock_token": dispatch_dblock_token[i],
0216 "dataset": datasets[i],
0217 "dblock": dblocks[i],
0218 "dblock_token": dblock_tokens[i],
0219 "size": size[i],
0220 "checksum": c_sum[i],
0221 'scope': scope[i],
0222 "guid": guids[i]
0223 }
0224 return files
0225
0226
0227 def fix_log(description, files):
0228 """
0229 Fixes log file description in output files (changes GUID and scope).
0230
0231 :param description:
0232 :param files: output files
0233 :return: fixed output files
0234 """
0235 logger.info("modifying log-specific values in a log file description")
0236 if description["logFile"] and description["logFile"] != "NULL":
0237 if description["logGUID"] and description["logGUID"] != "NULL" and description["logFile"] in \
0238 files:
0239 files[description["logFile"]]["guid"] = description["logGUID"]
0240 files[description["logFile"]]["scope"] = description["scopeLog"]
0241
0242 return files
0243
0244
0245 def get_output_files(description):
0246 """
0247 Extracts output files from the description.
0248
0249 :param description:
0250 :return: output files
0251 """
0252 logger.info("Extracting output files in description")
0253 files = {}
0254 if description['outFiles'] and description['outFiles'] != "NULL":
0255 out_files = split(description["outFiles"])
0256 length = len(out_files)
0257 ddm_endpoint = split(description.get("ddmEndPointOut"), min_len=length)
0258 destination_se = split(description.get("fileDestinationSE"), min_len=length)
0259 dblock_token = split(description.get("dispatchDBlockTokenForOut"), min_len=length)
0260 dblock_tokens = split(description.get("prodDBlockTokenForOut"), min_len=length)
0261 datasets = split(description.get("realDatasets"), min_len=length)
0262 dblocks = split(description.get("destinationDblock"), min_len=length)
0263 destination_dblock_token = split(description.get("destinationDBlockToken"), min_len=length)
0264 scope = split(description.get("scopeOut"), min_len=length, fill_last=True)
0265
0266 for i, f in enumerate(out_files):
0267 if f is not None:
0268 files[f] = {
0269 "ddm_endpoint": ddm_endpoint[i],
0270 "storage_element": destination_se[i],
0271 "dispatch_dblock_token": dblock_token[i],
0272 "destination_dblock_token": destination_dblock_token[i],
0273 "dblock_token": dblock_tokens[i],
0274 "dataset": datasets[i],
0275 "dblock": dblocks[i],
0276 "scope": scope[i]
0277 }
0278
0279 return fix_log(description, files)
0280
0281
0282 def one_or_set(array):
0283 if len(array) < 1:
0284 return join(array)
0285
0286 zero = array[0]
0287
0288 for i in array:
0289 if i != zero:
0290 return join(array)
0291
0292 return stringify_weird(zero)
0293
0294
0295 class JobDescription(object):
0296 __holder = None
0297 __key_aliases = {
0298 'PandaID': 'jobid',
0299 'transformation': 'script',
0300 'jobPars': 'script_parameters',
0301 'coreCount': 'number_of_cores',
0302 'prodUserID': 'user_dn',
0303 'prodSourceLabel': 'label',
0304 'homepackage': 'home_package',
0305 "nSent": 'throttle',
0306 'minRamCount': 'minimum_ram',
0307 'maxDiskCount': 'maximum_input_file_size',
0308 'maxCpuCount': 'maximum_cpu_usage_time',
0309 'attemptNr': 'attempt_number',
0310 }
0311 __key_back_aliases = {
0312 'task_id': 'taskID',
0313 'jobset_id': 'jobsetID',
0314 'job_definition_id': 'jobDefinitionID',
0315 'status_code': 'StatusCode',
0316 }
0317 __soft_key_aliases = {
0318 'id': 'jobid',
0319 'command': 'script',
0320 'command_parameters': 'script_parameters'
0321 }
0322
0323 __input_file_keys = {
0324 'inFiles': '',
0325 "ddmEndPointIn": 'ddm_endpoint',
0326 "destinationSE": 'storage_element',
0327 "dispatchDBlockToken": 'dispatch_dblock_token',
0328 "realDatasetsIn": 'dataset',
0329 "prodDBlocks": 'dblock',
0330 "fsize": 'size',
0331 "dispatchDblock": 'dispatch_dblock',
0332 'prodDBlockToken': 'dblock_token',
0333 "GUID": 'guid',
0334 "checksum": 'checksum',
0335 "scopeIn": 'scope'
0336 }
0337 __may_be_united = ['guid', 'scope', 'dataset']
0338
0339 __output_file_keys = {
0340 'outFiles': '',
0341 'ddmEndPointOut': 'ddm_endpoint',
0342 'fileDestinationSE': 'storage_element',
0343 'dispatchDBlockTokenForOut': 'dispatch_dblock_token',
0344 'prodDBlockTokenForOut': 'dblock_token',
0345 'realDatasets': 'dataset',
0346 'destinationDblock': 'dblock',
0347 'destinationDBlockToken': 'destination_dblock_token',
0348 'scopeOut': 'scope',
0349 'logGUID': 'guid',
0350 'scopeLog': 'scope'
0351 }
0352
0353 __key_back_aliases_from_forward = None
0354 __key_reverse_aliases = None
0355 __key_aliases_snake = None
0356 input_files = None
0357 output_files = None
0358
0359 def __init__(self):
0360 super(JobDescription, self).__init__()
0361
0362 self.__key_back_aliases_from_forward = self.__key_back_aliases.copy()
0363 self.__key_reverse_aliases = {}
0364 self.__key_aliases_snake = {}
0365 self.input_files = {}
0366 self.output_files = {}
0367
0368 for key in self.__key_aliases:
0369 alias = self.__key_aliases[key]
0370 self.__key_back_aliases_from_forward[alias] = key
0371 self.__key_aliases_snake[camel_to_snake(key)] = alias
0372
0373 def get_input_file_prop(self, key):
0374 corresponding_key = self.__input_file_keys[key]
0375 ret = []
0376
0377 for f in self.input_files:
0378 ret.append(f if corresponding_key == '' else self.input_files[f][corresponding_key])
0379
0380 if corresponding_key in self.__may_be_united:
0381 return one_or_set(ret)
0382
0383 return join(ret)
0384
0385 def get_output_file_prop(self, key):
0386 log_file = self.log_file
0387
0388 if key == 'logGUID':
0389 return stringify_weird(self.output_files[log_file]['guid'])
0390 if key == 'scopeLog':
0391 return stringify_weird(self.output_files[log_file]['scope'])
0392
0393 corresponding_key = self.__output_file_keys[key]
0394 ret = []
0395
0396 for f in self.output_files:
0397 if key != 'scopeOut' or f != log_file:
0398 ret.append(f if corresponding_key == '' else self.output_files[f][corresponding_key])
0399
0400 if corresponding_key in self.__may_be_united:
0401 return one_or_set(ret)
0402
0403 return join(ret)
0404
0405 def load(self, new_desc):
0406 try:
0407 if isinstance(new_desc, basestring):
0408 new_desc = json.loads(new_desc)
0409 except Exception:
0410 if isinstance(new_desc, str):
0411 new_desc = json.loads(new_desc)
0412
0413 if "PandaID" in new_desc:
0414 logger.info("Parsing description to be of readable, easy to use format")
0415
0416 fixed = {}
0417
0418 self.input_files = get_input_files(new_desc)
0419 self.output_files = get_output_files(new_desc)
0420
0421 for key in new_desc:
0422 value = new_desc[key]
0423
0424 if key not in self.__input_file_keys and key not in self.__output_file_keys:
0425 old_key = key
0426 if key in self.__key_aliases:
0427 key = self.__key_aliases[key]
0428 else:
0429 key = camel_to_snake(key)
0430
0431 if key != old_key:
0432 self.__key_back_aliases_from_forward[key] = old_key
0433
0434 self.__key_reverse_aliases[old_key] = key
0435
0436 fixed[key] = parse_value(value)
0437
0438 new_desc = fixed
0439 else:
0440 self.input_files = new_desc['input_files']
0441 self.output_files = new_desc['output_files']
0442
0443 self.__holder = new_desc
0444
0445 def to_json(self, decompose=False, **kwargs):
0446 if decompose:
0447 prep = {}
0448
0449 for k in self.__holder:
0450 if k not in ['input_files', 'output_files']:
0451 if k in self.__key_back_aliases_from_forward:
0452 rev = self.__key_back_aliases_from_forward[k]
0453 else:
0454 rev = snake_to_camel(k)
0455 prep[rev] = stringify_weird(self.__holder[k])
0456
0457 for k in self.__output_file_keys:
0458 prep[k] = self.get_output_file_prop(k)
0459 for k in self.__input_file_keys:
0460 prep[k] = self.get_input_file_prop(k)
0461
0462 else:
0463 prep = self.__holder.copy()
0464 prep['input_files'] = self.input_files
0465 prep['output_files'] = self.output_files
0466
0467 return json.dumps(prep, **kwargs)
0468
0469 def get_description_parameter(self, key):
0470 if self.__holder is not None:
0471 if key in self.__holder:
0472 return self.__holder[key]
0473
0474 if key in self.__input_file_keys:
0475 logger.warning(("Old key JobDescription.%s is used. Better to use JobDescription.input_files[][%s] to "
0476 "access and manipulate this value.\n" % (key, self.__input_file_keys[key])) + self.get_traceback())
0477 return self.get_input_file_prop(key)
0478 if key in self.__output_file_keys:
0479 logger.warning(("Old key JobDescription.%s is used. Better to use JobDescription.output_files[][%s] to"
0480 " access and manipulate this value.\n" % (key, self.__output_file_keys[key])) + self.get_traceback())
0481 return self.get_output_file_prop(key)
0482
0483 snake_key = camel_to_snake(key)
0484 if snake_key in self.__key_aliases_snake:
0485 logger.warning(("Old key JobDescription.%s is used. Better to use JobDescription.%s to access and "
0486 "manipulate this value.\n" % (key, self.__key_aliases_snake[snake_key])) + self.get_traceback())
0487 return stringify_weird(self.__holder[self.__key_aliases_snake[snake_key]])
0488
0489 if key in self.__soft_key_aliases:
0490 return self.get_description_parameter(self.__soft_key_aliases[key])
0491
0492 raise AttributeError("Description parameter not found")
0493
0494 def set_description_parameter(self, key, value):
0495 if self.__holder is not None:
0496 if key in self.__holder:
0497 self.__holder[key] = value
0498 return True
0499
0500 if key in self.__input_file_keys:
0501 err = "Key JobDescription.%s is read-only\n" % key
0502 if key == 'inFiles':
0503 err += "Use JobDescription.input_files to manipulate input files"
0504 else:
0505 err += "Use JobDescription.input_files[][%s] to set up this parameter in files description" %\
0506 self.__input_file_keys[key]
0507 raise AttributeError(err)
0508
0509 if key in self.__output_file_keys:
0510 err = "Key JobDescription.%s is read-only\n" % key
0511 if key == 'outFiles':
0512 err += "Use JobDescription.output_files to manipulate output files"
0513 else:
0514 err += "Use JobDescription.output_files[][%s] to set up this parameter in files description" %\
0515 self.__output_file_keys[key]
0516 raise AttributeError(err)
0517
0518 snake_key = camel_to_snake(key)
0519 if snake_key in self.__key_aliases_snake:
0520 logger.warning(("Old key JobDescription.%s is used. Better to use JobDescription.%s to access and"
0521 "manipulate this value.\n" % (key, self.__key_aliases_snake[snake_key])) + self.get_traceback())
0522 self.__holder[self.__key_aliases_snake[snake_key]] = parse_value(value)
0523
0524 if key in self.__soft_key_aliases:
0525 return self.set_description_parameter(self.__soft_key_aliases[key], value)
0526
0527 return False
0528
0529 def get_traceback(self):
0530 tb = list(reversed(traceback.extract_stack()))
0531
0532 tb_str = '\n'
0533 for ii in enumerate(tb):
0534 if ii[0] < 3:
0535 continue
0536 i = ii[1]
0537 tb_str += '{file}:{line} (in {module}): {call}\n'.format(file=i[0],
0538 line=i[1],
0539 module=i[2],
0540 call=i[3])
0541 thread = threading.currentThread()
0542 return 'Traceback: (latest call first)' + tb_str + 'Thread: %s(%d)' % (thread.getName(), thread.ident)
0543
0544 def __getattr__(self, key):
0545 """
0546 Reflection of description values into Job instance properties if they are not shadowed.
0547 If there is no own property with corresponding name, the value of Description is used.
0548 Params and return described in __getattr__ interface.
0549 """
0550 try:
0551 return object.__getattribute__(self, key)
0552 except AttributeError:
0553 return self.get_description_parameter(key)
0554
0555 def __setattr__(self, key, value):
0556 """
0557 Reflection of description values into Job instance properties if they are not shadowed.
0558 If there is no own property with corresponding name, the value of Description is set.
0559 Params and return described in __setattr__ interface.
0560 """
0561 try:
0562 object.__getattribute__(self, key)
0563 return object.__setattr__(self, key, value)
0564 except AttributeError:
0565 if not self.set_description_parameter(key, value):
0566 return object.__setattr__(self, key, value)
0567
0568
0569 if __name__ == "__main__":
0570 import sys
0571 logging.basicConfig()
0572 logger.setLevel(logging.DEBUG)
0573
0574 jd = JobDescription()
0575 with open(sys.argv[1], "r") as f:
0576 contents = f.read()
0577
0578 jd.load(contents)
0579
0580 logger.debug(jd.id)
0581 logger.debug(jd.command)
0582 logger.debug(jd.PandaID)
0583 logger.debug(jd.scopeOut)
0584 logger.debug(jd.scopeLog)
0585 logger.debug(jd.fileDestinationSE)
0586 logger.debug(jd.inFiles)
0587 logger.debug(json.dumps(jd.output_files, indent=4, sort_keys=True))
0588
0589 logger.debug(jd.to_json(True, indent=4, sort_keys=True))