Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:16

0001 # Licensed under the Apache License, Version 2.0 (the "License");
0002 # you may not use this file except in compliance with the License.
0003 # You may obtain a copy of the License at
0004 # http://www.apache.org/licenses/LICENSE-2.0
0005 #
0006 # Authors:
0007 # - Alexey Anisenkov, anisyonk@cern.ch, 2018-2019
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2019
0009 # - Wen Guan, wen.guan@cern.ch, 2018
0010 
0011 """
0012 The implementation of data structure to host Job definition.
0013 
0014 The main reasons for such incapsulation are to
0015  - apply in one place all data validation actions (for attributes and values)
0016  - introduce internal information schema (names of attributes) to remove dependency
0017  with data structure, formats, names from external source (PanDA)
0018 
0019 :author: Alexey Anisenkov
0020 :contact: anisyonk@cern.ch
0021 :date: February 2018
0022 """
0023 
0024 import os
0025 import re
0026 import ast
0027 import shlex
0028 import pipes
0029 from time import sleep
0030 
0031 from .basedata import BaseData
0032 from .filespec import FileSpec
0033 from pilot.util.auxiliary import get_object_size, get_key_value
0034 from pilot.util.constants import LOG_TRANSFER_NOT_DONE
0035 from pilot.util.filehandling import get_guid, get_valid_path_from_list
0036 from pilot.util.timing import get_elapsed_real_time
0037 
0038 import logging
0039 logger = logging.getLogger(__name__)
0040 
0041 
0042 class JobData(BaseData):
0043     """
0044         High-level object to host Job definition/settings
0045     """
0046 
0047     # ## put explicit list of all the attributes with comments for better inline-documentation by Sphinx
0048     # ## FIX ME LATER: use proper doc format
0049     # ## incomplete list of attributes .. to be extended once becomes used
0050 
0051     jobid = None                   # unique Job identifier (forced to be a string)
0052     taskid = None                  # unique Task identifier, the task that this job belongs to (forced to be a string)
0053     jobparams = ""                 # job parameters defining the execution of the job
0054     transformation = ""            # script execution name
0055     # current job status; format = {key: value, ..} e.g. key='LOG_TRANSFER', value='DONE'
0056     status = {'LOG_TRANSFER': LOG_TRANSFER_NOT_DONE}
0057     corecount = 1                  # Number of cores as requested by the task
0058     platform = ""                  # cmtconfig value from the task definition
0059     is_eventservice = False        # True for event service jobs
0060     is_eventservicemerge = False   # True for event service merge jobs
0061     is_hpo = False                 # True for HPO jobs
0062     transfertype = ""              # direct access instruction from server
0063     accessmode = ""                # direct access instruction from jobparams
0064     processingtype = ""            # e.g. nightlies
0065     maxcpucount = 0                # defines what is a looping job (seconds)
0066     allownooutput = ""             # used to disregard empty files from job report
0067 
0068     # set by the pilot (not from job definition)
0069     workdir = ""                   # working directory for this job
0070     workdirsizes = []              # time ordered list of work dir sizes
0071     fileinfo = {}                  #
0072     piloterrorcode = 0             # current pilot error code
0073     piloterrorcodes = []           # ordered list of stored pilot error codes
0074     piloterrordiag = ""            # current pilot error diagnostics
0075     piloterrordiags = []           # ordered list of stored pilot error diagnostics
0076     transexitcode = 0              # payload/trf exit code
0077     exeerrorcode = 0               #
0078     exeerrordiag = ""              #
0079     exitcode = 0                   #
0080     exitmsg = ""                   #
0081     state = ""                     # internal pilot states; running, failed, finished, holding, stagein, stageout
0082     serverstate = ""               # server job states; starting, running, finished, holding, failed
0083     stageout = ""                  # stage-out identifier, e.g. log
0084     metadata = {}                  # payload metadata (job report)
0085     cpuconsumptionunit = "s"       #
0086     cpuconsumptiontime = -1        #
0087     cpuconversionfactor = 1        #
0088     nevents = 0                    # number of events
0089     neventsw = 0                   # number of events written
0090     dbtime = None                  #
0091     dbdata = None                  #
0092     resimevents = None             # ReSim events from job report (ATLAS)
0093     payload = ""                   # payload name
0094     utilities = {}                 # utility processes { <name>: [<process handle>, number of launches, command string], .. }
0095     pid = None                     # payload pid
0096     pgrp = None                    # payload process group
0097     sizes = {}                     # job object sizes { timestamp: size, .. }
0098     currentsize = 0                # current job object size
0099     command = ""                   # full payload command (set for container jobs)
0100     setup = ""                     # full payload setup (needed by postprocess command)
0101     zombies = []                   # list of zombie process ids
0102     memorymonitor = ""             # memory monitor name, e.g. prmon
0103     actualcorecount = 0            # number of cores actually used by the payload
0104     corecounts = []                # keep track of all actual core count measurements
0105     looping_check = True           # perform looping payload check
0106 
0107     # time variable used for on-the-fly cpu consumption time measurements done by job monitoring
0108     t0 = None                      # payload startup time
0109 
0110     overwrite_queuedata = {}       # custom settings extracted from job parameters (--overwriteQueueData) to be used as master values for `QueueData`
0111     overwrite_storagedata = {}     # custom settings extracted from job parameters (--overwriteStorageData) to be used as master values for `StorageData`
0112 
0113     zipmap = ""                    # ZIP MAP values extracted from jobparameters
0114     imagename = ""                 # container image name extracted from job parameters or job definition
0115     imagename_jobdef = ""
0116     usecontainer = False           # boolean, True if a container is to be used for the payload
0117 
0118     # from job definition
0119     attemptnr = 0                  # job attempt number
0120     destinationdblock = ""         ## to be moved to FileSpec (job.outdata)
0121     datasetin = ""                 ## TO BE DEPRECATED: moved to FileSpec (job.indata)
0122     debug = False                  # debug mode, when True, pilot will send debug info back to the server
0123     debug_command = ''             # debug command (can be defined on the task side)
0124     produserid = ""                # the user DN (added to trace report)
0125     jobdefinitionid = ""           # the job definition id (added to trace report)
0126     infilesguids = ""              #
0127     indata = []                    # list of `FileSpec` objects for input files (aggregated inFiles, ddmEndPointIn, scopeIn, filesizeIn, etc)
0128     outdata = []                   # list of `FileSpec` objects for output files
0129     logdata = []                   # list of `FileSpec` objects for log file(s)
0130     # preprocess = {u'args': u'preprocess', u'command': u'echo'}
0131     # postprocess = {u'args': u'postprocess', u'command': u'echo'}
0132     preprocess = {}                # preprocess dictionary with command to execute before payload, {'command': '..', 'args': '..'}
0133     postprocess = {}               # postprocess dictionary with command to execute after payload, {'command': '..', 'args': '..'}
0134     coprocess = {}                 # coprocess dictionary with command to execute during payload, {'command': '..', 'args': '..'}
0135     # coprocess = {u'args': u'coprocess', u'command': u'echo'}
0136     containeroptions = {}          #
0137     use_vp = False                 # True for VP jobs
0138 
0139     # home package string with additional payload release information; does not need to be added to
0140     # the conversion function since it's already lower case
0141     homepackage = ""               #
0142     jobsetid = ""                  # job set id
0143     noexecstrcnv = None            # server instruction to the pilot if it should take payload setup from job parameters
0144     swrelease = ""                 # software release string
0145     writetofile = ""               #
0146 
0147     # cmtconfig encoded info
0148     alrbuserplatform = ""          # ALRB_USER_PLATFORM encoded in platform/cmtconfig value
0149 
0150     # RAW data to keep backward compatible behavior for a while ## TO BE REMOVED once all job attributes will be covered
0151     _rawdata = {}
0152 
0153     # specify the type of attributes for proper data validation and casting
0154     _keys = {int: ['corecount', 'piloterrorcode', 'transexitcode', 'exitcode', 'cpuconversionfactor', 'exeerrorcode',
0155                    'attemptnr', 'nevents', 'neventsw', 'pid', 'cpuconsumptiontime', 'maxcpucount', 'actualcorecount'],
0156              str: ['jobid', 'taskid', 'jobparams', 'transformation', 'destinationdblock', 'exeerrordiag'
0157                    'state', 'serverstate', 'workdir', 'stageout',
0158                    'platform', 'piloterrordiag', 'exitmsg', 'produserid', 'jobdefinitionid', 'writetofile',
0159                    'cpuconsumptionunit', 'homepackage', 'jobsetid', 'payload', 'processingtype',
0160                    'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype',
0161                    'datasetin',    ## TO BE DEPRECATED: moved to FileSpec (job.indata)
0162                    'infilesguids', 'memorymonitor', 'allownooutput'],
0163              list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts'],
0164              dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess',
0165                     'postprocess', 'coprocess', 'containeroptions'],
0166              bool: ['is_eventservice', 'is_eventservicemerge', 'is_hpo', 'noexecstrcnv', 'debug', 'usecontainer',
0167                     'use_vp', 'looping_check']
0168              }
0169 
0170     def __init__(self, data, use_kmap=True):
0171         """
0172             :param data: input dictionary of data settings
0173         """
0174 
0175         self.infosys = None  # reference to Job specific InfoService instance
0176         self._rawdata = data
0177         self.load(data, use_kmap=use_kmap)
0178 
0179         # for native HPO pilot support
0180         if self.is_hpo and False:
0181             self.is_eventservice = True
0182 
0183     def init(self, infosys):
0184         """
0185             :param infosys: infosys object
0186         """
0187         self.infosys = infosys
0188         self.indata = self.prepare_infiles(self._rawdata)
0189         self.outdata, self.logdata = self.prepare_outfiles(self._rawdata)
0190 
0191         # overwrites
0192         if self.imagename_jobdef and not self.imagename:
0193             logger.debug('using imagename_jobdef as imagename (\"%s\")' % self.imagename_jobdef)
0194             self.imagename = self.imagename_jobdef
0195         elif self.imagename_jobdef and self.imagename:
0196             logger.debug('using imagename from jobparams (ignoring imagename_jobdef)')
0197         elif not self.imagename_jobdef and self.imagename:
0198             logger.debug('using imagename from jobparams (imagename_jobdef not set)')
0199 
0200         if self.imagename:
0201             # prepend IMAGE_BASE to imagename if necessary (for testing purposes)
0202             image_base = os.environ.get('IMAGE_BASE', '')
0203             if not image_base and 'IMAGE_BASE' in infosys.queuedata.catchall:
0204                 image_base = get_key_value(infosys.queuedata.catchall, key='IMAGE_BASE')
0205             if image_base:
0206                 os.environ['ALRB_CONT_UNPACKEDDIR'] = image_base
0207                 paths = [os.path.join(image_base, os.path.basename(self.imagename)),
0208                          os.path.join(image_base, self.imagename)]
0209                 local_path = get_valid_path_from_list(paths)
0210                 if local_path:
0211                     self.imagename = local_path
0212             #if image_base and not os.path.isabs(self.imagename) and not self.imagename.startswith('docker'):
0213             #    self.imagename = os.path.join(image_base, self.imagename)
0214 
0215     def prepare_infiles(self, data):
0216         """
0217             Construct FileSpec objects for input files from raw dict `data`
0218             :return: list of validated `FileSpec` objects
0219         """
0220 
0221         # direct access handling
0222         self.set_accessmode()
0223 
0224         access_keys = ['allow_lan', 'allow_wan', 'direct_access_lan', 'direct_access_wan']
0225         if not self.infosys or not self.infosys.queuedata:
0226             self.show_access_settings(access_keys)
0227 
0228         # form raw list data from input comma-separated values for further validation by FileSpec
0229         kmap = self.get_kmap()
0230 
0231         try:
0232             ksources = dict([k, self.clean_listdata(data.get(k, ''), list, k, [])] for k in list(kmap.values()))  # Python 3
0233         except Exception:
0234             ksources = dict([k, self.clean_listdata(data.get(k, ''), list, k, [])] for k in kmap.itervalues())  # Python 2
0235 
0236         ret, lfns = [], set()
0237         for ind, lfn in enumerate(ksources.get('inFiles', [])):
0238             if lfn in ['', 'NULL'] or lfn in lfns:  # exclude null data and duplicates
0239                 continue
0240             lfns.add(lfn)
0241             idat = {}
0242 
0243             try:
0244                 for attrname, k in list(kmap.items()):  # Python 3
0245                     idat[attrname] = ksources[k][ind] if len(ksources[k]) > ind else None
0246             except Exception:
0247                 for attrname, k in kmap.iteritems():  # Python 2
0248                     idat[attrname] = ksources[k][ind] if len(ksources[k]) > ind else None
0249 
0250             accessmode = 'copy'  ## default settings
0251 
0252             # for prod jobs: use remoteio if transferType=direct and prodDBlockToken!=local
0253             # for analy jobs: use remoteio if prodDBlockToken!=local
0254             if (self.is_analysis() or self.transfertype == 'direct') and idat.get('storage_token') != 'local':  ## Job settings
0255                 accessmode = 'direct'
0256             if self.accessmode:  ## Job input options (job params) overwrite any other settings
0257                 accessmode = self.accessmode
0258 
0259             idat['accessmode'] = accessmode
0260             # init access setting from queuedata
0261             if self.infosys and self.infosys.queuedata:
0262                 for key in access_keys:
0263                     idat[key] = getattr(self.infosys.queuedata, key)
0264 
0265             finfo = FileSpec(filetype='input', **idat)
0266             logger.info('added file \'%s\' with accessmode \'%s\'' % (lfn, accessmode))
0267             ret.append(finfo)
0268 
0269         return ret
0270 
0271     def set_accessmode(self):
0272         """
0273         Set the accessmode field using jobparams.
0274 
0275         :return:
0276         """
0277         self.accessmode = None
0278         if '--accessmode=direct' in self.jobparams:
0279             self.accessmode = 'direct'
0280         if '--accessmode=copy' in self.jobparams or '--useLocalIO' in self.jobparams:
0281             self.accessmode = 'copy'
0282 
0283     @staticmethod
0284     def show_access_settings(access_keys):
0285         """
0286         Show access settings for the case job.infosys.queuedata is not initialized.
0287 
0288         :param access_keys: list of access keys (list).
0289         :return:
0290         """
0291         dat = dict([k, getattr(FileSpec, k, None)] for k in access_keys)
0292         try:
0293             msg = ', '.join(["%s=%s" % (k, v) for k, v in sorted(dat.iteritems())])  # Python 2
0294         except Exception:
0295             msg = ', '.join(["%s=%s" % (k, v) for k, v in sorted(dat.items())])  # Python 3
0296         logger.info('job.infosys.queuedata is not initialized: the following access settings will be used by default: %s' % msg)
0297 
0298     @staticmethod
0299     def get_kmap():
0300         """
0301         Return the kmap dictionary for server data to pilot conversions.
0302 
0303         :return: kmap (dict).
0304         """
0305         kmap = {
0306             # 'internal_name': 'ext_key_structure'
0307             'lfn': 'inFiles',
0308             ##'??': 'dispatchDblock', '??define_proper_internal_name': 'dispatchDBlockToken',
0309             'dataset': 'realDatasetsIn', 'guid': 'GUID',
0310             'filesize': 'fsize', 'checksum': 'checksum', 'scope': 'scopeIn',
0311             ##'??define_internal_key': 'prodDBlocks',
0312             'storage_token': 'prodDBlockToken',
0313             'ddmendpoint': 'ddmEndPointIn',
0314         }
0315 
0316         return kmap
0317 
0318     def prepare_outfiles(self, data):
0319         """
0320         Construct validated FileSpec objects for output and log files from raw dict `data`
0321         Note: final preparation for output files can only be done after the payload has finished in case the payload
0322         has produced a job report with e.g. output file guids. This is verified in
0323         pilot/control/payload/process_job_report().
0324 
0325         :param data:
0326         :return: (list of `FileSpec` for output, list of `FileSpec` for log)
0327         """
0328 
0329         # form raw list data from input comma-separated values for further validataion by FileSpec
0330         kmap = {
0331             # 'internal_name': 'ext_key_structure'
0332             'lfn': 'outFiles',
0333             ##'??': 'destinationDblock', '??define_proper_internal_name': 'destinationDBlockToken',
0334             'dataset': 'realDatasets', 'scope': 'scopeOut',
0335             ##'??define_internal_key':'prodDBlocks', '??':'dispatchDBlockTokenForOut',
0336             'ddmendpoint': 'ddmEndPointOut',
0337         }
0338 
0339         try:
0340             ksources = dict([k, self.clean_listdata(data.get(k, ''), list, k, [])] for k in list(kmap.values()))  # Python 3
0341         except Exception:
0342             ksources = dict([k, self.clean_listdata(data.get(k, ''), list, k, [])] for k in kmap.itervalues())  # Python 2
0343 
0344         # take the logfile name from the environment first (in case of raythena and aborted pilots)
0345         pilot_logfile = os.environ.get('PILOT_LOGFILE', None)
0346         if pilot_logfile:
0347             # update the data with the new name
0348             old_logfile = data.get('logFile')
0349             data['logFile'] = pilot_logfile
0350             # note: the logFile also appears in the outFiles list
0351             outfiles = ksources.get('outFiles', None)
0352             if outfiles and old_logfile in outfiles:
0353                 # search and replace the old logfile name with the new from the environment
0354                 ksources['outFiles'] = [pilot_logfile if x == old_logfile else x for x in ksources.get('outFiles')]
0355 
0356         log_lfn = data.get('logFile')
0357         if log_lfn:
0358             # unify scopeOut structure: add scope of log file
0359             scope_out = []
0360             for lfn in ksources.get('outFiles', []):
0361                 if lfn == log_lfn:
0362                     scope_out.append(data.get('scopeLog'))
0363                 else:
0364                     if not ksources['scopeOut']:
0365                         raise Exception('Failed to extract scopeOut parameter from Job structure sent by Panda, please check input format!')
0366                     scope_out.append(ksources['scopeOut'].pop(0))
0367             ksources['scopeOut'] = scope_out
0368 
0369         return self._get_all_output(ksources, kmap, log_lfn, data)
0370 
0371     def _get_all_output(self, ksources, kmap, log_lfn, data):
0372         """
0373         Create lists of FileSpecs for output + log files.
0374         Helper function for prepare_output().
0375 
0376         :param ksources:
0377         :param kmap:
0378         :param log_lfn: log file name (string).
0379         :param data:
0380         :return: ret_output (list of FileSpec), ret_log (list of FileSpec)
0381         """
0382 
0383         ret_output, ret_log = [], []
0384 
0385         lfns = set()
0386         for ind, lfn in enumerate(ksources['outFiles']):
0387             if lfn in ['', 'NULL'] or lfn in lfns:  # exclude null data and duplicates
0388                 continue
0389             lfns.add(lfn)
0390             idat = {}
0391             try:
0392                 for attrname, k in list(kmap.items()):  # Python 3
0393                     idat[attrname] = ksources[k][ind] if len(ksources[k]) > ind else None
0394             except Exception:
0395                 for attrname, k in kmap.iteritems():  # Python 2
0396                     idat[attrname] = ksources[k][ind] if len(ksources[k]) > ind else None
0397 
0398             ftype = 'output'
0399             ret = ret_output
0400             if lfn == log_lfn:  # log file case
0401                 ftype = 'log'
0402                 idat['guid'] = data.get('logGUID')
0403                 ret = ret_log
0404             elif lfn.endswith('.lib.tgz'):  # build job case, generate a guid for the lib file
0405                 idat['guid'] = get_guid()
0406 
0407             finfo = FileSpec(filetype=ftype, **idat)
0408             ret.append(finfo)
0409 
0410         return ret_output, ret_log
0411 
0412     def __getitem__(self, key):
0413         """
0414             Temporary Integration function to keep dict-based access for old logic in compatible way
0415             TO BE REMOVED ONCE all fields will be moved to Job object attributes
0416         """
0417 
0418         if key == 'infosys':
0419             return self.infosys
0420 
0421         #if hasattr(self, key):
0422         #    return getattr(self, key)
0423 
0424         return self._rawdata[key]
0425 
0426     def __setitem__(self, key, val):
0427         """
0428             Temporary Integration function to keep dict-based access for old logic in compatible way
0429             TO BE REMOVED ONCE all fields will be moved to Job object attributes
0430         """
0431 
0432         self._rawdata[key] = val
0433 
0434     def __contains__(self, key):
0435         """
0436             Temporary Integration function to keep dict-based access for old logic in compatible way
0437             TO BE REMOVED ONCE all fields will be moved to Job object attributes
0438         """
0439 
0440         return key in self._rawdata
0441 
0442     def get(self, key, defval=None):
0443         """
0444             Temporary Integration function to keep dict-based access for old logic in compatible way
0445             TO BE REMOVED ONCE all fields will be moved to Job object attributes
0446         """
0447 
0448         return self._rawdata.get(key, defval)
0449 
0450     def load(self, data, use_kmap=True):
0451         """
0452             Construct and initialize data from ext source
0453             :param data: input dictionary of job data settings
0454         """
0455 
0456         ## the translation map of the container attributes from external data to internal schema
0457         ## 'internal_name':('ext_name1', 'extname2_if_any')
0458         ## 'internal_name2':'ext_name3'
0459 
0460         ## first defined ext field will be used
0461         ## if key is not explicitly specified then ext name will be used as is
0462         ## fix me later to proper internal names if need
0463 
0464         kmap = {
0465             'jobid': 'PandaID',
0466             'taskid': 'taskID',
0467             'jobparams': 'jobPars',
0468             'corecount': 'coreCount',
0469             'platform': 'cmtConfig',
0470             'infilesguids': 'GUID',                      ## TO BE DEPRECATED: moved to FileSpec
0471             'attemptnr': 'attemptNr',
0472             'datasetin': 'realDatasetsIn',               ## TO BE DEPRECATED: moved to FileSpec
0473             'processingtype': 'processingType',
0474             'transfertype': 'transferType',
0475             'destinationdblock': 'destinationDblock',
0476             'noexecstrcnv': 'noExecStrCnv',
0477             'swrelease': 'swRelease',
0478             'jobsetid': 'jobsetID',
0479             'produserid': 'prodUserID',
0480             'jobdefinitionid': 'jobDefinitionID',
0481             'writetofile': 'writeToFile',
0482             'is_eventservice': 'eventService',
0483             'is_eventservicemerge': 'eventServiceMerge',
0484             'is_hpo': 'isHPO',
0485             'use_vp': 'useVP',
0486             'maxcpucount': 'maxCpuCount',
0487             'allownooutput': 'allowNoOutput',
0488             'imagename_jobdef': 'container_name',
0489             'containeroptions': 'containerOptions',
0490             'looping_check': 'loopingCheck'
0491         } if use_kmap else {}
0492 
0493         self._load_data(data, kmap)
0494 
0495     def is_analysis(self):  ## if it's experiment specific logic then it could be isolated into extended JobDataATLAS class
0496         """
0497             Determine whether the job is an analysis user job or not.
0498             :return: True in case of user analysis job
0499         """
0500 
0501         is_analysis = self.transformation.startswith('https://') or self.transformation.startswith('http://')
0502 
0503         # apply addons checks later if need
0504 
0505         return is_analysis
0506 
0507     def is_build_job(self):
0508         """
0509         Check if the job is a build job.
0510         (i.e. check if the job has an output file that is a lib file).
0511 
0512         :return: boolean
0513         """
0514 
0515         for fspec in self.outdata:
0516             if '.lib.' in fspec.lfn and '.log.' not in fspec.lfn:
0517                 return True
0518 
0519         return False
0520 
0521     def is_local(self):  ## confusing function, since it does not consider real status of applied transfer, TOBE DEPRECATED, use `has_remoteio()` instead of
0522         """
0523         Should the input files be accessed locally?
0524         Note: all input files will have storage_token set to local in that case.
0525 
0526         :return: boolean.
0527         """
0528 
0529         for fspec in self.indata:
0530             if fspec.storage_token == 'local' and '.lib.' not in fspec.lfn:
0531                 return True
0532 
0533     def has_remoteio(self):
0534         """
0535         Check status of input file transfers and determine either direct access mode will be used or not.
0536         :return: True if at least one file should use direct access mode
0537         """
0538 
0539         return any([fspec.status == 'remote_io' for fspec in self.indata])
0540 
0541     def clean(self):
0542         """
0543             Validate and finally clean up required data values (object properties) if need
0544             :return: None
0545         """
0546 
0547         pass
0548 
0549     ## custom function pattern to apply extra validation to the key values
0550     ##def clean__keyname(self, raw, value):
0551     ##  :param raw: raw value passed from ext source as input
0552     ##  :param value: preliminary cleaned and casted to proper type value
0553     ##
0554     ##    return value
0555 
0556     def clean__corecount(self, raw, value):
0557         """
0558             Verify and validate value for the corecount key (set to 1 if not set)
0559         """
0560 
0561         # note: experiment specific
0562 
0563         # Overwrite the corecount value with ATHENA_PROC_NUMBER if it is set
0564         athena_corecount = os.environ.get('ATHENA_PROC_NUMBER')
0565         if athena_corecount:
0566             try:
0567                 value = int(athena_corecount)
0568             except Exception:
0569                 logger.info("ATHENA_PROC_NUMBER is not properly set.. ignored, data=%s" % athena_corecount)
0570 
0571         return value if value else 1
0572 
0573     def clean__platform(self, raw, value):
0574         """
0575         Verify and validate value for the platform key.
0576         Set the alrbuserplatform value if encoded in platform/cmtconfig string.
0577 
0578         :param raw: (unused).
0579         :param value: platform (string).
0580         :return: updated platform (string).
0581         """
0582 
0583         v = value if value.lower() not in ['null', 'none'] else ''
0584         # handle encoded alrbuserplatform in cmtconfig/platform string
0585         if '@' in v:
0586             self.alrbuserplatform = v.split('@')[1]  # ALRB_USER_PLATFORM value
0587             v = v.split('@')[0]  # cmtconfig value
0588 
0589         return v
0590 
0591     def clean__jobparams(self, raw, value):
0592         """
0593         Verify and validate value for the jobparams key
0594         Extract value from jobparams not related to job options.
0595         The function will in particular extract and remove --overwriteQueueData, ZIP_MAP and --containerimage.
0596         It will remove the old Pilot 1 option --overwriteQueuedata which should be replaced with --overwriteQueueData.
0597 
0598         :param raw: (unused).
0599         :param value: job parameters (string).
0600         :return: updated job parameters (string).
0601         """
0602 
0603         #   value += ' --athenaopts "HITtoRDO:--nprocs=$ATHENA_CORE_NUMBER" someblah'
0604         logger.info('cleaning jobparams: %s' % value)
0605 
0606         # user specific pre-filtering
0607         # (return list of strings not to be filtered, which will be put back in the post-filtering below)
0608         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0609         try:
0610             user = __import__('pilot.user.%s.jobdata' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0611             exclusions, value = user.jobparams_prefiltering(value)
0612         except Exception as e:
0613             logger.warning('caught exception in user code: %s' % e)
0614             exclusions = {}
0615 
0616         ## clean job params from Pilot1 old-formatted options
0617         ret = re.sub(r"--overwriteQueuedata={.*?}", "", value)
0618 
0619         ## extract overwrite options
0620         options, ret = self.parse_args(ret, {'--overwriteQueueData': lambda x: ast.literal_eval(x) if x else {},
0621                                              '--overwriteStorageData': lambda x: ast.literal_eval(x) if x else {}}, remove=True)
0622         self.overwrite_queuedata = options.get('--overwriteQueueData', {})
0623         self.overwrite_storagedata = options.get('--overwriteStorageData', {})
0624 
0625         # extract zip map  ## TO BE FIXED? better to pass it via dedicated sub-option in jobParams from PanDA side: e.g. using --zipmap "content"
0626         # so that the zip_map can be handles more gracefully via parse_args
0627 
0628         pattern = r" \'?<ZIP_MAP>(.+)<\/ZIP_MAP>\'?"
0629         pattern = re.compile(pattern)
0630 
0631         result = re.findall(pattern, ret)
0632         if result:
0633             self.zipmap = result[0]
0634             # remove zip map from final jobparams
0635             ret = re.sub(pattern, '', ret)
0636 
0637         # extract and remove any present --containerimage XYZ options
0638         ret, imagename = self.extract_container_image(ret)
0639         if imagename != "":
0640             self.imagename = imagename
0641 
0642         try:
0643             ret = user.jobparams_postfiltering(ret, exclusions=exclusions)
0644         except Exception as e:
0645             logger.warning('caught exception in user code: %s' % e)
0646 
0647         logger.info('cleaned jobparams: %s' % ret)
0648 
0649 #        self.coprocess = {u'args': u'--coprocess -o output.json -j "" -p "bash%20./exec_in_container.sh"
0650 #        --inSampleFile input.json -a jobO.83699547-623a-4d8c-9b1f-4ff5332bdb77.tar --sourceURL https://aipanda048.cern.ch:25443
0651 #        --checkPointToSave aaa --writeInputToTxt IN_DATA:input_ds.json -i "[\'v04.trt_sharded_weighted_1M5K.tar.gz\']"
0652 #        --inMap "{\'IN_DATA\': [\'v04.trt_sharded_weighted_1M5K.tar.gz\']}" --outMetricsFile=23136708.metrics.000006.tgz^metrics.tgz',
0653 #        u'command': u'http://pandaserver.cern.ch:25080/trf/user/runHPO-00-00-01'}
0654 #        logger.debug('hardcoding coprocess: %s' % self.coprocess)
0655 
0656         return ret
0657 
0658     def extract_container_image(self, jobparams):
0659         """
0660         Extract the container image from the job parameters if present, and remove it.
0661 
0662         :param jobparams: job parameters (string).
0663         :return: updated job parameters (string), extracted image name (string).
0664         """
0665 
0666         imagename = ""
0667 
0668         # define regexp pattern for the full container image option
0669         _pattern = r'(\ \-\-containerImage\=?\s?[\S]+)'
0670         pattern = re.compile(_pattern)
0671         image_option = re.findall(pattern, jobparams)
0672 
0673         if image_option and image_option[0] != "":
0674 
0675             imagepattern = re.compile(r" \'?\-\-containerImage\=?\ ?([\S]+)\ ?\'?")
0676             # imagepattern = re.compile(r'(\ \-\-containerImage\=?\s?([\S]+))')
0677             image = re.findall(imagepattern, jobparams)
0678             if image and image[0] != "":
0679                 try:
0680                     imagename = image[0]  # removed [1]
0681                 except Exception as e:
0682                     logger.warning('failed to extract image name: %s' % e)
0683                 else:
0684                     logger.info("extracted image from jobparams: %s" % imagename)
0685             else:
0686                 logger.warning("image could not be extract from %s" % jobparams)
0687 
0688             # remove the option from the job parameters
0689             jobparams = re.sub(_pattern, "", jobparams)
0690             logger.info("removed the %s option from job parameters: %s" % (image_option[0], jobparams))
0691 
0692         return jobparams, imagename
0693 
0694     @classmethod
0695     def parse_args(self, data, options, remove=False):
0696         """
0697             Extract option/values from string containing command line options (arguments)
0698             :param data: input command line arguments (raw string)
0699             :param options: dict of option names to be considered: (name, type), type is a cast function to be applied with result value
0700             :param remove: boolean, if True then exclude specified options from returned raw string of command line arguments
0701             :return: tuple: (dict of extracted options, raw string of final command line options)
0702         """
0703 
0704         logger.debug('extract options=%s from data=%s' % (list(options.keys()), data))  # Python 2/3
0705 
0706         if not options:
0707             return {}, data
0708 
0709         opts, pargs = self.get_opts_pargs(data)
0710         if not opts:
0711             return {}, data
0712 
0713         ret = self.get_ret(options, opts)
0714 
0715         ## serialize parameters back to string
0716         rawdata = data
0717         if remove:
0718             final_args = []
0719             for arg in pargs:
0720                 if isinstance(arg, (tuple, list)):  ## parsed option
0721                     if arg[0] not in options:  # exclude considered options
0722                         if arg[1] is None:
0723                             arg.pop()
0724                         final_args.extend(arg)
0725                 else:
0726                     final_args.append(arg)
0727             rawdata = " ".join(pipes.quote(e) for e in final_args)
0728 
0729         return ret, rawdata
0730 
0731     @staticmethod
0732     def get_opts_pargs(data):
0733         """
0734         Get the opts and pargs variables.
0735 
0736         :param data: input command line arguments (raw string)
0737         :return: opts (dict), pargs (list)
0738         """
0739 
0740         try:
0741             args = shlex.split(data)
0742         except ValueError as e:
0743             logger.error('Failed to parse input arguments from data=%s, error=%s .. skipped.' % (data, e))
0744             return {}, data
0745 
0746         opts, curopt, pargs = {}, None, []
0747         for arg in args:
0748             if arg.startswith('-'):
0749                 if curopt is not None:
0750                     opts[curopt] = None
0751                     pargs.append([curopt, None])
0752                 curopt = arg
0753                 continue
0754             if curopt is None:  # no open option, ignore
0755                 pargs.append(arg)
0756             else:
0757                 opts[curopt] = arg
0758                 pargs.append([curopt, arg])
0759                 curopt = None
0760         if curopt:
0761             pargs.append([curopt, None])
0762 
0763         return opts, pargs
0764 
0765     @staticmethod
0766     def get_ret(options, opts):
0767         """
0768         Get the ret variable from the options.
0769 
0770         :param options:
0771         :param opts:
0772         :return: ret (dict).
0773         """
0774 
0775         ret = {}
0776         try:
0777             _items = list(options.items())  # Python 3
0778         except Exception:
0779             _items = options.iteritems()  # Python 2
0780         for opt, fcast in _items:
0781             val = opts.get(opt)
0782             try:
0783                 val = fcast(val) if callable(fcast) else val
0784             except Exception as e:
0785                 logger.error('Failed to extract value for option=%s from data=%s: cast function=%s failed, exception=%s .. skipped' % (opt, val, fcast, e))
0786                 continue
0787             ret[opt] = val
0788 
0789         return ret
0790 
0791     def add_workdir_size(self, workdir_size):
0792         """
0793         Add a measured workdir size to the workdirsizes field.
0794         The function will deduce any input and output file sizes from the workdir size.
0795 
0796         :param workdir_size: workdir size (int).
0797         :return:
0798         """
0799 
0800         # Convert to long if necessary
0801         try:
0802             if not isinstance(workdir_size, (int, long)):  # Python 2  # noqa: F821
0803                 try:
0804                     workdir_size = long(workdir_size)  # noqa: F821
0805                 except Exception as e:
0806                     logger.warning('failed to convert %s to long: %s' % (workdir_size, e))
0807                     return
0808         except Exception:
0809             if not isinstance(workdir_size, int):  # Python 3, note order
0810                 try:
0811                     workdir_size = int(workdir_size)  # Python 3
0812                 except Exception as e:
0813                     logger.warning('failed to convert %s to int: %s' % (workdir_size, e))
0814                     return
0815         try:  # Python 2
0816             total_size = long(0)  # B, note do not use 0L as it will generate a syntax error in Python 3  # noqa: F821
0817         except Exception:
0818             total_size = 0  # B, Python 3
0819 
0820         if os.path.exists(self.workdir):
0821             # Find out which input and output files have been transferred and add their sizes to the total size
0822             # (Note: output files should also be removed from the total size since outputfilesize is added in the
0823             # task def)
0824 
0825             # Then update the file list in case additional output files were produced
0826             # Note: don't do this deduction since it is not known by the task definition
0827             # out_files, dummy, dummy = discoverAdditionalOutputFiles(outFiles, job.workdir, job.destinationDblock,
0828             # job.scopeOut)
0829 
0830             for fspec in self.indata + self.outdata:
0831                 if fspec.filetype == 'input' and fspec.status != 'transferred':
0832                     continue
0833                 pfn = os.path.join(self.workdir, fspec.lfn)
0834                 if not os.path.isfile(pfn):
0835                     msg = "pfn file=%s does not exist (skip from workdir size calculation)" % pfn
0836                     logger.info(msg)
0837                 else:
0838                     total_size += os.path.getsize(pfn)
0839 
0840             logger.info("total size of present input+output files: %d B (workdir size: %d B)" %
0841                         (total_size, workdir_size))
0842             workdir_size -= total_size
0843 
0844         self.workdirsizes.append(workdir_size)
0845 
0846     def get_max_workdir_size(self):
0847         """
0848         Return the maximum disk space used by the payload.
0849 
0850         :return: workdir size (int).
0851         """
0852 
0853         try:
0854             maxdirsize = long(0)  # Python 2, note do not use 0L as it will generate a syntax error in Python 3  # noqa: F821
0855         except Exception:
0856             maxdirsize = 0  # Python 3
0857 
0858         if self.workdirsizes != []:
0859             # Get the maximum value from the list
0860             maxdirsize = max(self.workdirsizes)
0861         else:
0862             logger.warning("found no stored workdir sizes")
0863 
0864         return maxdirsize
0865 
0866     def get_lfns_and_guids(self):
0867         """
0868         Return ordered lists with the input file LFNs and GUIDs.
0869 
0870         :return: list of input files, list of corresponding GUIDs.
0871         """
0872 
0873         lfns = []
0874         guids = []
0875 
0876         for fspec in self.indata:
0877             lfns.append(fspec.lfn)
0878             guids.append(fspec.guid)
0879 
0880         return lfns, guids
0881 
0882     def get_status(self, key):
0883         """
0884 
0885         Return the value for the given key (e.g. LOG_TRANSFER) from the status dictionary.
0886         LOG_TRANSFER_NOT_DONE is returned if job object is not defined for key='LOG_TRANSFER'.
0887         If no key is found, None will be returned.
0888 
0889         :param key: key name (string).
0890         :return: corresponding key value in job.status dictionary (string).
0891         """
0892 
0893         log_transfer = self.status.get(key, None)
0894 
0895         if not log_transfer:
0896             if key == 'LOG_TRANSFER':
0897                 log_transfer = LOG_TRANSFER_NOT_DONE
0898 
0899         return log_transfer
0900 
0901     def get_job_option_for_input_name(self, input_name):
0902         """
0903         Expecting something like --inputHitsFile=@input_name in jobparams.
0904 
0905         :returns: job_option such as --inputHitsFile
0906         """
0907         job_options = self.jobparams.split(' ')
0908         input_name_option = '=@%s' % input_name
0909         for job_option in job_options:
0910             if input_name_option in job_option:
0911                 return job_option.split("=")[0]
0912         return None
0913 
0914     def process_writetofile(self):
0915         """
0916         Expecting writetofile from the job definition.
0917         The format is 'inputFor_file1:lfn1,lfn2^inputFor_file2:lfn3,lfn4'
0918 
0919         format writetofile_dictionary = {'inputFor_file1': [lfn1, lfn2], 'inputFor_file2': [lfn3, lfn4]}
0920         """
0921         writetofile_dictionary = {}
0922         if self.writetofile:
0923             fileinfos = self.writetofile.split("^")
0924             for fileinfo in fileinfos:
0925                 if ':' in fileinfo:
0926                     input_name, input_list = fileinfo.split(":")
0927                     writetofile_dictionary[input_name] = input_list.split(',')
0928                 else:
0929                     logger.error("writeToFile doesn't have the correct format, expecting a separator \':\' for %s" % fileinfo)
0930 
0931         if writetofile_dictionary:
0932             for input_name in writetofile_dictionary:
0933                 input_name_new = input_name + '.txt'
0934                 input_name_full = os.path.join(self.workdir, input_name_new)
0935                 f = open(input_name_full, 'w')
0936                 job_option = self.get_job_option_for_input_name(input_name)
0937                 if not job_option:
0938                     logger.error("Unknown job option format, expected job options such as \'--inputHitsFile\' for input file: %s" % input_name)
0939                 else:
0940                     f.write("%s\n" % job_option)
0941                 for input_file in writetofile_dictionary[input_name]:
0942                     f.write("%s\n" % input_file)
0943                 f.close()
0944                 logger.info("Wrote input file list to file %s: %s" % (input_name_full, writetofile_dictionary[input_name]))
0945 
0946                 self.jobparams = self.jobparams.replace(input_name, input_name_new)
0947                 if job_option:
0948                     self.jobparams = self.jobparams.replace('%s=' % job_option, '')
0949                 self.jobparams = self.jobparams.replace('--autoConfiguration=everything', '')
0950                 logger.info("jobparams after processing writeToFile: %s" % self.jobparams)
0951 
0952     def add_size(self, size):
0953         """
0954         Add a size measurement to the sizes field at the current time stamp.
0955         A size measurement is in Bytes.
0956 
0957         :param size: size of object in Bytes (int).
0958         :return:
0959         """
0960 
0961         # is t0 set? if not, set it
0962         if not self.t0:
0963             self.t0 = os.times()
0964 
0965         # get the current time stamp relative to t0
0966         time_stamp = get_elapsed_real_time(t0=self.t0)
0967 
0968         # add a data point to the sizes dictionary
0969         self.sizes[time_stamp] = size
0970 
0971     def get_size(self):
0972         """
0973         Determine the size (B) of the job object.
0974 
0975         :return: size (int).
0976         """
0977 
0978         # protect against the case where the object changes size during calculation (rare)
0979         try:
0980             self.currentsize = get_object_size(self)
0981         except Exception:
0982             pass
0983         return self.currentsize
0984 
0985     def collect_zombies(self, tn=None):
0986         """
0987         Collect zombie child processes, tn is the max number of loops, plus 1,
0988         to avoid infinite looping even if some child processes really get wedged;
0989         tn=None means it will keep going until all child zombies have been collected.
0990 
0991         :param tn: max depth (int).
0992         :return:
0993         """
0994 
0995         sleep(1)
0996 
0997         if self.zombies and tn > 1:
0998             logger.info("--- collectZombieJob: --- %d, %s" % (tn, str(self.zombies)))
0999             tn -= 1
1000             for x in self.zombies:
1001                 try:
1002                     logger.info("zombie collector trying to kill pid %s" % str(x))
1003                     _id, rc = os.waitpid(x, os.WNOHANG)
1004                 except OSError as e:
1005                     logger.info("harmless exception when collecting zombies: %s" % e)
1006                     self.zombies.remove(x)
1007                 else:
1008                     if _id:  # finished
1009                         self.zombies.remove(x)
1010                 self.collect_zombies(tn=tn)  # recursion
1011 
1012         if self.zombies and not tn:
1013             # for the infinite waiting case, we have to use blocked waiting, otherwise it throws
1014             # RuntimeError: maximum recursion depth exceeded
1015             for x in self.zombies:
1016                 try:
1017                     _id, rc = os.waitpid(x, 0)
1018                 except OSError as e:
1019                     logger.info("harmless exception when collecting zombie jobs, %s" % str(e))
1020                     self.zombies.remove(x)
1021                 else:
1022                     if _id:  # finished
1023                         self.zombies.remove(x)
1024                 self.collect_zombies(tn=tn)  # recursion
1025 
1026     def only_copy_to_scratch(self):  ## TO BE DEPRECATED, use `has_remoteio()` instead of
1027         """
1028         Determine if the payload only has copy-to-scratch input.
1029         In this case, there should be no --usePFCTurl or --directIn in the job parameters.
1030 
1031         :return: True if only copy-to-scratch. False if at least one file should use direct access mode
1032         """
1033 
1034         for fspec in self.indata:
1035             if fspec.status == 'remote_io':
1036                 return False
1037 
1038         return True
1039 
1040     def reset_errors(self):  # temporary fix, make sure all queues are empty before starting new job
1041         """
1042 
1043         :return:
1044         """
1045 
1046         self.piloterrorcode = 0
1047         self.piloterrorcodes = []
1048         self.piloterrordiag = ""
1049         self.piloterrordiags = []
1050         self.transexitcode = 0
1051         self.exeerrorcode = 0
1052         self.exeerrordiag = ""
1053         self.exitcode = 0
1054         self.exitmsg = ""
1055         self.corecounts = []
1056 
1057     def to_json(self):
1058         from json import dumps
1059         return dumps(self, default=lambda o: o.__dict__)