Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-11 08:41:05

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0009 # - Wen Guan, wen.guan@cern.ch, 2018
0010 
0011 from collections import defaultdict
0012 import fnmatch
0013 from glob import glob
0014 import logging
0015 import os
0016 import re
0017 from random import randint
0018 from signal import SIGTERM, SIGUSR1
0019 
0020 # from tarfile import ExFileObject
0021 
0022 try:
0023     from functools import reduce  # Python 3
0024 except ImportError:
0025     pass
0026 
0027 from .container import create_root_container_command
0028 from .dbrelease import get_dbrelease_version, create_dbrelease
0029 from .setup import (
0030     should_pilot_prepare_setup,
0031     is_standard_atlas_job,
0032     get_asetup,
0033     set_inds,
0034     get_analysis_trf,
0035     get_payload_environment_variables,
0036     replace_lfns_with_turls,
0037 )
0038 from .utilities import (
0039     get_memory_monitor_setup,
0040     get_network_monitor_setup,
0041     post_memory_monitor_action,
0042     get_memory_monitor_summary_filename,
0043     get_prefetcher_setup,
0044     get_benchmark_setup,
0045     get_memory_monitor_output_filename,
0046     get_metadata_dict_from_txt,
0047 )
0048 
0049 from pilot.util.auxiliary import (
0050     get_resource_name,
0051     show_memory_usage,
0052     is_python3,
0053     get_key_value,
0054 )
0055 
0056 from pilot.common.errorcodes import ErrorCodes
0057 from pilot.common.exception import TrfDownloadFailure, PilotException
0058 from pilot.util.config import config
0059 from pilot.util.constants import (
0060     UTILITY_BEFORE_PAYLOAD,
0061     UTILITY_WITH_PAYLOAD,
0062     UTILITY_AFTER_PAYLOAD_STARTED,
0063     UTILITY_AFTER_PAYLOAD_FINISHED,
0064     UTILITY_AFTER_PAYLOAD_STARTED2,
0065     UTILITY_BEFORE_STAGEIN,
0066     UTILITY_AFTER_PAYLOAD_FINISHED2
0067 )
0068 from pilot.util.container import execute
0069 from pilot.util.filehandling import (
0070     copy, copy_pilot_source, calculate_checksum,
0071     get_guid, get_local_file_size,
0072     remove, remove_dir_tree, remove_core_dumps, read_file, read_json,
0073     update_extension,
0074     write_file,
0075     get_disk_usage
0076 )
0077 from pilot.util.processes import (
0078     convert_ps_to_dict,
0079     find_pid, find_cmd_pids,
0080     get_trimmed_dictionary,
0081     is_child
0082 )
0083 
0084 from pilot.util.tracereport import TraceReport
0085 
0086 logger = logging.getLogger(__name__)
0087 
0088 errors = ErrorCodes()
0089 
0090 
0091 def sanity_check():
0092     """
0093     Perform an initial sanity check before doing anything else in a
0094     given workflow. This function can be used to verify importing of
0095     modules that are otherwise used much later, but it is better to abort
0096     the pilot if a problem is discovered early.
0097 
0098     :return: exit code (0 if all is ok, otherwise non-zero exit code).
0099     """
0100 
0101     exit_code = 0
0102 
0103     #try:
0104     #    from rucio.client.downloadclient import DownloadClient
0105     #    from rucio.client.uploadclient import UploadClient
0106     #    # note: must do something with Download/UploadClients or flake8
0107     # will complain - but do not instantiate
0108     #except Exception as e:
0109     #    logger.warning('sanity check failed: %s' % e)
0110     #    exit_code = errors.MIDDLEWAREIMPORTFAILURE
0111 
0112     return exit_code
0113 
0114 
0115 def validate(job):
0116     """
0117     Perform user specific payload/job validation.
0118     This function will produce a local DBRelease file if necessary (old releases).
0119 
0120     :param job: job object.
0121     :return: Boolean (True if validation is successful).
0122     """
0123 
0124     status = True
0125 
0126     if 'DBRelease' in job.jobparams:
0127         logger.debug((
0128             'encountered DBRelease info in job parameters - '
0129             'will attempt to create a local DBRelease file'))
0130         version = get_dbrelease_version(job.jobparams)
0131         if version:
0132             status = create_dbrelease(version, job.workdir)
0133 
0134     # assign error in case of DBRelease handling failure
0135     if not status:
0136         job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.DBRELEASEFAILURE)
0137 
0138     # make sure that any given images actually exist
0139     if status:
0140         if job.imagename and job.imagename.startswith('/'):
0141             if os.path.exists(job.imagename):
0142                 logger.info('verified that image exists: %s', job.imagename)
0143             else:
0144                 status = False
0145                 logger.warning('image does not exist: %s', job.imagename)
0146                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.IMAGENOTFOUND)
0147 
0148     # cleanup job parameters if only copy-to-scratch
0149     #if job.only_copy_to_scratch():
0150     #    logger.debug('job.params=%s' % job.jobparams)
0151     #    if ' --usePFCTurl' in job.jobparams:
0152     #        logger.debug('cleaning up --usePFCTurl from job parameters
0153     #         since all input is copy-to-scratch')
0154     #        job.jobparams = job.jobparams.replace(' --usePFCTurl', '')
0155     #    if ' --directIn' in job.jobparams:
0156     #        logger.debug('cleaning up --directIn from job parameters
0157     #           since all input is copy-to-scratch')
0158     #        job.jobparams = job.jobparams.replace(' --directIn', '')
0159 
0160     return status
0161 
0162 
0163 def open_remote_files(indata, workdir, nthreads):
0164     """
0165     Verify that direct i/o files can be opened.
0166 
0167     :param indata: list of FileSpec.
0168     :param workdir: working directory (string).
0169     :param nthreads: number of concurrent file open threads (int).
0170     :return: exit code (int), diagnostics (string).
0171     """
0172 
0173     exitcode = 0
0174     diagnostics = ""
0175     not_opened = ""
0176 
0177     # extract direct i/o files from indata (string of comma-separated turls)
0178     turls = extract_turls(indata)
0179     if turls:
0180         # execute file open script which will attempt to open each file
0181 
0182         # copy pilot source into container directory, unless it is already there
0183         diagnostics = copy_pilot_source(workdir)
0184         if diagnostics:
0185             raise PilotException(diagnostics)
0186 
0187         script = 'open_remote_file.py'
0188         final_script_path = os.path.join(workdir, script)
0189         os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH') + ':' + workdir
0190         script_path = os.path.join('pilot/scripts', script)
0191         dir1 = os.path.join(os.path.join(os.environ['PILOT_HOME'], 'pilot2'), script_path)
0192         dir2 = os.path.join(workdir, script_path)
0193         full_script_path = dir1 if os.path.exists(dir1) else dir2
0194         if not os.path.exists(full_script_path):
0195             # do not set ec since this will be a pilot issue rather than site issue
0196             diagnostics = (
0197                 'cannot perform file open test - script path does '
0198                 'not exist: %s' % full_script_path
0199             )
0200             logger.warning(diagnostics)
0201             logger.warning('tested both path=%s and path=%s (none exists)', dir1, dir2)
0202             return exitcode, diagnostics, not_opened
0203         try:
0204             copy(full_script_path, final_script_path)
0205         except PilotException as exc:
0206             # do not set ec since this will be a pilot issue rather than site issue
0207             diagnostics = 'cannot perform file open test - pilot source copy failed: %s' % exc
0208             logger.warning(diagnostics)
0209             return exitcode, diagnostics, not_opened
0210         else:
0211             # correct the path when containers have been used
0212             final_script_path = os.path.join('.', script)
0213 
0214             _cmd = get_file_open_command(final_script_path, turls, nthreads)
0215             cmd = create_root_container_command(workdir, _cmd)
0216 
0217             show_memory_usage()
0218 
0219             logger.info('*** executing file open verification script:\n\n\'%s\'\n\n', cmd)
0220             exit_code, stdout, stderr = execute(cmd, usecontainer=False)
0221             if config.Pilot.remotefileverification_log:
0222                 fpath = os.path.join(workdir, config.Pilot.remotefileverification_log)
0223                 write_file(fpath, stdout + stderr, mute=False)
0224 
0225             show_memory_usage()
0226 
0227             # error handling
0228             if exit_code:
0229                 logger.warning('script %s finished with ec=%d', script, exit_code)
0230             else:
0231                 dictionary_path = os.path.join(
0232                     workdir,
0233                     config.Pilot.remotefileverification_dictionary
0234                 )
0235                 if not dictionary_path:
0236                     logger.warning('file does not exist: %s', dictionary_path)
0237                 else:
0238                     file_dictionary = read_json(dictionary_path)
0239                     if not file_dictionary:
0240                         logger.warning('could not read dictionary from %s', dictionary_path)
0241                     else:
0242                         not_opened = ""
0243                         for turl in file_dictionary:
0244                             opened = file_dictionary[turl]
0245                             if not opened:
0246                                 logger.info('turl could not be opened: %s', turl)
0247                                 not_opened += turl if not not_opened else ",%s" % turl
0248                             else:
0249                                 logger.info('turl could be opened: %s', turl)
0250 
0251                         if not_opened:
0252                             exitcode = errors.REMOTEFILECOULDNOTBEOPENED
0253                             diagnostics = "Remote file could not be opened: %s" % not_opened if "," not in not_opened else "turls not opened:%s" % not_opened
0254     else:
0255         logger.info('nothing to verify (for remote files)')
0256 
0257     return exitcode, diagnostics, not_opened
0258 
0259 
0260 def get_file_open_command(script_path, turls, nthreads):
0261     """
0262 
0263     :param script_path: path to script (string).
0264     :param turls: comma-separated turls (string).
0265     :param nthreads: number of concurrent file open threads (int).
0266     :return: comma-separated list of turls (string).
0267     """
0268 
0269     return "%s --turls=%s -w %s -t %s" % (script_path, turls, os.path.dirname(script_path), str(nthreads))
0270 
0271 
0272 def extract_turls(indata):
0273     """
0274     Extract TURLs from indata for direct i/o files.
0275 
0276     :param indata: list of FileSpec.
0277     :return: comma-separated list of turls (string).
0278     """
0279 
0280     # turls = ""
0281     # for filespc in indata:
0282     # if filespc.status == 'remote_io':
0283     # turls += filespc.turl if not turls else ",%s" % filespc.turl
0284     # return turls
0285 
0286     return ",".join(
0287         fspec.turl for fspec in indata if fspec.status == 'remote_io'
0288     )
0289 
0290 
0291 def process_remote_file_traces(path, job, not_opened_turls):
0292     """
0293     Report traces for remote files.
0294     The function reads back the base trace report (common part of all traces)
0295     and updates it per file before reporting it to the Rucio server.
0296 
0297     :param path: path to base trace report (string).
0298     :param job: job object.
0299     :param not_opened_turls: list of turls that could not be opened (list).
0300     :return:
0301     """
0302 
0303     try:
0304         base_trace_report = read_json(path)
0305     except PilotException as exc:
0306         logger.warning('failed to open base trace report (cannot send trace reports): %s', exc)
0307     else:
0308         if not base_trace_report:
0309             logger.warning('failed to read back base trace report (cannot send trace reports)')
0310         else:
0311             # update and send the trace info
0312             for fspec in job.indata:
0313                 if fspec.status == 'remote_io':
0314                     base_trace_report.update(url=fspec.turl)
0315                     base_trace_report.update(remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0316                     base_trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0317                     base_trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0318                     if fspec.turl in not_opened_turls:
0319                         base_trace_report.update(clientState='FAILED_REMOTE_OPEN')
0320                     else:
0321                         base_trace_report.update(clientState='FOUND_ROOT')
0322 
0323                     # copy the base trace report (only a dictionary) into a real trace report object
0324                     trace_report = TraceReport(**base_trace_report)
0325                     if trace_report:
0326                         trace_report.send()
0327                     else:
0328                         logger.warning('failed to create trace report for turl=%s', fspec.turl)
0329 
0330 
0331 def get_nthreads(catchall):
0332     """
0333     Extract number of concurrent file open threads from catchall.
0334     Return nthreads=1 if nopenfiles=.. is not present in catchall.
0335 
0336     :param catchall: queuedata catchall (string).
0337     :return: number of threads (int).
0338     """
0339 
0340     _nthreads = get_key_value(catchall, key='nopenfiles')
0341     return _nthreads if _nthreads else 1
0342 
0343 
0344 def get_payload_command(job):
0345     """
0346     Return the full command for executing the payload, including the
0347     sourcing of all setup files and setting of environment variables.
0348 
0349     :param job: job object.
0350     :raises PilotException: TrfDownloadFailure.
0351     :return: command (string).
0352     """
0353 
0354     show_memory_usage()
0355 
0356     # Should the pilot do the setup or does jobPars already contain the information?
0357     preparesetup = should_pilot_prepare_setup(job.noexecstrcnv, job.jobparams)
0358 
0359     # Get the platform value
0360     # platform = job.infosys.queuedata.platform
0361 
0362     # Is it a user job or not?
0363     userjob = job.is_analysis()
0364     logger.info('pilot is running a %s job', 'user analysis' if userjob else 'production')
0365 
0366     resource_name = get_resource_name()  # 'grid' if no hpc_resource is set
0367 
0368     # Python 3, level -1 -> 0
0369     modname = 'pilot.user.atlas.resource.%s' % resource_name
0370     resource = __import__(modname, globals(), locals(), [resource_name], 0)
0371 
0372     # get the general setup command and then verify it if required
0373     cmd = resource.get_setup_command(job, preparesetup)
0374     if cmd:
0375         exitcode, diagnostics = resource.verify_setup_command(cmd)
0376         if exitcode != 0:
0377             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode)
0378             raise PilotException(diagnostics, code=exitcode)
0379 
0380     # make sure that remote file can be opened before executing payload
0381     catchall = job.infosys.queuedata.catchall.lower() if job.infosys.queuedata.catchall else ''
0382     if config.Pilot.remotefileverification_log and 'remoteio_test=false' not in catchall:
0383         exitcode = 0
0384         diagnostics = ""
0385         not_opened_turls = ""
0386         try:
0387             logger.debug('executing open_remote_files()')
0388             exitcode, diagnostics, not_opened_turls = open_remote_files(job.indata, job.workdir, get_nthreads(catchall))
0389         except Exception as exc:
0390             logger.warning('caught std exception: %s', exc)
0391         else:
0392             # read back the base trace report
0393             path = os.path.join(job.workdir, config.Pilot.base_trace_report)
0394             if not os.path.exists(path):
0395                 logger.warning((
0396                     'base trace report does not exist (%s) - input file '
0397                     'traces should already have been sent'), path)
0398             else:
0399                 process_remote_file_traces(path, job, not_opened_turls)
0400 
0401             # fail the job if the remote files could not be verified
0402             if exitcode != 0:
0403                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode, msg=diagnostics)
0404                 raise PilotException(diagnostics, code=exitcode)
0405     else:
0406         logger.debug('no remote file open verification')
0407 
0408     if is_standard_atlas_job(job.swrelease):
0409         # Normal setup (production and user jobs)
0410         logger.info("preparing normal production/analysis job setup command")
0411         cmd = get_normal_payload_command(cmd, job, preparesetup, userjob)
0412     else:
0413         # Generic, non-ATLAS specific jobs, or at least a job with undefined swRelease
0414         logger.info("generic job (non-ATLAS specific or with undefined swRelease)")
0415         cmd = get_generic_payload_command(cmd, job, preparesetup, userjob)
0416 
0417     # add any missing trailing ;
0418     if not cmd.endswith(';'):
0419         cmd += '; '
0420 
0421     # only if not using a user container
0422     if not job.imagename:
0423         site = os.environ.get('PILOT_SITENAME', '')
0424         variables = get_payload_environment_variables(
0425             cmd, job.jobid, job.taskid, job.attemptnr, job.processingtype, site, userjob)
0426         cmd = ''.join(variables) + cmd
0427 
0428     # prepend PanDA job id in case it is not there already (e.g. runcontainer jobs)
0429     if 'export PandaID' not in cmd:
0430         cmd = "export PandaID=%s;" % job.jobid + cmd
0431 
0432     cmd = cmd.replace(';;', ';')
0433 
0434     # For direct access in prod jobs, we need to substitute the input file names
0435     # with the corresponding TURLs
0436     # get relevant file transfer info
0437     #use_copy_tool, use_direct_access, use_pfc_turl = get_file_transfer_info(job)
0438     #if not userjob and use_direct_access and job.transfertype == 'direct':
0439 
0440     ## ported from old logic
0441     if not userjob and not job.is_build_job() and job.has_remoteio():
0442         ## ported from old logic but still it looks strange (anisyonk)
0443         ## the "PoolFileCatalog.xml" should already contains proper TURLs
0444         ## values as it created by create_input_file_metadata() if the case
0445         ## is just to patch `writetofile` file, than logic should be cleaned
0446         ## and decoupled anyway, instead of parsing the file, it's much easier
0447         ## to generate properly `writetofile` content from the beginning
0448         ##  with TURL data
0449         lfns = job.get_lfns_and_guids()[0]
0450         cmd = replace_lfns_with_turls(
0451             cmd,
0452             job.workdir,
0453             "PoolFileCatalog.xml",
0454             lfns,
0455             writetofile=job.writetofile
0456         )
0457 
0458     # Explicitly add the ATHENA_PROC_NUMBER (or JOB value)
0459     cmd = add_athena_proc_number(cmd)
0460 
0461     show_memory_usage()
0462 
0463     logger.info('payload run command: %s', cmd)
0464 
0465     return cmd
0466 
0467 
0468 def get_normal_payload_command(cmd, job, preparesetup, userjob):
0469     """
0470     Return the payload command for a normal production/analysis job.
0471 
0472     :param cmd: any preliminary command setup (string).
0473     :param job: job object.
0474     :param userjob: True for user analysis jobs, False otherwise (bool).
0475     :param preparesetup: True if the pilot should prepare the setup,
0476     False if already in the job parameters.
0477     :return: normal payload command (string).
0478     """
0479 
0480     # set the INDS env variable
0481     # (used by runAthena but also for EventIndex production jobs)
0482     set_inds(job.datasetin)  # realDatasetsIn
0483 
0484     if userjob:
0485         # Try to download the trf (skip when user container is to be used)
0486         exitcode, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir)
0487         if exitcode != 0:
0488             raise TrfDownloadFailure(diagnostics)
0489 
0490         logger.debug('user analysis trf: %s', trf_name)
0491 
0492         if preparesetup:
0493             _cmd = get_analysis_run_command(job, trf_name)
0494         else:
0495             _cmd = job.jobparams
0496 
0497         # Correct for multi-core if necessary (especially important in
0498         # case coreCount=1 to limit parallel make)
0499         cmd += "; " + add_makeflags(job.corecount, "") + _cmd
0500     else:
0501         # Add Database commands if they are set by the local site
0502         cmd += os.environ.get('PILOT_DB_LOCAL_SETUP_CMD', '')
0503 
0504         if job.is_eventservice:
0505             if job.corecount:
0506                 cmd += '; export ATHENA_PROC_NUMBER=%s' % job.corecount
0507                 cmd += '; export ATHENA_CORE_NUMBER=%s' % job.corecount
0508             else:
0509                 cmd += '; export ATHENA_PROC_NUMBER=1'
0510                 cmd += '; export ATHENA_CORE_NUMBER=1'
0511 
0512         # Add the transform and the job parameters (production jobs)
0513         if preparesetup:
0514             cmd += "; %s %s" % (job.transformation, job.jobparams)
0515         else:
0516             cmd += "; " + job.jobparams
0517 
0518     return cmd
0519 
0520 
0521 def get_generic_payload_command(cmd, job, preparesetup, userjob):
0522     """
0523 
0524     :param cmd:
0525     :param job: job object.
0526     :param preparesetup:
0527     :param userjob: True for user analysis jobs, False otherwise (bool).
0528     :return: generic job command (string).
0529     """
0530 
0531     if userjob:
0532         # Try to download the trf
0533         #if job.imagename != "" or "--containerImage" in job.jobparams:
0534         #    job.transformation = os.path.join(os.path.dirname(job.transformation), "runcontainer")
0535         #    logger.warning('overwrote job.transformation, now set to: %s' % job.transformation)
0536         exitcode, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir)
0537         if exitcode != 0:
0538             raise TrfDownloadFailure(diagnostics)
0539 
0540         logger.debug('user analysis trf: %s', trf_name)
0541 
0542         if preparesetup:
0543             _cmd = get_analysis_run_command(job, trf_name)
0544         else:
0545             _cmd = job.jobparams
0546 
0547         # correct for multi-core if necessary (especially important in case
0548         # coreCount=1 to limit parallel make), only if not using a user container
0549         if not job.imagename:
0550             cmd += "; " + add_makeflags(job.corecount, "") + _cmd
0551         else:
0552             cmd += _cmd
0553 
0554     elif verify_release_string(job.homepackage) != 'NULL' and job.homepackage != ' ':
0555         if preparesetup:
0556             cmd = "python %s/%s %s" % (job.homepackage, job.transformation, job.jobparams)
0557         else:
0558             cmd = job.jobparams
0559     else:
0560         if preparesetup:
0561             cmd = "python %s %s" % (job.transformation, job.jobparams)
0562         else:
0563             cmd = job.jobparams
0564 
0565     return cmd
0566 
0567 
0568 def add_athena_proc_number(cmd):
0569     """
0570     Add the ATHENA_PROC_NUMBER and ATHENA_CORE_NUMBER to
0571     the payload command if necessary.
0572 
0573     :param cmd: payload execution command (string).
0574     :return: updated payload execution command (string).
0575     """
0576 
0577     # get the values if they exist
0578     try:
0579         value1 = int(os.environ['ATHENA_PROC_NUMBER_JOB'])
0580     except (TypeError, KeyError, ValueError) as exc:
0581         logger.warning('failed to convert ATHENA_PROC_NUMBER_JOB to int: %s', exc)
0582         value1 = None
0583     try:
0584         value2 = int(os.environ['ATHENA_CORE_NUMBER'])
0585     except (TypeError, KeyError, ValueError) as exc:
0586         logger.warning('failed to convert ATHENA_CORE_NUMBER to int: %s', exc)
0587         value2 = None
0588 
0589     if "ATHENA_PROC_NUMBER" not in cmd:
0590         if "ATHENA_PROC_NUMBER" in os.environ:
0591             cmd = 'export ATHENA_PROC_NUMBER=%s;' % os.environ['ATHENA_PROC_NUMBER'] + cmd
0592         elif "ATHENA_PROC_NUMBER_JOB" in os.environ and value1:
0593             if value1 > 1:
0594                 cmd = 'export ATHENA_PROC_NUMBER=%d;' % value1 + cmd
0595             else:
0596                 logger.info((
0597                     "will not add ATHENA_PROC_NUMBER to cmd "
0598                     "since the value is %s"), str(value1))
0599         else:
0600             logger.warning((
0601                 "don't know how to set ATHENA_PROC_NUMBER "
0602                 "(could not find it in os.environ)"))
0603     else:
0604         logger.info("ATHENA_PROC_NUMBER already in job command")
0605 
0606     if 'ATHENA_CORE_NUMBER' in os.environ and value2:
0607         if value2 > 1:
0608             cmd = 'export ATHENA_CORE_NUMBER=%d;' % value2 + cmd
0609         else:
0610             logger.info("will not add ATHENA_CORE_NUMBER to cmd since the value is %s", str(value2))
0611     else:
0612         logger.warning((
0613             'there is no ATHENA_CORE_NUMBER in os.environ '
0614             '(cannot add it to payload command)'))
0615 
0616     return cmd
0617 
0618 
0619 def verify_release_string(release):
0620     """
0621     Verify that the release (or homepackage) string is set.
0622 
0623     :param release: release or homepackage string that might or might not be set.
0624     :return: release (set string).
0625     """
0626 
0627     if release is None:
0628         release = ""
0629     release = release.upper()
0630     if release == "":
0631         release = "NULL"
0632     if release == "NULL":
0633         logger.info("detected unset (NULL) release/homepackage string")
0634 
0635     return release
0636 
0637 
0638 def add_makeflags(job_core_count, cmd):
0639     """
0640     Correct for multi-core if necessary (especially important in
0641     case coreCount=1 to limit parallel make).
0642 
0643     :param job_core_count: core count from the job definition (int).
0644     :param cmd: payload execution command (string).
0645     :return: updated payload execution command (string).
0646     """
0647 
0648     # ATHENA_PROC_NUMBER is set in Node.py using the schedconfig value
0649     try:
0650         core_count = int(os.environ.get('ATHENA_PROC_NUMBER'))
0651     except (TypeError, KeyError, ValueError):
0652         core_count = -1
0653 
0654     if core_count == -1:
0655         try:
0656             core_count = int(job_core_count)
0657         except (TypeError, ValueError):
0658             pass
0659         else:
0660             if core_count >= 1:
0661                 # Note: the original request (AF) was to use j%d
0662                 # and not -j%d, now using the latter
0663                 cmd += "export MAKEFLAGS=\'-j%d QUICK=1 -l1\';" % (core_count)
0664 
0665     # make sure that MAKEFLAGS is always set
0666     if "MAKEFLAGS=" not in cmd:
0667         cmd += "export MAKEFLAGS=\'-j1 QUICK=1 -l1\';"
0668 
0669     return cmd
0670 
0671 
0672 def get_analysis_run_command(job, trf_name):
0673     """
0674     Return the proper run command for the user job.
0675 
0676     Example output:
0677     export X509_USER_PROXY=<..>;./runAthena <job parameters> --usePFCTurl --directIn
0678 
0679     :param job: job object.
0680     :param trf_name: name of the transform that will run the job (string).
0681     Used when containers are not used.
0682     :return: command (string).
0683     """
0684 
0685     cmd = ""
0686 
0687     # get relevant file transfer info
0688     #use_copy_tool, use_direct_access, use_pfc_turl = get_file_transfer_info(job)
0689     # check if the input files are to be accessed locally (ie if prodDBlockToken is set to local)
0690     ## useless since stage-in phase has already passed (DEPRECATE ME, anisyonk)
0691     #if job.is_local():
0692     #    logger.debug('switched off direct access for local prodDBlockToken')
0693     #    use_direct_access = False
0694     #    use_pfc_turl = False
0695 
0696     # add the user proxy
0697     if 'X509_USER_PROXY' in os.environ and not job.imagename:
0698         cmd += 'export X509_USER_PROXY=%s;' % os.environ.get('X509_USER_PROXY')
0699 
0700     # set up trfs
0701     if job.imagename == "":  # user jobs with no imagename defined
0702         cmd += './%s %s' % (trf_name, job.jobparams)
0703     else:
0704         if job.is_analysis() and job.imagename:
0705             cmd += './%s %s' % (trf_name, job.jobparams)
0706         else:
0707             cmd += 'python %s %s' % (trf_name, job.jobparams)
0708 
0709         imagename = job.imagename
0710         # check if image is on disk as defined by envar PAYLOAD_CONTAINER_LOCATION
0711         payload_container_location = os.environ.get('PAYLOAD_CONTAINER_LOCATION')
0712         if payload_container_location is not None:
0713             logger.debug("$PAYLOAD_CONTAINER_LOCATION = %s", payload_container_location)
0714             # get container name
0715             containername = imagename.rsplit('/')[-1]
0716             image_location = os.path.join(payload_container_location, containername)
0717             if os.path.exists(image_location):
0718                 logger.debug("image exists at %s", image_location)
0719                 imagename = image_location
0720 
0721         # restore the image name if necessary
0722         if 'containerImage' not in cmd and 'runcontainer' in trf_name:
0723             cmd += ' --containerImage=%s' % imagename
0724 
0725     # add control options for PFC turl and direct access
0726     #if job.indata:   ## DEPRECATE ME (anisyonk)
0727     #    if use_pfc_turl and '--usePFCTurl' not in cmd:
0728     #        cmd += ' --usePFCTurl'
0729     #    if use_direct_access and '--directIn' not in cmd:
0730     #        cmd += ' --directIn'
0731 
0732     if job.has_remoteio():
0733         logger.debug((
0734             'direct access (remoteio) is used to access some input files: '
0735             '--usePFCTurl and --directIn will be added to payload command'))
0736         if '--usePFCTurl' not in cmd:
0737             cmd += ' --usePFCTurl'
0738         if '--directIn' not in cmd:
0739             cmd += ' --directIn'
0740 
0741     # update the payload command for forced accessmode
0742     ## -- REDUNDANT logic, since it should be done from the beginning at
0743     ## the step of FileSpec initialization (anisyonk)
0744     #cmd = update_forced_accessmode(log, cmd, job.transfertype,
0745     # job.jobparams, trf_name)  ## DEPRECATE ME (anisyonk)
0746 
0747     # add guids when needed
0748     # get the correct guids list (with only the direct access files)
0749     if not job.is_build_job():
0750         lfns, guids = job.get_lfns_and_guids()
0751         _guids = get_guids_from_jobparams(job.jobparams, lfns, guids)
0752         if _guids:
0753             cmd += ' --inputGUIDs \"%s\"' % (str(_guids))
0754 
0755     show_memory_usage()
0756 
0757     return cmd
0758 
0759 
0760 ## SHOULD NOT BE USED since payload cmd should be properly generated
0761 ## from the beginning (consider final directio settings) (anisyonk)
0762 ## DEPRECATE ME (anisyonk)
0763 def update_forced_accessmode(log, cmd, transfertype, jobparams, trf_name):
0764     """
0765     Update the payload command for forced accessmode.
0766     accessmode is an option that comes from HammerCloud and is used to
0767     force a certain input file access mode; i.e. copy-to-scratch or direct access.
0768 
0769     :param log: logging object.
0770     :param cmd: payload command.
0771     :param transfertype: transfer type (.e.g 'direct') from the job
0772     definition with priority over accessmode (string).
0773     :param jobparams: job parameters (string).
0774     :param trf_name: transformation name (string).
0775     :return: updated payload command string.
0776     """
0777 
0778     if "accessmode" in cmd and transfertype != 'direct':
0779         accessmode_usect = None
0780         accessmode_directin = None
0781         _accessmode_dic = {"--accessmode=copy": ["copy-to-scratch mode", ""],
0782                            "--accessmode=direct": ["direct access mode", " --directIn"]}
0783 
0784         # update run_command according to jobPars
0785         for _mode in list(_accessmode_dic.keys()):  # Python 2/3
0786             if _mode in jobparams:
0787                 # any accessmode set in jobPars should overrule schedconfig
0788                 logger.info("enforcing %s", _accessmode_dic[_mode][0])
0789                 if _mode == "--accessmode=copy":
0790                     # make sure direct access is turned off
0791                     accessmode_usect = True
0792                     accessmode_directin = False
0793                 elif _mode == "--accessmode=direct":
0794                     # make sure copy-to-scratch gets turned off
0795                     accessmode_usect = False
0796                     accessmode_directin = True
0797                 else:
0798                     accessmode_usect = False
0799                     accessmode_directin = False
0800 
0801                 # update run_command (do not send the accessmode switch to runAthena)
0802                 cmd += _accessmode_dic[_mode][1]
0803                 if _mode in cmd:
0804                     cmd = cmd.replace(_mode, "")
0805 
0806         # force usage of copy tool for stage-in or direct access
0807         if accessmode_usect:
0808             logger.info('forced copy tool usage selected')
0809             # remove again the "--directIn"
0810             if "directIn" in cmd:
0811                 cmd = cmd.replace(' --directIn', ' ')
0812         elif accessmode_directin:
0813             logger.info('forced direct access usage selected')
0814             if "directIn" not in cmd:
0815                 cmd += ' --directIn'
0816         else:
0817             logger.warning('neither forced copy tool usage nor direct access was selected')
0818 
0819         if "directIn" in cmd and "usePFCTurl" not in cmd:
0820             cmd += ' --usePFCTurl'
0821 
0822         # need to add proxy if not there already
0823         if "--directIn" in cmd and "export X509_USER_PROXY" not in cmd:
0824             if 'X509_USER_PROXY' in os.environ:
0825                 cmd = cmd.replace("./%s" % trf_name, "export X509_USER_PROXY=%s;./%s" %
0826                                   (os.environ.get('X509_USER_PROXY'), trf_name))
0827 
0828     # if both direct access and the accessmode loop added a
0829     # directIn switch, remove the first one from the string
0830     if cmd.count("directIn") > 1:
0831         cmd = cmd.replace(' --directIn', ' ', 1)
0832 
0833     return cmd
0834 
0835 
0836 def get_guids_from_jobparams(jobparams, infiles, infilesguids):
0837     """
0838     Extract the correct guid from the input file list.
0839     The guids list is used for direct reading.
0840     1. extract input file list for direct reading from job parameters
0841     2. for each input file in this list, find the corresponding guid from
0842     the input file guid list.
0843     Since the job parameters string is entered by a human, the order of
0844     the input files might not be the same.
0845 
0846     :param jobparams: job parameters.
0847     :param infiles: input file list.
0848     :param infilesguids: input file guids list.
0849     :return: guids list.
0850     """
0851 
0852     guidlist = []
0853     jobparams = jobparams.replace("'", "")
0854     jobparams = jobparams.replace(", ", ",")
0855 
0856     pattern = re.compile(r'\-i \"\[([A-Za-z0-9.,_-]+)\]\"')
0857     directreadinginputfiles = re.findall(pattern, jobparams)
0858     _infiles = []
0859     if directreadinginputfiles != []:
0860         _infiles = directreadinginputfiles[0].split(",")
0861     else:
0862         match = re.search(r"-i ([A-Za-z0-9.\[\],_-]+) ", jobparams)  # Python 3 (added r)
0863         if match is not None:
0864             compactinfiles = match.group(1)
0865             match = re.search(r'(.*)\[(.+)\](.*)\[(.+)\]', compactinfiles)  # Python 3 (added r)
0866             if match is not None:
0867                 infiles = []
0868                 head = match.group(1)
0869                 tail = match.group(3)
0870                 body = match.group(2).split(',')
0871                 attr = match.group(4).split(',')
0872 
0873                 for idx, item in enumerate(body):
0874                     lfn = '%s%s%s%s' % (head, item, tail, attr[idx])
0875                     infiles.append(lfn)
0876             else:
0877                 infiles = [compactinfiles]
0878 
0879     for infile in _infiles:
0880         # get the corresponding index from the inputFiles list,
0881         # which has the same order as infilesguids
0882         try:
0883             index = infiles.index(infile)
0884         except ValueError as exc:
0885             logger.warning("exception caught: %s (direct reading will fail)", exc)
0886         else:
0887             # add the corresponding guid to the list
0888             guidlist.append(infilesguids[index])
0889 
0890     return guidlist
0891 
0892 
0893 def get_file_transfer_info(job):   ## TO BE DEPRECATED, NOT USED (anisyonk)
0894     """
0895     Return information about desired file transfer.
0896 
0897     :param job: job object
0898     :return: use copy tool (boolean), use direct access (boolean),
0899     use PFC Turl (boolean).
0900     """
0901 
0902     use_copy_tool = True
0903     use_direct_access = False
0904     use_pfc_turl = False
0905 
0906     # check with schedconfig
0907     is_lan = job.infosys.queuedata.direct_access_lan
0908     is_wan = job.infosys.queuedata.direct_access_wan
0909     if not job.is_build_job() and (is_lan or is_wan or job.transfertype == 'direct'):
0910         # override if all input files are copy-to-scratch
0911         if job.only_copy_to_scratch():
0912             logger.info((
0913                 'all input files are copy-to-scratch '
0914                 '(--usePFCTurl and --directIn will not be set)'))
0915         else:
0916             logger.debug('--usePFCTurl and --directIn will be set')
0917             use_copy_tool = False
0918             use_direct_access = True
0919             use_pfc_turl = True
0920 
0921     return use_copy_tool, use_direct_access, use_pfc_turl
0922 
0923 
0924 def update_job_data(job):
0925     """
0926     This function can be used to update/add data to the job object.
0927     E.g. user specific information can be extracted from other job object fields.
0928     In the case of ATLAS, information is extracted from the metadata field and
0929     added to other job object fields.
0930 
0931     :param job: job object
0932     :return:
0933     """
0934 
0935     ## comment from Alexey:
0936     ## it would be better to reallocate this logic (as well as parse
0937     ## metadata values)directly to Job object since in general it's Job
0938     ## related part. Later on once we introduce VO specific Job class
0939     ## (inherited from JobData) this can be easily customized
0940 
0941     # get label "all" or "log"
0942     stageout = get_stageout_label(job)
0943 
0944     if 'exeErrorDiag' in job.metadata:
0945         job.exeerrordiag = job.metadata['exeErrorDiag']
0946         if job.exeerrordiag:
0947             logger.warning('payload failed: exeErrorDiag=%s', job.exeerrordiag)
0948 
0949     # determine what should be staged out
0950     job.stageout = stageout  # output and log file or only log file
0951 
0952     work_attributes = None
0953     try:
0954         work_attributes = parse_jobreport_data(job.metadata)
0955     except Exception as exc:
0956         logger.warning('failed to parse job report (cannot set job.nevents): %s', exc)
0957     else:
0958         # note: the number of events can be set already at this point
0959         # if the value was extracted from the job report (a more thorough
0960         # search for this value is done later unless it was set here)
0961         nevents = work_attributes.get('nEvents', 0)
0962         if nevents:
0963             job.nevents = nevents
0964 
0965     # extract output files from the job report if required, in case the trf
0966     # has created additional (overflow) files. Also make sure all guids are
0967     # assigned (use job report value if present, otherwise generate the guid)
0968     if job.metadata and not job.is_eventservice:
0969         # keep this for now, complicated to merge with verify_output_files?
0970         extract_output_file_guids(job)
0971         try:
0972             verify_output_files(job)
0973         except Exception as exc:
0974             logger.warning('exception caught while trying verify output files: %s', exc)
0975     else:
0976         if not job.allownooutput:  # i.e. if it's an empty list/string, do nothing
0977             logger.debug((
0978                 "will not try to extract output files from jobReport "
0979                 "for user job (and allowNoOut list is empty)"))
0980         else:
0981             # remove the files listed in allowNoOutput if they don't exist
0982             remove_no_output_files(job)
0983 
0984     ## validate output data (to be moved into the JobData)
0985     ## warning: do no execute this code unless guid lookup in job report
0986     # has failed - pilot should only generate guids
0987     ## if they are not present in job report
0988     for dat in job.outdata:
0989         if not dat.guid:
0990             dat.guid = get_guid()
0991             logger.warning(
0992                 'guid not set: generated guid=%s for lfn=%s',
0993                 dat.guid,
0994                 dat.lfn
0995             )
0996 
0997 
0998 def get_stageout_label(job):
0999     """
1000     Get a proper stage-out label.
1001 
1002     :param job: job object.
1003     :return: "all"/"log" depending on stage-out type (string).
1004     """
1005 
1006     stageout = "all"
1007 
1008     if job.is_eventservice:
1009         logger.info('event service payload, will only stage-out log')
1010         stageout = "log"
1011     else:
1012         # handle any error codes
1013         if 'exeErrorCode' in job.metadata:
1014             job.exeerrorcode = job.metadata['exeErrorCode']
1015             if job.exeerrorcode == 0:
1016                 stageout = "all"
1017             else:
1018                 logger.info('payload failed: exeErrorCode=%d', job.exeerrorcode)
1019                 stageout = "log"
1020 
1021     return stageout
1022 
1023 
1024 def update_output_for_hpo(job):
1025     """
1026     Update the output (outdata) for HPO jobs.
1027 
1028     :param job: job object.
1029     :return:
1030     """
1031 
1032     try:
1033         new_outdata = discover_new_outdata(job)
1034     except Exception as exc:
1035         logger.warning('exception caught while discovering new outdata: %s', exc)
1036     else:
1037         if new_outdata:
1038             logger.info((
1039                 'replacing job outdata with discovered output '
1040                 '(%d file(s))'), len(new_outdata))
1041             job.outdata = new_outdata
1042 
1043 
1044 def discover_new_outdata(job):
1045     """
1046     Discover new outdata created by HPO job.
1047 
1048     :param job: job object.
1049     :return: new_outdata (list of FileSpec objects)
1050     """
1051 
1052     from pilot.info.filespec import FileSpec
1053     new_outdata = []
1054 
1055     for outdata_file in job.outdata:
1056         new_output = discover_new_output(outdata_file.lfn, job.workdir)
1057         if new_output:
1058             # create new FileSpec objects out of the new output
1059             for outfile in new_output:
1060                 # note: guid will be taken from job report
1061                 # after this function has been called
1062                 files = [{
1063                     'scope': outdata_file.scope,
1064                     'lfn': outfile,
1065                     'workdir': job.workdir,
1066                     'dataset': outdata_file.dataset,
1067                     'ddmendpoint': outdata_file.ddmendpoint,
1068                     'ddmendpoint_alt': None,
1069                     'filesize': new_output[outfile]['filesize'],
1070                     'checksum': new_output[outfile]['checksum'],
1071                     'guid': ''
1072                 }]
1073 
1074                 # do not abbreviate the following two lines as otherwise
1075                 # the content of xfiles will be a list of generator objects
1076                 _xfiles = [FileSpec(type='output', **f) for f in files]
1077                 new_outdata += _xfiles
1078 
1079     return new_outdata
1080 
1081 
1082 def discover_new_output(name_pattern, workdir):
1083     """
1084     Discover new output created by HPO job in the given work dir.
1085 
1086     name_pattern for known 'filename' is 'filename_N' (N = 0, 1, 2, ..).
1087     Example: name_pattern = 23578835.metrics.000001.tgz
1088              should discover files with names 23578835.metrics.000001.tgz_N (N = 0, 1, ..)
1089 
1090     new_output = { lfn: {'path': path, 'size': size, 'checksum': checksum}, .. }
1091 
1092     :param name_pattern: assumed name pattern for file to discover (string).
1093     :param workdir: work directory (string).
1094     :return: new_output (dictionary).
1095     """
1096 
1097     new_output = {}
1098     outputs = glob("%s/%s_*" % (workdir, name_pattern))
1099     if outputs:
1100         lfns = [os.path.basename(path) for path in outputs]
1101         for lfn, path in list(zip(lfns, outputs)):
1102             # get file size
1103             filesize = get_local_file_size(path)
1104             # get checksum
1105             checksum = calculate_checksum(path)
1106 
1107             if filesize and checksum:
1108                 new_output[lfn] = {'path': path, 'filesize': filesize, 'checksum': checksum}
1109             else:
1110                 logger.warning(
1111                     'failed to create file info (filesize=%d, checksum=%s) for lfn=%s',
1112                     filesize,
1113                     checksum,
1114                     lfn
1115                 )
1116 
1117     return new_output
1118 
1119 
1120 def extract_output_file_guids(job):
1121     """
1122     Extract output file info from the job report and make sure all guids\
1123     are assigned (use job report value if present, otherwise generate the guid.\
1124     Note: guid generation is done later, not in this function since
1125     this function might not be called if metadata info is not found prior
1126     to the call).
1127 
1128     :param job: job object.
1129     :return:
1130     """
1131 
1132     # make sure there is a defined output file list in the job report -
1133     # unless it is allowed by task parameter allowNoOutput
1134     if not job.allownooutput:
1135         output = job.metadata.get('files', {}).get('output', [])
1136         if output:
1137             logger.info((
1138                 'verified that job report contains metadata '
1139                 'for %d file(s)'), len(output))
1140         else:
1141             #- will fail job since allowNoOutput is not set')
1142             logger.warning((
1143                 'job report contains no output '
1144                 'files and allowNoOutput is not set'))
1145             #job.piloterrorcodes, job.piloterrordiags =
1146             # errors.add_error_code(errors.NOOUTPUTINJOBREPORT)
1147             return
1148 
1149     # extract info from metadata (job report JSON)
1150     data = dict([e.lfn, e] for e in job.outdata)
1151     #extra = []
1152     for dat in job.metadata.get('files', {}).get('output', []):
1153         for fdat in dat.get('subFiles', []):
1154             lfn = fdat['name']
1155 
1156             # verify the guid if the lfn is known
1157             # only extra guid if the file is known by the
1158             # job definition (March 18 change, v 2.5.2)
1159             if lfn in data:
1160                 data[lfn].guid = fdat['file_guid']
1161                 logger.info((
1162                     'set guid=%s for lfn=%s '
1163                     '(value taken from job report)'), data[lfn].guid, lfn)
1164             else:  # found new entry
1165                 logger.warning((
1166                     'pilot no longer considers output files not mentioned '
1167                     'in job definition (lfn=%s)'), lfn)
1168                 continue
1169 
1170                 #if job.outdata:
1171                 #    kw = {'lfn': lfn,
1172                 # .         # take value from 1st output file?
1173                 #          'scope': job.outdata[0].scope,
1174                 #          'guid': fdat['file_guid'],
1175                 #          'filesize': fdat['file_size'],
1176                 #           # take value from 1st output file?
1177                 #          'dataset': dat.get('dataset') or job.outdata[0].dataset
1178                 #          }
1179                 #    spec = FileSpec(filetype='output', **kw)
1180                 #    extra.append(spec)
1181 
1182     # make sure the output list has set guids from job report
1183     for fspec in job.outdata:
1184         if fspec.guid != data[fspec.lfn].guid:
1185             fspec.guid = data[fspec.lfn].guid
1186             logger.debug('reset guid=%s for lfn=%s', fspec.guid, fspec.lfn)
1187         else:
1188             if fspec.guid:
1189                 logger.debug('verified guid=%s for lfn=%s', fspec.guid, fspec.lfn)
1190             else:
1191                 logger.warning('guid not set for lfn=%s', fspec.lfn)
1192     #if extra:
1193         #logger.info('found extra output files in job report,
1194         # will overwrite output file list: extra=%s' % extra)
1195         #job.outdata = extra
1196 
1197 
1198 def verify_output_files(job):
1199     """
1200     Make sure that the known output files from the job definition are listed
1201     in the job report and number of processed events is greater than zero.
1202     If the output file is not listed in the job report, then if the file is
1203     listed in allowNoOutput remove it from stage-out, otherwise fail the job.
1204 
1205     Note from Rod: fail scenario: The output file is not in output:[] or is
1206     there with zero events. Then if allownooutput is not set - fail the job.
1207     If it is set, then do not store the output, and finish ok.
1208 
1209     :param job: job object.
1210     :return: Boolean (and potentially updated job.outdata list)
1211     """
1212 
1213     failed = False
1214 
1215     # get list of output files from the job definition
1216     lfns_jobdef = []
1217     for fspec in job.outdata:
1218         lfns_jobdef.append(fspec.lfn)
1219     if not lfns_jobdef:
1220         logger.debug('empty output file list from job definition (nothing to verify)')
1221         return True
1222 
1223     # get list of output files from job report
1224     # (if None is returned, it means the job report is from an old release
1225     # and does not contain an output list)
1226     output = job.metadata.get('files', {}).get('output', None)
1227     if not output and output is not None:
1228         # ie empty list, output=[] - are all known output files in allowNoOutput?
1229         logger.warning((
1230             'encountered an empty output file list in job report, '
1231             'consulting allowNoOutput list'))
1232         failed = False
1233         for lfn in lfns_jobdef:
1234             if lfn not in job.allownooutput:
1235                 if job.is_analysis():
1236                     logger.warning((
1237                         'lfn %s is not in allowNoOutput list - '
1238                         'ignore for user job'),
1239                         lfn
1240                     )
1241                 else:
1242                     failed = True
1243                     logger.warning(
1244                         'lfn %s is not in allowNoOutput list - job will fail',
1245                         lfn
1246                     )
1247                     job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGOUTPUTFILE)
1248                     break
1249             else:
1250                 logger.info('lfn %s listed in allowNoOutput - will be removed from stage-out', lfn)
1251                 remove_from_stageout(lfn, job)
1252 
1253     elif output is None:
1254         # ie job report is ancient / output could not be extracted
1255         logger.warning((
1256             'output file list could not be extracted from job report '
1257             '(nothing to verify)'))
1258     else:
1259         verified, nevents = verify_extracted_output_files(output, lfns_jobdef, job)
1260         failed = (not verified)
1261         if nevents > 0 and not failed and job.nevents == 0:
1262             job.nevents = nevents
1263             logger.info('number of events from summed up output files: %d', nevents)
1264         else:
1265             logger.info('number of events previously set to %d', job.nevents)
1266 
1267     status = (not failed)
1268 
1269     if status:
1270         logger.info('output file verification succeeded')
1271     else:
1272         logger.warning('output file verification failed')
1273 
1274     return status
1275 
1276 
1277 def verify_extracted_output_files(output, lfns_jobdef, job):
1278     """
1279     Make sure all output files extracted from the job report are listed.
1280     Grab the number of events if possible.
1281 
1282     :param output: list of FileSpecs (list).
1283     :param lfns_jobdef: list of lfns strings from job definition (list).
1284     :param job: job object.
1285     :return: True if successful|False if failed, number of events (Boolean, int)
1286     """
1287 
1288     failed = False
1289     nevents = 0
1290     output_jobrep = {}  # {lfn: nentries, ..}
1291     logger.debug((
1292         'extracted output file list from job report - '
1293         'make sure all known output files are listed'))
1294 
1295     # first collect the output files from the job report
1296     for dat in output:
1297         for fdat in dat.get('subFiles', []):
1298             # get the lfn
1299             name = fdat.get('name', None)
1300 
1301             # get the number of processed events and add the output file info to the dictionary
1302             output_jobrep[name] = fdat.get('nentries', None)
1303 
1304     # now make sure that the known output files are in the job report dictionary
1305     for lfn in lfns_jobdef:
1306         if lfn not in output_jobrep and lfn not in job.allownooutput:
1307             if job.is_analysis():
1308                 logger.warning((
1309                     'output file %s from job definition is not present '
1310                     'in job report and is not listed in allowNoOutput'), lfn)
1311             else:
1312                 logger.warning((
1313                     'output file %s from job definition is not present '
1314                     'in job report and is not listed in allowNoOutput - '
1315                     'job will fail'), lfn)
1316                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGOUTPUTFILE)
1317                 failed = True
1318                 break
1319 
1320         if lfn not in output_jobrep and lfn in job.allownooutput:
1321             logger.warning((
1322                 'output file %s from job definition is not present '
1323                 'in job report but is listed in allowNoOutput - '
1324                 'remove from stage-out'), lfn)
1325             remove_from_stageout(lfn, job)
1326         else:
1327             nentries = output_jobrep[lfn]
1328             if nentries == "UNDEFINED":
1329                 logger.warning((
1330                     'encountered file with nentries=UNDEFINED - '
1331                     'will ignore %s'), lfn)
1332 
1333             elif nentries is None:
1334 
1335                 if lfn not in job.allownooutput:
1336                     logger.warning((
1337                         'output file %s is listed in job report, '
1338                         'but has no events and is not listed in '
1339                         'allowNoOutput - will ignore'), lfn)
1340                 else:
1341                     logger.warning((
1342                         'output file %s is listed in job report, '
1343                         'nentries is None and is listed in allowNoOutput - '
1344                         'remove from stage-out'), lfn)
1345                     remove_from_stageout(lfn, job)
1346 
1347             elif nentries == 0:
1348 
1349                 if lfn not in job.allownooutput:
1350                     logger.warning((
1351                         'output file %s is listed in job report, '
1352                         'has zero events and is not listed in '
1353                         'allowNoOutput - will ignore'), lfn)
1354                 else:
1355                     logger.warning((
1356                         'output file %s is listed in job report, '
1357                         'has zero events and is listed in allowNoOutput - '
1358                         'remove from stage-out'), lfn)
1359                     remove_from_stageout(lfn, job)
1360 
1361             elif type(nentries) is int and nentries:
1362                 logger.info('output file %s has %d event(s)', lfn, nentries)
1363                 nevents += nentries
1364             else:  # should not reach this step
1365                 logger.warning((
1366                     'case not handled for output file %s with %s event(s) '
1367                     '(ignore)'), lfn, str(nentries))
1368 
1369     status = (not failed)
1370     return status, nevents
1371 
1372 
1373 def remove_from_stageout(lfn, job):
1374     """
1375     From the given lfn from the stage-out list.
1376 
1377     :param lfn: local file name (string).
1378     :param job: job object
1379     :return: [updated job object]
1380     """
1381 
1382     outdata = []
1383     for fspec in job.outdata:
1384         if fspec.lfn == lfn:
1385             logger.info('removing %s from stage-out list', lfn)
1386         else:
1387             outdata.append(fspec)
1388     job.outdata = outdata
1389 
1390 
1391 def remove_no_output_files(job):
1392     """
1393     Remove files from output file list if they are listed in
1394     allowNoOutput and do not exist.
1395 
1396     :param job: job object.
1397     :return:
1398     """
1399 
1400     # first identify the files to keep
1401     _outfiles = []
1402     for fspec in job.outdata:
1403         filename = fspec.lfn
1404         path = os.path.join(job.workdir, filename)
1405 
1406         if filename in job.allownooutput:
1407             if os.path.exists(path):
1408                 logger.info((
1409                     "file %s is listed in allowNoOutput but exists "
1410                     "(will not be removed from list of files to be "
1411                     "staged-out)"), filename)
1412                 _outfiles.append(filename)
1413             else:
1414                 logger.info((
1415                     "file %s is listed in allowNoOutput and does not exist "
1416                     "(will be removed from list of files to be staged-out)"), filename)
1417         else:
1418             if os.path.exists(path):
1419                 logger.info("file %s is not listed in allowNoOutput (will be staged-out)", filename)
1420             else:
1421                 logger.warning((
1422                     "file %s is not listed in allowNoOutput and "
1423                     "does not exist (job will fail)"), filename)
1424             _outfiles.append(filename)
1425 
1426     # now remove the unwanted fspecs
1427     if len(_outfiles) != len(job.outdata):
1428         outdata = []
1429         for fspec in job.outdata:
1430             if fspec.lfn in _outfiles:
1431                 outdata.append(fspec)
1432         job.outdata = outdata
1433 
1434 
1435 def get_outfiles_records(subfiles):
1436     """
1437     Extract file info from job report JSON subfiles entry.
1438 
1439     :param subfiles: list of subfiles.
1440     :return: file info dictionary with format { 'guid': .., 'size': .., 'nentries': .. (optional)}
1441     """
1442 
1443     res = {}
1444     for subfile in subfiles:
1445         res[subfile['name']] = {
1446             'guid': subfile['file_guid'],
1447             'size': subfile['file_size']
1448         }
1449 
1450         nentries = subfile.get('nentries', 'UNDEFINED')
1451         if type(nentries) == int:
1452             res[subfile['name']]['nentries'] = nentries
1453         else:
1454             logger.warning("nentries is undefined in job report")
1455 
1456     return res
1457 
1458 
1459 class DictQuery(dict):
1460     """
1461     Helper class for parsing job report.
1462     """
1463 
1464     def get(self, path, dst_dict, dst_key):
1465         keys = path.split("/")
1466         if len(keys) == 0:
1467             return
1468         last_key = keys.pop()
1469         me_ = self
1470         for key in keys:
1471             if not (key in me_ and isinstance(me_[key], dict)):
1472                 return
1473 
1474             me_ = me_[key]
1475 
1476         if last_key in me_:
1477             dst_dict[dst_key] = me_[last_key]
1478 
1479 
1480 def parse_jobreport_data(job_report):  # noqa: C901
1481     """
1482     Parse a job report and extract relevant fields.
1483 
1484     :param job_report:
1485     :return:
1486     """
1487     work_attributes = {}
1488     if job_report is None or not any(job_report):
1489         return work_attributes
1490 
1491     # these are default values for job metrics
1492     core_count = ""
1493     work_attributes["nEvents"] = 0
1494     work_attributes["dbTime"] = ""
1495     work_attributes["dbData"] = ""
1496     work_attributes["inputfiles"] = []
1497     work_attributes["outputfiles"] = []
1498 
1499     if "ATHENA_PROC_NUMBER" in os.environ:
1500         logger.debug("ATHENA_PROC_NUMBER: %s", os.environ["ATHENA_PROC_NUMBER"])
1501         work_attributes['core_count'] = int(os.environ["ATHENA_PROC_NUMBER"])
1502         core_count = int(os.environ["ATHENA_PROC_NUMBER"])
1503 
1504     dictq = DictQuery(job_report)
1505     dictq.get("resource/transform/processedEvents", work_attributes, "nEvents")
1506     dictq.get("resource/transform/cpuTimeTotal", work_attributes, "cpuConsumptionTime")
1507     dictq.get("resource/machine/node", work_attributes, "node")
1508     dictq.get("resource/machine/model_name", work_attributes, "cpuConsumptionUnit")
1509     dictq.get("resource/dbTimeTotal", work_attributes, "dbTime")
1510     dictq.get("resource/dbDataTotal", work_attributes, "dbData")
1511     dictq.get("exitCode", work_attributes, "transExitCode")
1512     dictq.get("exitMsg", work_attributes, "exeErrorDiag")
1513     dictq.get("files/input", work_attributes, "inputfiles")
1514     dictq.get("files/output", work_attributes, "outputfiles")
1515 
1516     outputfiles_dict = {}
1517     for opf in work_attributes['outputfiles']:
1518         outputfiles_dict.update(get_outfiles_records(opf['subFiles']))
1519     work_attributes['outputfiles'] = outputfiles_dict
1520 
1521     if work_attributes['inputfiles']:
1522         if is_python3():
1523             work_attributes['nInputFiles'] = reduce(lambda a, b: a + b, [len(inpfiles['subFiles']) for inpfiles in
1524                                                                          work_attributes['inputfiles']])
1525         else:
1526             work_attributes['nInputFiles'] = reduce(lambda a, b: a + b, map(lambda inpfiles: len(inpfiles['subFiles']),
1527                                                                             work_attributes['inputfiles']))
1528 
1529     if 'resource' in job_report and 'executor' in job_report['resource']:
1530         j = job_report['resource']['executor']
1531 
1532         fin_report = defaultdict(int)
1533         for value in j.values():
1534             mem = value.get('memory', {})
1535             for key in ('Avg', 'Max'):
1536                 for subk, subv in mem.get(key, {}).items():
1537                     fin_report[subk] += subv
1538 
1539         work_attributes.update(fin_report)
1540 
1541     workdir_size = get_disk_usage('.')
1542     work_attributes['jobMetrics'] = 'coreCount=%s nEvents=%s dbTime=%s dbData=%s workDirSize=%s' % \
1543                                     (core_count,
1544                                         work_attributes["nEvents"],
1545                                         work_attributes["dbTime"],
1546                                         work_attributes["dbData"],
1547                                         workdir_size)
1548     del work_attributes["dbData"]
1549     del work_attributes["dbTime"]
1550 
1551     return work_attributes
1552 
1553 
1554 def get_executor_dictionary(jobreport_dictionary):
1555     """
1556     Extract the 'executor' dictionary from with a job report.
1557 
1558     :param jobreport_dictionary:
1559     :return: executor_dictionary
1560     """
1561 
1562     executor_dictionary = {}
1563     if jobreport_dictionary != {}:
1564 
1565         if 'resource' in jobreport_dictionary:
1566             resource_dictionary = jobreport_dictionary['resource']
1567             if 'executor' in resource_dictionary:
1568                 executor_dictionary = resource_dictionary['executor']
1569             else:
1570                 logger.warning("no such key: executor")
1571         else:
1572             logger.warning("no such key: resource")
1573 
1574     return executor_dictionary
1575 
1576 
1577 def get_resimevents(jobreport_dictionary):
1578     """
1579     Extract and add up the resimevents from the job report.
1580     This information is reported with the jobMetrics.
1581 
1582     :param jobreport_dictionary: job report dictionary.
1583     :return: resimevents (int or None)
1584     """
1585 
1586     resimevents = None
1587 
1588     executor_dictionary = get_executor_dictionary(jobreport_dictionary)
1589     if executor_dictionary != {}:
1590         for fmt in list(executor_dictionary.keys()):  # "ReSim", Python 2/3
1591             if 'resimevents' in executor_dictionary[fmt]:
1592                 try:
1593                     resimevents = int(executor_dictionary[fmt]['resimevents'])
1594                 except (KeyError, ValueError, TypeError):
1595                     pass
1596                 else:
1597                     break
1598 
1599     return resimevents
1600 
1601 
1602 def get_db_info(jobreport_dictionary):
1603     """
1604     Extract and add up the DB info from the job report.
1605     This information is reported with the jobMetrics.
1606     Note: this function adds up the different dbData and dbTime's in
1607     the different executor steps. In modern job reports this might have
1608     been done already by the transform and stored in dbDataTotal and dbTimeTotal.
1609 
1610     :param jobreport_dictionary: job report dictionary.
1611     :return: db_time (int), db_data (long)
1612     """
1613 
1614     db_time = 0
1615     try:
1616         db_data = long(0)  # Python 2  # noqa: F821
1617     except NameError:
1618         db_data = 0  # Python 3
1619 
1620     executor_dictionary = get_executor_dictionary(jobreport_dictionary)
1621     if executor_dictionary != {}:
1622         for fmt in list(executor_dictionary.keys()):  # "RAWtoESD", .., Python 2/3
1623             if 'dbData' in executor_dictionary[fmt]:
1624                 try:
1625                     db_data += executor_dictionary[fmt]['dbData']
1626                 except Exception:
1627                     pass
1628             else:
1629                 logger.warning("format %s has no such key: dbData", fmt)
1630             if 'dbTime' in executor_dictionary[fmt]:
1631                 try:
1632                     db_time += executor_dictionary[fmt]['dbTime']
1633                 except Exception:
1634                     pass
1635             else:
1636                 logger.warning("format %s has no such key: dbTime", fmt)
1637 
1638     return db_time, db_data
1639 
1640 
1641 def get_db_info_str(db_time, db_data):
1642     """
1643     Convert db_time, db_data to strings.
1644     E.g. dbData="105077960", dbTime="251.42".
1645 
1646     :param db_time: time (s)
1647     :param db_data: long integer
1648     :return: db_time_s, db_data_s (strings)
1649     """
1650 
1651     try:
1652         zero = long(0)  # Python 2  # noqa: F821
1653     except NameError:
1654         zero = 0  # Python 3
1655 
1656     db_data_s = ""
1657     if db_data != zero:
1658         db_data_s = "%s" % (db_data)
1659 
1660     db_time_s = ""
1661     if db_time != 0:
1662         db_time_s = "%.2f" % (db_time)
1663 
1664     return db_time_s, db_data_s
1665 
1666 
1667 def get_cpu_times(jobreport_dictionary):
1668     """
1669     Extract and add up the total CPU times from the job report.
1670     E.g. ('s', 5790L, 1.0).
1671 
1672     Note: this function is used with Event Service jobs
1673 
1674     :param jobreport_dictionary:
1675     :return: cpu_conversion_unit (unit), total_cpu_time,
1676     conversion_factor (output consistent with set_time_consumed())
1677     """
1678 
1679     try:
1680         total_cpu_time = long(0)  # Python 2 # noqa: F821
1681     except NameError:
1682         total_cpu_time = 0  # Python 3
1683 
1684     executor_dictionary = get_executor_dictionary(jobreport_dictionary)
1685     if executor_dictionary != {}:
1686         for fmt in list(executor_dictionary.keys()):  # "RAWtoESD", .., Python 2/3
1687             try:
1688                 total_cpu_time += executor_dictionary[fmt]['cpuTime']
1689             except KeyError:
1690                 logger.warning("format %s has no such key: cpuTime", fmt)
1691             except Exception:
1692                 pass
1693 
1694     conversion_factor = 1.0
1695     cpu_conversion_unit = "s"
1696 
1697     return cpu_conversion_unit, total_cpu_time, conversion_factor
1698 
1699 
1700 def get_exit_info(jobreport_dictionary):
1701     """
1702     Return the exit code (exitCode) and exit message (exitMsg).
1703     E.g. (0, 'OK').
1704 
1705     :param jobreport_dictionary:
1706     :return: exit_code, exit_message
1707     """
1708 
1709     return jobreport_dictionary['exitCode'], jobreport_dictionary['exitMsg']
1710 
1711 
1712 def cleanup_looping_payload(workdir):
1713     """
1714     Run a special cleanup for looping payloads.
1715     Remove any root and tmp files.
1716 
1717     :param workdir: working directory (string)
1718     :return:
1719     """
1720 
1721     for (root, _, files) in os.walk(workdir):
1722         for filename in files:
1723             if 'pool.root' in filename:
1724                 path = os.path.join(root, filename)
1725                 path = os.path.abspath(path)
1726                 remove(path)
1727 
1728 
1729 def cleanup_payload(workdir, outputfiles=None, removecores=True):
1730     """
1731     Cleanup of payload (specifically AthenaMP) sub directories prior to log file creation.
1732     Also remove core dumps.
1733 
1734     :param workdir: working directory (string).
1735     :param outputfiles: list of output files.
1736     :param removecores: remove core files if True (Boolean).
1737     :return:
1738     """
1739 
1740     if outputfiles is None:
1741         outputfiles = []
1742 
1743     if removecores:
1744         remove_core_dumps(workdir)
1745 
1746     for ampdir in glob('%s/athenaMP-workers-*' % workdir):
1747         for (root, _, files) in os.walk(ampdir):
1748             for filename in files:
1749                 path = os.path.abspath(os.path.join(root, filename))
1750 
1751                 core_file = ('core' in filename and removecores)
1752                 pool_root_file = 'pool.root' in filename
1753                 tmp_file = 'tmp.' in filename
1754 
1755                 if core_file or pool_root_file or tmp_file:
1756                     remove(path)
1757 
1758                 for outfile in outputfiles:
1759                     if outfile in filename:
1760                         remove(path)
1761 
1762 
1763 def get_redundant_path():
1764     """
1765     Return the path to the file containing the redundant files
1766     and directories to be removed prior to log file creation.
1767 
1768     :return: file path (string).
1769     """
1770 
1771     filename = config.Pilot.redundant
1772 
1773     # correct /cvmfs if necessary
1774     if filename.startswith('/cvmfs') and os.environ.get('ATLAS_SW_BASE', False):
1775         filename = filename.replace('/cvmfs', os.environ.get('ATLAS_SW_BASE'))
1776 
1777     return filename
1778 
1779 
1780 def get_redundants():
1781     """
1782     Get list of redundant files and directories (to be removed).
1783     The function will return the content of an external file. It that
1784     can't be read, then a list defined in this function will be returned instead.
1785     Any updates to the external file must be propagated to this function.
1786 
1787     :return: files and directories list
1788     """
1789 
1790     # try to read the list from the external file
1791     filename = get_redundant_path()
1792 
1793     # do not use the cvmfs file since it is not being updated
1794     # If you uncomment this block, need to also uncomment the read_list import
1795     # if os.path.exists(filename) and False:
1796     #    dir_list = read_list(filename)
1797     #    if dir_list:
1798     #        return dir_list
1799 
1800     logger.debug((
1801         'list of redundant files could not be read from external file: %s '
1802         '(will use internal list)'), filename)
1803 
1804     # else return the following
1805     dir_list = ["AtlasProduction*",
1806                 "AtlasPoint1",
1807                 "AtlasTier0",
1808                 "buildJob*",
1809                 "CDRelease*",
1810                 "csc*.log",
1811                 "DBRelease*",
1812                 "EvgenJobOptions",
1813                 "external",
1814                 "fort.*",
1815                 "geant4",
1816                 "geomDB",
1817                 "geomDB_sqlite",
1818                 "home",
1819                 "o..pacman..o",
1820                 "pacman-*",
1821                 "python",
1822                 "runAthena*",
1823                 "share",
1824                 "sources.*",
1825                 "sqlite*",
1826                 "sw",
1827                 "tcf_*",
1828                 "triggerDB",
1829                 "trusted.caches",
1830                 "workdir",
1831                 "*.data*",
1832                 "*.events",
1833                 "*.py",
1834                 "*.pyc",
1835                 "*.root*",
1836                 "JEM",
1837                 "tmp*",
1838                 "*.tmp",
1839                 "*.TMP",
1840                 "MC11JobOptions",
1841                 "scratch",
1842                 "*.writing",
1843                 "pwg*",
1844                 "pwhg*",
1845                 "*PROC*",
1846                 "madevent",
1847                 "*proxy",
1848                 "ckpt*",
1849                 "*runcontainer*",
1850                 "*job.log.tgz",
1851                 "runGen-*",
1852                 "runAthena-*",
1853                 "pandawnutil/*",
1854                 "src/*",
1855                 "singularity_cachedir",
1856                 "_joproxy15",
1857                 "HAHM_*",
1858                 "Process",
1859                 "merged_lhef._0.events-new",
1860                 "singularity/*",  # new
1861                 "/cores",  # new
1862                 "/work",  # new
1863                 "docs/",  # new
1864                 "/pilot2"]  # new
1865 
1866     return dir_list
1867 
1868 
1869 def remove_archives(workdir):
1870     """
1871     Explicitly remove any soft linked archives (.a files) since
1872     they will be dereferenced by the tar command
1873     (--dereference option).
1874 
1875     :param workdir: working directory (string)
1876     :return:
1877     """
1878 
1879     matches = []
1880     for root, _, filenames in os.walk(workdir):
1881         for filename in fnmatch.filter(filenames, '*.a'):
1882             matches.append(os.path.join(root, filename))
1883     for root, _, filenames in os.walk(os.path.dirname(workdir)):
1884         for filename in fnmatch.filter(filenames, 'EventService_premerge_*.tar'):
1885             matches.append(os.path.join(root, filename))
1886 
1887     for match in matches:
1888         remove(match)
1889 
1890 
1891 def cleanup_broken_links(workdir):
1892     """
1893     Run a second pass to clean up any broken links prior to log file creation.
1894 
1895     :param workdir: working directory (string)
1896     :return:
1897     """
1898 
1899     broken = []
1900     for root, _, files in os.walk(workdir):
1901         for filename in files:
1902             path = os.path.join(root, filename)
1903             if not os.path.islink(path):
1904                 continue
1905 
1906             target_path = os.readlink(path)
1907             # Resolve relative symlinks
1908             if not os.path.isabs(target_path):
1909                 target_path = os.path.join(os.path.dirname(path), target_path)
1910             if not os.path.exists(target_path):
1911                 broken.append(path)
1912 
1913     for brok in broken:
1914         remove(brok)
1915 
1916 
1917 def list_work_dir(workdir):
1918     """
1919     Execute ls -lF for the given directory and dump to log.
1920 
1921     :param workdir: directory name (string).
1922     """
1923 
1924     cmd = 'ls -lF %s' % workdir
1925     _, stdout, stderr = execute(cmd)
1926     logger.debug('%s:\n' % stdout + stderr)
1927 
1928 
1929 def remove_special_files(workdir, dir_list, outputfiles):
1930     """
1931     Remove list of special files from the workdir.
1932 
1933     :param workdir: work directory (string).
1934     :param dir_list: list of special files (list).
1935     :param outputfiles: output files (list).
1936     :return:
1937     """
1938 
1939     # note: these should be partial file/dir names, not containing any wildcards
1940     exceptions_list = ["runargs", "runwrapper", "jobReport", "log."]
1941 
1942     to_delete = []
1943     for _dir in dir_list:
1944         files = glob(os.path.join(workdir, _dir))
1945         if not files:
1946             continue
1947 
1948         exclude = []
1949         for exc in exceptions_list:
1950             for item in files:
1951                 if exc in item:
1952                     exclude.append(os.path.abspath(item))
1953 
1954         _files = [os.path.abspath(item) for item in files if item not in exclude]
1955         to_delete += _files
1956 
1957     exclude_files = []
1958     for opf in outputfiles:
1959         exclude_files.append(os.path.join(workdir, opf))
1960 
1961     for item in to_delete:
1962         if item not in exclude_files:
1963             logger.debug('removing %s', item)
1964             if os.path.isfile(item):
1965                 remove(item)
1966             else:
1967                 remove_dir_tree(item)
1968 
1969 
1970 def remove_redundant_files(workdir, outputfiles=None, islooping=False, debugmode=False):
1971     """
1972     Remove redundant files and directories prior to creating the log file.
1973 
1974     Note: in debug mode, any core files should not be removed before creating the log.
1975 
1976     :param workdir: working directory (string).
1977     :param outputfiles: list of protected output files (list).
1978     :param islooping: looping job variable to make sure workDir is not removed in case of looping (Boolean).
1979     :param debugmode: True if debug mode has been switched on (Boolean).
1980     :return:
1981     """
1982 
1983     if outputfiles is None:
1984         outputfiles = []
1985 
1986     logger.debug("removing redundant files prior to log creation")
1987     workdir = os.path.abspath(workdir)
1988 
1989     list_work_dir(workdir)
1990 
1991     # get list of redundant files and directories (to be removed)
1992     dir_list = get_redundants()
1993 
1994     # remove core and pool.root files from AthenaMP sub directories
1995     logger.debug('cleaning up payload')
1996     try:
1997         cleanup_payload(workdir, outputfiles, removecores=not debugmode)
1998     except OSError as exc:
1999         logger.warning("failed to execute cleanup_payload(): %s", exc)
2000 
2001     # explicitly remove any soft linked archives (.a files)
2002     # since they will be dereferenced by the tar command (--dereference option)
2003     logger.debug('removing archives')
2004     remove_archives(workdir)
2005 
2006     # remove special files
2007     remove_special_files(workdir, dir_list, outputfiles)
2008 
2009     # run a second pass to clean up any broken links
2010     logger.debug('cleaning up broken links')
2011     cleanup_broken_links(workdir)
2012 
2013     # remove any present user workDir
2014     path = os.path.join(workdir, 'workDir')
2015     if os.path.exists(path):
2016         # remove at least root files from workDir (ie also in the case of looping job)
2017         cleanup_looping_payload(path)
2018         if not islooping:
2019             logger.debug('removing \'workDir\' from workdir=%s', workdir)
2020             remove_dir_tree(path)
2021 
2022     # remove additional dirs
2023     additionals = ['singularity', 'pilot', 'cores']
2024     for additional in additionals:
2025         path = os.path.join(workdir, additional)
2026         if os.path.exists(path):
2027             logger.debug('removing \'%s\' from workdir=%s', additional, workdir)
2028             remove_dir_tree(path)
2029 
2030     list_work_dir(workdir)
2031 
2032 
2033 def download_command(process, workdir):
2034     """
2035     Download the pre/postprocess commands if necessary.
2036 
2037     Process FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>}
2038 
2039     :param process: pre/postprocess dictionary.
2040     :param workdir: job workdir (string).
2041     :return: updated pre/postprocess dictionary.
2042     """
2043 
2044     cmd = process.get('command', '')
2045 
2046     # download the command if necessary
2047     if cmd.startswith('http'):
2048         # Try to download the trf (skip when user container is to be used)
2049         exitcode, _, cmd = get_analysis_trf(cmd, workdir)
2050         if exitcode != 0:
2051             logger.warning('cannot execute command due to previous error: %s', cmd)
2052             return {}
2053 
2054         # update the preprocess command (the URL should be stripped)
2055         process['command'] = './' + cmd
2056 
2057     return process
2058 
2059 
2060 def get_utility_commands(order=None, job=None):
2061     """
2062     Return a dictionary of utility commands and arguments to be executed
2063     in parallel with the payload. This could e.g. be memory and network
2064     monitor commands. A separate function can be used to determine the
2065     corresponding command setups using the utility command name. If the
2066     optional order parameter is set, the function should return the list
2067     of corresponding commands.
2068 
2069     For example:
2070 
2071     If order=UTILITY_BEFORE_PAYLOAD, the function should return all
2072     commands that are to be executed before the payload.
2073 
2074     If order=UTILITY_WITH_PAYLOAD, the corresponding commands will be
2075     prepended to the payload execution string.
2076 
2077     If order=UTILITY_AFTER_PAYLOAD_STARTED, the commands that should be
2078     executed after the payload has been started should be returned.
2079 
2080     If order=UTILITY_WITH_STAGEIN, the commands that should be executed
2081     parallel with stage-in will be returned.
2082 
2083     FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>, 'ignore_failure': <Boolean>}
2084 
2085     :param order: optional sorting order (see pilot.util.constants).
2086     :param job: optional job object.
2087     :return: dictionary of utilities to be executed in parallel with the payload.
2088     """
2089 
2090     if order == UTILITY_BEFORE_PAYLOAD and job.preprocess:
2091         return get_precopostprocess_command(job.preprocess, job.workdir, 'preprocess')
2092 
2093     if order == UTILITY_WITH_PAYLOAD:
2094         return {'command': 'NetworkMonitor', 'args': '', 'label': 'networkmonitor', 'ignore_failure': True}
2095 
2096     if order == UTILITY_AFTER_PAYLOAD_STARTED:
2097         return get_utility_after_payload_started()
2098 
2099     if order == UTILITY_AFTER_PAYLOAD_STARTED2 and job.coprocess:
2100         return get_precopostprocess_command(job.coprocess, job.workdir, 'coprocess')
2101 
2102     if order == UTILITY_AFTER_PAYLOAD_FINISHED:
2103         return get_xcache_command(
2104             job.infosys.queuedata.catchall,
2105             job.workdir,
2106             job.jobid,
2107             'xcache_kill',
2108             xcache_deactivation_command,
2109         )
2110 
2111     if order == UTILITY_AFTER_PAYLOAD_FINISHED2 and job.postprocess:
2112         return get_precopostprocess_command(job.postprocess, job.workdir, 'postprocess')
2113 
2114     if order == UTILITY_BEFORE_STAGEIN:
2115         return get_xcache_command(
2116             job.infosys.queuedata.catchall,
2117             job.workdir,
2118             job.jobid,
2119             'xcache_start',
2120             xcache_activation_command,
2121         )
2122 
2123     return None
2124 
2125 
2126 def get_precopostprocess_command(process, workdir, label):
2127     """
2128     Return the pre/co/post-process command dictionary.
2129 
2130     Command FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>}
2131 
2132     The returned command has the structure: { 'command': <string>, }
2133     :param process: pre/co/post-process (dictionary).
2134     :param workdir: working directory (string).
2135     :param label: label (string).
2136     :return: command (dictionary).
2137     """
2138 
2139     com = {}
2140     if process.get('command', ''):
2141         com = download_command(process, workdir)
2142         com['label'] = label
2143         com['ignore_failure'] = False
2144     return com
2145 
2146 
2147 def get_utility_after_payload_started():
2148     """
2149     Return the command dictionary for the utility after the payload has started.
2150 
2151     Command FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>}
2152 
2153     :return: command (dictionary).
2154     """
2155 
2156     com = {}
2157     try:
2158         cmd = config.Pilot.utility_after_payload_started
2159     except Exception:
2160         pass
2161     else:
2162         if cmd:
2163             com = {'command': cmd, 'args': '', 'label': cmd.lower(), 'ignore_failure': True}
2164     return com
2165 
2166 
2167 def get_xcache_command(catchall, workdir, jobid, label, xcache_function):
2168     """
2169     Return the proper xcache command for either activation or deactivation.
2170 
2171     Command FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>}
2172 
2173     :param catchall: queuedata catchall field (string).
2174     :param workdir: job working directory (string).
2175     :param jobid: PanDA job id (string).
2176     :param label: label (string).
2177     :param xcache_function: activation/deactivation function name (function).
2178     :return: command (dictionary).
2179     """
2180 
2181     com = {}
2182     if 'pilotXcache' in catchall:
2183         com = xcache_function(jobid=jobid, workdir=workdir)
2184         com['label'] = label
2185         com['ignore_failure'] = True
2186     return com
2187 
2188 
2189 def post_prestagein_utility_command(**kwargs):
2190     """
2191     Execute any post pre-stage-in utility commands.
2192 
2193     :param kwargs: kwargs (dictionary).
2194     :return:
2195     """
2196 
2197     label = kwargs.get('label', 'unknown_label')
2198     stdout = kwargs.get('output', None)
2199 
2200     if stdout:
2201         logger.debug('processing stdout for label=%s', label)
2202         xcache_proxy(stdout)
2203     else:
2204         logger.warning('no output for label=%s', label)
2205 
2206     alrb_xcache_files = os.environ.get('ALRB_XCACHE_FILES', '')
2207     if alrb_xcache_files:
2208         cmd = 'cat $ALRB_XCACHE_FILES/settings.sh'
2209         _, _stdout, _ = execute(cmd)
2210         logger.debug('cmd=%s:\n\n%s\n\n', cmd, _stdout)
2211 
2212 
2213 def xcache_proxy(output):
2214     """
2215     Extract env vars from xcache stdout and set them.
2216 
2217     :param output: command output (string).
2218     :return:
2219     """
2220 
2221     # loop over each line in the xcache stdout and identify the needed environmental variables
2222     for line in output.split('\n'):
2223         if 'ALRB_XCACHE_PROXY' in line:
2224             suffix = '_REMOTE' if 'REMOTE' in line else ''
2225             name = 'ALRB_XCACHE_PROXY%s' % suffix
2226             pattern = r'\ export\ ALRB_XCACHE_PROXY%s\=\"(.+)\"' % suffix
2227             set_xcache_var(line, name=name, pattern=pattern)
2228 
2229         elif 'ALRB_XCACHE_MYPROCESS' in line:
2230             set_xcache_var(
2231                 line,
2232                 name='ALRB_XCACHE_MYPROCESS',
2233                 pattern=r'\ ALRB_XCACHE_MYPROCESS\=(.+)'
2234             )
2235 
2236         elif 'Messages logged in' in line:
2237             set_xcache_var(
2238                 line,
2239                 name='ALRB_XCACHE_LOG',
2240                 pattern=r'xcache\ started\ successfully.\ \ Messages\ logged\ in\ (.+)'
2241             )
2242 
2243         elif 'ALRB_XCACHE_FILES' in line:
2244             set_xcache_var(
2245                 line,
2246                 name='ALRB_XCACHE_FILES',
2247                 pattern=r'\ ALRB_XCACHE_FILES\=(.+)'
2248             )
2249 
2250 
2251 def set_xcache_var(line, name='', pattern=''):
2252     """
2253     Extract the value of a given environmental variable from a given stdout line.
2254 
2255     :param line: line from stdout to be investigated (string).
2256     :param name: name of env var (string).
2257     :param pattern: regex pattern (string).
2258     :return:
2259     """
2260 
2261     pattern = re.compile(pattern)
2262     result = re.findall(pattern, line)
2263     if result:
2264         os.environ[name] = result[0]
2265 
2266 
2267 def xcache_activation_command(workdir='', jobid=''):
2268     """
2269     Return the xcache service activation command.
2270 
2271     Note: the workdir is not used here, but the function prototype
2272     needs it in the called (xcache_activation_command needs it).
2273 
2274     :param workdir: unused work directory - do not remove (string).
2275     :param jobid: PanDA job id to guarantee that xcache process is unique (int).
2276     :return: xcache command (string).
2277     """
2278 
2279     # a successful startup will set ALRB_XCACHE_PROXY and ALRB_XCACHE_PROXY_REMOTE
2280     # so any file access with root://...  should be replaced with one of
2281     # the above (depending on whether you are on the same machine or not)
2282     # example:
2283     # ${ALRB_XCACHE_PROXY}root://atlasxrootd-kit.gridka.de:1094//pnfs/gridka.de/../DAOD_FTAG4.24348858._000020.pool.root.1
2284     command = "%s " % get_asetup(asetup=False)
2285 
2286     # add 'xcache list' which will also kill any
2287     # orphaned processes lingering in the system
2288     command += (
2289         "lsetup xcache; xcache list; "
2290         "xcache start -d $PWD/%s/xcache -C centos7 --disklow 4g --diskhigh 5g -b 4" % jobid)
2291 
2292     return {'command': command, 'args': ''}
2293 
2294 
2295 def xcache_deactivation_command(workdir='', jobid=''):
2296     """
2297     Return the xcache service deactivation command.
2298     This service should be stopped after the payload has finished.
2299     Copy the messages log before shutting down.
2300 
2301     Note: the job id is not used here, but the function prototype
2302     needs it in the called (xcache_activation_command needs it).
2303 
2304     :param workdir: payload work directory (string).
2305     :param jobid: unused job id - do not remove (string).
2306     :return: xcache command (string).
2307     """
2308 
2309     path = os.environ.get('ALRB_XCACHE_LOG', None)
2310     if path and os.path.exists(path):
2311         logger.debug('copying xcache messages log file (%s) to work dir (%s)', path, workdir)
2312         dest = os.path.join(workdir, 'xcache-messages.log')
2313         try:
2314             copy(path, dest)
2315         except Exception as exc:
2316             logger.warning('exception caught copying xcache log: %s', exc)
2317     else:
2318         if not path:
2319             logger.warning('ALRB_XCACHE_LOG is not set')
2320         if path and not os.path.exists(path):
2321             logger.warning('path does not exist: %s', path)
2322     command = "%s " % get_asetup(asetup=False)
2323     command += "lsetup xcache; xcache kill"  # -C centos7
2324 
2325     return {'command': command, 'args': '-p $ALRB_XCACHE_MYPROCESS'}
2326 
2327 
2328 def get_utility_command_setup(name, job, setup=None):
2329     """
2330     Return the proper setup for the given utility command.
2331     If a payload setup is specified, then the utility command string should be prepended to it.
2332 
2333     :param name: name of utility (string).
2334     :param job: job object.
2335     :param setup: optional payload setup string.
2336     :return: utility command setup (string).
2337     """
2338 
2339     if name == 'MemoryMonitor':
2340         # must know if payload is running in a container or not
2341         # (enables search for pid in ps output)
2342         use_container = job.usecontainer or 'runcontainer' in job.transformation
2343         dump_ps = ("PRMON_DEBUG" in job.infosys.queuedata.catchall)
2344 
2345         setup, pid = get_memory_monitor_setup(
2346             job.pid,
2347             job.pgrp,
2348             job.jobid,
2349             job.workdir,
2350             job.command,
2351             use_container=use_container,
2352             transformation=job.transformation,
2353             outdata=job.outdata,
2354             dump_ps=dump_ps
2355         )
2356 
2357         _pattern = r"([\S]+)\ ."
2358         pattern = re.compile(_pattern)
2359         _name = re.findall(pattern, setup.split(';')[-1])
2360         if _name:
2361             job.memorymonitor = _name[0]
2362         else:
2363             logger.warning('trf name could not be identified in setup string')
2364 
2365         # update the pgrp if the pid changed
2366         if pid not in (job.pid, -1):
2367             logger.debug('updating pgrp=%d for pid=%d', job.pgrp, pid)
2368             try:
2369                 job.pgrp = os.getpgid(pid)
2370             except Exception as exc:
2371                 logger.warning('os.getpgid(%d) failed with: %s', pid, exc)
2372         return setup
2373 
2374     if name == 'NetworkMonitor' and setup:
2375         return get_network_monitor_setup(setup, job)
2376 
2377     if name == 'Prefetcher':
2378         return get_prefetcher_setup(job)
2379 
2380     if name == 'Benchmark':
2381         return get_benchmark_setup(job)
2382 
2383     return ""
2384 
2385 
2386 def get_utility_command_execution_order(name):
2387     """
2388     Should the given utility command be executed before or after the payload?
2389 
2390     :param name: utility name (string).
2391     :return: execution order constant.
2392     """
2393 
2394     # example implementation
2395     if name == 'NetworkMonitor':
2396         return UTILITY_WITH_PAYLOAD
2397 
2398     if name == 'MemoryMonitor':
2399         return UTILITY_AFTER_PAYLOAD_STARTED
2400 
2401     logger.warning('unknown utility name: %s', name)
2402     return UTILITY_AFTER_PAYLOAD_STARTED
2403 
2404 
2405 def post_utility_command_action(name, job):
2406     """
2407     Perform post action for given utility command.
2408 
2409     :param name: name of utility command (string).
2410     :param job: job object.
2411     :return:
2412     """
2413 
2414     if name == 'NetworkMonitor':
2415         pass
2416     elif name == 'MemoryMonitor':
2417         post_memory_monitor_action(job)
2418 
2419 
2420 def get_utility_command_kill_signal(name):
2421     """
2422     Return the proper kill signal used to stop the utility command.
2423 
2424     :param name: name of utility command (string).
2425     :return: kill signal
2426     """
2427 
2428     # note that the NetworkMonitor does not require killing (to be confirmed)
2429     sig = SIGUSR1 if name == 'MemoryMonitor' else SIGTERM
2430     return sig
2431 
2432 
2433 def get_utility_command_output_filename(name, selector=None):
2434     """
2435     Return the filename to the output of the utility command.
2436 
2437     :param name: utility name (string).
2438     :param selector: optional special conditions flag (boolean).
2439     :return: filename (string).
2440     """
2441 
2442     if name == 'MemoryMonitor':
2443         filename = get_memory_monitor_summary_filename(selector=selector)
2444     else:
2445         filename = ""
2446 
2447     return filename
2448 
2449 
2450 def verify_lfn_length(outdata):
2451     """
2452     Make sure that the LFNs are all within the allowed length.
2453 
2454     :param outdata: FileSpec object.
2455     :return: error code (int), diagnostics (string).
2456     """
2457 
2458     exitcode = 0
2459     diagnostics = ""
2460     max_length = 255
2461 
2462     # loop over all output files
2463     for fspec in outdata:
2464         if len(fspec.lfn) > max_length:
2465             diagnostics = "LFN too long (length: %d, must be less than %d characters): %s" % \
2466                           (len(fspec.lfn), max_length, fspec.lfn)
2467             exitcode = errors.LFNTOOLONG
2468             break
2469 
2470     return exitcode, diagnostics
2471 
2472 
2473 def verify_ncores(corecount):
2474     """
2475     Verify that nCores settings are correct
2476 
2477     :param corecount: number of cores (int).
2478     :return:
2479     """
2480 
2481     try:
2482         del os.environ['ATHENA_PROC_NUMBER_JOB']
2483         logger.debug("unset existing ATHENA_PROC_NUMBER_JOB")
2484     except Exception:
2485         pass
2486 
2487     try:
2488         athena_proc_number = int(os.environ.get('ATHENA_PROC_NUMBER', None))
2489     except Exception:
2490         athena_proc_number = None
2491 
2492     # Note: if ATHENA_PROC_NUMBER is set (by the wrapper), then do not
2493     # overwrite it. Otherwise, set it to the value of job.coreCount
2494     # (actually set ATHENA_PROC_NUMBER_JOB and use it if it exists,
2495     # otherwise use ATHENA_PROC_NUMBER directly; ATHENA_PROC_NUMBER_JOB
2496     # will always be the value from the job definition)
2497     if athena_proc_number:
2498         logger.info((
2499             "encountered a set ATHENA_PROC_NUMBER (%d), "
2500             "will not overwrite it"), athena_proc_number)
2501         logger.info('set ATHENA_CORE_NUMBER to same value as ATHENA_PROC_NUMBER')
2502         os.environ['ATHENA_CORE_NUMBER'] = str(athena_proc_number)
2503     else:
2504         os.environ['ATHENA_PROC_NUMBER_JOB'] = str(corecount)
2505         os.environ['ATHENA_CORE_NUMBER'] = str(corecount)
2506         logger.info((
2507             "set ATHENA_PROC_NUMBER_JOB and ATHENA_CORE_NUMBER to %s "
2508             "(ATHENA_PROC_NUMBER will not be overwritten)"), corecount)
2509 
2510 
2511 def verify_job(job):
2512     """
2513     Verify job parameters for specific errors.
2514     Note:
2515       in case of problem, the function should set the corresponding pilot error code using:
2516       job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code())
2517 
2518     :param job: job object
2519     :return: Boolean.
2520     """
2521 
2522     status = False
2523 
2524     # are LFNs of correct lengths?
2525     exitcode, diagnostics = verify_lfn_length(job.outdata)
2526     if exitcode != 0:
2527         logger.fatal(diagnostics)
2528         job.piloterrordiag = diagnostics
2529         job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode)
2530     else:
2531         status = True
2532 
2533     # check the ATHENA_PROC_NUMBER settings
2534     verify_ncores(job.corecount)
2535 
2536     return status
2537 
2538 
2539 def update_stagein(job):
2540     """
2541     Skip DBRelease files during stage-in.
2542 
2543     :param job: job object.
2544     :return:
2545     """
2546 
2547     for fspec in job.indata:
2548         if 'DBRelease' in fspec.lfn:
2549             fspec.status = 'no_transfer'
2550 
2551 
2552 def get_metadata(workdir):
2553     """
2554     Return the metadata from file.
2555 
2556     :param workdir: work directory (string)
2557     :return:
2558     """
2559 
2560     path = os.path.join(workdir, config.Payload.jobreport)
2561     metadata = read_file(path) if os.path.exists(path) else None
2562     logger.debug('metadata=%s', str(metadata))
2563 
2564     return metadata
2565 
2566 
2567 def should_update_logstash(frequency=10):
2568     """
2569     Should logstash be updated with prmon dictionary?
2570 
2571     :param frequency:
2572     :return: return True once per 'frequency' times.
2573     """
2574     return randint(0, frequency - 1) == 0
2575 
2576 
2577 def update_server(job):
2578     """
2579     Perform any user specific server actions.
2580 
2581     E.g. this can be used to send special information to a logstash.
2582 
2583     :param job: job object.
2584     :return:
2585     """
2586 
2587     # attempt to read memory_monitor_output.txt and convert it to json
2588     if not should_update_logstash():
2589         logger.debug('no need to update logstash for this job')
2590         return
2591 
2592     path = os.path.join(job.workdir, get_memory_monitor_output_filename())
2593     if not os.path.exists(path):
2594         logger.warning('path does not exist: %s', path)
2595         return
2596 
2597     # convert memory monitor text output to json and return the selection
2598     # (don't store it, log has already been created)
2599     metadata_dictionary = get_metadata_dict_from_txt(path, storejson=True, jobid=job.jobid)
2600     if metadata_dictionary:
2601         # the output was previously written to file,
2602         # update the path and tell curl to send it
2603         new_path = update_extension(path=path, extension='json')
2604 
2605         #out = read_json(new_path)
2606         #logger.debug('prmon json=\n%s' % out)
2607         # logger.debug('final logstash prmon dictionary: %s' % str(metadata_dictionary))
2608         url = 'https://pilot.atlas-ml.org'  # 'http://collector.atlas-ml.org:80'
2609 
2610         # cmd = (
2611         #    "curl --connect-timeout 20 --max-time 120 "
2612         #    "-H \"Content-Type: application/json\" -X POST -d \'%s\' %s" % \
2613         #      (str(metadata_dictionary).replace("'", '"'), url)
2614         #)
2615 
2616         # curl --connect-timeout 20 --max-time 120 -H
2617         # "Content-Type: application/json" -X POST --upload-file test.json
2618         # https://pilot.atlas-ml.org
2619         cmd = (
2620             "curl --connect-timeout 20 --max-time 120 "
2621             "-H \"Content-Type: application/json\" "
2622             "-X POST "
2623             "--upload-file %s %s" % (new_path, url)
2624         )
2625         #cmd = "curl --connect-timeout 20 --max-time 120 -F
2626         #  'data=@%s' %s" % (new_path, url)
2627         # send metadata to logstash
2628         try:
2629             _, stdout, stderr = execute(cmd, usecontainer=False)
2630         except Exception as exc:
2631             logger.warning('exception caught: %s', exc)
2632         else:
2633             logger.debug('sent prmon JSON dictionary to logstash server')
2634             logger.debug('stdout: %s', stdout)
2635             logger.debug('stderr: %s', stderr)
2636     else:
2637         msg = 'no prmon json available - cannot send anything to logstash server'
2638         logger.warning(msg)
2639 
2640 
2641 def preprocess_debug_command(job):
2642     """
2643     Pre-process the debug command in debug mode.
2644 
2645     :param job: Job object.
2646     :return:
2647     """
2648 
2649     # Should the pilot do the setup or does jobPars already contain the information?
2650     preparesetup = should_pilot_prepare_setup(job.noexecstrcnv, job.jobparams)
2651     # get the general setup command and then verify it if required
2652     resource_name = get_resource_name()  # 'grid' if no hpc_resource is set
2653 
2654     # Python 3, level: -1 -> 0
2655     modname = 'pilot.user.atlas.resource.%s' % resource_name
2656     resource = __import__(modname, globals(), locals(), [resource_name], 0)
2657 
2658     cmd = resource.get_setup_command(job, preparesetup)
2659     if not cmd.endswith(';'):
2660         cmd += '; '
2661     if cmd not in job.debug_command:
2662         job.debug_command = cmd + job.debug_command
2663 
2664 
2665 def process_debug_command(debug_command, pandaid):
2666     """
2667     In debug mode, the server can send a special debug command to the piloti
2668     via the updateJob backchannel. This function can be used to process that
2669     command, i.e. to identify a proper pid to debug (which is unknown
2670     to the server).
2671 
2672     For gdb, the server might send a command with gdb option --pid %.
2673     The pilot need to replace the % with the proper pid. The default
2674     (hardcoded) process will be that of athena.py. The pilot will find the
2675     corresponding pid.
2676 
2677     :param debug_command: debug command (string).
2678     :param pandaid: PanDA id (string).
2679     :return: updated debug command (string).
2680     """
2681 
2682     if '--pid %' not in debug_command:
2683         return debug_command
2684 
2685     pandaid_pid = None
2686 
2687     # replace the % with the pid for athena.py
2688     # note: if athena.py is not yet running, the --pid % will remain.
2689     # Otherwise the % will be replaced by the pid first find the pid
2690     # (if athena.py is running)
2691     cmd = 'ps axo pid,ppid,pgid,args'
2692     _, stdout, _ = execute(cmd)
2693     if stdout:
2694         #logger.debug('ps=\n\n%s\n' % stdout)
2695         # convert the ps output to a dictionary
2696         dictionary = convert_ps_to_dict(stdout)
2697 
2698         # trim this dictionary to reduce the size
2699         # (only keep the PID and PPID lists)
2700         trimmed_dictionary = get_trimmed_dictionary(['PID', 'PPID'], dictionary)
2701 
2702         # what is the pid of the trf?
2703         pandaid_pid = find_pid(pandaid, dictionary)
2704 
2705         # find all athena processes
2706         pids = find_cmd_pids('athena.py', dictionary)
2707 
2708         # which of the found pids are children of the trf?
2709         # (which has an export PandaID=.. attached to it)
2710         for pid in pids:
2711             try:
2712                 child = is_child(pid, pandaid_pid, trimmed_dictionary)
2713             except RuntimeError as rte:
2714                 logger.warning((
2715                     'too many recursions: %s '
2716                     '(cannot identify athena process)'), rte)
2717             else:
2718                 if child:
2719                     logger.info('pid=%d is a child process of the trf of this job', pid)
2720                     debug_command = debug_command.replace('--pid %', '--pid %d' % pid)
2721                     logger.info('updated debug command: %s', debug_command)
2722                     break
2723                 logger.info('pid=%d is not a child process of the trf of this job', pid)
2724 
2725         if not pids or '--pid %' in debug_command:
2726             logger.debug('athena is not yet running (no corresponding pid)')
2727 
2728             # reset the command to prevent the payload from being killed
2729             # (will be killed when gdb has run)
2730             debug_command = ''
2731 
2732     return debug_command
2733 
2734 
2735 def allow_timefloor(submitmode):
2736     """
2737     Should the timefloor mechanism (multi-jobs) be allowed for the given submit mode?
2738 
2739     :param submitmode: submit mode (string).
2740     """
2741 
2742     return True