File indexing completed on 2026-04-11 08:41:05
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 from collections import defaultdict
0012 import fnmatch
0013 from glob import glob
0014 import logging
0015 import os
0016 import re
0017 from random import randint
0018 from signal import SIGTERM, SIGUSR1
0019
0020
0021
0022 try:
0023 from functools import reduce
0024 except ImportError:
0025 pass
0026
0027 from .container import create_root_container_command
0028 from .dbrelease import get_dbrelease_version, create_dbrelease
0029 from .setup import (
0030 should_pilot_prepare_setup,
0031 is_standard_atlas_job,
0032 get_asetup,
0033 set_inds,
0034 get_analysis_trf,
0035 get_payload_environment_variables,
0036 replace_lfns_with_turls,
0037 )
0038 from .utilities import (
0039 get_memory_monitor_setup,
0040 get_network_monitor_setup,
0041 post_memory_monitor_action,
0042 get_memory_monitor_summary_filename,
0043 get_prefetcher_setup,
0044 get_benchmark_setup,
0045 get_memory_monitor_output_filename,
0046 get_metadata_dict_from_txt,
0047 )
0048
0049 from pilot.util.auxiliary import (
0050 get_resource_name,
0051 show_memory_usage,
0052 is_python3,
0053 get_key_value,
0054 )
0055
0056 from pilot.common.errorcodes import ErrorCodes
0057 from pilot.common.exception import TrfDownloadFailure, PilotException
0058 from pilot.util.config import config
0059 from pilot.util.constants import (
0060 UTILITY_BEFORE_PAYLOAD,
0061 UTILITY_WITH_PAYLOAD,
0062 UTILITY_AFTER_PAYLOAD_STARTED,
0063 UTILITY_AFTER_PAYLOAD_FINISHED,
0064 UTILITY_AFTER_PAYLOAD_STARTED2,
0065 UTILITY_BEFORE_STAGEIN,
0066 UTILITY_AFTER_PAYLOAD_FINISHED2
0067 )
0068 from pilot.util.container import execute
0069 from pilot.util.filehandling import (
0070 copy, copy_pilot_source, calculate_checksum,
0071 get_guid, get_local_file_size,
0072 remove, remove_dir_tree, remove_core_dumps, read_file, read_json,
0073 update_extension,
0074 write_file,
0075 get_disk_usage
0076 )
0077 from pilot.util.processes import (
0078 convert_ps_to_dict,
0079 find_pid, find_cmd_pids,
0080 get_trimmed_dictionary,
0081 is_child
0082 )
0083
0084 from pilot.util.tracereport import TraceReport
0085
0086 logger = logging.getLogger(__name__)
0087
0088 errors = ErrorCodes()
0089
0090
0091 def sanity_check():
0092 """
0093 Perform an initial sanity check before doing anything else in a
0094 given workflow. This function can be used to verify importing of
0095 modules that are otherwise used much later, but it is better to abort
0096 the pilot if a problem is discovered early.
0097
0098 :return: exit code (0 if all is ok, otherwise non-zero exit code).
0099 """
0100
0101 exit_code = 0
0102
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112 return exit_code
0113
0114
0115 def validate(job):
0116 """
0117 Perform user specific payload/job validation.
0118 This function will produce a local DBRelease file if necessary (old releases).
0119
0120 :param job: job object.
0121 :return: Boolean (True if validation is successful).
0122 """
0123
0124 status = True
0125
0126 if 'DBRelease' in job.jobparams:
0127 logger.debug((
0128 'encountered DBRelease info in job parameters - '
0129 'will attempt to create a local DBRelease file'))
0130 version = get_dbrelease_version(job.jobparams)
0131 if version:
0132 status = create_dbrelease(version, job.workdir)
0133
0134
0135 if not status:
0136 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.DBRELEASEFAILURE)
0137
0138
0139 if status:
0140 if job.imagename and job.imagename.startswith('/'):
0141 if os.path.exists(job.imagename):
0142 logger.info('verified that image exists: %s', job.imagename)
0143 else:
0144 status = False
0145 logger.warning('image does not exist: %s', job.imagename)
0146 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.IMAGENOTFOUND)
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158
0159
0160 return status
0161
0162
0163 def open_remote_files(indata, workdir, nthreads):
0164 """
0165 Verify that direct i/o files can be opened.
0166
0167 :param indata: list of FileSpec.
0168 :param workdir: working directory (string).
0169 :param nthreads: number of concurrent file open threads (int).
0170 :return: exit code (int), diagnostics (string).
0171 """
0172
0173 exitcode = 0
0174 diagnostics = ""
0175 not_opened = ""
0176
0177
0178 turls = extract_turls(indata)
0179 if turls:
0180
0181
0182
0183 diagnostics = copy_pilot_source(workdir)
0184 if diagnostics:
0185 raise PilotException(diagnostics)
0186
0187 script = 'open_remote_file.py'
0188 final_script_path = os.path.join(workdir, script)
0189 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH') + ':' + workdir
0190 script_path = os.path.join('pilot/scripts', script)
0191 dir1 = os.path.join(os.path.join(os.environ['PILOT_HOME'], 'pilot2'), script_path)
0192 dir2 = os.path.join(workdir, script_path)
0193 full_script_path = dir1 if os.path.exists(dir1) else dir2
0194 if not os.path.exists(full_script_path):
0195
0196 diagnostics = (
0197 'cannot perform file open test - script path does '
0198 'not exist: %s' % full_script_path
0199 )
0200 logger.warning(diagnostics)
0201 logger.warning('tested both path=%s and path=%s (none exists)', dir1, dir2)
0202 return exitcode, diagnostics, not_opened
0203 try:
0204 copy(full_script_path, final_script_path)
0205 except PilotException as exc:
0206
0207 diagnostics = 'cannot perform file open test - pilot source copy failed: %s' % exc
0208 logger.warning(diagnostics)
0209 return exitcode, diagnostics, not_opened
0210 else:
0211
0212 final_script_path = os.path.join('.', script)
0213
0214 _cmd = get_file_open_command(final_script_path, turls, nthreads)
0215 cmd = create_root_container_command(workdir, _cmd)
0216
0217 show_memory_usage()
0218
0219 logger.info('*** executing file open verification script:\n\n\'%s\'\n\n', cmd)
0220 exit_code, stdout, stderr = execute(cmd, usecontainer=False)
0221 if config.Pilot.remotefileverification_log:
0222 fpath = os.path.join(workdir, config.Pilot.remotefileverification_log)
0223 write_file(fpath, stdout + stderr, mute=False)
0224
0225 show_memory_usage()
0226
0227
0228 if exit_code:
0229 logger.warning('script %s finished with ec=%d', script, exit_code)
0230 else:
0231 dictionary_path = os.path.join(
0232 workdir,
0233 config.Pilot.remotefileverification_dictionary
0234 )
0235 if not dictionary_path:
0236 logger.warning('file does not exist: %s', dictionary_path)
0237 else:
0238 file_dictionary = read_json(dictionary_path)
0239 if not file_dictionary:
0240 logger.warning('could not read dictionary from %s', dictionary_path)
0241 else:
0242 not_opened = ""
0243 for turl in file_dictionary:
0244 opened = file_dictionary[turl]
0245 if not opened:
0246 logger.info('turl could not be opened: %s', turl)
0247 not_opened += turl if not not_opened else ",%s" % turl
0248 else:
0249 logger.info('turl could be opened: %s', turl)
0250
0251 if not_opened:
0252 exitcode = errors.REMOTEFILECOULDNOTBEOPENED
0253 diagnostics = "Remote file could not be opened: %s" % not_opened if "," not in not_opened else "turls not opened:%s" % not_opened
0254 else:
0255 logger.info('nothing to verify (for remote files)')
0256
0257 return exitcode, diagnostics, not_opened
0258
0259
0260 def get_file_open_command(script_path, turls, nthreads):
0261 """
0262
0263 :param script_path: path to script (string).
0264 :param turls: comma-separated turls (string).
0265 :param nthreads: number of concurrent file open threads (int).
0266 :return: comma-separated list of turls (string).
0267 """
0268
0269 return "%s --turls=%s -w %s -t %s" % (script_path, turls, os.path.dirname(script_path), str(nthreads))
0270
0271
0272 def extract_turls(indata):
0273 """
0274 Extract TURLs from indata for direct i/o files.
0275
0276 :param indata: list of FileSpec.
0277 :return: comma-separated list of turls (string).
0278 """
0279
0280
0281
0282
0283
0284
0285
0286 return ",".join(
0287 fspec.turl for fspec in indata if fspec.status == 'remote_io'
0288 )
0289
0290
0291 def process_remote_file_traces(path, job, not_opened_turls):
0292 """
0293 Report traces for remote files.
0294 The function reads back the base trace report (common part of all traces)
0295 and updates it per file before reporting it to the Rucio server.
0296
0297 :param path: path to base trace report (string).
0298 :param job: job object.
0299 :param not_opened_turls: list of turls that could not be opened (list).
0300 :return:
0301 """
0302
0303 try:
0304 base_trace_report = read_json(path)
0305 except PilotException as exc:
0306 logger.warning('failed to open base trace report (cannot send trace reports): %s', exc)
0307 else:
0308 if not base_trace_report:
0309 logger.warning('failed to read back base trace report (cannot send trace reports)')
0310 else:
0311
0312 for fspec in job.indata:
0313 if fspec.status == 'remote_io':
0314 base_trace_report.update(url=fspec.turl)
0315 base_trace_report.update(remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0316 base_trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0317 base_trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0318 if fspec.turl in not_opened_turls:
0319 base_trace_report.update(clientState='FAILED_REMOTE_OPEN')
0320 else:
0321 base_trace_report.update(clientState='FOUND_ROOT')
0322
0323
0324 trace_report = TraceReport(**base_trace_report)
0325 if trace_report:
0326 trace_report.send()
0327 else:
0328 logger.warning('failed to create trace report for turl=%s', fspec.turl)
0329
0330
0331 def get_nthreads(catchall):
0332 """
0333 Extract number of concurrent file open threads from catchall.
0334 Return nthreads=1 if nopenfiles=.. is not present in catchall.
0335
0336 :param catchall: queuedata catchall (string).
0337 :return: number of threads (int).
0338 """
0339
0340 _nthreads = get_key_value(catchall, key='nopenfiles')
0341 return _nthreads if _nthreads else 1
0342
0343
0344 def get_payload_command(job):
0345 """
0346 Return the full command for executing the payload, including the
0347 sourcing of all setup files and setting of environment variables.
0348
0349 :param job: job object.
0350 :raises PilotException: TrfDownloadFailure.
0351 :return: command (string).
0352 """
0353
0354 show_memory_usage()
0355
0356
0357 preparesetup = should_pilot_prepare_setup(job.noexecstrcnv, job.jobparams)
0358
0359
0360
0361
0362
0363 userjob = job.is_analysis()
0364 logger.info('pilot is running a %s job', 'user analysis' if userjob else 'production')
0365
0366 resource_name = get_resource_name()
0367
0368
0369 modname = 'pilot.user.atlas.resource.%s' % resource_name
0370 resource = __import__(modname, globals(), locals(), [resource_name], 0)
0371
0372
0373 cmd = resource.get_setup_command(job, preparesetup)
0374 if cmd:
0375 exitcode, diagnostics = resource.verify_setup_command(cmd)
0376 if exitcode != 0:
0377 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode)
0378 raise PilotException(diagnostics, code=exitcode)
0379
0380
0381 catchall = job.infosys.queuedata.catchall.lower() if job.infosys.queuedata.catchall else ''
0382 if config.Pilot.remotefileverification_log and 'remoteio_test=false' not in catchall:
0383 exitcode = 0
0384 diagnostics = ""
0385 not_opened_turls = ""
0386 try:
0387 logger.debug('executing open_remote_files()')
0388 exitcode, diagnostics, not_opened_turls = open_remote_files(job.indata, job.workdir, get_nthreads(catchall))
0389 except Exception as exc:
0390 logger.warning('caught std exception: %s', exc)
0391 else:
0392
0393 path = os.path.join(job.workdir, config.Pilot.base_trace_report)
0394 if not os.path.exists(path):
0395 logger.warning((
0396 'base trace report does not exist (%s) - input file '
0397 'traces should already have been sent'), path)
0398 else:
0399 process_remote_file_traces(path, job, not_opened_turls)
0400
0401
0402 if exitcode != 0:
0403 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode, msg=diagnostics)
0404 raise PilotException(diagnostics, code=exitcode)
0405 else:
0406 logger.debug('no remote file open verification')
0407
0408 if is_standard_atlas_job(job.swrelease):
0409
0410 logger.info("preparing normal production/analysis job setup command")
0411 cmd = get_normal_payload_command(cmd, job, preparesetup, userjob)
0412 else:
0413
0414 logger.info("generic job (non-ATLAS specific or with undefined swRelease)")
0415 cmd = get_generic_payload_command(cmd, job, preparesetup, userjob)
0416
0417
0418 if not cmd.endswith(';'):
0419 cmd += '; '
0420
0421
0422 if not job.imagename:
0423 site = os.environ.get('PILOT_SITENAME', '')
0424 variables = get_payload_environment_variables(
0425 cmd, job.jobid, job.taskid, job.attemptnr, job.processingtype, site, userjob)
0426 cmd = ''.join(variables) + cmd
0427
0428
0429 if 'export PandaID' not in cmd:
0430 cmd = "export PandaID=%s;" % job.jobid + cmd
0431
0432 cmd = cmd.replace(';;', ';')
0433
0434
0435
0436
0437
0438
0439
0440
0441 if not userjob and not job.is_build_job() and job.has_remoteio():
0442
0443
0444
0445
0446
0447
0448
0449 lfns = job.get_lfns_and_guids()[0]
0450 cmd = replace_lfns_with_turls(
0451 cmd,
0452 job.workdir,
0453 "PoolFileCatalog.xml",
0454 lfns,
0455 writetofile=job.writetofile
0456 )
0457
0458
0459 cmd = add_athena_proc_number(cmd)
0460
0461 show_memory_usage()
0462
0463 logger.info('payload run command: %s', cmd)
0464
0465 return cmd
0466
0467
0468 def get_normal_payload_command(cmd, job, preparesetup, userjob):
0469 """
0470 Return the payload command for a normal production/analysis job.
0471
0472 :param cmd: any preliminary command setup (string).
0473 :param job: job object.
0474 :param userjob: True for user analysis jobs, False otherwise (bool).
0475 :param preparesetup: True if the pilot should prepare the setup,
0476 False if already in the job parameters.
0477 :return: normal payload command (string).
0478 """
0479
0480
0481
0482 set_inds(job.datasetin)
0483
0484 if userjob:
0485
0486 exitcode, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir)
0487 if exitcode != 0:
0488 raise TrfDownloadFailure(diagnostics)
0489
0490 logger.debug('user analysis trf: %s', trf_name)
0491
0492 if preparesetup:
0493 _cmd = get_analysis_run_command(job, trf_name)
0494 else:
0495 _cmd = job.jobparams
0496
0497
0498
0499 cmd += "; " + add_makeflags(job.corecount, "") + _cmd
0500 else:
0501
0502 cmd += os.environ.get('PILOT_DB_LOCAL_SETUP_CMD', '')
0503
0504 if job.is_eventservice:
0505 if job.corecount:
0506 cmd += '; export ATHENA_PROC_NUMBER=%s' % job.corecount
0507 cmd += '; export ATHENA_CORE_NUMBER=%s' % job.corecount
0508 else:
0509 cmd += '; export ATHENA_PROC_NUMBER=1'
0510 cmd += '; export ATHENA_CORE_NUMBER=1'
0511
0512
0513 if preparesetup:
0514 cmd += "; %s %s" % (job.transformation, job.jobparams)
0515 else:
0516 cmd += "; " + job.jobparams
0517
0518 return cmd
0519
0520
0521 def get_generic_payload_command(cmd, job, preparesetup, userjob):
0522 """
0523
0524 :param cmd:
0525 :param job: job object.
0526 :param preparesetup:
0527 :param userjob: True for user analysis jobs, False otherwise (bool).
0528 :return: generic job command (string).
0529 """
0530
0531 if userjob:
0532
0533
0534
0535
0536 exitcode, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir)
0537 if exitcode != 0:
0538 raise TrfDownloadFailure(diagnostics)
0539
0540 logger.debug('user analysis trf: %s', trf_name)
0541
0542 if preparesetup:
0543 _cmd = get_analysis_run_command(job, trf_name)
0544 else:
0545 _cmd = job.jobparams
0546
0547
0548
0549 if not job.imagename:
0550 cmd += "; " + add_makeflags(job.corecount, "") + _cmd
0551 else:
0552 cmd += _cmd
0553
0554 elif verify_release_string(job.homepackage) != 'NULL' and job.homepackage != ' ':
0555 if preparesetup:
0556 cmd = "python %s/%s %s" % (job.homepackage, job.transformation, job.jobparams)
0557 else:
0558 cmd = job.jobparams
0559 else:
0560 if preparesetup:
0561 cmd = "python %s %s" % (job.transformation, job.jobparams)
0562 else:
0563 cmd = job.jobparams
0564
0565 return cmd
0566
0567
0568 def add_athena_proc_number(cmd):
0569 """
0570 Add the ATHENA_PROC_NUMBER and ATHENA_CORE_NUMBER to
0571 the payload command if necessary.
0572
0573 :param cmd: payload execution command (string).
0574 :return: updated payload execution command (string).
0575 """
0576
0577
0578 try:
0579 value1 = int(os.environ['ATHENA_PROC_NUMBER_JOB'])
0580 except (TypeError, KeyError, ValueError) as exc:
0581 logger.warning('failed to convert ATHENA_PROC_NUMBER_JOB to int: %s', exc)
0582 value1 = None
0583 try:
0584 value2 = int(os.environ['ATHENA_CORE_NUMBER'])
0585 except (TypeError, KeyError, ValueError) as exc:
0586 logger.warning('failed to convert ATHENA_CORE_NUMBER to int: %s', exc)
0587 value2 = None
0588
0589 if "ATHENA_PROC_NUMBER" not in cmd:
0590 if "ATHENA_PROC_NUMBER" in os.environ:
0591 cmd = 'export ATHENA_PROC_NUMBER=%s;' % os.environ['ATHENA_PROC_NUMBER'] + cmd
0592 elif "ATHENA_PROC_NUMBER_JOB" in os.environ and value1:
0593 if value1 > 1:
0594 cmd = 'export ATHENA_PROC_NUMBER=%d;' % value1 + cmd
0595 else:
0596 logger.info((
0597 "will not add ATHENA_PROC_NUMBER to cmd "
0598 "since the value is %s"), str(value1))
0599 else:
0600 logger.warning((
0601 "don't know how to set ATHENA_PROC_NUMBER "
0602 "(could not find it in os.environ)"))
0603 else:
0604 logger.info("ATHENA_PROC_NUMBER already in job command")
0605
0606 if 'ATHENA_CORE_NUMBER' in os.environ and value2:
0607 if value2 > 1:
0608 cmd = 'export ATHENA_CORE_NUMBER=%d;' % value2 + cmd
0609 else:
0610 logger.info("will not add ATHENA_CORE_NUMBER to cmd since the value is %s", str(value2))
0611 else:
0612 logger.warning((
0613 'there is no ATHENA_CORE_NUMBER in os.environ '
0614 '(cannot add it to payload command)'))
0615
0616 return cmd
0617
0618
0619 def verify_release_string(release):
0620 """
0621 Verify that the release (or homepackage) string is set.
0622
0623 :param release: release or homepackage string that might or might not be set.
0624 :return: release (set string).
0625 """
0626
0627 if release is None:
0628 release = ""
0629 release = release.upper()
0630 if release == "":
0631 release = "NULL"
0632 if release == "NULL":
0633 logger.info("detected unset (NULL) release/homepackage string")
0634
0635 return release
0636
0637
0638 def add_makeflags(job_core_count, cmd):
0639 """
0640 Correct for multi-core if necessary (especially important in
0641 case coreCount=1 to limit parallel make).
0642
0643 :param job_core_count: core count from the job definition (int).
0644 :param cmd: payload execution command (string).
0645 :return: updated payload execution command (string).
0646 """
0647
0648
0649 try:
0650 core_count = int(os.environ.get('ATHENA_PROC_NUMBER'))
0651 except (TypeError, KeyError, ValueError):
0652 core_count = -1
0653
0654 if core_count == -1:
0655 try:
0656 core_count = int(job_core_count)
0657 except (TypeError, ValueError):
0658 pass
0659 else:
0660 if core_count >= 1:
0661
0662
0663 cmd += "export MAKEFLAGS=\'-j%d QUICK=1 -l1\';" % (core_count)
0664
0665
0666 if "MAKEFLAGS=" not in cmd:
0667 cmd += "export MAKEFLAGS=\'-j1 QUICK=1 -l1\';"
0668
0669 return cmd
0670
0671
0672 def get_analysis_run_command(job, trf_name):
0673 """
0674 Return the proper run command for the user job.
0675
0676 Example output:
0677 export X509_USER_PROXY=<..>;./runAthena <job parameters> --usePFCTurl --directIn
0678
0679 :param job: job object.
0680 :param trf_name: name of the transform that will run the job (string).
0681 Used when containers are not used.
0682 :return: command (string).
0683 """
0684
0685 cmd = ""
0686
0687
0688
0689
0690
0691
0692
0693
0694
0695
0696
0697 if 'X509_USER_PROXY' in os.environ and not job.imagename:
0698 cmd += 'export X509_USER_PROXY=%s;' % os.environ.get('X509_USER_PROXY')
0699
0700
0701 if job.imagename == "":
0702 cmd += './%s %s' % (trf_name, job.jobparams)
0703 else:
0704 if job.is_analysis() and job.imagename:
0705 cmd += './%s %s' % (trf_name, job.jobparams)
0706 else:
0707 cmd += 'python %s %s' % (trf_name, job.jobparams)
0708
0709 imagename = job.imagename
0710
0711 payload_container_location = os.environ.get('PAYLOAD_CONTAINER_LOCATION')
0712 if payload_container_location is not None:
0713 logger.debug("$PAYLOAD_CONTAINER_LOCATION = %s", payload_container_location)
0714
0715 containername = imagename.rsplit('/')[-1]
0716 image_location = os.path.join(payload_container_location, containername)
0717 if os.path.exists(image_location):
0718 logger.debug("image exists at %s", image_location)
0719 imagename = image_location
0720
0721
0722 if 'containerImage' not in cmd and 'runcontainer' in trf_name:
0723 cmd += ' --containerImage=%s' % imagename
0724
0725
0726
0727
0728
0729
0730
0731
0732 if job.has_remoteio():
0733 logger.debug((
0734 'direct access (remoteio) is used to access some input files: '
0735 '--usePFCTurl and --directIn will be added to payload command'))
0736 if '--usePFCTurl' not in cmd:
0737 cmd += ' --usePFCTurl'
0738 if '--directIn' not in cmd:
0739 cmd += ' --directIn'
0740
0741
0742
0743
0744
0745
0746
0747
0748
0749 if not job.is_build_job():
0750 lfns, guids = job.get_lfns_and_guids()
0751 _guids = get_guids_from_jobparams(job.jobparams, lfns, guids)
0752 if _guids:
0753 cmd += ' --inputGUIDs \"%s\"' % (str(_guids))
0754
0755 show_memory_usage()
0756
0757 return cmd
0758
0759
0760
0761
0762
0763 def update_forced_accessmode(log, cmd, transfertype, jobparams, trf_name):
0764 """
0765 Update the payload command for forced accessmode.
0766 accessmode is an option that comes from HammerCloud and is used to
0767 force a certain input file access mode; i.e. copy-to-scratch or direct access.
0768
0769 :param log: logging object.
0770 :param cmd: payload command.
0771 :param transfertype: transfer type (.e.g 'direct') from the job
0772 definition with priority over accessmode (string).
0773 :param jobparams: job parameters (string).
0774 :param trf_name: transformation name (string).
0775 :return: updated payload command string.
0776 """
0777
0778 if "accessmode" in cmd and transfertype != 'direct':
0779 accessmode_usect = None
0780 accessmode_directin = None
0781 _accessmode_dic = {"--accessmode=copy": ["copy-to-scratch mode", ""],
0782 "--accessmode=direct": ["direct access mode", " --directIn"]}
0783
0784
0785 for _mode in list(_accessmode_dic.keys()):
0786 if _mode in jobparams:
0787
0788 logger.info("enforcing %s", _accessmode_dic[_mode][0])
0789 if _mode == "--accessmode=copy":
0790
0791 accessmode_usect = True
0792 accessmode_directin = False
0793 elif _mode == "--accessmode=direct":
0794
0795 accessmode_usect = False
0796 accessmode_directin = True
0797 else:
0798 accessmode_usect = False
0799 accessmode_directin = False
0800
0801
0802 cmd += _accessmode_dic[_mode][1]
0803 if _mode in cmd:
0804 cmd = cmd.replace(_mode, "")
0805
0806
0807 if accessmode_usect:
0808 logger.info('forced copy tool usage selected')
0809
0810 if "directIn" in cmd:
0811 cmd = cmd.replace(' --directIn', ' ')
0812 elif accessmode_directin:
0813 logger.info('forced direct access usage selected')
0814 if "directIn" not in cmd:
0815 cmd += ' --directIn'
0816 else:
0817 logger.warning('neither forced copy tool usage nor direct access was selected')
0818
0819 if "directIn" in cmd and "usePFCTurl" not in cmd:
0820 cmd += ' --usePFCTurl'
0821
0822
0823 if "--directIn" in cmd and "export X509_USER_PROXY" not in cmd:
0824 if 'X509_USER_PROXY' in os.environ:
0825 cmd = cmd.replace("./%s" % trf_name, "export X509_USER_PROXY=%s;./%s" %
0826 (os.environ.get('X509_USER_PROXY'), trf_name))
0827
0828
0829
0830 if cmd.count("directIn") > 1:
0831 cmd = cmd.replace(' --directIn', ' ', 1)
0832
0833 return cmd
0834
0835
0836 def get_guids_from_jobparams(jobparams, infiles, infilesguids):
0837 """
0838 Extract the correct guid from the input file list.
0839 The guids list is used for direct reading.
0840 1. extract input file list for direct reading from job parameters
0841 2. for each input file in this list, find the corresponding guid from
0842 the input file guid list.
0843 Since the job parameters string is entered by a human, the order of
0844 the input files might not be the same.
0845
0846 :param jobparams: job parameters.
0847 :param infiles: input file list.
0848 :param infilesguids: input file guids list.
0849 :return: guids list.
0850 """
0851
0852 guidlist = []
0853 jobparams = jobparams.replace("'", "")
0854 jobparams = jobparams.replace(", ", ",")
0855
0856 pattern = re.compile(r'\-i \"\[([A-Za-z0-9.,_-]+)\]\"')
0857 directreadinginputfiles = re.findall(pattern, jobparams)
0858 _infiles = []
0859 if directreadinginputfiles != []:
0860 _infiles = directreadinginputfiles[0].split(",")
0861 else:
0862 match = re.search(r"-i ([A-Za-z0-9.\[\],_-]+) ", jobparams)
0863 if match is not None:
0864 compactinfiles = match.group(1)
0865 match = re.search(r'(.*)\[(.+)\](.*)\[(.+)\]', compactinfiles)
0866 if match is not None:
0867 infiles = []
0868 head = match.group(1)
0869 tail = match.group(3)
0870 body = match.group(2).split(',')
0871 attr = match.group(4).split(',')
0872
0873 for idx, item in enumerate(body):
0874 lfn = '%s%s%s%s' % (head, item, tail, attr[idx])
0875 infiles.append(lfn)
0876 else:
0877 infiles = [compactinfiles]
0878
0879 for infile in _infiles:
0880
0881
0882 try:
0883 index = infiles.index(infile)
0884 except ValueError as exc:
0885 logger.warning("exception caught: %s (direct reading will fail)", exc)
0886 else:
0887
0888 guidlist.append(infilesguids[index])
0889
0890 return guidlist
0891
0892
0893 def get_file_transfer_info(job):
0894 """
0895 Return information about desired file transfer.
0896
0897 :param job: job object
0898 :return: use copy tool (boolean), use direct access (boolean),
0899 use PFC Turl (boolean).
0900 """
0901
0902 use_copy_tool = True
0903 use_direct_access = False
0904 use_pfc_turl = False
0905
0906
0907 is_lan = job.infosys.queuedata.direct_access_lan
0908 is_wan = job.infosys.queuedata.direct_access_wan
0909 if not job.is_build_job() and (is_lan or is_wan or job.transfertype == 'direct'):
0910
0911 if job.only_copy_to_scratch():
0912 logger.info((
0913 'all input files are copy-to-scratch '
0914 '(--usePFCTurl and --directIn will not be set)'))
0915 else:
0916 logger.debug('--usePFCTurl and --directIn will be set')
0917 use_copy_tool = False
0918 use_direct_access = True
0919 use_pfc_turl = True
0920
0921 return use_copy_tool, use_direct_access, use_pfc_turl
0922
0923
0924 def update_job_data(job):
0925 """
0926 This function can be used to update/add data to the job object.
0927 E.g. user specific information can be extracted from other job object fields.
0928 In the case of ATLAS, information is extracted from the metadata field and
0929 added to other job object fields.
0930
0931 :param job: job object
0932 :return:
0933 """
0934
0935
0936
0937
0938
0939
0940
0941
0942 stageout = get_stageout_label(job)
0943
0944 if 'exeErrorDiag' in job.metadata:
0945 job.exeerrordiag = job.metadata['exeErrorDiag']
0946 if job.exeerrordiag:
0947 logger.warning('payload failed: exeErrorDiag=%s', job.exeerrordiag)
0948
0949
0950 job.stageout = stageout
0951
0952 work_attributes = None
0953 try:
0954 work_attributes = parse_jobreport_data(job.metadata)
0955 except Exception as exc:
0956 logger.warning('failed to parse job report (cannot set job.nevents): %s', exc)
0957 else:
0958
0959
0960
0961 nevents = work_attributes.get('nEvents', 0)
0962 if nevents:
0963 job.nevents = nevents
0964
0965
0966
0967
0968 if job.metadata and not job.is_eventservice:
0969
0970 extract_output_file_guids(job)
0971 try:
0972 verify_output_files(job)
0973 except Exception as exc:
0974 logger.warning('exception caught while trying verify output files: %s', exc)
0975 else:
0976 if not job.allownooutput:
0977 logger.debug((
0978 "will not try to extract output files from jobReport "
0979 "for user job (and allowNoOut list is empty)"))
0980 else:
0981
0982 remove_no_output_files(job)
0983
0984
0985
0986
0987
0988 for dat in job.outdata:
0989 if not dat.guid:
0990 dat.guid = get_guid()
0991 logger.warning(
0992 'guid not set: generated guid=%s for lfn=%s',
0993 dat.guid,
0994 dat.lfn
0995 )
0996
0997
0998 def get_stageout_label(job):
0999 """
1000 Get a proper stage-out label.
1001
1002 :param job: job object.
1003 :return: "all"/"log" depending on stage-out type (string).
1004 """
1005
1006 stageout = "all"
1007
1008 if job.is_eventservice:
1009 logger.info('event service payload, will only stage-out log')
1010 stageout = "log"
1011 else:
1012
1013 if 'exeErrorCode' in job.metadata:
1014 job.exeerrorcode = job.metadata['exeErrorCode']
1015 if job.exeerrorcode == 0:
1016 stageout = "all"
1017 else:
1018 logger.info('payload failed: exeErrorCode=%d', job.exeerrorcode)
1019 stageout = "log"
1020
1021 return stageout
1022
1023
1024 def update_output_for_hpo(job):
1025 """
1026 Update the output (outdata) for HPO jobs.
1027
1028 :param job: job object.
1029 :return:
1030 """
1031
1032 try:
1033 new_outdata = discover_new_outdata(job)
1034 except Exception as exc:
1035 logger.warning('exception caught while discovering new outdata: %s', exc)
1036 else:
1037 if new_outdata:
1038 logger.info((
1039 'replacing job outdata with discovered output '
1040 '(%d file(s))'), len(new_outdata))
1041 job.outdata = new_outdata
1042
1043
1044 def discover_new_outdata(job):
1045 """
1046 Discover new outdata created by HPO job.
1047
1048 :param job: job object.
1049 :return: new_outdata (list of FileSpec objects)
1050 """
1051
1052 from pilot.info.filespec import FileSpec
1053 new_outdata = []
1054
1055 for outdata_file in job.outdata:
1056 new_output = discover_new_output(outdata_file.lfn, job.workdir)
1057 if new_output:
1058
1059 for outfile in new_output:
1060
1061
1062 files = [{
1063 'scope': outdata_file.scope,
1064 'lfn': outfile,
1065 'workdir': job.workdir,
1066 'dataset': outdata_file.dataset,
1067 'ddmendpoint': outdata_file.ddmendpoint,
1068 'ddmendpoint_alt': None,
1069 'filesize': new_output[outfile]['filesize'],
1070 'checksum': new_output[outfile]['checksum'],
1071 'guid': ''
1072 }]
1073
1074
1075
1076 _xfiles = [FileSpec(type='output', **f) for f in files]
1077 new_outdata += _xfiles
1078
1079 return new_outdata
1080
1081
1082 def discover_new_output(name_pattern, workdir):
1083 """
1084 Discover new output created by HPO job in the given work dir.
1085
1086 name_pattern for known 'filename' is 'filename_N' (N = 0, 1, 2, ..).
1087 Example: name_pattern = 23578835.metrics.000001.tgz
1088 should discover files with names 23578835.metrics.000001.tgz_N (N = 0, 1, ..)
1089
1090 new_output = { lfn: {'path': path, 'size': size, 'checksum': checksum}, .. }
1091
1092 :param name_pattern: assumed name pattern for file to discover (string).
1093 :param workdir: work directory (string).
1094 :return: new_output (dictionary).
1095 """
1096
1097 new_output = {}
1098 outputs = glob("%s/%s_*" % (workdir, name_pattern))
1099 if outputs:
1100 lfns = [os.path.basename(path) for path in outputs]
1101 for lfn, path in list(zip(lfns, outputs)):
1102
1103 filesize = get_local_file_size(path)
1104
1105 checksum = calculate_checksum(path)
1106
1107 if filesize and checksum:
1108 new_output[lfn] = {'path': path, 'filesize': filesize, 'checksum': checksum}
1109 else:
1110 logger.warning(
1111 'failed to create file info (filesize=%d, checksum=%s) for lfn=%s',
1112 filesize,
1113 checksum,
1114 lfn
1115 )
1116
1117 return new_output
1118
1119
1120 def extract_output_file_guids(job):
1121 """
1122 Extract output file info from the job report and make sure all guids\
1123 are assigned (use job report value if present, otherwise generate the guid.\
1124 Note: guid generation is done later, not in this function since
1125 this function might not be called if metadata info is not found prior
1126 to the call).
1127
1128 :param job: job object.
1129 :return:
1130 """
1131
1132
1133
1134 if not job.allownooutput:
1135 output = job.metadata.get('files', {}).get('output', [])
1136 if output:
1137 logger.info((
1138 'verified that job report contains metadata '
1139 'for %d file(s)'), len(output))
1140 else:
1141
1142 logger.warning((
1143 'job report contains no output '
1144 'files and allowNoOutput is not set'))
1145
1146
1147 return
1148
1149
1150 data = dict([e.lfn, e] for e in job.outdata)
1151
1152 for dat in job.metadata.get('files', {}).get('output', []):
1153 for fdat in dat.get('subFiles', []):
1154 lfn = fdat['name']
1155
1156
1157
1158
1159 if lfn in data:
1160 data[lfn].guid = fdat['file_guid']
1161 logger.info((
1162 'set guid=%s for lfn=%s '
1163 '(value taken from job report)'), data[lfn].guid, lfn)
1164 else:
1165 logger.warning((
1166 'pilot no longer considers output files not mentioned '
1167 'in job definition (lfn=%s)'), lfn)
1168 continue
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183 for fspec in job.outdata:
1184 if fspec.guid != data[fspec.lfn].guid:
1185 fspec.guid = data[fspec.lfn].guid
1186 logger.debug('reset guid=%s for lfn=%s', fspec.guid, fspec.lfn)
1187 else:
1188 if fspec.guid:
1189 logger.debug('verified guid=%s for lfn=%s', fspec.guid, fspec.lfn)
1190 else:
1191 logger.warning('guid not set for lfn=%s', fspec.lfn)
1192
1193
1194
1195
1196
1197
1198 def verify_output_files(job):
1199 """
1200 Make sure that the known output files from the job definition are listed
1201 in the job report and number of processed events is greater than zero.
1202 If the output file is not listed in the job report, then if the file is
1203 listed in allowNoOutput remove it from stage-out, otherwise fail the job.
1204
1205 Note from Rod: fail scenario: The output file is not in output:[] or is
1206 there with zero events. Then if allownooutput is not set - fail the job.
1207 If it is set, then do not store the output, and finish ok.
1208
1209 :param job: job object.
1210 :return: Boolean (and potentially updated job.outdata list)
1211 """
1212
1213 failed = False
1214
1215
1216 lfns_jobdef = []
1217 for fspec in job.outdata:
1218 lfns_jobdef.append(fspec.lfn)
1219 if not lfns_jobdef:
1220 logger.debug('empty output file list from job definition (nothing to verify)')
1221 return True
1222
1223
1224
1225
1226 output = job.metadata.get('files', {}).get('output', None)
1227 if not output and output is not None:
1228
1229 logger.warning((
1230 'encountered an empty output file list in job report, '
1231 'consulting allowNoOutput list'))
1232 failed = False
1233 for lfn in lfns_jobdef:
1234 if lfn not in job.allownooutput:
1235 if job.is_analysis():
1236 logger.warning((
1237 'lfn %s is not in allowNoOutput list - '
1238 'ignore for user job'),
1239 lfn
1240 )
1241 else:
1242 failed = True
1243 logger.warning(
1244 'lfn %s is not in allowNoOutput list - job will fail',
1245 lfn
1246 )
1247 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGOUTPUTFILE)
1248 break
1249 else:
1250 logger.info('lfn %s listed in allowNoOutput - will be removed from stage-out', lfn)
1251 remove_from_stageout(lfn, job)
1252
1253 elif output is None:
1254
1255 logger.warning((
1256 'output file list could not be extracted from job report '
1257 '(nothing to verify)'))
1258 else:
1259 verified, nevents = verify_extracted_output_files(output, lfns_jobdef, job)
1260 failed = (not verified)
1261 if nevents > 0 and not failed and job.nevents == 0:
1262 job.nevents = nevents
1263 logger.info('number of events from summed up output files: %d', nevents)
1264 else:
1265 logger.info('number of events previously set to %d', job.nevents)
1266
1267 status = (not failed)
1268
1269 if status:
1270 logger.info('output file verification succeeded')
1271 else:
1272 logger.warning('output file verification failed')
1273
1274 return status
1275
1276
1277 def verify_extracted_output_files(output, lfns_jobdef, job):
1278 """
1279 Make sure all output files extracted from the job report are listed.
1280 Grab the number of events if possible.
1281
1282 :param output: list of FileSpecs (list).
1283 :param lfns_jobdef: list of lfns strings from job definition (list).
1284 :param job: job object.
1285 :return: True if successful|False if failed, number of events (Boolean, int)
1286 """
1287
1288 failed = False
1289 nevents = 0
1290 output_jobrep = {}
1291 logger.debug((
1292 'extracted output file list from job report - '
1293 'make sure all known output files are listed'))
1294
1295
1296 for dat in output:
1297 for fdat in dat.get('subFiles', []):
1298
1299 name = fdat.get('name', None)
1300
1301
1302 output_jobrep[name] = fdat.get('nentries', None)
1303
1304
1305 for lfn in lfns_jobdef:
1306 if lfn not in output_jobrep and lfn not in job.allownooutput:
1307 if job.is_analysis():
1308 logger.warning((
1309 'output file %s from job definition is not present '
1310 'in job report and is not listed in allowNoOutput'), lfn)
1311 else:
1312 logger.warning((
1313 'output file %s from job definition is not present '
1314 'in job report and is not listed in allowNoOutput - '
1315 'job will fail'), lfn)
1316 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.MISSINGOUTPUTFILE)
1317 failed = True
1318 break
1319
1320 if lfn not in output_jobrep and lfn in job.allownooutput:
1321 logger.warning((
1322 'output file %s from job definition is not present '
1323 'in job report but is listed in allowNoOutput - '
1324 'remove from stage-out'), lfn)
1325 remove_from_stageout(lfn, job)
1326 else:
1327 nentries = output_jobrep[lfn]
1328 if nentries == "UNDEFINED":
1329 logger.warning((
1330 'encountered file with nentries=UNDEFINED - '
1331 'will ignore %s'), lfn)
1332
1333 elif nentries is None:
1334
1335 if lfn not in job.allownooutput:
1336 logger.warning((
1337 'output file %s is listed in job report, '
1338 'but has no events and is not listed in '
1339 'allowNoOutput - will ignore'), lfn)
1340 else:
1341 logger.warning((
1342 'output file %s is listed in job report, '
1343 'nentries is None and is listed in allowNoOutput - '
1344 'remove from stage-out'), lfn)
1345 remove_from_stageout(lfn, job)
1346
1347 elif nentries == 0:
1348
1349 if lfn not in job.allownooutput:
1350 logger.warning((
1351 'output file %s is listed in job report, '
1352 'has zero events and is not listed in '
1353 'allowNoOutput - will ignore'), lfn)
1354 else:
1355 logger.warning((
1356 'output file %s is listed in job report, '
1357 'has zero events and is listed in allowNoOutput - '
1358 'remove from stage-out'), lfn)
1359 remove_from_stageout(lfn, job)
1360
1361 elif type(nentries) is int and nentries:
1362 logger.info('output file %s has %d event(s)', lfn, nentries)
1363 nevents += nentries
1364 else:
1365 logger.warning((
1366 'case not handled for output file %s with %s event(s) '
1367 '(ignore)'), lfn, str(nentries))
1368
1369 status = (not failed)
1370 return status, nevents
1371
1372
1373 def remove_from_stageout(lfn, job):
1374 """
1375 From the given lfn from the stage-out list.
1376
1377 :param lfn: local file name (string).
1378 :param job: job object
1379 :return: [updated job object]
1380 """
1381
1382 outdata = []
1383 for fspec in job.outdata:
1384 if fspec.lfn == lfn:
1385 logger.info('removing %s from stage-out list', lfn)
1386 else:
1387 outdata.append(fspec)
1388 job.outdata = outdata
1389
1390
1391 def remove_no_output_files(job):
1392 """
1393 Remove files from output file list if they are listed in
1394 allowNoOutput and do not exist.
1395
1396 :param job: job object.
1397 :return:
1398 """
1399
1400
1401 _outfiles = []
1402 for fspec in job.outdata:
1403 filename = fspec.lfn
1404 path = os.path.join(job.workdir, filename)
1405
1406 if filename in job.allownooutput:
1407 if os.path.exists(path):
1408 logger.info((
1409 "file %s is listed in allowNoOutput but exists "
1410 "(will not be removed from list of files to be "
1411 "staged-out)"), filename)
1412 _outfiles.append(filename)
1413 else:
1414 logger.info((
1415 "file %s is listed in allowNoOutput and does not exist "
1416 "(will be removed from list of files to be staged-out)"), filename)
1417 else:
1418 if os.path.exists(path):
1419 logger.info("file %s is not listed in allowNoOutput (will be staged-out)", filename)
1420 else:
1421 logger.warning((
1422 "file %s is not listed in allowNoOutput and "
1423 "does not exist (job will fail)"), filename)
1424 _outfiles.append(filename)
1425
1426
1427 if len(_outfiles) != len(job.outdata):
1428 outdata = []
1429 for fspec in job.outdata:
1430 if fspec.lfn in _outfiles:
1431 outdata.append(fspec)
1432 job.outdata = outdata
1433
1434
1435 def get_outfiles_records(subfiles):
1436 """
1437 Extract file info from job report JSON subfiles entry.
1438
1439 :param subfiles: list of subfiles.
1440 :return: file info dictionary with format { 'guid': .., 'size': .., 'nentries': .. (optional)}
1441 """
1442
1443 res = {}
1444 for subfile in subfiles:
1445 res[subfile['name']] = {
1446 'guid': subfile['file_guid'],
1447 'size': subfile['file_size']
1448 }
1449
1450 nentries = subfile.get('nentries', 'UNDEFINED')
1451 if type(nentries) == int:
1452 res[subfile['name']]['nentries'] = nentries
1453 else:
1454 logger.warning("nentries is undefined in job report")
1455
1456 return res
1457
1458
1459 class DictQuery(dict):
1460 """
1461 Helper class for parsing job report.
1462 """
1463
1464 def get(self, path, dst_dict, dst_key):
1465 keys = path.split("/")
1466 if len(keys) == 0:
1467 return
1468 last_key = keys.pop()
1469 me_ = self
1470 for key in keys:
1471 if not (key in me_ and isinstance(me_[key], dict)):
1472 return
1473
1474 me_ = me_[key]
1475
1476 if last_key in me_:
1477 dst_dict[dst_key] = me_[last_key]
1478
1479
1480 def parse_jobreport_data(job_report):
1481 """
1482 Parse a job report and extract relevant fields.
1483
1484 :param job_report:
1485 :return:
1486 """
1487 work_attributes = {}
1488 if job_report is None or not any(job_report):
1489 return work_attributes
1490
1491
1492 core_count = ""
1493 work_attributes["nEvents"] = 0
1494 work_attributes["dbTime"] = ""
1495 work_attributes["dbData"] = ""
1496 work_attributes["inputfiles"] = []
1497 work_attributes["outputfiles"] = []
1498
1499 if "ATHENA_PROC_NUMBER" in os.environ:
1500 logger.debug("ATHENA_PROC_NUMBER: %s", os.environ["ATHENA_PROC_NUMBER"])
1501 work_attributes['core_count'] = int(os.environ["ATHENA_PROC_NUMBER"])
1502 core_count = int(os.environ["ATHENA_PROC_NUMBER"])
1503
1504 dictq = DictQuery(job_report)
1505 dictq.get("resource/transform/processedEvents", work_attributes, "nEvents")
1506 dictq.get("resource/transform/cpuTimeTotal", work_attributes, "cpuConsumptionTime")
1507 dictq.get("resource/machine/node", work_attributes, "node")
1508 dictq.get("resource/machine/model_name", work_attributes, "cpuConsumptionUnit")
1509 dictq.get("resource/dbTimeTotal", work_attributes, "dbTime")
1510 dictq.get("resource/dbDataTotal", work_attributes, "dbData")
1511 dictq.get("exitCode", work_attributes, "transExitCode")
1512 dictq.get("exitMsg", work_attributes, "exeErrorDiag")
1513 dictq.get("files/input", work_attributes, "inputfiles")
1514 dictq.get("files/output", work_attributes, "outputfiles")
1515
1516 outputfiles_dict = {}
1517 for opf in work_attributes['outputfiles']:
1518 outputfiles_dict.update(get_outfiles_records(opf['subFiles']))
1519 work_attributes['outputfiles'] = outputfiles_dict
1520
1521 if work_attributes['inputfiles']:
1522 if is_python3():
1523 work_attributes['nInputFiles'] = reduce(lambda a, b: a + b, [len(inpfiles['subFiles']) for inpfiles in
1524 work_attributes['inputfiles']])
1525 else:
1526 work_attributes['nInputFiles'] = reduce(lambda a, b: a + b, map(lambda inpfiles: len(inpfiles['subFiles']),
1527 work_attributes['inputfiles']))
1528
1529 if 'resource' in job_report and 'executor' in job_report['resource']:
1530 j = job_report['resource']['executor']
1531
1532 fin_report = defaultdict(int)
1533 for value in j.values():
1534 mem = value.get('memory', {})
1535 for key in ('Avg', 'Max'):
1536 for subk, subv in mem.get(key, {}).items():
1537 fin_report[subk] += subv
1538
1539 work_attributes.update(fin_report)
1540
1541 workdir_size = get_disk_usage('.')
1542 work_attributes['jobMetrics'] = 'coreCount=%s nEvents=%s dbTime=%s dbData=%s workDirSize=%s' % \
1543 (core_count,
1544 work_attributes["nEvents"],
1545 work_attributes["dbTime"],
1546 work_attributes["dbData"],
1547 workdir_size)
1548 del work_attributes["dbData"]
1549 del work_attributes["dbTime"]
1550
1551 return work_attributes
1552
1553
1554 def get_executor_dictionary(jobreport_dictionary):
1555 """
1556 Extract the 'executor' dictionary from with a job report.
1557
1558 :param jobreport_dictionary:
1559 :return: executor_dictionary
1560 """
1561
1562 executor_dictionary = {}
1563 if jobreport_dictionary != {}:
1564
1565 if 'resource' in jobreport_dictionary:
1566 resource_dictionary = jobreport_dictionary['resource']
1567 if 'executor' in resource_dictionary:
1568 executor_dictionary = resource_dictionary['executor']
1569 else:
1570 logger.warning("no such key: executor")
1571 else:
1572 logger.warning("no such key: resource")
1573
1574 return executor_dictionary
1575
1576
1577 def get_resimevents(jobreport_dictionary):
1578 """
1579 Extract and add up the resimevents from the job report.
1580 This information is reported with the jobMetrics.
1581
1582 :param jobreport_dictionary: job report dictionary.
1583 :return: resimevents (int or None)
1584 """
1585
1586 resimevents = None
1587
1588 executor_dictionary = get_executor_dictionary(jobreport_dictionary)
1589 if executor_dictionary != {}:
1590 for fmt in list(executor_dictionary.keys()):
1591 if 'resimevents' in executor_dictionary[fmt]:
1592 try:
1593 resimevents = int(executor_dictionary[fmt]['resimevents'])
1594 except (KeyError, ValueError, TypeError):
1595 pass
1596 else:
1597 break
1598
1599 return resimevents
1600
1601
1602 def get_db_info(jobreport_dictionary):
1603 """
1604 Extract and add up the DB info from the job report.
1605 This information is reported with the jobMetrics.
1606 Note: this function adds up the different dbData and dbTime's in
1607 the different executor steps. In modern job reports this might have
1608 been done already by the transform and stored in dbDataTotal and dbTimeTotal.
1609
1610 :param jobreport_dictionary: job report dictionary.
1611 :return: db_time (int), db_data (long)
1612 """
1613
1614 db_time = 0
1615 try:
1616 db_data = long(0)
1617 except NameError:
1618 db_data = 0
1619
1620 executor_dictionary = get_executor_dictionary(jobreport_dictionary)
1621 if executor_dictionary != {}:
1622 for fmt in list(executor_dictionary.keys()):
1623 if 'dbData' in executor_dictionary[fmt]:
1624 try:
1625 db_data += executor_dictionary[fmt]['dbData']
1626 except Exception:
1627 pass
1628 else:
1629 logger.warning("format %s has no such key: dbData", fmt)
1630 if 'dbTime' in executor_dictionary[fmt]:
1631 try:
1632 db_time += executor_dictionary[fmt]['dbTime']
1633 except Exception:
1634 pass
1635 else:
1636 logger.warning("format %s has no such key: dbTime", fmt)
1637
1638 return db_time, db_data
1639
1640
1641 def get_db_info_str(db_time, db_data):
1642 """
1643 Convert db_time, db_data to strings.
1644 E.g. dbData="105077960", dbTime="251.42".
1645
1646 :param db_time: time (s)
1647 :param db_data: long integer
1648 :return: db_time_s, db_data_s (strings)
1649 """
1650
1651 try:
1652 zero = long(0)
1653 except NameError:
1654 zero = 0
1655
1656 db_data_s = ""
1657 if db_data != zero:
1658 db_data_s = "%s" % (db_data)
1659
1660 db_time_s = ""
1661 if db_time != 0:
1662 db_time_s = "%.2f" % (db_time)
1663
1664 return db_time_s, db_data_s
1665
1666
1667 def get_cpu_times(jobreport_dictionary):
1668 """
1669 Extract and add up the total CPU times from the job report.
1670 E.g. ('s', 5790L, 1.0).
1671
1672 Note: this function is used with Event Service jobs
1673
1674 :param jobreport_dictionary:
1675 :return: cpu_conversion_unit (unit), total_cpu_time,
1676 conversion_factor (output consistent with set_time_consumed())
1677 """
1678
1679 try:
1680 total_cpu_time = long(0)
1681 except NameError:
1682 total_cpu_time = 0
1683
1684 executor_dictionary = get_executor_dictionary(jobreport_dictionary)
1685 if executor_dictionary != {}:
1686 for fmt in list(executor_dictionary.keys()):
1687 try:
1688 total_cpu_time += executor_dictionary[fmt]['cpuTime']
1689 except KeyError:
1690 logger.warning("format %s has no such key: cpuTime", fmt)
1691 except Exception:
1692 pass
1693
1694 conversion_factor = 1.0
1695 cpu_conversion_unit = "s"
1696
1697 return cpu_conversion_unit, total_cpu_time, conversion_factor
1698
1699
1700 def get_exit_info(jobreport_dictionary):
1701 """
1702 Return the exit code (exitCode) and exit message (exitMsg).
1703 E.g. (0, 'OK').
1704
1705 :param jobreport_dictionary:
1706 :return: exit_code, exit_message
1707 """
1708
1709 return jobreport_dictionary['exitCode'], jobreport_dictionary['exitMsg']
1710
1711
1712 def cleanup_looping_payload(workdir):
1713 """
1714 Run a special cleanup for looping payloads.
1715 Remove any root and tmp files.
1716
1717 :param workdir: working directory (string)
1718 :return:
1719 """
1720
1721 for (root, _, files) in os.walk(workdir):
1722 for filename in files:
1723 if 'pool.root' in filename:
1724 path = os.path.join(root, filename)
1725 path = os.path.abspath(path)
1726 remove(path)
1727
1728
1729 def cleanup_payload(workdir, outputfiles=None, removecores=True):
1730 """
1731 Cleanup of payload (specifically AthenaMP) sub directories prior to log file creation.
1732 Also remove core dumps.
1733
1734 :param workdir: working directory (string).
1735 :param outputfiles: list of output files.
1736 :param removecores: remove core files if True (Boolean).
1737 :return:
1738 """
1739
1740 if outputfiles is None:
1741 outputfiles = []
1742
1743 if removecores:
1744 remove_core_dumps(workdir)
1745
1746 for ampdir in glob('%s/athenaMP-workers-*' % workdir):
1747 for (root, _, files) in os.walk(ampdir):
1748 for filename in files:
1749 path = os.path.abspath(os.path.join(root, filename))
1750
1751 core_file = ('core' in filename and removecores)
1752 pool_root_file = 'pool.root' in filename
1753 tmp_file = 'tmp.' in filename
1754
1755 if core_file or pool_root_file or tmp_file:
1756 remove(path)
1757
1758 for outfile in outputfiles:
1759 if outfile in filename:
1760 remove(path)
1761
1762
1763 def get_redundant_path():
1764 """
1765 Return the path to the file containing the redundant files
1766 and directories to be removed prior to log file creation.
1767
1768 :return: file path (string).
1769 """
1770
1771 filename = config.Pilot.redundant
1772
1773
1774 if filename.startswith('/cvmfs') and os.environ.get('ATLAS_SW_BASE', False):
1775 filename = filename.replace('/cvmfs', os.environ.get('ATLAS_SW_BASE'))
1776
1777 return filename
1778
1779
1780 def get_redundants():
1781 """
1782 Get list of redundant files and directories (to be removed).
1783 The function will return the content of an external file. It that
1784 can't be read, then a list defined in this function will be returned instead.
1785 Any updates to the external file must be propagated to this function.
1786
1787 :return: files and directories list
1788 """
1789
1790
1791 filename = get_redundant_path()
1792
1793
1794
1795
1796
1797
1798
1799
1800 logger.debug((
1801 'list of redundant files could not be read from external file: %s '
1802 '(will use internal list)'), filename)
1803
1804
1805 dir_list = ["AtlasProduction*",
1806 "AtlasPoint1",
1807 "AtlasTier0",
1808 "buildJob*",
1809 "CDRelease*",
1810 "csc*.log",
1811 "DBRelease*",
1812 "EvgenJobOptions",
1813 "external",
1814 "fort.*",
1815 "geant4",
1816 "geomDB",
1817 "geomDB_sqlite",
1818 "home",
1819 "o..pacman..o",
1820 "pacman-*",
1821 "python",
1822 "runAthena*",
1823 "share",
1824 "sources.*",
1825 "sqlite*",
1826 "sw",
1827 "tcf_*",
1828 "triggerDB",
1829 "trusted.caches",
1830 "workdir",
1831 "*.data*",
1832 "*.events",
1833 "*.py",
1834 "*.pyc",
1835 "*.root*",
1836 "JEM",
1837 "tmp*",
1838 "*.tmp",
1839 "*.TMP",
1840 "MC11JobOptions",
1841 "scratch",
1842 "*.writing",
1843 "pwg*",
1844 "pwhg*",
1845 "*PROC*",
1846 "madevent",
1847 "*proxy",
1848 "ckpt*",
1849 "*runcontainer*",
1850 "*job.log.tgz",
1851 "runGen-*",
1852 "runAthena-*",
1853 "pandawnutil/*",
1854 "src/*",
1855 "singularity_cachedir",
1856 "_joproxy15",
1857 "HAHM_*",
1858 "Process",
1859 "merged_lhef._0.events-new",
1860 "singularity/*",
1861 "/cores",
1862 "/work",
1863 "docs/",
1864 "/pilot2"]
1865
1866 return dir_list
1867
1868
1869 def remove_archives(workdir):
1870 """
1871 Explicitly remove any soft linked archives (.a files) since
1872 they will be dereferenced by the tar command
1873 (--dereference option).
1874
1875 :param workdir: working directory (string)
1876 :return:
1877 """
1878
1879 matches = []
1880 for root, _, filenames in os.walk(workdir):
1881 for filename in fnmatch.filter(filenames, '*.a'):
1882 matches.append(os.path.join(root, filename))
1883 for root, _, filenames in os.walk(os.path.dirname(workdir)):
1884 for filename in fnmatch.filter(filenames, 'EventService_premerge_*.tar'):
1885 matches.append(os.path.join(root, filename))
1886
1887 for match in matches:
1888 remove(match)
1889
1890
1891 def cleanup_broken_links(workdir):
1892 """
1893 Run a second pass to clean up any broken links prior to log file creation.
1894
1895 :param workdir: working directory (string)
1896 :return:
1897 """
1898
1899 broken = []
1900 for root, _, files in os.walk(workdir):
1901 for filename in files:
1902 path = os.path.join(root, filename)
1903 if not os.path.islink(path):
1904 continue
1905
1906 target_path = os.readlink(path)
1907
1908 if not os.path.isabs(target_path):
1909 target_path = os.path.join(os.path.dirname(path), target_path)
1910 if not os.path.exists(target_path):
1911 broken.append(path)
1912
1913 for brok in broken:
1914 remove(brok)
1915
1916
1917 def list_work_dir(workdir):
1918 """
1919 Execute ls -lF for the given directory and dump to log.
1920
1921 :param workdir: directory name (string).
1922 """
1923
1924 cmd = 'ls -lF %s' % workdir
1925 _, stdout, stderr = execute(cmd)
1926 logger.debug('%s:\n' % stdout + stderr)
1927
1928
1929 def remove_special_files(workdir, dir_list, outputfiles):
1930 """
1931 Remove list of special files from the workdir.
1932
1933 :param workdir: work directory (string).
1934 :param dir_list: list of special files (list).
1935 :param outputfiles: output files (list).
1936 :return:
1937 """
1938
1939
1940 exceptions_list = ["runargs", "runwrapper", "jobReport", "log."]
1941
1942 to_delete = []
1943 for _dir in dir_list:
1944 files = glob(os.path.join(workdir, _dir))
1945 if not files:
1946 continue
1947
1948 exclude = []
1949 for exc in exceptions_list:
1950 for item in files:
1951 if exc in item:
1952 exclude.append(os.path.abspath(item))
1953
1954 _files = [os.path.abspath(item) for item in files if item not in exclude]
1955 to_delete += _files
1956
1957 exclude_files = []
1958 for opf in outputfiles:
1959 exclude_files.append(os.path.join(workdir, opf))
1960
1961 for item in to_delete:
1962 if item not in exclude_files:
1963 logger.debug('removing %s', item)
1964 if os.path.isfile(item):
1965 remove(item)
1966 else:
1967 remove_dir_tree(item)
1968
1969
1970 def remove_redundant_files(workdir, outputfiles=None, islooping=False, debugmode=False):
1971 """
1972 Remove redundant files and directories prior to creating the log file.
1973
1974 Note: in debug mode, any core files should not be removed before creating the log.
1975
1976 :param workdir: working directory (string).
1977 :param outputfiles: list of protected output files (list).
1978 :param islooping: looping job variable to make sure workDir is not removed in case of looping (Boolean).
1979 :param debugmode: True if debug mode has been switched on (Boolean).
1980 :return:
1981 """
1982
1983 if outputfiles is None:
1984 outputfiles = []
1985
1986 logger.debug("removing redundant files prior to log creation")
1987 workdir = os.path.abspath(workdir)
1988
1989 list_work_dir(workdir)
1990
1991
1992 dir_list = get_redundants()
1993
1994
1995 logger.debug('cleaning up payload')
1996 try:
1997 cleanup_payload(workdir, outputfiles, removecores=not debugmode)
1998 except OSError as exc:
1999 logger.warning("failed to execute cleanup_payload(): %s", exc)
2000
2001
2002
2003 logger.debug('removing archives')
2004 remove_archives(workdir)
2005
2006
2007 remove_special_files(workdir, dir_list, outputfiles)
2008
2009
2010 logger.debug('cleaning up broken links')
2011 cleanup_broken_links(workdir)
2012
2013
2014 path = os.path.join(workdir, 'workDir')
2015 if os.path.exists(path):
2016
2017 cleanup_looping_payload(path)
2018 if not islooping:
2019 logger.debug('removing \'workDir\' from workdir=%s', workdir)
2020 remove_dir_tree(path)
2021
2022
2023 additionals = ['singularity', 'pilot', 'cores']
2024 for additional in additionals:
2025 path = os.path.join(workdir, additional)
2026 if os.path.exists(path):
2027 logger.debug('removing \'%s\' from workdir=%s', additional, workdir)
2028 remove_dir_tree(path)
2029
2030 list_work_dir(workdir)
2031
2032
2033 def download_command(process, workdir):
2034 """
2035 Download the pre/postprocess commands if necessary.
2036
2037 Process FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>}
2038
2039 :param process: pre/postprocess dictionary.
2040 :param workdir: job workdir (string).
2041 :return: updated pre/postprocess dictionary.
2042 """
2043
2044 cmd = process.get('command', '')
2045
2046
2047 if cmd.startswith('http'):
2048
2049 exitcode, _, cmd = get_analysis_trf(cmd, workdir)
2050 if exitcode != 0:
2051 logger.warning('cannot execute command due to previous error: %s', cmd)
2052 return {}
2053
2054
2055 process['command'] = './' + cmd
2056
2057 return process
2058
2059
2060 def get_utility_commands(order=None, job=None):
2061 """
2062 Return a dictionary of utility commands and arguments to be executed
2063 in parallel with the payload. This could e.g. be memory and network
2064 monitor commands. A separate function can be used to determine the
2065 corresponding command setups using the utility command name. If the
2066 optional order parameter is set, the function should return the list
2067 of corresponding commands.
2068
2069 For example:
2070
2071 If order=UTILITY_BEFORE_PAYLOAD, the function should return all
2072 commands that are to be executed before the payload.
2073
2074 If order=UTILITY_WITH_PAYLOAD, the corresponding commands will be
2075 prepended to the payload execution string.
2076
2077 If order=UTILITY_AFTER_PAYLOAD_STARTED, the commands that should be
2078 executed after the payload has been started should be returned.
2079
2080 If order=UTILITY_WITH_STAGEIN, the commands that should be executed
2081 parallel with stage-in will be returned.
2082
2083 FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>, 'ignore_failure': <Boolean>}
2084
2085 :param order: optional sorting order (see pilot.util.constants).
2086 :param job: optional job object.
2087 :return: dictionary of utilities to be executed in parallel with the payload.
2088 """
2089
2090 if order == UTILITY_BEFORE_PAYLOAD and job.preprocess:
2091 return get_precopostprocess_command(job.preprocess, job.workdir, 'preprocess')
2092
2093 if order == UTILITY_WITH_PAYLOAD:
2094 return {'command': 'NetworkMonitor', 'args': '', 'label': 'networkmonitor', 'ignore_failure': True}
2095
2096 if order == UTILITY_AFTER_PAYLOAD_STARTED:
2097 return get_utility_after_payload_started()
2098
2099 if order == UTILITY_AFTER_PAYLOAD_STARTED2 and job.coprocess:
2100 return get_precopostprocess_command(job.coprocess, job.workdir, 'coprocess')
2101
2102 if order == UTILITY_AFTER_PAYLOAD_FINISHED:
2103 return get_xcache_command(
2104 job.infosys.queuedata.catchall,
2105 job.workdir,
2106 job.jobid,
2107 'xcache_kill',
2108 xcache_deactivation_command,
2109 )
2110
2111 if order == UTILITY_AFTER_PAYLOAD_FINISHED2 and job.postprocess:
2112 return get_precopostprocess_command(job.postprocess, job.workdir, 'postprocess')
2113
2114 if order == UTILITY_BEFORE_STAGEIN:
2115 return get_xcache_command(
2116 job.infosys.queuedata.catchall,
2117 job.workdir,
2118 job.jobid,
2119 'xcache_start',
2120 xcache_activation_command,
2121 )
2122
2123 return None
2124
2125
2126 def get_precopostprocess_command(process, workdir, label):
2127 """
2128 Return the pre/co/post-process command dictionary.
2129
2130 Command FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>}
2131
2132 The returned command has the structure: { 'command': <string>, }
2133 :param process: pre/co/post-process (dictionary).
2134 :param workdir: working directory (string).
2135 :param label: label (string).
2136 :return: command (dictionary).
2137 """
2138
2139 com = {}
2140 if process.get('command', ''):
2141 com = download_command(process, workdir)
2142 com['label'] = label
2143 com['ignore_failure'] = False
2144 return com
2145
2146
2147 def get_utility_after_payload_started():
2148 """
2149 Return the command dictionary for the utility after the payload has started.
2150
2151 Command FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>}
2152
2153 :return: command (dictionary).
2154 """
2155
2156 com = {}
2157 try:
2158 cmd = config.Pilot.utility_after_payload_started
2159 except Exception:
2160 pass
2161 else:
2162 if cmd:
2163 com = {'command': cmd, 'args': '', 'label': cmd.lower(), 'ignore_failure': True}
2164 return com
2165
2166
2167 def get_xcache_command(catchall, workdir, jobid, label, xcache_function):
2168 """
2169 Return the proper xcache command for either activation or deactivation.
2170
2171 Command FORMAT: {'command': <command>, 'args': <args>, 'label': <some name>}
2172
2173 :param catchall: queuedata catchall field (string).
2174 :param workdir: job working directory (string).
2175 :param jobid: PanDA job id (string).
2176 :param label: label (string).
2177 :param xcache_function: activation/deactivation function name (function).
2178 :return: command (dictionary).
2179 """
2180
2181 com = {}
2182 if 'pilotXcache' in catchall:
2183 com = xcache_function(jobid=jobid, workdir=workdir)
2184 com['label'] = label
2185 com['ignore_failure'] = True
2186 return com
2187
2188
2189 def post_prestagein_utility_command(**kwargs):
2190 """
2191 Execute any post pre-stage-in utility commands.
2192
2193 :param kwargs: kwargs (dictionary).
2194 :return:
2195 """
2196
2197 label = kwargs.get('label', 'unknown_label')
2198 stdout = kwargs.get('output', None)
2199
2200 if stdout:
2201 logger.debug('processing stdout for label=%s', label)
2202 xcache_proxy(stdout)
2203 else:
2204 logger.warning('no output for label=%s', label)
2205
2206 alrb_xcache_files = os.environ.get('ALRB_XCACHE_FILES', '')
2207 if alrb_xcache_files:
2208 cmd = 'cat $ALRB_XCACHE_FILES/settings.sh'
2209 _, _stdout, _ = execute(cmd)
2210 logger.debug('cmd=%s:\n\n%s\n\n', cmd, _stdout)
2211
2212
2213 def xcache_proxy(output):
2214 """
2215 Extract env vars from xcache stdout and set them.
2216
2217 :param output: command output (string).
2218 :return:
2219 """
2220
2221
2222 for line in output.split('\n'):
2223 if 'ALRB_XCACHE_PROXY' in line:
2224 suffix = '_REMOTE' if 'REMOTE' in line else ''
2225 name = 'ALRB_XCACHE_PROXY%s' % suffix
2226 pattern = r'\ export\ ALRB_XCACHE_PROXY%s\=\"(.+)\"' % suffix
2227 set_xcache_var(line, name=name, pattern=pattern)
2228
2229 elif 'ALRB_XCACHE_MYPROCESS' in line:
2230 set_xcache_var(
2231 line,
2232 name='ALRB_XCACHE_MYPROCESS',
2233 pattern=r'\ ALRB_XCACHE_MYPROCESS\=(.+)'
2234 )
2235
2236 elif 'Messages logged in' in line:
2237 set_xcache_var(
2238 line,
2239 name='ALRB_XCACHE_LOG',
2240 pattern=r'xcache\ started\ successfully.\ \ Messages\ logged\ in\ (.+)'
2241 )
2242
2243 elif 'ALRB_XCACHE_FILES' in line:
2244 set_xcache_var(
2245 line,
2246 name='ALRB_XCACHE_FILES',
2247 pattern=r'\ ALRB_XCACHE_FILES\=(.+)'
2248 )
2249
2250
2251 def set_xcache_var(line, name='', pattern=''):
2252 """
2253 Extract the value of a given environmental variable from a given stdout line.
2254
2255 :param line: line from stdout to be investigated (string).
2256 :param name: name of env var (string).
2257 :param pattern: regex pattern (string).
2258 :return:
2259 """
2260
2261 pattern = re.compile(pattern)
2262 result = re.findall(pattern, line)
2263 if result:
2264 os.environ[name] = result[0]
2265
2266
2267 def xcache_activation_command(workdir='', jobid=''):
2268 """
2269 Return the xcache service activation command.
2270
2271 Note: the workdir is not used here, but the function prototype
2272 needs it in the called (xcache_activation_command needs it).
2273
2274 :param workdir: unused work directory - do not remove (string).
2275 :param jobid: PanDA job id to guarantee that xcache process is unique (int).
2276 :return: xcache command (string).
2277 """
2278
2279
2280
2281
2282
2283
2284 command = "%s " % get_asetup(asetup=False)
2285
2286
2287
2288 command += (
2289 "lsetup xcache; xcache list; "
2290 "xcache start -d $PWD/%s/xcache -C centos7 --disklow 4g --diskhigh 5g -b 4" % jobid)
2291
2292 return {'command': command, 'args': ''}
2293
2294
2295 def xcache_deactivation_command(workdir='', jobid=''):
2296 """
2297 Return the xcache service deactivation command.
2298 This service should be stopped after the payload has finished.
2299 Copy the messages log before shutting down.
2300
2301 Note: the job id is not used here, but the function prototype
2302 needs it in the called (xcache_activation_command needs it).
2303
2304 :param workdir: payload work directory (string).
2305 :param jobid: unused job id - do not remove (string).
2306 :return: xcache command (string).
2307 """
2308
2309 path = os.environ.get('ALRB_XCACHE_LOG', None)
2310 if path and os.path.exists(path):
2311 logger.debug('copying xcache messages log file (%s) to work dir (%s)', path, workdir)
2312 dest = os.path.join(workdir, 'xcache-messages.log')
2313 try:
2314 copy(path, dest)
2315 except Exception as exc:
2316 logger.warning('exception caught copying xcache log: %s', exc)
2317 else:
2318 if not path:
2319 logger.warning('ALRB_XCACHE_LOG is not set')
2320 if path and not os.path.exists(path):
2321 logger.warning('path does not exist: %s', path)
2322 command = "%s " % get_asetup(asetup=False)
2323 command += "lsetup xcache; xcache kill"
2324
2325 return {'command': command, 'args': '-p $ALRB_XCACHE_MYPROCESS'}
2326
2327
2328 def get_utility_command_setup(name, job, setup=None):
2329 """
2330 Return the proper setup for the given utility command.
2331 If a payload setup is specified, then the utility command string should be prepended to it.
2332
2333 :param name: name of utility (string).
2334 :param job: job object.
2335 :param setup: optional payload setup string.
2336 :return: utility command setup (string).
2337 """
2338
2339 if name == 'MemoryMonitor':
2340
2341
2342 use_container = job.usecontainer or 'runcontainer' in job.transformation
2343 dump_ps = ("PRMON_DEBUG" in job.infosys.queuedata.catchall)
2344
2345 setup, pid = get_memory_monitor_setup(
2346 job.pid,
2347 job.pgrp,
2348 job.jobid,
2349 job.workdir,
2350 job.command,
2351 use_container=use_container,
2352 transformation=job.transformation,
2353 outdata=job.outdata,
2354 dump_ps=dump_ps
2355 )
2356
2357 _pattern = r"([\S]+)\ ."
2358 pattern = re.compile(_pattern)
2359 _name = re.findall(pattern, setup.split(';')[-1])
2360 if _name:
2361 job.memorymonitor = _name[0]
2362 else:
2363 logger.warning('trf name could not be identified in setup string')
2364
2365
2366 if pid not in (job.pid, -1):
2367 logger.debug('updating pgrp=%d for pid=%d', job.pgrp, pid)
2368 try:
2369 job.pgrp = os.getpgid(pid)
2370 except Exception as exc:
2371 logger.warning('os.getpgid(%d) failed with: %s', pid, exc)
2372 return setup
2373
2374 if name == 'NetworkMonitor' and setup:
2375 return get_network_monitor_setup(setup, job)
2376
2377 if name == 'Prefetcher':
2378 return get_prefetcher_setup(job)
2379
2380 if name == 'Benchmark':
2381 return get_benchmark_setup(job)
2382
2383 return ""
2384
2385
2386 def get_utility_command_execution_order(name):
2387 """
2388 Should the given utility command be executed before or after the payload?
2389
2390 :param name: utility name (string).
2391 :return: execution order constant.
2392 """
2393
2394
2395 if name == 'NetworkMonitor':
2396 return UTILITY_WITH_PAYLOAD
2397
2398 if name == 'MemoryMonitor':
2399 return UTILITY_AFTER_PAYLOAD_STARTED
2400
2401 logger.warning('unknown utility name: %s', name)
2402 return UTILITY_AFTER_PAYLOAD_STARTED
2403
2404
2405 def post_utility_command_action(name, job):
2406 """
2407 Perform post action for given utility command.
2408
2409 :param name: name of utility command (string).
2410 :param job: job object.
2411 :return:
2412 """
2413
2414 if name == 'NetworkMonitor':
2415 pass
2416 elif name == 'MemoryMonitor':
2417 post_memory_monitor_action(job)
2418
2419
2420 def get_utility_command_kill_signal(name):
2421 """
2422 Return the proper kill signal used to stop the utility command.
2423
2424 :param name: name of utility command (string).
2425 :return: kill signal
2426 """
2427
2428
2429 sig = SIGUSR1 if name == 'MemoryMonitor' else SIGTERM
2430 return sig
2431
2432
2433 def get_utility_command_output_filename(name, selector=None):
2434 """
2435 Return the filename to the output of the utility command.
2436
2437 :param name: utility name (string).
2438 :param selector: optional special conditions flag (boolean).
2439 :return: filename (string).
2440 """
2441
2442 if name == 'MemoryMonitor':
2443 filename = get_memory_monitor_summary_filename(selector=selector)
2444 else:
2445 filename = ""
2446
2447 return filename
2448
2449
2450 def verify_lfn_length(outdata):
2451 """
2452 Make sure that the LFNs are all within the allowed length.
2453
2454 :param outdata: FileSpec object.
2455 :return: error code (int), diagnostics (string).
2456 """
2457
2458 exitcode = 0
2459 diagnostics = ""
2460 max_length = 255
2461
2462
2463 for fspec in outdata:
2464 if len(fspec.lfn) > max_length:
2465 diagnostics = "LFN too long (length: %d, must be less than %d characters): %s" % \
2466 (len(fspec.lfn), max_length, fspec.lfn)
2467 exitcode = errors.LFNTOOLONG
2468 break
2469
2470 return exitcode, diagnostics
2471
2472
2473 def verify_ncores(corecount):
2474 """
2475 Verify that nCores settings are correct
2476
2477 :param corecount: number of cores (int).
2478 :return:
2479 """
2480
2481 try:
2482 del os.environ['ATHENA_PROC_NUMBER_JOB']
2483 logger.debug("unset existing ATHENA_PROC_NUMBER_JOB")
2484 except Exception:
2485 pass
2486
2487 try:
2488 athena_proc_number = int(os.environ.get('ATHENA_PROC_NUMBER', None))
2489 except Exception:
2490 athena_proc_number = None
2491
2492
2493
2494
2495
2496
2497 if athena_proc_number:
2498 logger.info((
2499 "encountered a set ATHENA_PROC_NUMBER (%d), "
2500 "will not overwrite it"), athena_proc_number)
2501 logger.info('set ATHENA_CORE_NUMBER to same value as ATHENA_PROC_NUMBER')
2502 os.environ['ATHENA_CORE_NUMBER'] = str(athena_proc_number)
2503 else:
2504 os.environ['ATHENA_PROC_NUMBER_JOB'] = str(corecount)
2505 os.environ['ATHENA_CORE_NUMBER'] = str(corecount)
2506 logger.info((
2507 "set ATHENA_PROC_NUMBER_JOB and ATHENA_CORE_NUMBER to %s "
2508 "(ATHENA_PROC_NUMBER will not be overwritten)"), corecount)
2509
2510
2511 def verify_job(job):
2512 """
2513 Verify job parameters for specific errors.
2514 Note:
2515 in case of problem, the function should set the corresponding pilot error code using:
2516 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code())
2517
2518 :param job: job object
2519 :return: Boolean.
2520 """
2521
2522 status = False
2523
2524
2525 exitcode, diagnostics = verify_lfn_length(job.outdata)
2526 if exitcode != 0:
2527 logger.fatal(diagnostics)
2528 job.piloterrordiag = diagnostics
2529 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode)
2530 else:
2531 status = True
2532
2533
2534 verify_ncores(job.corecount)
2535
2536 return status
2537
2538
2539 def update_stagein(job):
2540 """
2541 Skip DBRelease files during stage-in.
2542
2543 :param job: job object.
2544 :return:
2545 """
2546
2547 for fspec in job.indata:
2548 if 'DBRelease' in fspec.lfn:
2549 fspec.status = 'no_transfer'
2550
2551
2552 def get_metadata(workdir):
2553 """
2554 Return the metadata from file.
2555
2556 :param workdir: work directory (string)
2557 :return:
2558 """
2559
2560 path = os.path.join(workdir, config.Payload.jobreport)
2561 metadata = read_file(path) if os.path.exists(path) else None
2562 logger.debug('metadata=%s', str(metadata))
2563
2564 return metadata
2565
2566
2567 def should_update_logstash(frequency=10):
2568 """
2569 Should logstash be updated with prmon dictionary?
2570
2571 :param frequency:
2572 :return: return True once per 'frequency' times.
2573 """
2574 return randint(0, frequency - 1) == 0
2575
2576
2577 def update_server(job):
2578 """
2579 Perform any user specific server actions.
2580
2581 E.g. this can be used to send special information to a logstash.
2582
2583 :param job: job object.
2584 :return:
2585 """
2586
2587
2588 if not should_update_logstash():
2589 logger.debug('no need to update logstash for this job')
2590 return
2591
2592 path = os.path.join(job.workdir, get_memory_monitor_output_filename())
2593 if not os.path.exists(path):
2594 logger.warning('path does not exist: %s', path)
2595 return
2596
2597
2598
2599 metadata_dictionary = get_metadata_dict_from_txt(path, storejson=True, jobid=job.jobid)
2600 if metadata_dictionary:
2601
2602
2603 new_path = update_extension(path=path, extension='json')
2604
2605
2606
2607
2608 url = 'https://pilot.atlas-ml.org'
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619 cmd = (
2620 "curl --connect-timeout 20 --max-time 120 "
2621 "-H \"Content-Type: application/json\" "
2622 "-X POST "
2623 "--upload-file %s %s" % (new_path, url)
2624 )
2625
2626
2627
2628 try:
2629 _, stdout, stderr = execute(cmd, usecontainer=False)
2630 except Exception as exc:
2631 logger.warning('exception caught: %s', exc)
2632 else:
2633 logger.debug('sent prmon JSON dictionary to logstash server')
2634 logger.debug('stdout: %s', stdout)
2635 logger.debug('stderr: %s', stderr)
2636 else:
2637 msg = 'no prmon json available - cannot send anything to logstash server'
2638 logger.warning(msg)
2639
2640
2641 def preprocess_debug_command(job):
2642 """
2643 Pre-process the debug command in debug mode.
2644
2645 :param job: Job object.
2646 :return:
2647 """
2648
2649
2650 preparesetup = should_pilot_prepare_setup(job.noexecstrcnv, job.jobparams)
2651
2652 resource_name = get_resource_name()
2653
2654
2655 modname = 'pilot.user.atlas.resource.%s' % resource_name
2656 resource = __import__(modname, globals(), locals(), [resource_name], 0)
2657
2658 cmd = resource.get_setup_command(job, preparesetup)
2659 if not cmd.endswith(';'):
2660 cmd += '; '
2661 if cmd not in job.debug_command:
2662 job.debug_command = cmd + job.debug_command
2663
2664
2665 def process_debug_command(debug_command, pandaid):
2666 """
2667 In debug mode, the server can send a special debug command to the piloti
2668 via the updateJob backchannel. This function can be used to process that
2669 command, i.e. to identify a proper pid to debug (which is unknown
2670 to the server).
2671
2672 For gdb, the server might send a command with gdb option --pid %.
2673 The pilot need to replace the % with the proper pid. The default
2674 (hardcoded) process will be that of athena.py. The pilot will find the
2675 corresponding pid.
2676
2677 :param debug_command: debug command (string).
2678 :param pandaid: PanDA id (string).
2679 :return: updated debug command (string).
2680 """
2681
2682 if '--pid %' not in debug_command:
2683 return debug_command
2684
2685 pandaid_pid = None
2686
2687
2688
2689
2690
2691 cmd = 'ps axo pid,ppid,pgid,args'
2692 _, stdout, _ = execute(cmd)
2693 if stdout:
2694
2695
2696 dictionary = convert_ps_to_dict(stdout)
2697
2698
2699
2700 trimmed_dictionary = get_trimmed_dictionary(['PID', 'PPID'], dictionary)
2701
2702
2703 pandaid_pid = find_pid(pandaid, dictionary)
2704
2705
2706 pids = find_cmd_pids('athena.py', dictionary)
2707
2708
2709
2710 for pid in pids:
2711 try:
2712 child = is_child(pid, pandaid_pid, trimmed_dictionary)
2713 except RuntimeError as rte:
2714 logger.warning((
2715 'too many recursions: %s '
2716 '(cannot identify athena process)'), rte)
2717 else:
2718 if child:
2719 logger.info('pid=%d is a child process of the trf of this job', pid)
2720 debug_command = debug_command.replace('--pid %', '--pid %d' % pid)
2721 logger.info('updated debug command: %s', debug_command)
2722 break
2723 logger.info('pid=%d is not a child process of the trf of this job', pid)
2724
2725 if not pids or '--pid %' in debug_command:
2726 logger.debug('athena is not yet running (no corresponding pid)')
2727
2728
2729
2730 debug_command = ''
2731
2732 return debug_command
2733
2734
2735 def allow_timefloor(submitmode):
2736 """
2737 Should the timefloor mechanism (multi-jobs) be allowed for the given submit mode?
2738
2739 :param submitmode: submit mode (string).
2740 """
2741
2742 return True