File indexing completed on 2026-04-10 08:39:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 """
0012 The implementation of data structure to host Job definition.
0013
0014 The main reasons for such incapsulation are to
0015 - apply in one place all data validation actions (for attributes and values)
0016 - introduce internal information schema (names of attributes) to remove dependency
0017 with data structure, formats, names from external source (PanDA)
0018
0019 :author: Alexey Anisenkov
0020 :contact: anisyonk@cern.ch
0021 :date: February 2018
0022 """
0023
0024 import os
0025 import re
0026 import ast
0027 import shlex
0028 import pipes
0029 from time import sleep
0030
0031 from .basedata import BaseData
0032 from .filespec import FileSpec
0033 from pilot.util.auxiliary import get_object_size, get_key_value
0034 from pilot.util.constants import LOG_TRANSFER_NOT_DONE
0035 from pilot.util.filehandling import get_guid, get_valid_path_from_list
0036 from pilot.util.timing import get_elapsed_real_time
0037
0038 import logging
0039 logger = logging.getLogger(__name__)
0040
0041
0042 class JobData(BaseData):
0043 """
0044 High-level object to host Job definition/settings
0045 """
0046
0047
0048
0049
0050
0051 jobid = None
0052 taskid = None
0053 jobparams = ""
0054 transformation = ""
0055
0056 status = {'LOG_TRANSFER': LOG_TRANSFER_NOT_DONE}
0057 corecount = 1
0058 platform = ""
0059 is_eventservice = False
0060 is_eventservicemerge = False
0061 is_hpo = False
0062 transfertype = ""
0063 accessmode = ""
0064 processingtype = ""
0065 maxcpucount = 0
0066 allownooutput = ""
0067
0068
0069 workdir = ""
0070 workdirsizes = []
0071 fileinfo = {}
0072 piloterrorcode = 0
0073 piloterrorcodes = []
0074 piloterrordiag = ""
0075 piloterrordiags = []
0076 transexitcode = 0
0077 exeerrorcode = 0
0078 exeerrordiag = ""
0079 exitcode = 0
0080 exitmsg = ""
0081 state = ""
0082 serverstate = ""
0083 stageout = ""
0084 metadata = {}
0085 cpuconsumptionunit = "s"
0086 cpuconsumptiontime = -1
0087 cpuconversionfactor = 1
0088 nevents = 0
0089 neventsw = 0
0090 dbtime = None
0091 dbdata = None
0092 resimevents = None
0093 payload = ""
0094 utilities = {}
0095 pid = None
0096 pgrp = None
0097 sizes = {}
0098 currentsize = 0
0099 command = ""
0100 setup = ""
0101 zombies = []
0102 memorymonitor = ""
0103 actualcorecount = 0
0104 corecounts = []
0105 looping_check = True
0106
0107
0108 t0 = None
0109
0110 overwrite_queuedata = {}
0111 overwrite_storagedata = {}
0112
0113 zipmap = ""
0114 imagename = ""
0115 imagename_jobdef = ""
0116 usecontainer = False
0117
0118
0119 attemptnr = 0
0120 destinationdblock = ""
0121 datasetin = ""
0122 debug = False
0123 debug_command = ''
0124 produserid = ""
0125 jobdefinitionid = ""
0126 infilesguids = ""
0127 indata = []
0128 outdata = []
0129 logdata = []
0130
0131
0132 preprocess = {}
0133 postprocess = {}
0134 coprocess = {}
0135
0136 containeroptions = {}
0137 use_vp = False
0138
0139
0140
0141 homepackage = ""
0142 jobsetid = ""
0143 noexecstrcnv = None
0144 swrelease = ""
0145 writetofile = ""
0146
0147
0148 alrbuserplatform = ""
0149
0150
0151 _rawdata = {}
0152
0153
0154 _keys = {int: ['corecount', 'piloterrorcode', 'transexitcode', 'exitcode', 'cpuconversionfactor', 'exeerrorcode',
0155 'attemptnr', 'nevents', 'neventsw', 'pid', 'cpuconsumptiontime', 'maxcpucount', 'actualcorecount'],
0156 str: ['jobid', 'taskid', 'jobparams', 'transformation', 'destinationdblock', 'exeerrordiag'
0157 'state', 'serverstate', 'workdir', 'stageout',
0158 'platform', 'piloterrordiag', 'exitmsg', 'produserid', 'jobdefinitionid', 'writetofile',
0159 'cpuconsumptionunit', 'homepackage', 'jobsetid', 'payload', 'processingtype',
0160 'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype',
0161 'datasetin',
0162 'infilesguids', 'memorymonitor', 'allownooutput'],
0163 list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts'],
0164 dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess',
0165 'postprocess', 'coprocess', 'containeroptions'],
0166 bool: ['is_eventservice', 'is_eventservicemerge', 'is_hpo', 'noexecstrcnv', 'debug', 'usecontainer',
0167 'use_vp', 'looping_check']
0168 }
0169
0170 def __init__(self, data, use_kmap=True):
0171 """
0172 :param data: input dictionary of data settings
0173 """
0174
0175 self.infosys = None
0176 self._rawdata = data
0177 self.load(data, use_kmap=use_kmap)
0178
0179
0180 if self.is_hpo and False:
0181 self.is_eventservice = True
0182
0183 def init(self, infosys):
0184 """
0185 :param infosys: infosys object
0186 """
0187 self.infosys = infosys
0188 self.indata = self.prepare_infiles(self._rawdata)
0189 self.outdata, self.logdata = self.prepare_outfiles(self._rawdata)
0190
0191
0192 if self.imagename_jobdef and not self.imagename:
0193 logger.debug('using imagename_jobdef as imagename (\"%s\")' % self.imagename_jobdef)
0194 self.imagename = self.imagename_jobdef
0195 elif self.imagename_jobdef and self.imagename:
0196 logger.debug('using imagename from jobparams (ignoring imagename_jobdef)')
0197 elif not self.imagename_jobdef and self.imagename:
0198 logger.debug('using imagename from jobparams (imagename_jobdef not set)')
0199
0200 if self.imagename:
0201
0202 image_base = os.environ.get('IMAGE_BASE', '')
0203 if not image_base and 'IMAGE_BASE' in infosys.queuedata.catchall:
0204 image_base = get_key_value(infosys.queuedata.catchall, key='IMAGE_BASE')
0205 if image_base:
0206 os.environ['ALRB_CONT_UNPACKEDDIR'] = image_base
0207 paths = [os.path.join(image_base, os.path.basename(self.imagename)),
0208 os.path.join(image_base, self.imagename)]
0209 local_path = get_valid_path_from_list(paths)
0210 if local_path:
0211 self.imagename = local_path
0212
0213
0214
0215 def prepare_infiles(self, data):
0216 """
0217 Construct FileSpec objects for input files from raw dict `data`
0218 :return: list of validated `FileSpec` objects
0219 """
0220
0221
0222 self.set_accessmode()
0223
0224 access_keys = ['allow_lan', 'allow_wan', 'direct_access_lan', 'direct_access_wan']
0225 if not self.infosys or not self.infosys.queuedata:
0226 self.show_access_settings(access_keys)
0227
0228
0229 kmap = self.get_kmap()
0230
0231 try:
0232 ksources = dict([k, self.clean_listdata(data.get(k, ''), list, k, [])] for k in list(kmap.values()))
0233 except Exception:
0234 ksources = dict([k, self.clean_listdata(data.get(k, ''), list, k, [])] for k in kmap.itervalues())
0235
0236 ret, lfns = [], set()
0237 for ind, lfn in enumerate(ksources.get('inFiles', [])):
0238 if lfn in ['', 'NULL'] or lfn in lfns:
0239 continue
0240 lfns.add(lfn)
0241 idat = {}
0242
0243 try:
0244 for attrname, k in list(kmap.items()):
0245 idat[attrname] = ksources[k][ind] if len(ksources[k]) > ind else None
0246 except Exception:
0247 for attrname, k in kmap.iteritems():
0248 idat[attrname] = ksources[k][ind] if len(ksources[k]) > ind else None
0249
0250 accessmode = 'copy'
0251
0252
0253
0254 if (self.is_analysis() or self.transfertype == 'direct') and idat.get('storage_token') != 'local':
0255 accessmode = 'direct'
0256 if self.accessmode:
0257 accessmode = self.accessmode
0258
0259 idat['accessmode'] = accessmode
0260
0261 if self.infosys and self.infosys.queuedata:
0262 for key in access_keys:
0263 idat[key] = getattr(self.infosys.queuedata, key)
0264
0265 finfo = FileSpec(filetype='input', **idat)
0266 logger.info('added file \'%s\' with accessmode \'%s\'' % (lfn, accessmode))
0267 ret.append(finfo)
0268
0269 return ret
0270
0271 def set_accessmode(self):
0272 """
0273 Set the accessmode field using jobparams.
0274
0275 :return:
0276 """
0277 self.accessmode = None
0278 if '--accessmode=direct' in self.jobparams:
0279 self.accessmode = 'direct'
0280 if '--accessmode=copy' in self.jobparams or '--useLocalIO' in self.jobparams:
0281 self.accessmode = 'copy'
0282
0283 @staticmethod
0284 def show_access_settings(access_keys):
0285 """
0286 Show access settings for the case job.infosys.queuedata is not initialized.
0287
0288 :param access_keys: list of access keys (list).
0289 :return:
0290 """
0291 dat = dict([k, getattr(FileSpec, k, None)] for k in access_keys)
0292 try:
0293 msg = ', '.join(["%s=%s" % (k, v) for k, v in sorted(dat.iteritems())])
0294 except Exception:
0295 msg = ', '.join(["%s=%s" % (k, v) for k, v in sorted(dat.items())])
0296 logger.info('job.infosys.queuedata is not initialized: the following access settings will be used by default: %s' % msg)
0297
0298 @staticmethod
0299 def get_kmap():
0300 """
0301 Return the kmap dictionary for server data to pilot conversions.
0302
0303 :return: kmap (dict).
0304 """
0305 kmap = {
0306
0307 'lfn': 'inFiles',
0308
0309 'dataset': 'realDatasetsIn', 'guid': 'GUID',
0310 'filesize': 'fsize', 'checksum': 'checksum', 'scope': 'scopeIn',
0311
0312 'storage_token': 'prodDBlockToken',
0313 'ddmendpoint': 'ddmEndPointIn',
0314 }
0315
0316 return kmap
0317
0318 def prepare_outfiles(self, data):
0319 """
0320 Construct validated FileSpec objects for output and log files from raw dict `data`
0321 Note: final preparation for output files can only be done after the payload has finished in case the payload
0322 has produced a job report with e.g. output file guids. This is verified in
0323 pilot/control/payload/process_job_report().
0324
0325 :param data:
0326 :return: (list of `FileSpec` for output, list of `FileSpec` for log)
0327 """
0328
0329
0330 kmap = {
0331
0332 'lfn': 'outFiles',
0333
0334 'dataset': 'realDatasets', 'scope': 'scopeOut',
0335
0336 'ddmendpoint': 'ddmEndPointOut',
0337 }
0338
0339 try:
0340 ksources = dict([k, self.clean_listdata(data.get(k, ''), list, k, [])] for k in list(kmap.values()))
0341 except Exception:
0342 ksources = dict([k, self.clean_listdata(data.get(k, ''), list, k, [])] for k in kmap.itervalues())
0343
0344
0345 pilot_logfile = os.environ.get('PILOT_LOGFILE', None)
0346 if pilot_logfile:
0347
0348 old_logfile = data.get('logFile')
0349 data['logFile'] = pilot_logfile
0350
0351 outfiles = ksources.get('outFiles', None)
0352 if outfiles and old_logfile in outfiles:
0353
0354 ksources['outFiles'] = [pilot_logfile if x == old_logfile else x for x in ksources.get('outFiles')]
0355
0356 log_lfn = data.get('logFile')
0357 if log_lfn:
0358
0359 scope_out = []
0360 for lfn in ksources.get('outFiles', []):
0361 if lfn == log_lfn:
0362 scope_out.append(data.get('scopeLog'))
0363 else:
0364 if not ksources['scopeOut']:
0365 raise Exception('Failed to extract scopeOut parameter from Job structure sent by Panda, please check input format!')
0366 scope_out.append(ksources['scopeOut'].pop(0))
0367 ksources['scopeOut'] = scope_out
0368
0369 return self._get_all_output(ksources, kmap, log_lfn, data)
0370
0371 def _get_all_output(self, ksources, kmap, log_lfn, data):
0372 """
0373 Create lists of FileSpecs for output + log files.
0374 Helper function for prepare_output().
0375
0376 :param ksources:
0377 :param kmap:
0378 :param log_lfn: log file name (string).
0379 :param data:
0380 :return: ret_output (list of FileSpec), ret_log (list of FileSpec)
0381 """
0382
0383 ret_output, ret_log = [], []
0384
0385 lfns = set()
0386 for ind, lfn in enumerate(ksources['outFiles']):
0387 if lfn in ['', 'NULL'] or lfn in lfns:
0388 continue
0389 lfns.add(lfn)
0390 idat = {}
0391 try:
0392 for attrname, k in list(kmap.items()):
0393 idat[attrname] = ksources[k][ind] if len(ksources[k]) > ind else None
0394 except Exception:
0395 for attrname, k in kmap.iteritems():
0396 idat[attrname] = ksources[k][ind] if len(ksources[k]) > ind else None
0397
0398 ftype = 'output'
0399 ret = ret_output
0400 if lfn == log_lfn:
0401 ftype = 'log'
0402 idat['guid'] = data.get('logGUID')
0403 ret = ret_log
0404 elif lfn.endswith('.lib.tgz'):
0405 idat['guid'] = get_guid()
0406
0407 finfo = FileSpec(filetype=ftype, **idat)
0408 ret.append(finfo)
0409
0410 return ret_output, ret_log
0411
0412 def __getitem__(self, key):
0413 """
0414 Temporary Integration function to keep dict-based access for old logic in compatible way
0415 TO BE REMOVED ONCE all fields will be moved to Job object attributes
0416 """
0417
0418 if key == 'infosys':
0419 return self.infosys
0420
0421
0422
0423
0424 return self._rawdata[key]
0425
0426 def __setitem__(self, key, val):
0427 """
0428 Temporary Integration function to keep dict-based access for old logic in compatible way
0429 TO BE REMOVED ONCE all fields will be moved to Job object attributes
0430 """
0431
0432 self._rawdata[key] = val
0433
0434 def __contains__(self, key):
0435 """
0436 Temporary Integration function to keep dict-based access for old logic in compatible way
0437 TO BE REMOVED ONCE all fields will be moved to Job object attributes
0438 """
0439
0440 return key in self._rawdata
0441
0442 def get(self, key, defval=None):
0443 """
0444 Temporary Integration function to keep dict-based access for old logic in compatible way
0445 TO BE REMOVED ONCE all fields will be moved to Job object attributes
0446 """
0447
0448 return self._rawdata.get(key, defval)
0449
0450 def load(self, data, use_kmap=True):
0451 """
0452 Construct and initialize data from ext source
0453 :param data: input dictionary of job data settings
0454 """
0455
0456
0457
0458
0459
0460
0461
0462
0463
0464 kmap = {
0465 'jobid': 'PandaID',
0466 'taskid': 'taskID',
0467 'jobparams': 'jobPars',
0468 'corecount': 'coreCount',
0469 'platform': 'cmtConfig',
0470 'infilesguids': 'GUID',
0471 'attemptnr': 'attemptNr',
0472 'datasetin': 'realDatasetsIn',
0473 'processingtype': 'processingType',
0474 'transfertype': 'transferType',
0475 'destinationdblock': 'destinationDblock',
0476 'noexecstrcnv': 'noExecStrCnv',
0477 'swrelease': 'swRelease',
0478 'jobsetid': 'jobsetID',
0479 'produserid': 'prodUserID',
0480 'jobdefinitionid': 'jobDefinitionID',
0481 'writetofile': 'writeToFile',
0482 'is_eventservice': 'eventService',
0483 'is_eventservicemerge': 'eventServiceMerge',
0484 'is_hpo': 'isHPO',
0485 'use_vp': 'useVP',
0486 'maxcpucount': 'maxCpuCount',
0487 'allownooutput': 'allowNoOutput',
0488 'imagename_jobdef': 'container_name',
0489 'containeroptions': 'containerOptions',
0490 'looping_check': 'loopingCheck'
0491 } if use_kmap else {}
0492
0493 self._load_data(data, kmap)
0494
0495 def is_analysis(self):
0496 """
0497 Determine whether the job is an analysis user job or not.
0498 :return: True in case of user analysis job
0499 """
0500
0501 is_analysis = self.transformation.startswith('https://') or self.transformation.startswith('http://')
0502
0503
0504
0505 return is_analysis
0506
0507 def is_build_job(self):
0508 """
0509 Check if the job is a build job.
0510 (i.e. check if the job has an output file that is a lib file).
0511
0512 :return: boolean
0513 """
0514
0515 for fspec in self.outdata:
0516 if '.lib.' in fspec.lfn and '.log.' not in fspec.lfn:
0517 return True
0518
0519 return False
0520
0521 def is_local(self):
0522 """
0523 Should the input files be accessed locally?
0524 Note: all input files will have storage_token set to local in that case.
0525
0526 :return: boolean.
0527 """
0528
0529 for fspec in self.indata:
0530 if fspec.storage_token == 'local' and '.lib.' not in fspec.lfn:
0531 return True
0532
0533 def has_remoteio(self):
0534 """
0535 Check status of input file transfers and determine either direct access mode will be used or not.
0536 :return: True if at least one file should use direct access mode
0537 """
0538
0539 return any([fspec.status == 'remote_io' for fspec in self.indata])
0540
0541 def clean(self):
0542 """
0543 Validate and finally clean up required data values (object properties) if need
0544 :return: None
0545 """
0546
0547 pass
0548
0549
0550
0551
0552
0553
0554
0555
0556 def clean__corecount(self, raw, value):
0557 """
0558 Verify and validate value for the corecount key (set to 1 if not set)
0559 """
0560
0561
0562
0563
0564 athena_corecount = os.environ.get('ATHENA_PROC_NUMBER')
0565 if athena_corecount:
0566 try:
0567 value = int(athena_corecount)
0568 except Exception:
0569 logger.info("ATHENA_PROC_NUMBER is not properly set.. ignored, data=%s" % athena_corecount)
0570
0571 return value if value else 1
0572
0573 def clean__platform(self, raw, value):
0574 """
0575 Verify and validate value for the platform key.
0576 Set the alrbuserplatform value if encoded in platform/cmtconfig string.
0577
0578 :param raw: (unused).
0579 :param value: platform (string).
0580 :return: updated platform (string).
0581 """
0582
0583 v = value if value.lower() not in ['null', 'none'] else ''
0584
0585 if '@' in v:
0586 self.alrbuserplatform = v.split('@')[1]
0587 v = v.split('@')[0]
0588
0589 return v
0590
0591 def clean__jobparams(self, raw, value):
0592 """
0593 Verify and validate value for the jobparams key
0594 Extract value from jobparams not related to job options.
0595 The function will in particular extract and remove --overwriteQueueData, ZIP_MAP and --containerimage.
0596 It will remove the old Pilot 1 option --overwriteQueuedata which should be replaced with --overwriteQueueData.
0597
0598 :param raw: (unused).
0599 :param value: job parameters (string).
0600 :return: updated job parameters (string).
0601 """
0602
0603
0604 logger.info('cleaning jobparams: %s' % value)
0605
0606
0607
0608 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0609 try:
0610 user = __import__('pilot.user.%s.jobdata' % pilot_user, globals(), locals(), [pilot_user], 0)
0611 exclusions, value = user.jobparams_prefiltering(value)
0612 except Exception as e:
0613 logger.warning('caught exception in user code: %s' % e)
0614 exclusions = {}
0615
0616
0617 ret = re.sub(r"--overwriteQueuedata={.*?}", "", value)
0618
0619
0620 options, ret = self.parse_args(ret, {'--overwriteQueueData': lambda x: ast.literal_eval(x) if x else {},
0621 '--overwriteStorageData': lambda x: ast.literal_eval(x) if x else {}}, remove=True)
0622 self.overwrite_queuedata = options.get('--overwriteQueueData', {})
0623 self.overwrite_storagedata = options.get('--overwriteStorageData', {})
0624
0625
0626
0627
0628 pattern = r" \'?<ZIP_MAP>(.+)<\/ZIP_MAP>\'?"
0629 pattern = re.compile(pattern)
0630
0631 result = re.findall(pattern, ret)
0632 if result:
0633 self.zipmap = result[0]
0634
0635 ret = re.sub(pattern, '', ret)
0636
0637
0638 ret, imagename = self.extract_container_image(ret)
0639 if imagename != "":
0640 self.imagename = imagename
0641
0642 try:
0643 ret = user.jobparams_postfiltering(ret, exclusions=exclusions)
0644 except Exception as e:
0645 logger.warning('caught exception in user code: %s' % e)
0646
0647 logger.info('cleaned jobparams: %s' % ret)
0648
0649
0650
0651
0652
0653
0654
0655
0656 return ret
0657
0658 def extract_container_image(self, jobparams):
0659 """
0660 Extract the container image from the job parameters if present, and remove it.
0661
0662 :param jobparams: job parameters (string).
0663 :return: updated job parameters (string), extracted image name (string).
0664 """
0665
0666 imagename = ""
0667
0668
0669 _pattern = r'(\ \-\-containerImage\=?\s?[\S]+)'
0670 pattern = re.compile(_pattern)
0671 image_option = re.findall(pattern, jobparams)
0672
0673 if image_option and image_option[0] != "":
0674
0675 imagepattern = re.compile(r" \'?\-\-containerImage\=?\ ?([\S]+)\ ?\'?")
0676
0677 image = re.findall(imagepattern, jobparams)
0678 if image and image[0] != "":
0679 try:
0680 imagename = image[0]
0681 except Exception as e:
0682 logger.warning('failed to extract image name: %s' % e)
0683 else:
0684 logger.info("extracted image from jobparams: %s" % imagename)
0685 else:
0686 logger.warning("image could not be extract from %s" % jobparams)
0687
0688
0689 jobparams = re.sub(_pattern, "", jobparams)
0690 logger.info("removed the %s option from job parameters: %s" % (image_option[0], jobparams))
0691
0692 return jobparams, imagename
0693
0694 @classmethod
0695 def parse_args(self, data, options, remove=False):
0696 """
0697 Extract option/values from string containing command line options (arguments)
0698 :param data: input command line arguments (raw string)
0699 :param options: dict of option names to be considered: (name, type), type is a cast function to be applied with result value
0700 :param remove: boolean, if True then exclude specified options from returned raw string of command line arguments
0701 :return: tuple: (dict of extracted options, raw string of final command line options)
0702 """
0703
0704 logger.debug('extract options=%s from data=%s' % (list(options.keys()), data))
0705
0706 if not options:
0707 return {}, data
0708
0709 opts, pargs = self.get_opts_pargs(data)
0710 if not opts:
0711 return {}, data
0712
0713 ret = self.get_ret(options, opts)
0714
0715
0716 rawdata = data
0717 if remove:
0718 final_args = []
0719 for arg in pargs:
0720 if isinstance(arg, (tuple, list)):
0721 if arg[0] not in options:
0722 if arg[1] is None:
0723 arg.pop()
0724 final_args.extend(arg)
0725 else:
0726 final_args.append(arg)
0727 rawdata = " ".join(pipes.quote(e) for e in final_args)
0728
0729 return ret, rawdata
0730
0731 @staticmethod
0732 def get_opts_pargs(data):
0733 """
0734 Get the opts and pargs variables.
0735
0736 :param data: input command line arguments (raw string)
0737 :return: opts (dict), pargs (list)
0738 """
0739
0740 try:
0741 args = shlex.split(data)
0742 except ValueError as e:
0743 logger.error('Failed to parse input arguments from data=%s, error=%s .. skipped.' % (data, e))
0744 return {}, data
0745
0746 opts, curopt, pargs = {}, None, []
0747 for arg in args:
0748 if arg.startswith('-'):
0749 if curopt is not None:
0750 opts[curopt] = None
0751 pargs.append([curopt, None])
0752 curopt = arg
0753 continue
0754 if curopt is None:
0755 pargs.append(arg)
0756 else:
0757 opts[curopt] = arg
0758 pargs.append([curopt, arg])
0759 curopt = None
0760 if curopt:
0761 pargs.append([curopt, None])
0762
0763 return opts, pargs
0764
0765 @staticmethod
0766 def get_ret(options, opts):
0767 """
0768 Get the ret variable from the options.
0769
0770 :param options:
0771 :param opts:
0772 :return: ret (dict).
0773 """
0774
0775 ret = {}
0776 try:
0777 _items = list(options.items())
0778 except Exception:
0779 _items = options.iteritems()
0780 for opt, fcast in _items:
0781 val = opts.get(opt)
0782 try:
0783 val = fcast(val) if callable(fcast) else val
0784 except Exception as e:
0785 logger.error('Failed to extract value for option=%s from data=%s: cast function=%s failed, exception=%s .. skipped' % (opt, val, fcast, e))
0786 continue
0787 ret[opt] = val
0788
0789 return ret
0790
0791 def add_workdir_size(self, workdir_size):
0792 """
0793 Add a measured workdir size to the workdirsizes field.
0794 The function will deduce any input and output file sizes from the workdir size.
0795
0796 :param workdir_size: workdir size (int).
0797 :return:
0798 """
0799
0800
0801 try:
0802 if not isinstance(workdir_size, (int, long)):
0803 try:
0804 workdir_size = long(workdir_size)
0805 except Exception as e:
0806 logger.warning('failed to convert %s to long: %s' % (workdir_size, e))
0807 return
0808 except Exception:
0809 if not isinstance(workdir_size, int):
0810 try:
0811 workdir_size = int(workdir_size)
0812 except Exception as e:
0813 logger.warning('failed to convert %s to int: %s' % (workdir_size, e))
0814 return
0815 try:
0816 total_size = long(0)
0817 except Exception:
0818 total_size = 0
0819
0820 if os.path.exists(self.workdir):
0821
0822
0823
0824
0825
0826
0827
0828
0829
0830 for fspec in self.indata + self.outdata:
0831 if fspec.filetype == 'input' and fspec.status != 'transferred':
0832 continue
0833 pfn = os.path.join(self.workdir, fspec.lfn)
0834 if not os.path.isfile(pfn):
0835 msg = "pfn file=%s does not exist (skip from workdir size calculation)" % pfn
0836 logger.info(msg)
0837 else:
0838 total_size += os.path.getsize(pfn)
0839
0840 logger.info("total size of present input+output files: %d B (workdir size: %d B)" %
0841 (total_size, workdir_size))
0842 workdir_size -= total_size
0843
0844 self.workdirsizes.append(workdir_size)
0845
0846 def get_max_workdir_size(self):
0847 """
0848 Return the maximum disk space used by the payload.
0849
0850 :return: workdir size (int).
0851 """
0852
0853 try:
0854 maxdirsize = long(0)
0855 except Exception:
0856 maxdirsize = 0
0857
0858 if self.workdirsizes != []:
0859
0860 maxdirsize = max(self.workdirsizes)
0861 else:
0862 logger.warning("found no stored workdir sizes")
0863
0864 return maxdirsize
0865
0866 def get_lfns_and_guids(self):
0867 """
0868 Return ordered lists with the input file LFNs and GUIDs.
0869
0870 :return: list of input files, list of corresponding GUIDs.
0871 """
0872
0873 lfns = []
0874 guids = []
0875
0876 for fspec in self.indata:
0877 lfns.append(fspec.lfn)
0878 guids.append(fspec.guid)
0879
0880 return lfns, guids
0881
0882 def get_status(self, key):
0883 """
0884
0885 Return the value for the given key (e.g. LOG_TRANSFER) from the status dictionary.
0886 LOG_TRANSFER_NOT_DONE is returned if job object is not defined for key='LOG_TRANSFER'.
0887 If no key is found, None will be returned.
0888
0889 :param key: key name (string).
0890 :return: corresponding key value in job.status dictionary (string).
0891 """
0892
0893 log_transfer = self.status.get(key, None)
0894
0895 if not log_transfer:
0896 if key == 'LOG_TRANSFER':
0897 log_transfer = LOG_TRANSFER_NOT_DONE
0898
0899 return log_transfer
0900
0901 def get_job_option_for_input_name(self, input_name):
0902 """
0903 Expecting something like --inputHitsFile=@input_name in jobparams.
0904
0905 :returns: job_option such as --inputHitsFile
0906 """
0907 job_options = self.jobparams.split(' ')
0908 input_name_option = '=@%s' % input_name
0909 for job_option in job_options:
0910 if input_name_option in job_option:
0911 return job_option.split("=")[0]
0912 return None
0913
0914 def process_writetofile(self):
0915 """
0916 Expecting writetofile from the job definition.
0917 The format is 'inputFor_file1:lfn1,lfn2^inputFor_file2:lfn3,lfn4'
0918
0919 format writetofile_dictionary = {'inputFor_file1': [lfn1, lfn2], 'inputFor_file2': [lfn3, lfn4]}
0920 """
0921 writetofile_dictionary = {}
0922 if self.writetofile:
0923 fileinfos = self.writetofile.split("^")
0924 for fileinfo in fileinfos:
0925 if ':' in fileinfo:
0926 input_name, input_list = fileinfo.split(":")
0927 writetofile_dictionary[input_name] = input_list.split(',')
0928 else:
0929 logger.error("writeToFile doesn't have the correct format, expecting a separator \':\' for %s" % fileinfo)
0930
0931 if writetofile_dictionary:
0932 for input_name in writetofile_dictionary:
0933 input_name_new = input_name + '.txt'
0934 input_name_full = os.path.join(self.workdir, input_name_new)
0935 f = open(input_name_full, 'w')
0936 job_option = self.get_job_option_for_input_name(input_name)
0937 if not job_option:
0938 logger.error("Unknown job option format, expected job options such as \'--inputHitsFile\' for input file: %s" % input_name)
0939 else:
0940 f.write("%s\n" % job_option)
0941 for input_file in writetofile_dictionary[input_name]:
0942 f.write("%s\n" % input_file)
0943 f.close()
0944 logger.info("Wrote input file list to file %s: %s" % (input_name_full, writetofile_dictionary[input_name]))
0945
0946 self.jobparams = self.jobparams.replace(input_name, input_name_new)
0947 if job_option:
0948 self.jobparams = self.jobparams.replace('%s=' % job_option, '')
0949 self.jobparams = self.jobparams.replace('--autoConfiguration=everything', '')
0950 logger.info("jobparams after processing writeToFile: %s" % self.jobparams)
0951
0952 def add_size(self, size):
0953 """
0954 Add a size measurement to the sizes field at the current time stamp.
0955 A size measurement is in Bytes.
0956
0957 :param size: size of object in Bytes (int).
0958 :return:
0959 """
0960
0961
0962 if not self.t0:
0963 self.t0 = os.times()
0964
0965
0966 time_stamp = get_elapsed_real_time(t0=self.t0)
0967
0968
0969 self.sizes[time_stamp] = size
0970
0971 def get_size(self):
0972 """
0973 Determine the size (B) of the job object.
0974
0975 :return: size (int).
0976 """
0977
0978
0979 try:
0980 self.currentsize = get_object_size(self)
0981 except Exception:
0982 pass
0983 return self.currentsize
0984
0985 def collect_zombies(self, tn=None):
0986 """
0987 Collect zombie child processes, tn is the max number of loops, plus 1,
0988 to avoid infinite looping even if some child processes really get wedged;
0989 tn=None means it will keep going until all child zombies have been collected.
0990
0991 :param tn: max depth (int).
0992 :return:
0993 """
0994
0995 sleep(1)
0996
0997 if self.zombies and tn > 1:
0998 logger.info("--- collectZombieJob: --- %d, %s" % (tn, str(self.zombies)))
0999 tn -= 1
1000 for x in self.zombies:
1001 try:
1002 logger.info("zombie collector trying to kill pid %s" % str(x))
1003 _id, rc = os.waitpid(x, os.WNOHANG)
1004 except OSError as e:
1005 logger.info("harmless exception when collecting zombies: %s" % e)
1006 self.zombies.remove(x)
1007 else:
1008 if _id:
1009 self.zombies.remove(x)
1010 self.collect_zombies(tn=tn)
1011
1012 if self.zombies and not tn:
1013
1014
1015 for x in self.zombies:
1016 try:
1017 _id, rc = os.waitpid(x, 0)
1018 except OSError as e:
1019 logger.info("harmless exception when collecting zombie jobs, %s" % str(e))
1020 self.zombies.remove(x)
1021 else:
1022 if _id:
1023 self.zombies.remove(x)
1024 self.collect_zombies(tn=tn)
1025
1026 def only_copy_to_scratch(self):
1027 """
1028 Determine if the payload only has copy-to-scratch input.
1029 In this case, there should be no --usePFCTurl or --directIn in the job parameters.
1030
1031 :return: True if only copy-to-scratch. False if at least one file should use direct access mode
1032 """
1033
1034 for fspec in self.indata:
1035 if fspec.status == 'remote_io':
1036 return False
1037
1038 return True
1039
1040 def reset_errors(self):
1041 """
1042
1043 :return:
1044 """
1045
1046 self.piloterrorcode = 0
1047 self.piloterrorcodes = []
1048 self.piloterrordiag = ""
1049 self.piloterrordiags = []
1050 self.transexitcode = 0
1051 self.exeerrorcode = 0
1052 self.exeerrordiag = ""
1053 self.exitcode = 0
1054 self.exitmsg = ""
1055 self.corecounts = []
1056
1057 def to_json(self):
1058 from json import dumps
1059 return dumps(self, default=lambda o: o.__dict__)