Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017
0009 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0011 # - Wen Guan, wen.guan@cern.ch, 2018
0012 
0013 from __future__ import print_function  # Python 2
0014 
0015 import os
0016 import time
0017 import hashlib
0018 import random
0019 import socket
0020 import logging
0021 
0022 try:
0023     import Queue as queue  # noqa: N813
0024 #except ModuleNotFoundError:  # Python 3
0025 except Exception:
0026     import queue  # Python 3
0027 
0028 from json import dumps  #, loads
0029 from re import findall
0030 from glob import glob
0031 
0032 from pilot.common.errorcodes import ErrorCodes
0033 from pilot.common.exception import ExcThread, PilotException  #, JobAlreadyRunning
0034 from pilot.info import infosys, JobData, InfoService, JobInfoProvider
0035 from pilot.util import https
0036 from pilot.util.auxiliary import get_batchsystem_jobid, get_job_scheduler_id, get_pilot_id, \
0037     set_pilot_state, get_pilot_state, check_for_final_server_update, pilot_version_banner, is_virtual_machine, \
0038     is_python3, show_memory_usage, has_instruction_sets, locate_core_file, get_display_info
0039 from pilot.util.config import config
0040 from pilot.util.common import should_abort, was_pilot_killed
0041 from pilot.util.constants import PILOT_MULTIJOB_START_TIME, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_KILL_SIGNAL, LOG_TRANSFER_NOT_DONE, \
0042     LOG_TRANSFER_IN_PROGRESS, LOG_TRANSFER_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_TROUBLE, SERVER_UPDATE_FINAL, \
0043     SERVER_UPDATE_UPDATING, SERVER_UPDATE_NOT_DONE
0044 from pilot.util.container import execute
0045 from pilot.util.filehandling import find_text_files, tail, is_json, copy, remove, write_json, establish_logging, write_file, \
0046     create_symlink
0047 from pilot.util.harvester import request_new_jobs, remove_job_request_file, parse_job_definition_file, \
0048     is_harvester_mode, get_worker_attributes_file, publish_job_report, publish_work_report, get_event_status_file, \
0049     publish_stageout_files
0050 from pilot.util.jobmetrics import get_job_metrics
0051 from pilot.util.math import mean
0052 from pilot.util.middleware import containerise_general_command
0053 from pilot.util.monitoring import job_monitor_tasks, check_local_space
0054 from pilot.util.monitoringtime import MonitoringTime
0055 from pilot.util.processes import cleanup, threads_aborted, kill_process, kill_processes
0056 from pilot.util.proxy import get_distinguished_name
0057 from pilot.util.queuehandling import scan_for_jobs, put_in_queue, queue_report, purge_queue
0058 from pilot.util.timing import add_to_pilot_timing, timing_report, get_postgetjob_time, get_time_since, time_stamp
0059 from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, get_cpu_model
0060 
0061 logger = logging.getLogger(__name__)
0062 errors = ErrorCodes()
0063 
0064 
0065 def control(queues, traces, args):
0066     """
0067     Main function of job control.
0068 
0069     :param queues: internal queues for job handling.
0070     :param traces: tuple containing internal pilot states.
0071     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0072     :return:
0073     """
0074 
0075     targets = {'validate': validate, 'retrieve': retrieve, 'create_data_payload': create_data_payload,
0076                'queue_monitor': queue_monitor, 'job_monitor': job_monitor, 'fast_job_monitor': fast_job_monitor}
0077     threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'queues': queues, 'traces': traces, 'args': args},
0078                          name=name) for name, target in list(targets.items())]  # Python 2/3
0079 
0080     [thread.start() for thread in threads]
0081 
0082     # if an exception is thrown, the graceful_stop will be set by the ExcThread class run() function
0083     while not args.graceful_stop.is_set():
0084         for thread in threads:
0085             bucket = thread.get_bucket()
0086             try:
0087                 exc = bucket.get(block=False)
0088             except queue.Empty:
0089                 pass
0090             else:
0091                 exc_type, exc_obj, exc_trace = exc
0092                 logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
0093 
0094                 # deal with the exception
0095                 # ..
0096 
0097             thread.join(0.1)
0098             time.sleep(0.1)
0099 
0100         time.sleep(0.5)
0101 
0102     logger.debug('job control ending since graceful_stop has been set')
0103     if args.abort_job.is_set():
0104         if traces.pilot['command'] == 'aborting':
0105             logger.warning('jobs are aborting')
0106         elif traces.pilot['command'] == 'abort':
0107             logger.warning('job control detected a set abort_job (due to a kill signal)')
0108             traces.pilot['command'] = 'aborting'
0109 
0110             # find all running jobs and stop them, find all jobs in queues relevant to this module
0111             #abort_jobs_in_queues(queues, args.signal)
0112 
0113     # proceed to set the job_aborted flag?
0114     if threads_aborted():
0115         logger.debug('will proceed to set job_aborted')
0116         args.job_aborted.set()
0117     else:
0118         logger.debug('will not set job_aborted yet')
0119 
0120     logger.debug('[job] control thread has finished')
0121     # test kill signal during end of generic workflow
0122     #import signal
0123     #os.kill(os.getpid(), signal.SIGBUS)
0124 
0125 
0126 def _validate_job(job):
0127     """
0128     Verify job parameters for specific problems.
0129 
0130     :param job: job object.
0131     :return: Boolean.
0132     """
0133 
0134     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0135     user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0136     container = __import__('pilot.user.%s.container' % pilot_user, globals(), locals(), [user], 0)  # Python 2/3
0137 
0138     # should a container be used for the payload?
0139     try:
0140         kwargs = {'job': job}
0141         job.usecontainer = container.do_use_container(**kwargs)
0142     except Exception as error:
0143         logger.warning('exception caught: %s', error)
0144 
0145     return True if user.verify_job(job) else False
0146 
0147 
0148 def verify_error_code(job):
0149     """
0150     Make sure an error code is properly set.
0151     This makes sure that job.piloterrorcode is always set for a failed/holding job, that not only
0152     job.piloterrorcodes are set but not job.piloterrorcode. This function also negates the sign of the error code
0153     and sets job state 'holding' (instead of 'failed') if the error is found to be recoverable by a later job (user
0154     jobs only).
0155 
0156     :param job: job object.
0157     :return:
0158     """
0159 
0160     if job.piloterrorcode == 0 and len(job.piloterrorcodes) > 0:
0161         logger.warning('piloterrorcode set to first piloterrorcodes list entry: %s', str(job.piloterrorcodes))
0162         job.piloterrorcode = job.piloterrorcodes[0]
0163 
0164     if job.piloterrorcode != 0 and job.is_analysis():
0165         if errors.is_recoverable(code=job.piloterrorcode):
0166             job.piloterrorcode = -abs(job.piloterrorcode)
0167             job.state = 'failed'
0168             logger.info('failed user job is recoverable (error code=%s)', job.piloterrorcode)
0169         else:
0170             logger.info('failed user job is not recoverable')
0171     else:
0172         logger.info('verified error code')
0173 
0174 
0175 def get_proper_state(job, state):
0176     """
0177     Return a proper job state to send to server.
0178     This function should only return 'starting', 'running', 'finished', 'holding' or 'failed'.
0179     If the internal job.serverstate is not yet set, it means it is the first server update, ie 'starting' should be
0180     sent.
0181 
0182     :param job: job object.
0183     :param state: internal pilot state (string).
0184     :return: valid server state (string).
0185     """
0186 
0187     if job.serverstate == "finished" or job.serverstate == "failed":
0188         pass
0189     elif job.serverstate == "" and state != "finished" and state != "failed":
0190         job.serverstate = 'starting'
0191     elif state == "finished" or state == "failed" or state == "holding":
0192         job.serverstate = state
0193     else:
0194         job.serverstate = 'running'
0195 
0196     return job.serverstate
0197 
0198 
0199 def publish_harvester_reports(state, args, data, job, final):
0200     """
0201     Publish all reports needed by Harvester.
0202 
0203     :param state: job state (string).
0204     :param args: pilot args object.
0205     :param data: data structure for server update (dictionary).
0206     :param job: job object.
0207     :param final: is this the final update? (Boolean).
0208     :return: True if successful, False otherwise (Boolean).
0209     """
0210 
0211     # write part of the heartbeat message to worker attributes files needed by Harvester
0212     path = get_worker_attributes_file(args)
0213 
0214     # add jobStatus (state) for Harvester
0215     data['jobStatus'] = state
0216 
0217     # publish work report
0218     if not publish_work_report(data, path):
0219         logger.debug('failed to write to workerAttributesFile %s', path)
0220         return False
0221 
0222     # check if we are in final state then write out information for output files
0223     if final:
0224         # Use the job information to write Harvester event_status.dump file
0225         event_status_file = get_event_status_file(args)
0226         if publish_stageout_files(job, event_status_file):
0227             logger.debug('wrote log and output files to file %s', event_status_file)
0228         else:
0229             logger.warning('could not write log and output files to file %s', event_status_file)
0230             return False
0231 
0232         # publish job report
0233         _path = os.path.join(job.workdir, config.Payload.jobreport)
0234         if os.path.exists(_path):
0235             if publish_job_report(job, args, config.Payload.jobreport):
0236                 logger.debug('wrote job report file')
0237                 return True
0238             else:
0239                 logger.warning('failed to write job report file')
0240                 return False
0241     else:
0242         logger.info('finished writing various report files in Harvester mode')
0243 
0244     return True
0245 
0246 
0247 def write_heartbeat_to_file(data):
0248     """
0249     Write heartbeat dictionary to file.
0250     This is only done when server updates are not wanted.
0251 
0252     :param data: server data (dictionary).
0253     :return: True if successful, False otherwise (Boolean).
0254     """
0255 
0256     path = os.path.join(os.environ.get('PILOT_HOME'), config.Pilot.heartbeat_message)
0257     if write_json(path, data):
0258         logger.debug('heartbeat dictionary: %s', data)
0259         logger.debug('wrote heartbeat to file %s', path)
0260         return True
0261     else:
0262         return False
0263 
0264 
0265 def send_state(job, args, state, xml=None, metadata=None, test_tobekilled=False):
0266     """
0267     Update the server (send heartbeat message).
0268     Interpret and handle any server instructions arriving with the updateJob back channel.
0269 
0270     :param job: job object.
0271     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0272     :param state: job state (string).
0273     :param xml: optional metadata xml (string).
0274     :param metadata: job report metadata read as a string.
0275     :param test_tobekilled: emulate a tobekilled command (boolean).
0276     :return: boolean (True if successful, False otherwise).
0277     """
0278 
0279     state = get_proper_state(job, state)
0280 
0281     # should the pilot make any server updates?
0282     if not args.update_server:
0283         logger.info('pilot will not update the server (heartbeat message will be written to file)')
0284     tag = 'sending' if args.update_server else 'writing'
0285 
0286     if state == 'finished' or state == 'failed' or state == 'holding':
0287         final = True
0288         os.environ['SERVER_UPDATE'] = SERVER_UPDATE_UPDATING
0289         logger.info('job %s has %s - %s final server update', job.jobid, state, tag)
0290 
0291         # make sure that job.state is 'failed' if there's a set error code
0292         if job.piloterrorcode or job.piloterrorcodes:
0293             logger.warning('making sure that job.state is set to failed since a pilot error code is set')
0294             state = 'failed'
0295             job.state = state
0296         # make sure an error code is properly set
0297         elif state != 'finished':
0298             verify_error_code(job)
0299     else:
0300         final = False
0301         logger.info('job %s has state \'%s\' - %s heartbeat', job.jobid, state, tag)
0302 
0303     # build the data structure needed for getJob, updateJob
0304     data = get_data_structure(job, state, args, xml=xml, metadata=metadata)
0305 
0306     # write the heartbeat message to file if the server is not to be updated by the pilot (Nordugrid mode)
0307     if not args.update_server:
0308         logger.debug('is_harvester_mode(args) : {0}'.format(is_harvester_mode(args)))
0309         # if in harvester mode write to files required by harvester
0310         if is_harvester_mode(args):
0311             return publish_harvester_reports(state, args, data, job, final)
0312         else:
0313             # store the file in the main workdir
0314             return write_heartbeat_to_file(data)
0315 
0316     try:
0317         if config.Pilot.pandajob == 'real':
0318             time_before = int(time.time())
0319             max_attempts = 10
0320             attempt = 0
0321             done = False
0322             while attempt < max_attempts and not done:
0323                 logger.info('job update attempt %d/%d', attempt + 1, max_attempts)
0324 
0325                 # get the URL for the PanDA server from pilot options or from config
0326                 pandaserver = get_panda_server(args.url, args.port)
0327 
0328                 res = https.request('{pandaserver}/server/panda/updateJob'.format(pandaserver=pandaserver), data=data)
0329                 if res is not None:
0330                     done = True
0331                 attempt += 1
0332 
0333             time_after = int(time.time())
0334             logger.info('server updateJob request completed in %ds for job %s', time_after - time_before, job.jobid)
0335             logger.info("server responded with: res = %s", str(res))
0336 
0337             show_memory_usage()
0338 
0339             if res is not None:
0340                 # does the server update contain any backchannel information? if so, update the job object
0341                 handle_backchannel_command(res, job, args, test_tobekilled=test_tobekilled)
0342 
0343                 if final:
0344                     os.environ['SERVER_UPDATE'] = SERVER_UPDATE_FINAL
0345                     logger.debug('set SERVER_UPDATE=SERVER_UPDATE_FINAL')
0346                 return True
0347         else:
0348             logger.info('skipping job update for fake test job')
0349             return True
0350 
0351     except Exception as error:
0352         logger.warning('exception caught while sending https request: %s', error)
0353         logger.warning('possibly offending data: %s', data)
0354 
0355     if final:
0356         os.environ['SERVER_UPDATE'] = SERVER_UPDATE_TROUBLE
0357         logger.debug('set SERVER_UPDATE=SERVER_UPDATE_TROUBLE')
0358 
0359     return False
0360 
0361 
0362 def get_job_status_from_server(job_id, url, port):
0363     """
0364     Return the current status of job <jobId> from the dispatcher.
0365     typical dispatcher response: 'status=finished&StatusCode=0'
0366     StatusCode  0: succeeded
0367                10: time-out
0368                20: general error
0369                30: failed
0370     In the case of time-out, the dispatcher will be asked one more time after 10 s.
0371 
0372     :param job_id: PanDA job id (int).
0373     :param url: PanDA server URL (string).
0374     :param port: PanDA server port (int).
0375     :return: status (string; e.g. holding), attempt_nr (int), status_code (int)
0376     """
0377 
0378     status = 'unknown'
0379     attempt_nr = 0
0380     status_code = 0
0381     if config.Pilot.pandajob == 'fake':
0382         return status, attempt_nr, status_code
0383 
0384     data = {}
0385     data['ids'] = job_id
0386 
0387     # get the URL for the PanDA server from pilot options or from config
0388     pandaserver = get_panda_server(url, port)
0389 
0390     # ask dispatcher about lost job status
0391     trial = 1
0392     max_trials = 2
0393 
0394     while trial <= max_trials:
0395         try:
0396             # open connection
0397             ret = https.request('{pandaserver}/server/panda/getStatus'.format(pandaserver=pandaserver), data=data)
0398             response = ret[1]
0399             logger.info("response: %s", str(response))
0400             if response:
0401                 try:
0402                     # decode the response
0403                     # eg. var = ['status=notfound', 'attemptNr=0', 'StatusCode=0']
0404                     # = response
0405 
0406                     status = response['status']  # e.g. 'holding'
0407                     attempt_nr = int(response['attemptNr'])  # e.g. '0'
0408                     status_code = int(response['StatusCode'])  # e.g. '0'
0409                 except Exception as error:
0410                     logger.warning(
0411                         "exception: dispatcher did not return allowed values: %s, %s", str(ret), error)
0412                     status = "unknown"
0413                     attempt_nr = -1
0414                     status_code = 20
0415                 else:
0416                     logger.debug('server job status=%s, attempt_nr=%d, status_code=%d', status, attempt_nr, status_code)
0417             else:
0418                 logger.warning("dispatcher did not return allowed values: %s", str(ret))
0419                 status = "unknown"
0420                 attempt_nr = -1
0421                 status_code = 20
0422         except Exception as error:
0423             logger.warning("could not interpret job status from dispatcher: %s", error)
0424             status = 'unknown'
0425             attempt_nr = -1
0426             status_code = -1
0427             break
0428         else:
0429             if status_code == 0:  # success
0430                 break
0431             elif status_code == 10:  # time-out
0432                 trial += 1
0433                 time.sleep(10)
0434                 continue
0435             elif status_code == 20:  # other error
0436                 if ret[0] == 13056 or ret[0] == '13056':
0437                     logger.warning("wrong certificate used with curl operation? (encountered error 13056)")
0438                 break
0439             else:  # general error
0440                 break
0441 
0442     return status, attempt_nr, status_code
0443 
0444 
0445 def get_panda_server(url, port):
0446     """
0447     Get the URL for the PanDA server.
0448 
0449     :param url: URL string, if set in pilot option (port not included).
0450     :param port: port number, if set in pilot option (int).
0451     :return: full URL (either from pilot options or from config file)
0452     """
0453 
0454     if url.startswith('https://'):
0455         url = url.replace('https://', '')
0456 
0457     if url != '' and port != 0:
0458         pandaserver = '%s:%s' % (url, port) if ":" not in url else url
0459     else:
0460         pandaserver = config.Pilot.pandaserver
0461 
0462     if not pandaserver.startswith('http'):
0463         pandaserver = 'https://' + pandaserver
0464 
0465     # add randomization for PanDA server
0466     default = 'pandaserver.cern.ch'
0467     if default in pandaserver:
0468         rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])])
0469         pandaserver = pandaserver.replace(default, rnd)
0470         logger.debug('updated %s to %s', default, pandaserver)
0471 
0472     return pandaserver
0473 
0474 
0475 def get_debug_command(cmd):
0476     """
0477     Identify and filter the given debug command.
0478 
0479     Note: only a single command will be allowed from a predefined list: tail, ls, gdb, ps, du.
0480 
0481     :param cmd: raw debug command from job definition (string).
0482     :return: debug_mode (Boolean, True if command is deemed ok), debug_command (string).
0483     """
0484 
0485     debug_mode = False
0486     debug_command = ""
0487 
0488     allowed_commands = ['tail', 'ls', 'ps', 'gdb', 'du']
0489     forbidden_commands = ['rm']
0490 
0491     # remove any 'debug,' command that the server might send redundantly
0492     if ',' in cmd and 'debug' in cmd:
0493         cmd = cmd.replace('debug,', '').replace(',debug', '')
0494     try:
0495         tmp = cmd.split(' ')
0496         com = tmp[0]
0497     except Exception as error:
0498         logger.warning('failed to identify debug command: %s', error)
0499     else:
0500         if com not in allowed_commands:
0501             logger.warning('command=%s is not in the list of allowed commands: %s', com, str(allowed_commands))
0502         elif ';' in cmd or '&#59' in cmd:
0503             logger.warning('debug command cannot contain \';\': \'%s\'', cmd)
0504         elif com in forbidden_commands:
0505             logger.warning('command=%s is not allowed', com)
0506         else:
0507             debug_mode = True
0508             debug_command = cmd
0509     return debug_mode, debug_command
0510 
0511 
0512 def handle_backchannel_command(res, job, args, test_tobekilled=False):
0513     """
0514     Does the server update contain any backchannel information? if so, update the job object.
0515 
0516     :param res: server response (dictionary).
0517     :param job: job object.
0518     :param args: pilot args object.
0519     :param test_tobekilled: emulate a tobekilled command (boolean).
0520     :return:
0521     """
0522 
0523     if test_tobekilled:
0524         logger.info('faking a \'tobekilled\' command')
0525         res['command'] = 'tobekilled'
0526 
0527     if 'command' in res and res.get('command') != 'NULL':
0528         # warning: server might return comma-separated string, 'debug,tobekilled'
0529         cmd = res.get('command')
0530         # is it a 'command options'-type? debug_command=tail .., ls .., gdb .., ps .., du ..
0531         if ' ' in cmd and 'tobekilled' not in cmd:
0532             try:
0533                 job.debug, job.debug_command = get_debug_command(cmd)
0534             except Exception as error:
0535                 logger.debug('exception caught in get_debug_command(): %s', error)
0536         elif 'tobekilled' in cmd:
0537             logger.info('pilot received a panda server signal to kill job %s at %s', job.jobid, time_stamp())
0538             set_pilot_state(job=job, state="failed")
0539             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PANDAKILL)
0540             if job.pid:
0541                 logger.debug('killing payload process')
0542                 kill_process(job.pid)
0543             else:
0544                 logger.debug('no pid to kill')
0545             args.abort_job.set()
0546         elif 'softkill' in cmd:
0547             logger.info('pilot received a panda server signal to softkill job %s at %s', job.jobid, time_stamp())
0548             # event service kill instruction
0549             job.debug_command = 'softkill'
0550         elif 'debug' in cmd:
0551             logger.info('pilot received a command to turn on standard debug mode from the server')
0552             job.debug = True
0553             job.debug_command = 'debug'
0554         elif 'debugoff' in cmd:
0555             logger.info('pilot received a command to turn off debug mode from the server')
0556             job.debug = False
0557             job.debug_command = 'debugoff'
0558         else:
0559             logger.warning('received unknown server command via backchannel: %s', cmd)
0560 
0561     # for testing debug mode
0562     # job.debug = True
0563     # job.debug_command = 'du -sk'
0564     # job.debug_command = 'tail -30 payload.stdout'
0565     # job.debug_command = 'ls -ltr workDir'  # not really tested
0566     # job.debug_command = 'ls -ltr %s' % job.workdir
0567     # job.debug_command = 'ps -ef'
0568     # job.debug_command = 'ps axo pid,ppid,pgid,args'
0569     # job.debug_command = 'gdb --pid % -ex \'generate-core-file\''
0570 
0571 
0572 def add_data_structure_ids(data, version_tag):
0573     """
0574     Add pilot, batch and scheduler ids to the data structure for getJob, updateJob.
0575 
0576     :param data: data structure (dict).
0577     :return: updated data structure (dict).
0578     """
0579 
0580     schedulerid = get_job_scheduler_id()
0581     if schedulerid:
0582         data['schedulerID'] = schedulerid
0583 
0584     pilotid = get_pilot_id()
0585     if pilotid:
0586         pilotversion = os.environ.get('PILOT_VERSION')
0587 
0588         # report the batch system job id, if available
0589         batchsystem_type, batchsystem_id = get_batchsystem_jobid()
0590 
0591         if batchsystem_type:
0592             data['pilotID'] = "%s|%s|%s|%s" % (pilotid, batchsystem_type, version_tag, pilotversion)
0593             data['batchID'] = batchsystem_id
0594         else:
0595             data['pilotID'] = "%s|%s|%s" % (pilotid, version_tag, pilotversion)
0596 
0597     return data
0598 
0599 
0600 def get_data_structure(job, state, args, xml=None, metadata=None):
0601     """
0602     Build the data structure needed for getJob, updateJob.
0603 
0604     :param job: job object.
0605     :param state: state of the job (string).
0606     :param args:
0607     :param xml: optional XML string.
0608     :param metadata: job report metadata read as a string.
0609     :return: data structure (dictionary).
0610     """
0611 
0612     data = {'jobId': job.jobid,
0613             'state': state,
0614             'timestamp': time_stamp(),
0615             'siteName': os.environ.get('PILOT_SITENAME'),  # args.site,
0616             'node': get_node_name(),
0617             'attemptNr': job.attemptnr}
0618 
0619     # add pilot, batch and scheduler ids to the data structure
0620     data = add_data_structure_ids(data, args.version_tag)
0621 
0622     starttime = get_postgetjob_time(job.jobid, args)
0623     if starttime:
0624         data['startTime'] = starttime
0625 
0626     job_metrics = get_job_metrics(job)
0627     if job_metrics:
0628         data['jobMetrics'] = job_metrics
0629 
0630     if xml is not None:
0631         data['xml'] = xml
0632     if metadata is not None:
0633         data['metaData'] = metadata
0634 
0635     # in debug mode, also send a tail of the latest log file touched by the payload
0636     if job.debug:
0637         data['stdout'] = process_debug_mode(job)
0638 
0639     # add the core count
0640     if job.corecount and job.corecount != 'null' and job.corecount != 'NULL':
0641         data['coreCount'] = job.corecount
0642         #data['coreCount'] = mean(job.corecounts) if job.corecounts else job.corecount
0643     if job.corecounts:
0644         _mean = mean(job.corecounts)
0645         logger.info('mean actualcorecount: %f', _mean)
0646         data['meanCoreCount'] = _mean
0647 
0648     # get the number of events, should report in heartbeat in case of preempted.
0649     if job.nevents != 0:
0650         data['nEvents'] = job.nevents
0651         logger.info("total number of processed events: %d (read)", job.nevents)
0652     else:
0653         logger.info("payload/TRF did not report the number of read events")
0654 
0655     # get the CU consumption time
0656     constime = get_cpu_consumption_time(job.cpuconsumptiontime)
0657     if constime and constime != -1:
0658         data['cpuConsumptionTime'] = constime
0659         data['cpuConversionFactor'] = job.cpuconversionfactor
0660     data['cpuConsumptionUnit'] = job.cpuconsumptionunit + "+" + get_cpu_model()
0661 
0662     instruction_sets = has_instruction_sets(['AVX2'])
0663     product, vendor = get_display_info()
0664     if instruction_sets:
0665         if 'cpuConsumptionUnit' in data:
0666             data['cpuConsumptionUnit'] += '+' + instruction_sets
0667         else:
0668             data['cpuConsumptionUnit'] = instruction_sets
0669         if product and vendor:
0670             logger.debug('cpuConsumptionUnit: could have added: product=%s, vendor=%s', product, vendor)
0671 
0672     # add memory information if available
0673     add_memory_info(data, job.workdir, name=job.memorymonitor)
0674     if state == 'finished' or state == 'failed':
0675         add_timing_and_extracts(data, job, state, args)
0676         add_error_codes(data, job)
0677 
0678     return data
0679 
0680 
0681 def process_debug_mode(job):
0682     """
0683     Handle debug mode - preprocess debug command, get the output and kill the payload in case of gdb.
0684 
0685     :param job: job object.
0686     :return: stdout from debug command (string).
0687     """
0688 
0689     # for gdb commands, use the proper gdb version (the system one may be too old)
0690     if job.debug_command.startswith('gdb '):
0691         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0692         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0693         user.preprocess_debug_command(job)
0694 
0695     stdout = get_debug_stdout(job)
0696     if stdout:
0697         # in case gdb was successfully used, the payload can now be killed
0698         if job.debug_command.startswith('gdb ') and job.pid:
0699             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PANDAKILL,
0700                                                                              msg='payload was killed after gdb produced requested core file')
0701             logger.debug('will proceed to kill payload processes')
0702             kill_processes(job.pid)
0703 
0704     return stdout
0705 
0706 
0707 def get_debug_stdout(job):
0708     """
0709     Return the requested output from a given debug command.
0710 
0711     :param job: job object.
0712     :return: output (string).
0713     """
0714 
0715     if job.debug_command == 'debug':
0716         return get_payload_log_tail(job.workdir)
0717     elif 'tail ' in job.debug_command:
0718         return get_requested_log_tail(job.debug_command, job.workdir)
0719     elif 'ls ' in job.debug_command:
0720         return get_ls(job.debug_command, job.workdir)
0721     elif 'ps ' in job.debug_command or 'gdb ' in job.debug_command:
0722         return get_general_command_stdout(job)
0723     else:
0724         # general command, execute and return output
0725         _, stdout, _ = execute(job.debug_command)
0726         logger.info('debug_command: %s:\n\n%s\n', job.debug_command, stdout)
0727         return stdout
0728 
0729 
0730 def get_general_command_stdout(job):
0731     """
0732     Return the output from the requested debug command.
0733 
0734     :param job: job object.
0735     :return: output (string).
0736     """
0737 
0738     stdout = ''
0739 
0740     # for gdb, we might have to process the debug command (e.g. to identify the proper pid to debug)
0741     if 'gdb ' in job.debug_command and '--pid %' in job.debug_command:
0742         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0743         user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0744         job.debug_command = user.process_debug_command(job.debug_command, job.jobid)
0745 
0746     if job.debug_command:
0747         _containerisation = False  # set this with some logic instead - not used for now
0748         if _containerisation:
0749             try:
0750                 containerise_general_command(job, job.infosys.queuedata.container_options,
0751                                              label='general',
0752                                              container_type='container')
0753             except PilotException as error:
0754                 logger.warning('general containerisation threw a pilot exception: %s', error)
0755             except Exception as error:
0756                 logger.warning('general containerisation threw an exception: %s', error)
0757         else:
0758             _, stdout, stderr = execute(job.debug_command)
0759             logger.debug("%s (stdout):\n\n%s\n\n", job.debug_command, stdout)
0760             logger.debug("%s (stderr):\n\n%s\n\n", job.debug_command, stderr)
0761 
0762         # in case a core file was produced, locate it
0763         path = locate_core_file(cmd=job.debug_command) if 'gdb ' in job.debug_command else ''
0764         if path:
0765             # copy it to the working directory (so it will be saved in the log)
0766             try:
0767                 copy(path, job.workdir)
0768             except Exception:
0769                 pass
0770 
0771     return stdout
0772 
0773 
0774 def get_ls(debug_command, workdir):
0775     """
0776     Return the requested ls debug command.
0777 
0778     :param debug_command: full debug command (string).
0779     :param workdir: job work directory (string).
0780     :return: output (string).
0781     """
0782 
0783     items = debug_command.split(' ')
0784     # cmd = items[0]
0785     options = ' '.join(items[1:])
0786     path = options.split(' ')[-1] if ' ' in options else options
0787     if path.startswith('-'):
0788         path = '.'
0789     finalpath = os.path.join(workdir, path)
0790     debug_command = debug_command.replace(path, finalpath)
0791 
0792     _, stdout, _ = execute(debug_command)
0793     logger.debug("%s:\n\n%s\n\n", debug_command, stdout)
0794 
0795     return stdout
0796 
0797 
0798 def get_requested_log_tail(debug_command, workdir):
0799     """
0800     Return the tail of the requested debug log.
0801 
0802     Examples
0803       tail workdir/tmp.stdout* <- pilot finds the requested log file in the specified relative path
0804       tail log.RAWtoALL <- pilot finds the requested log file
0805 
0806     :param debug_command: full debug command (string).
0807     :param workdir: job work directory (string).
0808     :return: output (string).
0809     """
0810 
0811     _tail = ""
0812     items = debug_command.split(' ')
0813     cmd = items[0]
0814     options = ' '.join(items[1:])
0815     logger.debug('debug command: %s', cmd)
0816     logger.debug('debug options: %s', options)
0817 
0818     # assume that the path is the last of the options; <some option> <some path>
0819     path = options.split(' ')[-1] if ' ' in options else options
0820     fullpath = os.path.join(workdir, path)
0821 
0822     # find all files with the given pattern and pick the latest updated file (if several)
0823     files = glob(fullpath)
0824     if files:
0825         logger.info('files found: %s', str(files))
0826         _tail = get_latest_log_tail(files)
0827     else:
0828         logger.warning('did not find \'%s\' in path %s', path, fullpath)
0829 
0830     if _tail:
0831         logger.debug('tail =\n\n%s\n\n', _tail)
0832 
0833     return _tail
0834 
0835 
0836 def add_error_codes(data, job):
0837     """
0838     Add error codes to data structure.
0839 
0840     :param data: data dictionary.
0841     :param job: job object.
0842     :return:
0843     """
0844 
0845     # error codes
0846     pilot_error_code = job.piloterrorcode
0847     pilot_error_codes = job.piloterrorcodes
0848     if pilot_error_codes != []:
0849         logger.warning('pilotErrorCodes = %s (will report primary/first error code)', str(pilot_error_codes))
0850         data['pilotErrorCode'] = pilot_error_codes[0]
0851     else:
0852         data['pilotErrorCode'] = pilot_error_code
0853 
0854     # add error info
0855     pilot_error_diag = job.piloterrordiag
0856     pilot_error_diags = job.piloterrordiags
0857     if pilot_error_diags != []:
0858         logger.warning('pilotErrorDiags = %s (will report primary/first error diag)', str(pilot_error_diags))
0859         data['pilotErrorDiag'] = pilot_error_diags[0]
0860     else:
0861         data['pilotErrorDiag'] = pilot_error_diag
0862     data['transExitCode'] = job.transexitcode
0863     data['exeErrorCode'] = job.exeerrorcode
0864     data['exeErrorDiag'] = job.exeerrordiag
0865 
0866 
0867 def get_cpu_consumption_time(cpuconsumptiontime):
0868     """
0869     Get the CPU consumption time.
0870     The function makes sure that the value exists and is within allowed limits (< 10^9).
0871 
0872     :param cpuconsumptiontime: CPU consumption time (int/None).
0873     :return: properly set CPU consumption time (int/None).
0874     """
0875 
0876     constime = None
0877 
0878     try:
0879         constime = int(cpuconsumptiontime)
0880     except Exception:
0881         constime = None
0882     if constime and constime > 10 ** 9:
0883         logger.warning("unrealistic cpuconsumptiontime: %d (reset to -1)", constime)
0884         constime = -1
0885 
0886     return constime
0887 
0888 
0889 def add_timing_and_extracts(data, job, state, args):
0890     """
0891     Add timing info and log extracts to data structure for a completed job (finished or failed) to be sent to server.
0892     Note: this function updates the data dictionary.
0893 
0894     :param data: data structure (dictionary).
0895     :param job: job object.
0896     :param state: state of the job (string).
0897     :param args: pilot args.
0898     :return:
0899     """
0900 
0901     time_getjob, time_stagein, time_payload, time_stageout, time_total_setup = timing_report(job.jobid, args)
0902     data['pilotTiming'] = "%s|%s|%s|%s|%s" % \
0903                           (time_getjob, time_stagein, time_payload, time_stageout, time_total_setup)
0904 
0905     # add log extracts (for failed/holding jobs or for jobs with outbound connections)
0906     extracts = ""
0907     if state == 'failed' or state == 'holding':
0908         pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0909         user = __import__('pilot.user.%s.diagnose' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0910         extracts = user.get_log_extracts(job, state)
0911         if extracts != "":
0912             logger.warning('\nXXXXXXXXXXXXXXXXXXXXX[begin log extracts]\n%s\nXXXXXXXXXXXXXXXXXXXXX[end log extracts]', extracts)
0913     data['pilotLog'] = extracts[:1024]
0914     data['endTime'] = time.time()
0915 
0916 
0917 def add_memory_info(data, workdir, name=""):
0918     """
0919     Add memory information (if available) to the data structure that will be sent to the server with job updates
0920     Note: this function updates the data dictionary.
0921 
0922     :param data: data structure (dictionary).
0923     :param workdir: working directory of the job (string).
0924     :param name: name of memory monitor (string).
0925     :return:
0926     """
0927 
0928     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0929     utilities = __import__('pilot.user.%s.utilities' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0930     try:
0931         utility_node = utilities.get_memory_monitor_info(workdir, name=name)
0932         data.update(utility_node)
0933     except Exception as error:
0934         logger.info('memory information not available: %s', error)
0935 
0936 
0937 def remove_pilot_logs_from_list(list_of_files):
0938     """
0939     Remove any pilot logs from the list of last updated files.
0940 
0941     :param list_of_files: list of last updated files (list).
0942     :return: list of files (list).
0943     """
0944 
0945     # note: better to move experiment specific files to user area
0946 
0947     # ignore the pilot log files
0948     try:
0949         to_be_removed = [config.Pilot.pilotlog, config.Pilot.stageinlog, config.Pilot.stageoutlog,
0950                          config.Pilot.timing_file, config.Pilot.remotefileverification_dictionary,
0951                          config.Pilot.remotefileverification_log, config.Pilot.base_trace_report,
0952                          config.Container.container_script, config.Container.release_setup,
0953                          config.Container.stagein_status_dictionary, config.Container.stagein_replica_dictionary,
0954                          'eventLoopHeartBeat.txt', 'memory_monitor_output.txt', 'memory_monitor_summary.json_snapshot']
0955     except Exception as error:
0956         logger.warning('exception caught: %s', error)
0957         to_be_removed = []
0958 
0959     new_list_of_files = []
0960     for filename in list_of_files:
0961         if os.path.basename(filename) not in to_be_removed and '/pilot/' not in filename and 'prmon' not in filename:
0962             new_list_of_files.append(filename)
0963 
0964     return new_list_of_files
0965 
0966 
0967 def get_payload_log_tail(workdir):
0968     """
0969     Return the tail of the payload stdout or its latest updated log file.
0970 
0971     :param workdir: job work directory (string).
0972     :return: tail of stdout (string).
0973     """
0974 
0975     # find the latest updated log file
0976     # list_of_files = get_list_of_log_files()
0977     # find the latest updated text file
0978     list_of_files = find_text_files()
0979     list_of_files = remove_pilot_logs_from_list(list_of_files)
0980 
0981     if not list_of_files:
0982         logger.info('no log files were found (will use default %s)', config.Payload.payloadstdout)
0983         list_of_files = [os.path.join(workdir, config.Payload.payloadstdout)]
0984 
0985     return get_latest_log_tail(list_of_files)
0986 
0987 
0988 def get_latest_log_tail(files):
0989     """
0990     Get the tail of the latest updated file from the given file list.
0991 
0992     :param files: files (list).
0993     """
0994 
0995     stdout_tail = ""
0996 
0997     try:
0998         latest_file = max(files, key=os.path.getmtime)
0999         logger.info('tail of file %s will be added to heartbeat', latest_file)
1000 
1001         # now get the tail of the found log file and protect against potentially large tails
1002         stdout_tail = latest_file + "\n" + tail(latest_file)
1003         stdout_tail = stdout_tail[-2048:]
1004     except OSError as exc:
1005         logger.warning('failed to get payload stdout tail: %s', exc)
1006 
1007     return stdout_tail
1008 
1009 
1010 def validate(queues, traces, args):
1011     """
1012     Perform validation of job.
1013 
1014     :param queues: queues object.
1015     :param traces: traces object.
1016     :param args: args object.
1017     :return:
1018     """
1019 
1020     while not args.graceful_stop.is_set():
1021         time.sleep(0.5)
1022         try:
1023             job = queues.jobs.get(block=True, timeout=1)
1024         except queue.Empty:
1025             continue
1026 
1027         traces.pilot['nr_jobs'] += 1
1028 
1029         # set the environmental variable for the task id
1030         os.environ['PanDA_TaskID'] = str(job.taskid)
1031         logger.info('processing PanDA job %s from task %s', job.jobid, job.taskid)
1032 
1033         if _validate_job(job):
1034 
1035             # Define a new parent group
1036             os.setpgrp()
1037 
1038             job_dir = os.path.join(args.mainworkdir, 'PanDA_Pilot-%s' % job.jobid)
1039             logger.debug('creating job working directory: %s', job_dir)
1040             try:
1041                 os.mkdir(job_dir)
1042                 os.chmod(job_dir, 0o770)
1043                 job.workdir = job_dir
1044             except Exception as error:
1045                 logger.debug('cannot create working directory: %s', error)
1046                 traces.pilot['error_code'] = errors.MKDIR
1047                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(traces.pilot['error_code'])
1048                 job.piloterrordiag = error
1049                 put_in_queue(job, queues.failed_jobs)
1050                 break
1051             else:
1052                 create_k8_link(job_dir)
1053 
1054 #            try:
1055 #                # stream the job object to file
1056 #                job_dict = job.to_json()
1057 #                write_json(os.path.join(job.workdir, 'job.json'), job_dict)
1058 #            except Exception as error:
1059 #                logger.debug('exception caught: %s', error)
1060 #            else:
1061 #                try:
1062 #                    _job_dict = read_json(os.path.join(job.workdir, 'job.json'))
1063 #                    job_dict = loads(_job_dict)
1064 #                    _job = JobData(job_dict, use_kmap=False)
1065 #                except Exception as error:
1066 #                    logger.warning('exception caught: %s', error)
1067 
1068             create_symlink(from_path='../%s' % config.Pilot.pilotlog, to_path=os.path.join(job_dir, config.Pilot.pilotlog))
1069 
1070             # pre-cleanup
1071             pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
1072             utilities = __import__('pilot.user.%s.utilities' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
1073             try:
1074                 utilities.precleanup()
1075             except Exception as error:
1076                 logger.warning('exception caught: %s', error)
1077 
1078             # store the PanDA job id for the wrapper to pick up
1079             store_jobid(job.jobid, args.sourcedir)
1080 
1081             # run the delayed space check now
1082             delayed_space_check(queues, traces, args, job)
1083 
1084             # make sure that ctypes is available (needed at the end by orphan killer)
1085             verify_ctypes(queues, job)
1086         else:
1087             logger.debug('Failed to validate job=%s', job.jobid)
1088             put_in_queue(job, queues.failed_jobs)
1089 
1090     # proceed to set the job_aborted flag?
1091     if threads_aborted():
1092         logger.debug('will proceed to set job_aborted')
1093         args.job_aborted.set()
1094     else:
1095         logger.debug('will not set job_aborted yet')
1096 
1097     logger.debug('[job] validate thread has finished')
1098 
1099 
1100 def verify_ctypes(queues, job):
1101     """
1102     Verify ctypes and make sure all subprocess are parented.
1103 
1104     :param queues: queues object.
1105     :param job: job object.
1106     :return:
1107     """
1108 
1109     try:
1110         import ctypes
1111     # except ModuleNotFoundError as error:  # Python 3
1112     except Exception as error:
1113         diagnostics = 'ctypes python module could not be imported: %s' % error
1114         logger.warning(diagnostics)
1115         #job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.NOCTYPES, msg=diagnostics)
1116         #logger.debug('Failed to validate job=%s', job.jobid)
1117         #put_in_queue(job, queues.failed_jobs)
1118     else:
1119         logger.debug('ctypes python module imported')
1120 
1121         # make sure all children are parented by the pilot
1122         # specifically, this will include any 'orphans', i.e. if the pilot kills all subprocesses at the end,
1123         # 'orphans' will be included (orphans seem like the wrong name)
1124         libc = ctypes.CDLL('libc.so.6')
1125         pr_set_child_subreaper = 36
1126         libc.prctl(pr_set_child_subreaper, 1)
1127         logger.debug('all child subprocesses will be parented')
1128 
1129 
1130 def delayed_space_check(queues, traces, args, job):
1131     """
1132     Run the delayed space check if necessary.
1133 
1134     :param queues: queues object.
1135     :param traces: traces object.
1136     :param args: args object.
1137     :param job: job object.
1138     :return:
1139     """
1140 
1141     proceed_with_local_space_check = True if (args.harvester_submitmode.lower() == 'push' and args.update_server) else False
1142     if proceed_with_local_space_check:
1143         logger.debug('pilot will now perform delayed space check')
1144         exit_code, diagnostics = check_local_space()
1145         if exit_code != 0:
1146             traces.pilot['error_code'] = errors.NOLOCALSPACE
1147             # set the corresponding error code
1148             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.NOLOCALSPACE, msg=diagnostics)
1149             logger.debug('Failed to validate job=%s', job.jobid)
1150             put_in_queue(job, queues.failed_jobs)
1151         else:
1152             put_in_queue(job, queues.validated_jobs)
1153     else:
1154         put_in_queue(job, queues.validated_jobs)
1155 
1156 
1157 def create_k8_link(job_dir):
1158     """
1159     Create a soft link to the payload workdir on Kubernetes if SHARED_DIR exists.
1160 
1161     :param job_dir: payload workdir (string).
1162     """
1163 
1164     shared_dir = os.environ.get('SHARED_DIR', None)
1165     if shared_dir:
1166         #create_symlink(from_path=os.path.join(shared_dir, 'payload_workdir'), to_path=job_dir)
1167         create_symlink(from_path=job_dir, to_path=os.path.join(shared_dir, 'payload_workdir'))
1168     else:
1169         logger.debug('will not create symlink in SHARED_DIR')
1170 
1171 
1172 def store_jobid(jobid, init_dir):
1173     """
1174     Store the PanDA job id in a file that can be picked up by the wrapper for other reporting.
1175 
1176     :param jobid: job id (int).
1177     :param init_dir: pilot init dir (string).
1178     :return:
1179     """
1180 
1181     try:
1182         path = os.path.join(os.path.join(init_dir, 'pilot2'), config.Pilot.jobid_file)
1183         path = path.replace('pilot2/pilot2', 'pilot2')  # dirty fix for bad paths
1184         mode = 'a' if os.path.exists(path) else 'w'
1185         logger.debug('path=%s  mode=%s', path, mode)
1186         write_file(path, "%s\n" % str(jobid), mode=mode, mute=False)
1187     except Exception as error:
1188         logger.warning('exception caught while trying to store job id: %s', error)
1189 
1190 
1191 def create_data_payload(queues, traces, args):
1192     """
1193     Get a Job object from the "validated_jobs" queue.
1194 
1195     If the job has defined input files, move the Job object to the "data_in" queue and put the internal pilot state to
1196     "stagein". In case there are no input files, place the Job object in the "finished_data_in" queue. For either case,
1197     the thread also places the Job object in the "payloads" queue (another thread will retrieve it and wait for any
1198     stage-in to finish).
1199 
1200     :param queues: internal queues for job handling.
1201     :param traces: tuple containing internal pilot states.
1202     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1203     :return:
1204     """
1205 
1206     while not args.graceful_stop.is_set():
1207         time.sleep(0.5)
1208         try:
1209             job = queues.validated_jobs.get(block=True, timeout=1)
1210         except queue.Empty:
1211             continue
1212 
1213         if job.indata:
1214             # if the job has input data, put the job object in the data_in queue which will trigger stage-in
1215             set_pilot_state(job=job, state='stagein')
1216             put_in_queue(job, queues.data_in)
1217 
1218         else:
1219             # if the job does not have any input data, then pretend that stage-in has finished and put the job
1220             # in the finished_data_in queue
1221             put_in_queue(job, queues.finished_data_in)
1222 
1223         put_in_queue(job, queues.payloads)
1224 
1225     # proceed to set the job_aborted flag?
1226     if threads_aborted():
1227         logger.debug('will proceed to set job_aborted')
1228         args.job_aborted.set()
1229     else:
1230         logger.debug('will not set job_aborted yet')
1231 
1232     logger.debug('[job] create_data_payload thread has finished')
1233 
1234 
1235 def get_task_id():
1236     """
1237     Return the task id for the current job.
1238     Note: currently the implementation uses an environmental variable to store this number (PanDA_TaskID).
1239 
1240     :return: task id (string). Returns empty string in case of error.
1241     """
1242 
1243     if "PanDA_TaskID" in os.environ:
1244         taskid = os.environ["PanDA_TaskID"]
1245     else:
1246         logger.warning('PanDA_TaskID not set in environment')
1247         taskid = ""
1248 
1249     return taskid
1250 
1251 
1252 def get_job_label(args):
1253     """
1254     Return a proper job label.
1255     The function returns a job label that corresponds to the actual pilot version, ie if the pilot is a development
1256     version (ptest or rc_test2) or production version (managed or user).
1257     Example: -i RC -> job_label = rc_test2.
1258     NOTE: it should be enough to only use the job label, -j rc_test2 (and not specify -i RC at all).
1259 
1260     :param args: pilot args object.
1261     :return: job_label (string).
1262     """
1263 
1264     # PQ status
1265     status = infosys.queuedata.status
1266 
1267     if args.version_tag == 'RC' and args.job_label == 'rc_test2':
1268         job_label = 'rc_test2'
1269     elif args.version_tag == 'RC' and args.job_label == 'ptest':
1270         job_label = args.job_label
1271     elif args.version_tag == 'RCM' and args.job_label == 'ptest':
1272         job_label = 'rcm_test2'
1273     elif args.version_tag == 'ALRB':
1274         job_label = 'rc_alrb'
1275     elif status == 'test' and args.job_label != 'ptest':
1276         logger.warning('PQ status set to test - will use job label / prodSourceLabel test')
1277         job_label = 'test'
1278     else:
1279         job_label = args.job_label
1280 
1281     return job_label
1282 
1283 
1284 def get_dispatcher_dictionary(args):
1285     """
1286     Return a dictionary with required fields for the dispatcher getJob operation.
1287 
1288     The dictionary should contain the following fields: siteName, computingElement (queue name),
1289     prodSourceLabel (e.g. user, test, ptest), diskSpace (available disk space for a job in MB),
1290     workingGroup, countryGroup, cpu (float), mem (float) and node (worker node name).
1291 
1292     workingGroup, countryGroup and allowOtherCountry
1293     we add a new pilot setting allowOtherCountry=True to be used in conjunction with countryGroup=us for
1294     US pilots. With these settings, the Panda server will produce the desired behaviour of dedicated X% of
1295     the resource exclusively (so long as jobs are available) to countryGroup=us jobs. When allowOtherCountry=false
1296     this maintains the behavior relied on by current users of the countryGroup mechanism -- to NOT allow
1297     the resource to be used outside the privileged group under any circumstances.
1298 
1299     :param args: arguments (e.g. containing queue name, queuedata dictionary, etc).
1300     :returns: dictionary prepared for the dispatcher getJob operation.
1301     """
1302 
1303     _diskspace = get_disk_space(infosys.queuedata)
1304 
1305     _mem, _cpu, _disk = collect_workernode_info()
1306 
1307     _nodename = get_node_name()
1308 
1309     # override for RC dev pilots
1310     job_label = get_job_label(args)
1311 
1312     data = {
1313         'siteName': infosys.queuedata.resource,  # next: remove redundant '-r' option of pilot.py
1314         'computingElement': args.queue,
1315         'prodSourceLabel': job_label,
1316         'diskSpace': _diskspace,
1317         'workingGroup': args.working_group,
1318         'cpu': _cpu,
1319         'mem': _mem,
1320         'node': _nodename
1321     }
1322 
1323     if args.jobtype != "":
1324         data['jobType'] = args.jobtype
1325 
1326     if args.allow_other_country != "":
1327         data['allowOtherCountry'] = args.allow_other_country
1328 
1329     if args.country_group != "":
1330         data['countryGroup'] = args.country_group
1331 
1332     if args.job_label == 'self':
1333         dn = get_distinguished_name()
1334         data['prodUserID'] = dn
1335 
1336     taskid = get_task_id()
1337     if taskid != "" and args.allow_same_user:
1338         data['taskID'] = taskid
1339         logger.info("will download a new job belonging to task id: %s", data['taskID'])
1340 
1341     if args.resource_type != "":
1342         data['resourceType'] = args.resource_type
1343 
1344     # add harvester fields
1345     if 'HARVESTER_ID' in os.environ:
1346         data['harvester_id'] = os.environ.get('HARVESTER_ID')
1347     if 'HARVESTER_WORKER_ID' in os.environ:
1348         data['worker_id'] = os.environ.get('HARVESTER_WORKER_ID')
1349 
1350 #    instruction_sets = has_instruction_sets(['AVX', 'AVX2'])
1351 #    if instruction_sets:
1352 #        data['cpuConsumptionUnit'] = instruction_sets
1353 
1354     return data
1355 
1356 
1357 def proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, max_getjob_requests, update_server, submitmode, harvester, verify_proxy, traces):
1358     """
1359     Can we proceed with getJob?
1360     We may not proceed if we have run out of time (timefloor limit), if the proxy is too short, if disk space is too
1361     small or if we have already proceed enough jobs.
1362 
1363     :param timefloor: timefloor limit (s) (int).
1364     :param starttime: start time of retrieve() (s) (int).
1365     :param jobnumber: number of downloaded jobs (int).
1366     :param getjob_requests: number of getjob requests (int).
1367     :param update_server: should pilot update server? (Boolean).
1368     :param submitmode: Harvester submit mode, PULL or PUSH (string).
1369     :param harvester: True if Harvester is used, False otherwise. Affects the max number of getjob reads (from file) (Boolean).
1370     :param verify_proxy: True if the proxy should be verified. False otherwise (Boolean).
1371     :param traces: traces object (to be able to propagate a proxy error all the way back to the wrapper).
1372     :return: True if pilot should proceed with getJob (Boolean).
1373     """
1374 
1375     # use for testing thread exceptions. the exception will be picked up by ExcThread run() and caught in job.control()
1376     # raise NoLocalSpace('testing exception from proceed_with_getjob')
1377 
1378     #timefloor = 600
1379     currenttime = time.time()
1380 
1381     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
1382     common = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
1383     if not common.allow_timefloor(submitmode):
1384         timefloor = 0
1385 
1386     # should the proxy be verified?
1387     if verify_proxy:
1388         userproxy = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
1389 
1390         # is the proxy still valid?
1391         exit_code, diagnostics = userproxy.verify_proxy()
1392         if traces.pilot['error_code'] == 0:  # careful so we don't overwrite another error code
1393             traces.pilot['error_code'] = exit_code
1394         if exit_code == errors.NOPROXY or exit_code == errors.NOVOMSPROXY:
1395             logger.warning(diagnostics)
1396             return False
1397 
1398     # is there enough local space to run a job?
1399     # note: do not run this test at this point if submit mode=PUSH and we are in truePilot mode on ARC
1400     # (available local space will in this case be checked after the job definition has been read from file, so the
1401     # pilot can report the error with a server update)
1402     proceed_with_local_space_check = False if (submitmode.lower() == 'push' and update_server) else True
1403     if proceed_with_local_space_check:
1404         exit_code, diagnostics = check_local_space()
1405         if exit_code != 0:
1406             traces.pilot['error_code'] = errors.NOLOCALSPACE
1407             return False
1408     else:
1409         logger.debug('pilot will delay local space check until after job definition has been read from file')
1410 
1411     maximum_getjob_requests = 60 if harvester else max_getjob_requests  # 1 s apart (if harvester)
1412     if getjob_requests > int(maximum_getjob_requests):
1413         logger.warning('reached maximum number of getjob requests (%s) -- will abort pilot', maximum_getjob_requests)
1414         # use singleton:
1415         # instruct the pilot to wrap up quickly
1416         os.environ['PILOT_WRAP_UP'] = 'QUICKLY'
1417         return False
1418 
1419     if timefloor == 0 and jobnumber > 0:
1420         logger.warning("since timefloor is set to 0, pilot was only allowed to run one job")
1421         # use singleton:
1422         # instruct the pilot to wrap up quickly
1423         os.environ['PILOT_WRAP_UP'] = 'QUICKLY'
1424         return False
1425 
1426     if (currenttime - starttime > timefloor) and jobnumber > 0:
1427         logger.warning("the pilot has run out of time (timefloor=%d has been passed)", timefloor)
1428         # use singleton:
1429         # instruct the pilot to wrap up quickly
1430         os.environ['PILOT_WRAP_UP'] = 'QUICKLY'
1431         return False
1432 
1433     # timefloor not relevant for the first job
1434     if jobnumber > 0:
1435         logger.info('since timefloor=%d s and only %d s has passed since launch, pilot can run another job', timefloor, currenttime - starttime)
1436 
1437     if harvester and jobnumber > 0:
1438         # unless it's the first job (which is preplaced in the init dir), instruct Harvester to place another job
1439         # in the init dir
1440         logger.info('asking Harvester for another job')
1441         request_new_jobs()
1442 
1443     if os.environ.get('SERVER_UPDATE', '') == SERVER_UPDATE_UPDATING:
1444         logger.info('still updating previous job, will not ask for a new job yet')
1445         return False
1446 
1447     os.environ['SERVER_UPDATE'] = SERVER_UPDATE_NOT_DONE
1448     return True
1449 
1450 
1451 def getjob_server_command(url, port):
1452     """
1453     Prepare the getJob server command.
1454 
1455     :param url: PanDA server URL (string)
1456     :param port: PanDA server port
1457     :return: full server command (URL string)
1458     """
1459 
1460     if url != "":
1461         port_pattern = '.:([0-9]+)'
1462         if not findall(port_pattern, url):
1463             url = url + ':%s' % port
1464         else:
1465             logger.debug('URL already contains port: %s', url)
1466     else:
1467         url = config.Pilot.pandaserver
1468     if url == "":
1469         logger.fatal('PanDA server url not set (either as pilot option or in config file)')
1470     elif not url.startswith("http"):
1471         url = 'https://' + url
1472         logger.warning('detected missing protocol in server url (added)')
1473 
1474     return '{pandaserver}/server/panda/getJob'.format(pandaserver=url)
1475 
1476 
1477 def get_job_definition_from_file(path, harvester):
1478     """
1479     Get a job definition from a pre-placed file.
1480     In Harvester mode, also remove any existing job request files since it is no longer needed/wanted.
1481 
1482     :param path: path to job definition file.
1483     :param harvester: True if Harvester is being used (determined from args.harvester), otherwise False
1484     :return: job definition dictionary.
1485     """
1486 
1487     # remove any existing Harvester job request files (silent in non-Harvester mode) and read the JSON
1488     if harvester:
1489         remove_job_request_file()
1490         if is_json(path):
1491             job_definition_list = parse_job_definition_file(path)
1492             if not job_definition_list:
1493                 logger.warning('no jobs were found in Harvester job definitions file: %s', path)
1494                 return {}
1495             else:
1496                 # remove the job definition file from the original location, place a renamed copy in the pilot dir
1497                 new_path = os.path.join(os.environ.get('PILOT_HOME'), 'job_definition.json')
1498                 copy(path, new_path)
1499                 remove(path)
1500 
1501                 # note: the pilot can only handle one job at the time from Harvester
1502                 return job_definition_list[0]
1503 
1504     # old style
1505     res = {}
1506     with open(path, 'r') as jobdatafile:
1507         response = jobdatafile.read()
1508         if len(response) == 0:
1509             logger.fatal('encountered empty job definition file: %s', path)
1510             res = None  # this is a fatal error, no point in continuing as the file will not be replaced
1511         else:
1512             # parse response message
1513             # logger.debug('%s:\n\n%s\n\n', path, response)
1514             try:
1515                 from urlparse import parse_qsl  # Python 2
1516             # except ModuleNotFoundError:  # Python 3
1517             except Exception:
1518                 from urllib.parse import parse_qsl  # Python 3
1519             datalist = parse_qsl(response, keep_blank_values=True)
1520 
1521             # convert to dictionary
1522             for data in datalist:
1523                 res[data[0]] = data[1]
1524 
1525     if os.path.exists(path):
1526         remove(path)
1527 
1528     return res
1529 
1530 
1531 def get_job_definition_from_server(args):
1532     """
1533     Get a job definition from a server.
1534 
1535     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1536     :return: job definition dictionary.
1537     """
1538 
1539     res = {}
1540 
1541     # get the job dispatcher dictionary
1542     data = get_dispatcher_dictionary(args)
1543 
1544     cmd = getjob_server_command(args.url, args.port)
1545     if cmd != "":
1546         logger.info('executing server command: %s', cmd)
1547         res = https.request(cmd, data=data)
1548 
1549     return res
1550 
1551 
1552 def locate_job_definition(args):
1553     """
1554     Locate the job definition file among standard locations.
1555 
1556     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1557     :return: path (string).
1558     """
1559 
1560     if args.harvester_datadir:
1561         paths = [os.path.join(args.harvester_datadir, config.Pilot.pandajobdata)]
1562     else:
1563         paths = [os.path.join("%s/.." % args.sourcedir, config.Pilot.pandajobdata),
1564                  os.path.join(args.sourcedir, config.Pilot.pandajobdata),
1565                  os.path.join(os.environ['PILOT_WORK_DIR'], config.Pilot.pandajobdata)]
1566 
1567     if args.harvester_workdir:
1568         paths.append(os.path.join(args.harvester_workdir, config.Harvester.pandajob_file))
1569     if 'HARVESTER_WORKDIR' in os.environ:
1570         paths.append(os.path.join(os.environ['HARVESTER_WORKDIR'], config.Harvester.pandajob_file))
1571 
1572     path = ""
1573     for _path in paths:
1574         if os.path.exists(_path):
1575             path = _path
1576             break
1577 
1578     if path == "":
1579         logger.info('did not find any local job definition file')
1580 
1581     return path
1582 
1583 
1584 def get_job_definition(args):
1585     """
1586     Get a job definition from a source (server or pre-placed local file).
1587 
1588     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1589     :return: job definition dictionary.
1590     """
1591 
1592     res = {}
1593     path = locate_job_definition(args)
1594 
1595     # should we run a normal 'real' job or a 'fake' job?
1596     if config.Pilot.pandajob == 'fake':
1597         logger.info('will use a fake PanDA job')
1598         res = get_fake_job()
1599     elif os.path.exists(path):
1600         logger.info('will read job definition from file %s', path)
1601         res = get_job_definition_from_file(path, args.harvester)
1602     else:
1603         if args.harvester and args.harvester_submitmode.lower() == 'push':
1604             pass  # local job definition file not found (go to sleep)
1605         else:
1606             logger.info('will download job definition from server')
1607             res = get_job_definition_from_server(args)
1608 
1609     return res
1610 
1611 
1612 def now():
1613     """
1614     Return the current epoch as a UTF-8 encoded string.
1615     :return: current time as encoded string
1616     """
1617     return str(time.time()).encode('utf-8')
1618 
1619 
1620 def get_fake_job(input=True):
1621     """
1622     Return a job definition for internal pilot testing.
1623     Note: this function is only used for testing purposes. The job definitions below are ATLAS specific.
1624 
1625     :param input: Boolean, set to False if no input files are wanted
1626     :return: job definition (dictionary).
1627     """
1628 
1629     res = None
1630 
1631     # create hashes
1632     hash = hashlib.md5()
1633     hash.update(now())
1634     log_guid = hash.hexdigest()
1635     hash.update(now())
1636     guid = hash.hexdigest()
1637     hash.update(now())
1638     job_name = hash.hexdigest()
1639 
1640     if config.Pilot.testjobtype == 'production':
1641         logger.info('creating fake test production job definition')
1642         res = {'jobsetID': 'NULL',
1643                'logGUID': log_guid,
1644                'cmtConfig': 'x86_64-slc6-gcc48-opt',
1645                'prodDBlocks': 'user.mlassnig:user.mlassnig.pilot.test.single.hits',
1646                'dispatchDBlockTokenForOut': 'NULL,NULL',
1647                'destinationDBlockToken': 'NULL,NULL',
1648                'destinationSE': 'AGLT2_TEST',
1649                'realDatasets': job_name,
1650                'prodUserID': 'no_one',
1651                'GUID': guid,
1652                'realDatasetsIn': 'user.mlassnig:user.mlassnig.pilot.test.single.hits',
1653                'nSent': 0,
1654                'cloud': 'US',
1655                'StatusCode': 0,
1656                'homepackage': 'AtlasProduction/20.1.4.14',
1657                'inFiles': 'HITS.06828093._000096.pool.root.1',
1658                'processingType': 'pilot-ptest',
1659                'ddmEndPointOut': 'UTA_SWT2_DATADISK,UTA_SWT2_DATADISK',
1660                'fsize': '94834717',
1661                'fileDestinationSE': 'AGLT2_TEST,AGLT2_TEST',
1662                'scopeOut': 'panda',
1663                'minRamCount': 0,
1664                'jobDefinitionID': 7932,
1665                'maxWalltime': 'NULL',
1666                'scopeLog': 'panda',
1667                'transformation': 'Reco_tf.py',
1668                'maxDiskCount': 0,
1669                'coreCount': 1,
1670                'prodDBlockToken': 'NULL',
1671                'transferType': 'NULL',
1672                'destinationDblock': job_name,
1673                'dispatchDBlockToken': 'NULL',
1674                'jobPars': '--maxEvents=1 --inputHITSFile HITS.06828093._000096.pool.root.1 --outputRDOFile RDO_%s.root' % job_name,
1675                'attemptNr': 0,
1676                'swRelease': 'Atlas-20.1.4',
1677                'nucleus': 'NULL',
1678                'maxCpuCount': 0,
1679                'outFiles': 'RDO_%s.root,%s.job.log.tgz' % (job_name, job_name),
1680                'currentPriority': 1000,
1681                'scopeIn': 'mc15_13TeV',
1682                'PandaID': '0',
1683                'sourceSite': 'NULL',
1684                'dispatchDblock': 'NULL',
1685                'prodSourceLabel': 'ptest',
1686                'checksum': 'ad:5d000974',
1687                'jobName': job_name,
1688                'ddmEndPointIn': 'UTA_SWT2_DATADISK',
1689                'taskID': 'NULL',
1690                'logFile': '%s.job.log.tgz' % job_name}
1691     elif config.Pilot.testjobtype == 'user':
1692         logger.info('creating fake test user job definition')
1693         res = {'jobsetID': 'NULL',
1694                'logGUID': log_guid,
1695                'cmtConfig': 'x86_64-slc6-gcc49-opt',
1696                'prodDBlocks': 'data15_13TeV:data15_13TeV.00276336.physics_Main.merge.AOD.r7562_p2521_tid07709524_00',
1697                'dispatchDBlockTokenForOut': 'NULL,NULL',
1698                'destinationDBlockToken': 'NULL,NULL',
1699                'destinationSE': 'ANALY_SWT2_CPB',
1700                'realDatasets': job_name,
1701                'prodUserID': 'None',
1702                'GUID': guid,
1703                'realDatasetsIn': 'data15_13TeV:data15_13TeV.00276336.physics_Main.merge.AOD.r7562_p2521_tid07709524_00',
1704                'nSent': '0',
1705                'cloud': 'US',
1706                'StatusCode': 0,
1707                'homepackage': 'AnalysisTransforms-AtlasDerivation_20.7.6.4',
1708                'inFiles': 'AOD.07709524._000050.pool.root.1',
1709                'processingType': 'pilot-ptest',
1710                'ddmEndPointOut': 'SWT2_CPB_SCRATCHDISK,SWT2_CPB_SCRATCHDISK',
1711                'fsize': '1564780952',
1712                'fileDestinationSE': 'ANALY_SWT2_CPB,ANALY_SWT2_CPB',
1713                'scopeOut': 'user.gangarbt',
1714                'minRamCount': '0',
1715                'jobDefinitionID': '9445',
1716                'maxWalltime': 'NULL',
1717                'scopeLog': 'user.gangarbt',
1718                'transformation': 'http://pandaserver.cern.ch:25080/trf/user/runAthena-00-00-11',
1719                'maxDiskCount': '0',
1720                'coreCount': '1',
1721                'prodDBlockToken': 'NULL',
1722                'transferType': 'NULL',
1723                'destinationDblock': job_name,
1724                'dispatchDBlockToken': 'NULL',
1725                'jobPars': '-a sources.20115461.derivation.tgz -r ./ -j "Reco_tf.py '
1726                           '--inputAODFile AOD.07709524._000050.pool.root.1 --outputDAODFile test.pool.root '
1727                           '--reductionConf HIGG3D1" -i "[\'AOD.07709524._000050.pool.root.1\']" -m "[]" -n "[]" --trf'
1728                           ' --useLocalIO --accessmode=copy -o '
1729                           '"{\'IROOT\': [(\'DAOD_HIGG3D1.test.pool.root\', \'%s.root\')]}" '
1730                           '--sourceURL https://aipanda012.cern.ch:25443' % (job_name),
1731                'attemptNr': '0',
1732                'swRelease': 'Atlas-20.7.6',
1733                'nucleus': 'NULL',
1734                'maxCpuCount': '0',
1735                'outFiles': '%s.root,%s.job.log.tgz' % (job_name, job_name),
1736                'currentPriority': '1000',
1737                'scopeIn': 'data15_13TeV',
1738                'PandaID': '0',
1739                'sourceSite': 'NULL',
1740                'dispatchDblock': 'data15_13TeV:data15_13TeV.00276336.physics_Main.merge.AOD.r7562_p2521_tid07709524_00',
1741                'prodSourceLabel': 'ptest',
1742                'checksum': 'ad:b11f45a7',
1743                'jobName': job_name,
1744                'ddmEndPointIn': 'SWT2_CPB_SCRATCHDISK',
1745                'taskID': 'NULL',
1746                'logFile': '%s.job.log.tgz' % job_name}
1747     else:
1748         logger.warning('unknown test job type: %s', config.Pilot.testjobtype)
1749 
1750     if res:
1751         if not input:
1752             res['inFiles'] = 'NULL'
1753             res['GUID'] = 'NULL'
1754             res['scopeIn'] = 'NULL'
1755             res['fsize'] = 'NULL'
1756             res['realDatasetsIn'] = 'NULL'
1757             res['checksum'] = 'NULL'
1758 
1759         if config.Pilot.testtransfertype == "NULL" or config.Pilot.testtransfertype == 'direct':
1760             res['transferType'] = config.Pilot.testtransfertype
1761         else:
1762             logger.warning('unknown test transfer type: %s (ignored)', config.Pilot.testtransfertype)
1763 
1764         if config.Pilot.testjobcommand == 'sleep':
1765             res['transformation'] = 'sleep'
1766             res['jobPars'] = '1'
1767             res['inFiles'] = ''
1768             res['outFiles'] = ''
1769 
1770         # convert to unicode for Python 2
1771         try:  # in case some later version of Python 3 has problems using u'' (seems ok with 3.7 at least)
1772             if not is_python3():
1773                 _res = {}
1774                 for entry in res:
1775                     if type(res[entry]) is str:
1776                         _res[u'%s' % entry] = u'%s' % res[entry]
1777                     else:
1778                         _res[u'%s' % entry] = res[entry]
1779                 res = _res
1780         except Exception:
1781             pass
1782     return res
1783 
1784 
1785 def get_job_retrieval_delay(harvester):
1786     """
1787     Return the proper delay between job retrieval attempts.
1788     In Harvester mode, the pilot will look once per second for a job definition file.
1789 
1790     :param harvester: True if Harvester is being used (determined from args.harvester), otherwise False
1791     :return: sleep (s)
1792     """
1793 
1794     return 1 if harvester else 60
1795 
1796 
1797 def retrieve(queues, traces, args):  # noqa: C901
1798     """
1799     Retrieve all jobs from a source.
1800 
1801     The job definition is a json dictionary that is either present in the launch
1802     directory (preplaced) or downloaded from a server specified by `args.url`.
1803 
1804     The function retrieves the job definition from the proper source and places
1805     it in the `queues.jobs` queue.
1806 
1807     WARNING: this function is nearly too complex. Be careful with adding more lines as flake8 will fail it.
1808 
1809     :param queues: internal queues for job handling.
1810     :param traces: tuple containing internal pilot states.
1811     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1812     :raises PilotException: if create_job fails (e.g. because queuedata could not be downloaded).
1813     :return:
1814     """
1815 
1816     timefloor = infosys.queuedata.timefloor
1817     starttime = time.time()
1818 
1819     jobnumber = 0  # number of downloaded jobs
1820     getjob_requests = 0
1821     getjob_failures = 0
1822     print_node_info()
1823 
1824     while not args.graceful_stop.is_set():
1825 
1826         time.sleep(0.5)
1827         getjob_requests += 1
1828 
1829         if not proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, args.getjob_requests,
1830                                    args.update_server, args.harvester_submitmode, args.harvester, args.verify_proxy, traces):
1831             # do not set graceful stop if pilot has not finished sending the final job update
1832             # i.e. wait until SERVER_UPDATE is DONE_FINAL
1833             check_for_final_server_update(args.update_server)
1834             args.graceful_stop.set()
1835             break
1836 
1837         # store time stamp
1838         time_pre_getjob = time.time()
1839 
1840         # get a job definition from a source (file or server)
1841         res = get_job_definition(args)
1842         logger.info('job definition = %s', str(res))
1843 
1844         if res is None:
1845             logger.fatal('fatal error in job download loop - cannot continue')
1846             # do not set graceful stop if pilot has not finished sending the final job update
1847             # i.e. wait until SERVER_UPDATE is DONE_FINAL
1848             check_for_final_server_update(args.update_server)
1849             args.graceful_stop.set()
1850             break
1851 
1852         if not res:
1853             getjob_failures += 1
1854             if getjob_failures >= args.getjob_failures:
1855                 logger.warning('did not get a job -- max number of job request failures reached: %d', getjob_failures)
1856                 args.graceful_stop.set()
1857                 break
1858 
1859             delay = get_job_retrieval_delay(args.harvester)
1860             if not args.harvester:
1861                 logger.warning('did not get a job -- sleep %d s and repeat', delay)
1862             for _ in range(delay):
1863                 if args.graceful_stop.is_set():
1864                     break
1865                 time.sleep(1)
1866         else:
1867             # it seems the PanDA server returns StatusCode as an int, but the aCT returns it as a string
1868             # note: StatusCode keyword is not available in job definition files from Harvester (not needed)
1869             if 'StatusCode' in res and res['StatusCode'] != '0' and res['StatusCode'] != 0:
1870                 getjob_failures += 1
1871                 if getjob_failures >= args.getjob_failures:
1872                     logger.warning('did not get a job -- max number of job request failures reached: %d',
1873                                    getjob_failures)
1874                     args.graceful_stop.set()
1875                     break
1876 
1877                 logger.warning('did not get a job -- sleep 60s and repeat -- status: %s', res['StatusCode'])
1878                 for i in range(60):
1879                     if args.graceful_stop.is_set():
1880                         break
1881                     time.sleep(1)
1882             else:
1883                 # create the job object out of the raw dispatcher job dictionary
1884                 try:
1885                     job = create_job(res, args.queue)
1886                 except PilotException as error:
1887                     raise error
1888                 #else:
1889                     # verify the job status on the server
1890                     #try:
1891                     #    job_status, job_attempt_nr, job_status_code = get_job_status_from_server(job.jobid, args.url, args.port)
1892                     #    if job_status == "running":
1893                     #        pilot_error_diag = "job %s is already running elsewhere - aborting" % job.jobid
1894                     #        logger.warning(pilot_error_diag)
1895                     #        raise JobAlreadyRunning(pilot_error_diag)
1896                     #except Exception as error:
1897                     #    logger.warning("%s", error)
1898                 # write time stamps to pilot timing file
1899                 # note: PILOT_POST_GETJOB corresponds to START_TIME in Pilot 1
1900                 add_to_pilot_timing(job.jobid, PILOT_PRE_GETJOB, time_pre_getjob, args)
1901                 add_to_pilot_timing(job.jobid, PILOT_POST_GETJOB, time.time(), args)
1902 
1903                 # add the job definition to the jobs queue and increase the job counter,
1904                 # and wait until the job has finished
1905                 put_in_queue(job, queues.jobs)
1906 
1907                 jobnumber += 1
1908                 while not args.graceful_stop.is_set():
1909                     if has_job_completed(queues, args):
1910                         # purge queue(s) that retains job object
1911                         purge_queue(queues.finished_data_in)
1912 
1913                         args.job_aborted.clear()
1914                         args.abort_job.clear()
1915                         logger.info('ready for new job')
1916 
1917                         # re-establish logging
1918                         logging.info('pilot has finished for previous job - re-establishing logging')
1919                         logging.handlers = []
1920                         logging.shutdown()
1921                         establish_logging(debug=args.debug, nopilotlog=args.nopilotlog)
1922                         pilot_version_banner()
1923                         getjob_requests = 0
1924                         add_to_pilot_timing('1', PILOT_MULTIJOB_START_TIME, time.time(), args)
1925                         break
1926                     time.sleep(0.5)
1927 
1928     # proceed to set the job_aborted flag?
1929     if threads_aborted():
1930         logger.debug('will proceed to set job_aborted')
1931         args.job_aborted.set()
1932     else:
1933         logger.debug('will not set job_aborted yet')
1934 
1935     logger.debug('[job] retrieve thread has finished')
1936 
1937 
1938 def print_node_info():
1939     """
1940     Print information about the local node to the log.
1941 
1942     :return:
1943     """
1944 
1945     if is_virtual_machine():
1946         logger.info("pilot is running in a virtual machine")
1947     else:
1948         logger.info("pilot is not running in a virtual machine")
1949 
1950 
1951 def create_job(dispatcher_response, queue):
1952     """
1953     Create a job object out of the dispatcher response.
1954 
1955     :param dispatcher_response: raw job dictionary from the dispatcher.
1956     :param queue: queue name (string).
1957     :return: job object
1958     """
1959 
1960     # initialize (job specific) InfoService instance
1961 
1962     job = JobData(dispatcher_response)
1963 
1964     jobinfosys = InfoService()
1965     jobinfosys.init(queue, infosys.confinfo, infosys.extinfo, JobInfoProvider(job))
1966     job.init(infosys)
1967 
1968     #job.workdir = os.getcwd()
1969 
1970     logger.info('received job: %s (sleep until the job has finished)', job.jobid)
1971     logger.info('job details: \n%s', job)
1972 
1973     # payload environment wants the PANDAID to be set, also used below
1974     os.environ['PANDAID'] = job.jobid
1975 
1976     return job
1977 
1978 
1979 def has_job_completed(queues, args):
1980     """
1981     Has the current job completed (finished or failed)?
1982     Note: the job object was extracted from monitored_payloads queue before this function was called.
1983 
1984     :param queues: Pilot queues object.
1985     :return: True is the payload has finished or failed
1986     """
1987 
1988     # check if the job has finished
1989     try:
1990         job = queues.completed_jobs.get(block=True, timeout=1)
1991     except queue.Empty:
1992         # logger.info("(job still running)")
1993         pass
1994     else:
1995         make_job_report(job)
1996         cmd = 'ls -lF %s' % os.environ.get('PILOT_HOME')
1997         logger.debug('%s:\n', cmd)
1998         _, stdout, _ = execute(cmd)
1999         logger.debug(stdout)
2000 
2001         queue_report(queues)
2002         job.reset_errors()
2003         logger.info("job %s has completed (purged errors)", job.jobid)
2004 
2005         # cleanup of any remaining processes
2006         if job.pid:
2007             job.zombies.append(job.pid)
2008         cleanup(job, args)
2009 
2010         return True
2011 
2012     # is there anything in the finished_jobs queue?
2013     #finished_queue_snapshot = list(queues.finished_jobs.queue)
2014     #peek = [obj for obj in finished_queue_snapshot if jobid == obj.jobid]
2015     #if peek:
2016     #    logger.info("job %s has completed (finished)", jobid)
2017     #    return True
2018 
2019     # is there anything in the failed_jobs queue?
2020     #failed_queue_snapshot = list(queues.failed_jobs.queue)
2021     #peek = [obj for obj in failed_queue_snapshot if jobid == obj.jobid]
2022     #if peek:
2023     #    logger.info("job %s has completed (failed)", jobid)
2024     #    return True
2025 
2026     return False
2027 
2028 
2029 def get_job_from_queue(queues, state):
2030     """
2031     Check if the job has finished or failed and if so return it.
2032 
2033     :param queues: pilot queues.
2034     :param state: job state (e.g. finished/failed) (string).
2035     :return: job object.
2036     """
2037     try:
2038         if state == "finished":
2039             job = queues.finished_jobs.get(block=True, timeout=1)
2040         elif state == "failed":
2041             job = queues.failed_jobs.get(block=True, timeout=1)
2042         else:
2043             job = None
2044     except queue.Empty:
2045         # logger.info("(job still running)")
2046         job = None
2047     else:
2048         # make sure that state=failed
2049         set_pilot_state(job=job, state=state)
2050         logger.info("job %s has state=%s", job.jobid, job.state)
2051 
2052     return job
2053 
2054 
2055 def is_queue_empty(queues, queue):
2056     """
2057     Check if the given queue is empty (without pulling).
2058 
2059     :param queues: pilot queues object.
2060     :param queue: queue name (string).
2061     :return: True if queue is empty, False otherwise
2062     """
2063 
2064     status = False
2065     if queue in queues._fields:
2066         _queue = getattr(queues, queue)
2067         jobs = list(_queue.queue)
2068         if len(jobs) > 0:
2069             logger.info('queue %s not empty: found %d job(s)', queue, len(jobs))
2070         else:
2071             logger.info('queue %s is empty', queue)
2072             status = True
2073     else:
2074         logger.warning('queue %s not present in %s', queue, queues._fields)
2075 
2076     return status
2077 
2078 
2079 def order_log_transfer(queues, job):
2080     """
2081     Order a log transfer for a failed job.
2082 
2083     :param queues: pilot queues object.
2084     :param job: job object.
2085     :return:
2086     """
2087 
2088     # add the job object to the data_out queue to have it staged out
2089     job.stageout = 'log'  # only stage-out log file
2090     #set_pilot_state(job=job, state='stageout')
2091     put_in_queue(job, queues.data_out)
2092 
2093     logger.debug('job added to data_out queue')
2094 
2095     # wait for the log transfer to finish
2096     n = 0
2097     nmax = 60
2098     while n < nmax:
2099         # refresh the log_transfer since it might have changed
2100         log_transfer = job.get_status('LOG_TRANSFER')
2101         logger.info('waiting for log transfer to finish (#%d/#%d): %s', n + 1, nmax, log_transfer)
2102         if is_queue_empty(queues, 'data_out') and \
2103                 (log_transfer == LOG_TRANSFER_DONE or log_transfer == LOG_TRANSFER_FAILED):  # set in data component
2104             logger.info('stage-out of log has completed')
2105             break
2106         else:
2107             if log_transfer == LOG_TRANSFER_IN_PROGRESS:  # set in data component, job object is singleton
2108                 logger.info('log transfer is in progress')
2109             time.sleep(2)
2110             n += 1
2111 
2112     logger.info('proceeding with server update (n=%d)', n)
2113 
2114 
2115 def wait_for_aborted_job_stageout(args, queues, job):
2116     """
2117     Wait for stage-out to finish for aborted job.
2118 
2119     :param args: pilot args object.
2120     :param queues: pilot queues object.
2121     :param job: job object.
2122     :return:
2123     """
2124 
2125     # if the pilot received a kill signal, how much time has passed since the signal was intercepted?
2126     try:
2127         time_since_kill = get_time_since('1', PILOT_KILL_SIGNAL, args)
2128         was_killed = was_pilot_killed(args.timing)
2129         if was_killed:
2130             logger.info('%d s passed since kill signal was intercepted - make sure that stage-out has finished', time_since_kill)
2131     except Exception as error:
2132         logger.warning('exception caught: %s', error)
2133         time_since_kill = 60
2134     else:
2135         if time_since_kill > 60 or time_since_kill < 0:  # fail-safe
2136             logger.warning('reset time_since_kill to 60 since value is out of allowed limits')
2137             time_since_kill = 60
2138 
2139     # if stage-out has not finished, we need to wait (less than two minutes or the batch system will issue
2140     # a hard SIGKILL)
2141     max_wait_time = 2 * 60 - time_since_kill - 5
2142     logger.debug('using max_wait_time = %d s', max_wait_time)
2143     t0 = time.time()
2144     while time.time() - t0 < max_wait_time:
2145         if job in queues.finished_data_out.queue or job in queues.failed_data_out.queue:
2146             logger.info('stage-out has finished, proceed with final server update')
2147             break
2148         else:
2149             time.sleep(0.5)
2150 
2151     logger.info('proceeding with final server update')
2152 
2153 
2154 def get_job_status(job, key):
2155     """
2156     Wrapper function around job.get_status().
2157     If key = 'LOG_TRANSFER' but job object is not defined, the function will return value = LOG_TRANSFER_NOT_DONE.
2158 
2159     :param job: job object.
2160     :param key: key name (string).
2161     :return: value (string).
2162     """
2163 
2164     value = ""
2165     if job:
2166         value = job.get_status(key)
2167     else:
2168         if key == 'LOG_TRANSFER':
2169             value = LOG_TRANSFER_NOT_DONE
2170 
2171     return value
2172 
2173 
2174 def queue_monitor(queues, traces, args):  # noqa: C901
2175     """
2176     Monitoring of queues.
2177     This function monitors queue activity, specifically if a job has finished or failed and then reports to the server.
2178 
2179     :param queues: internal queues for job handling.
2180     :param traces: tuple containing internal pilot states.
2181     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2182     :return:
2183     """
2184 
2185     # scan queues until at least one queue has a job object. abort if it takes too long time
2186     if not scan_for_jobs(queues):
2187         logger.warning('queues are still empty of jobs - will begin queue monitoring anyway')
2188 
2189     job = None
2190     while True:  # will abort when graceful_stop has been set or if enough time has passed after kill signal
2191         time.sleep(1)
2192 
2193         if traces.pilot['command'] == 'abort':
2194             logger.warning('job queue monitor received an abort instruction')
2195             args.graceful_stop.set()
2196 
2197         # abort in case graceful_stop has been set, and less than 30 s has passed since MAXTIME was reached (if set)
2198         # (abort at the end of the loop)
2199         abort_thread = should_abort(args, label='job:queue_monitor')
2200         if abort_thread and os.environ.get('PILOT_WRAP_UP', '') == 'NORMAL':
2201             pause_queue_monitor(20)
2202 
2203         # check if the job has finished
2204         imax = 20
2205         i = 0
2206         while i < imax and os.environ.get('PILOT_WRAP_UP', '') == 'NORMAL':
2207             job = get_finished_or_failed_job(args, queues)
2208             if job:
2209                 logger.debug('returned job has state=%s', job.state)
2210                 #if job.state == 'failed':
2211                 #    logger.warning('will abort failed job (should prepare for final server update)')
2212                 break
2213             i += 1
2214             state = get_pilot_state()  # the job object is not available, but the state is also kept in PILOT_JOB_STATE
2215             if state != 'stage-out':
2216                 # logger.info("no need to wait since job state=\'%s\'", state)
2217                 break
2218             pause_queue_monitor(1) if not abort_thread else pause_queue_monitor(10)
2219 
2220         # job has not been defined if it's still running
2221         if not job and not abort_thread:
2222             continue
2223 
2224         completed_jobids = queues.completed_jobids.queue if queues.completed_jobids else []
2225         if job and job.jobid not in completed_jobids:
2226             logger.info("preparing for final server update for job %s in state=\'%s\'", job.jobid, job.state)
2227 
2228             if args.job_aborted.is_set():
2229                 # wait for stage-out to finish for aborted job
2230                 wait_for_aborted_job_stageout(args, queues, job)
2231 
2232             # send final server update
2233             update_server(job, args)
2234 
2235             # we can now stop monitoring this job, so remove it from the monitored_payloads queue and add it to the
2236             # completed_jobs queue which will tell retrieve() that it can download another job
2237             try:
2238                 _job = queues.monitored_payloads.get(block=True, timeout=1)
2239             except queue.Empty:
2240                 logger.warning('failed to dequeue job: queue is empty (did job fail before job monitor started?)')
2241                 make_job_report(job)
2242             else:
2243                 logger.debug('job %s was dequeued from the monitored payloads queue', _job.jobid)
2244                 # now ready for the next job (or quit)
2245                 put_in_queue(job.jobid, queues.completed_jobids)
2246 
2247                 put_in_queue(job, queues.completed_jobs)
2248                 del _job
2249                 logger.debug('tmp job object deleted')
2250 
2251         if abort_thread:
2252             break
2253 
2254     # proceed to set the job_aborted flag?
2255     if threads_aborted():
2256         logger.debug('will proceed to set job_aborted')
2257         args.job_aborted.set()
2258     else:
2259         logger.debug('will not set job_aborted yet')
2260 
2261     logger.debug('[job] queue monitor thread has finished')
2262 
2263 
2264 def update_server(job, args):
2265     """
2266     Update the server (wrapper for send_state() that also prepares the metadata).
2267 
2268     :param job: job object.
2269     :param args: pilot args object.
2270     :return:
2271     """
2272 
2273     # user specific actions
2274     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
2275     user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
2276     metadata = user.get_metadata(job.workdir)
2277     try:
2278         user.update_server(job)
2279     except Exception as error:
2280         logger.warning('exception caught in update_server(): %s', error)
2281     if job.fileinfo:
2282         send_state(job, args, job.state, xml=dumps(job.fileinfo), metadata=metadata)
2283     else:
2284         send_state(job, args, job.state, metadata=metadata)
2285 
2286 
2287 def pause_queue_monitor(delay):
2288     """
2289     Pause the queue monitor to let log transfer complete.
2290     Note: this function should use globally available object. Use sleep for now.
2291     :param delay: sleep time in seconds (int).
2292     :return:
2293     """
2294 
2295     logger.warning('since job:queue_monitor is responsible for sending job updates, we sleep for %d s', delay)
2296     time.sleep(delay)
2297 
2298 
2299 def get_finished_or_failed_job(args, queues):
2300     """
2301     Check if the job has either finished or failed and if so return it.
2302     If failed, order a log transfer. If the job is in state 'failed' and abort_job is set, set job_aborted.
2303 
2304     :param args: pilot args object.
2305     :param queues: pilot queues object.
2306     :return: job object.
2307     """
2308 
2309     job = get_job_from_queue(queues, "finished")
2310     if job:
2311         # logger.debug('get_finished_or_failed_job: job has finished')
2312         pass
2313     else:
2314         # logger.debug('check_job: job has not finished')
2315         job = get_job_from_queue(queues, "failed")
2316         if job:
2317             logger.debug('get_finished_or_failed_job: job has failed')
2318             job.state = 'failed'
2319             args.job_aborted.set()
2320 
2321             # get the current log transfer status
2322             log_transfer = get_job_status(job, 'LOG_TRANSFER')
2323             if log_transfer == LOG_TRANSFER_NOT_DONE:
2324                 # order a log transfer for a failed job
2325                 order_log_transfer(queues, job)
2326 
2327     # check if the job has failed
2328     if job and job.state == 'failed':
2329         # set job_aborted in case of kill signals
2330         if args.abort_job.is_set():
2331             logger.warning('queue monitor detected a set abort_job (due to a kill signal)')
2332             # do not set graceful stop if pilot has not finished sending the final job update
2333             # i.e. wait until SERVER_UPDATE is DONE_FINAL
2334             #check_for_final_server_update(args.update_server)
2335             #args.job_aborted.set()
2336 
2337     return job
2338 
2339 
2340 def get_heartbeat_period(debug=False):
2341     """
2342     Return the proper heartbeat period, as determined by normal or debug mode.
2343     In normal mode, the heartbeat period is 30*60 s, while in debug mode it is 5*60 s. Both values are defined in the
2344     config file.
2345 
2346     :param debug: Boolean, True for debug mode. False otherwise.
2347     :return: heartbeat period (int).
2348     """
2349 
2350     try:
2351         return int(config.Pilot.heartbeat if not debug else config.Pilot.debug_heartbeat)
2352     except Exception as error:
2353         logger.warning('bad config data for heartbeat period: %s (will use default 1800 s)', error)
2354         return 1800
2355 
2356 
2357 def check_for_abort_job(args, caller=''):
2358     """
2359     Check if args.abort_job.is_set().
2360 
2361     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2362     :param caller: function name of caller (string).
2363     :return: Boolean, True if args_job.is_set()
2364     """
2365     abort_job = False
2366     if args.abort_job.is_set():
2367         logger.warning('%s detected an abort_job request (signal=%s)', caller, args.signal)
2368         logger.warning('in case pilot is running more than one job, all jobs will be aborted')
2369         abort_job = True
2370 
2371     return abort_job
2372 
2373 
2374 def interceptor(queues, traces, args):
2375     """
2376     MOVE THIS TO INTERCEPTOR.PY; TEMPLATE FOR THREADS
2377 
2378     :param queues: internal queues for job handling.
2379     :param traces: tuple containing internal pilot states.
2380     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2381     :return:
2382     """
2383 
2384     # overall loop counter (ignoring the fact that more than one job may be running)
2385     n = 0
2386     while not args.graceful_stop.is_set():
2387         time.sleep(0.1)
2388 
2389         # abort in case graceful_stop has been set, and less than 30 s has passed since MAXTIME was reached (if set)
2390         # (abort at the end of the loop)
2391         abort = should_abort(args, label='job:interceptor')
2392 
2393         # check for any abort_job requests
2394         abort_job = check_for_abort_job(args, caller='interceptor')
2395         if not abort_job:
2396             # peek at the jobs in the validated_jobs queue and send the running ones to the heartbeat function
2397             jobs = queues.monitored_payloads.queue
2398             if jobs:
2399                 for _ in range(len(jobs)):
2400                     logger.info('interceptor loop %d: looking for communication file', n)
2401             time.sleep(30)
2402 
2403         n += 1
2404 
2405         if abort or abort_job:
2406             break
2407 
2408     # proceed to set the job_aborted flag?
2409     if threads_aborted():
2410         logger.debug('will proceed to set job_aborted')
2411         args.job_aborted.set()
2412     else:
2413         logger.debug('will not set job_aborted yet')
2414 
2415     logger.debug('[job] interceptor thread has finished')
2416 
2417 
2418 def fast_monitor_tasks(job):
2419     """
2420     Perform user specific fast monitoring tasks.
2421 
2422     :param job: job object.
2423     :return: exit code (int).
2424     """
2425 
2426     exit_code = 0
2427 
2428     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
2429     user = __import__('pilot.user.%s.monitoring' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
2430     try:
2431         exit_code = user.fast_monitor_tasks(job)
2432     except Exception as exc:
2433         logger.warning('caught exception: %s', exc)
2434 
2435     return exit_code
2436 
2437 
2438 def fast_job_monitor(queues, traces, args):
2439     """
2440     Fast monitoring of job parameters.
2441 
2442     This function can be used for monitoring processes below the one minute threshold of the normal job_monitor thread.
2443 
2444     :param queues: internal queues for job handling.
2445     :param traces: tuple containing internal pilot states.
2446     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2447     :return:
2448     """
2449 
2450     # peeking and current time; peeking_time gets updated if and when jobs are being monitored, update_time is only
2451     # used for sending the heartbeat and is updated after a server update
2452     #peeking_time = int(time.time())
2453     #update_time = peeking_time
2454 
2455     # end thread immediately, unless fast monitoring is required
2456     if not args.use_realtime_logging:
2457         logger.warning('fast monitoring not required - ending thread')
2458         return
2459 
2460     while not args.graceful_stop.is_set():
2461         time.sleep(10)
2462 
2463         # abort in case graceful_stop has been set, and less than 30 s has passed since MAXTIME was reached (if set)
2464         # (abort at the end of the loop)
2465         abort = should_abort(args, label='job:fast_job_monitor')
2466         if abort:
2467             break
2468 
2469         if traces.pilot.get('command') == 'abort':
2470             logger.warning('fast job monitor received an abort command')
2471             break
2472 
2473         # check for any abort_job requests
2474         abort_job = check_for_abort_job(args, caller='fast job monitor')
2475         if abort_job:
2476             break
2477         else:
2478             # peek at the jobs in the validated_jobs queue and send the running ones to the heartbeat function
2479             jobs = queues.monitored_payloads.queue
2480             if jobs:
2481                 for i in range(len(jobs)):
2482                     #current_id = jobs[i].jobid
2483                     if jobs[i].state == 'finished' or jobs[i].state == 'failed':
2484                         logger.info('will abort job monitoring soon since job state=%s (job is still in queue)', jobs[i].state)
2485                         break
2486 
2487                 # perform the monitoring tasks
2488                 exit_code = fast_monitor_tasks(jobs[i])
2489                 if exit_code:
2490                     logger.debug('fast monitoring reported an error: %d', exit_code)
2491 
2492     # proceed to set the job_aborted flag?
2493     if threads_aborted():
2494         logger.debug('will proceed to set job_aborted')
2495         args.job_aborted.set()
2496     else:
2497         logger.debug('will not set job_aborted yet')
2498 
2499     logger.debug('[job] fast job monitor thread has finished')
2500 
2501 
2502 def job_monitor(queues, traces, args):  # noqa: C901
2503     """
2504     Monitoring of job parameters.
2505     This function monitors certain job parameters, such as job looping, at various time intervals. The main loop
2506     is executed once a minute, while individual verifications may be executed at any time interval (>= 1 minute). E.g.
2507     looping jobs are checked once per ten minutes (default) and the heartbeat is send once per 30 minutes. Memory
2508     usage is checked once a minute.
2509 
2510     :param queues: internal queues for job handling.
2511     :param traces: tuple containing internal pilot states.
2512     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2513     :return:
2514     """
2515 
2516     # initialize the monitoring time object
2517     mt = MonitoringTime()
2518 
2519     # peeking and current time; peeking_time gets updated if and when jobs are being monitored, update_time is only
2520     # used for sending the heartbeat and is updated after a server update
2521     peeking_time = int(time.time())
2522     update_time = peeking_time
2523 
2524     # overall loop counter (ignoring the fact that more than one job may be running)
2525     n = 0
2526     while not args.graceful_stop.is_set():
2527         time.sleep(0.5)
2528 
2529         # abort in case graceful_stop has been set, and less than 30 s has passed since MAXTIME was reached (if set)
2530         # (abort at the end of the loop)
2531         abort = should_abort(args, label='job:job_monitor')
2532 
2533         if traces.pilot.get('command') == 'abort':
2534             logger.warning('job monitor received an abort command')
2535 
2536         # check for any abort_job requests
2537         abort_job = check_for_abort_job(args, caller='job monitor')
2538         if not abort_job:
2539             if not queues.current_data_in.empty():
2540                 # make sure to send heartbeat regularly if stage-in takes a long time
2541                 jobs = queues.current_data_in.queue
2542                 if jobs:
2543                     for i in range(len(jobs)):
2544                         # send heartbeat if it is time (note that the heartbeat function might update the job object, e.g.
2545                         # by turning on debug mode, ie we need to get the heartbeat period in case it has changed)
2546                         update_time = send_heartbeat_if_time(jobs[i], args, update_time)
2547 
2548                         # note: when sending a state change to the server, the server might respond with 'tobekilled'
2549                         try:
2550                             jobs[i]
2551                         except Exception as error:
2552                             logger.warning('detected stale jobs[i] object in job_monitor: %s', error)
2553                         else:
2554                             if jobs[i].state == 'failed':
2555                                 logger.warning('job state is \'failed\' - order log transfer and abort job_monitor() (1)')
2556                                 jobs[i].stageout = 'log'  # only stage-out log file
2557                                 put_in_queue(jobs[i], queues.data_out)
2558 
2559                     # sleep for a while if stage-in has not completed
2560                     time.sleep(1)
2561                     continue
2562             elif queues.finished_data_in.empty():
2563                 # sleep for a while if stage-in has not completed
2564                 time.sleep(1)
2565                 continue
2566 
2567             time.sleep(60)
2568 
2569         # peek at the jobs in the validated_jobs queue and send the running ones to the heartbeat function
2570         jobs = queues.monitored_payloads.queue
2571         if jobs:
2572             # update the peeking time
2573             peeking_time = int(time.time())
2574             for i in range(len(jobs)):
2575                 current_id = jobs[i].jobid
2576                 logger.info('monitor loop #%d: job %d:%s is in state \'%s\'', n, i, current_id, jobs[i].state)
2577                 if jobs[i].state == 'finished' or jobs[i].state == 'failed':
2578                     logger.info('will abort job monitoring soon since job state=%s (job is still in queue)', jobs[i].state)
2579                     break
2580 
2581                 # perform the monitoring tasks
2582                 exit_code, diagnostics = job_monitor_tasks(jobs[i], mt, args)
2583                 if exit_code != 0:
2584                     if exit_code == errors.NOVOMSPROXY:
2585                         logger.warning('VOMS proxy has expired - keep monitoring job')
2586                     elif exit_code == errors.KILLPAYLOAD:
2587                         jobs[i].piloterrorcodes, jobs[i].piloterrordiags = errors.add_error_code(exit_code)
2588                         logger.debug('killing payload process')
2589                         kill_process(jobs[i].pid)
2590                         break
2591                     else:
2592                         try:
2593                             fail_monitored_job(jobs[i], exit_code, diagnostics, queues, traces)
2594                         except Exception as error:
2595                             logger.warning('(1) exception caught: %s (job id=%s)', error, current_id)
2596                         break
2597 
2598                 # run this check again in case job_monitor_tasks() takes a long time to finish (and the job object
2599                 # has expired in the mean time)
2600                 try:
2601                     _job = jobs[i]
2602                 except Exception:
2603                     logger.info('aborting job monitoring since job object (job id=%s) has expired', current_id)
2604                     break
2605 
2606                 # send heartbeat if it is time (note that the heartbeat function might update the job object, e.g.
2607                 # by turning on debug mode, ie we need to get the heartbeat period in case it has changed)
2608                 try:
2609                     update_time = send_heartbeat_if_time(_job, args, update_time)
2610                 except Exception as error:
2611                     logger.warning('(2) exception caught: %s (job id=%s)', error, current_id)
2612                     break
2613                 else:
2614                     # note: when sending a state change to the server, the server might respond with 'tobekilled'
2615                     if _job.state == 'failed':
2616                         logger.warning('job state is \'failed\' - order log transfer and abort job_monitor() (2)')
2617                         _job.stageout = 'log'  # only stage-out log file
2618                         put_in_queue(_job, queues.data_out)
2619                         abort = True
2620                         break
2621 
2622         elif os.environ.get('PILOT_JOB_STATE') == 'stagein':
2623             logger.info('job monitoring is waiting for stage-in to finish')
2624         else:
2625             # check the waiting time in the job monitor. set global graceful_stop if necessary
2626             check_job_monitor_waiting_time(args, peeking_time, abort_override=abort_job)
2627 
2628         n += 1
2629 
2630         if abort or abort_job:
2631             break
2632 
2633     # proceed to set the job_aborted flag?
2634     if threads_aborted():
2635         logger.debug('will proceed to set job_aborted')
2636         args.job_aborted.set()
2637     else:
2638         logger.debug('will not set job_aborted yet')
2639 
2640     logger.debug('[job] job monitor thread has finished')
2641 
2642 
2643 def send_heartbeat_if_time(job, args, update_time):
2644     """
2645     Send a heartbeat to the server if it is time to do so.
2646 
2647     :param job: job object.
2648     :param args: args object.
2649     :param update_time: last update time (from time.time()).
2650     :return: possibly updated update_time (from time.time()).
2651     """
2652 
2653     if int(time.time()) - update_time >= get_heartbeat_period(job.debug):
2654         if job.serverstate != 'finished' and job.serverstate != 'failed':
2655             send_state(job, args, 'running')
2656             update_time = int(time.time())
2657 
2658     return update_time
2659 
2660 
2661 def check_job_monitor_waiting_time(args, peeking_time, abort_override=False):
2662     """
2663     Check the waiting time in the job monitor.
2664     Set global graceful_stop if necessary.
2665 
2666     :param args: args object.
2667     :param peeking_time: time when monitored_payloads queue was peeked into (int).
2668     :return:
2669     """
2670 
2671     waiting_time = int(time.time()) - peeking_time
2672     msg = 'no jobs in monitored_payloads queue (waited for %d s)' % waiting_time
2673     if waiting_time > 60 * 60:
2674         abort = True
2675         msg += ' - aborting'
2676     else:
2677         abort = False
2678     if logger:
2679         logger.warning(msg)
2680     else:
2681         print(msg)
2682     if abort or abort_override:
2683         # do not set graceful stop if pilot has not finished sending the final job update
2684         # i.e. wait until SERVER_UPDATE is DONE_FINAL
2685         check_for_final_server_update(args.update_server)
2686         args.graceful_stop.set()
2687 
2688 
2689 def fail_monitored_job(job, exit_code, diagnostics, queues, traces):
2690     """
2691     Fail a monitored job.
2692 
2693     :param job: job object
2694     :param exit_code: exit code from job_monitor_tasks (int).
2695     :param diagnostics: pilot error diagnostics (string).
2696     :param queues: queues object.
2697     :param traces: traces object.
2698     :return:
2699     """
2700 
2701     set_pilot_state(job=job, state="failed")
2702     job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=diagnostics)
2703     job.piloterrordiag = diagnostics
2704     traces.pilot['error_code'] = exit_code
2705     put_in_queue(job, queues.failed_payloads)
2706     logger.info('aborting job monitoring since job state=%s', job.state)
2707 
2708 
2709 def make_job_report(job):
2710     """
2711     Make a summary report for the given job.
2712     This function is called when the job has completed.
2713 
2714     :param job: job object.
2715     :return:
2716     """
2717 
2718     logger.info('')
2719     logger.info('job summary report')
2720     logger.info('--------------------------------------------------')
2721     logger.info('PanDA job id: %s', job.jobid)
2722     logger.info('task id: %s', job.taskid)
2723     n = len(job.piloterrorcodes)
2724     if n > 0:
2725         for i in range(n):
2726             logger.info('error %d/%d: %s: %s', i + 1, n, job.piloterrorcodes[i], job.piloterrordiags[i])
2727     else:
2728         logger.info('errors: (none)')
2729     if job.piloterrorcode != 0:
2730         logger.info('pilot error code: %d', job.piloterrorcode)
2731         logger.info('pilot error diag: %s', job.piloterrordiag)
2732     info = ""
2733     for key in job.status:
2734         info += key + " = " + job.status[key] + " "
2735     logger.info('status: %s', info)
2736     s = ""
2737     if job.is_analysis() and job.state != 'finished':
2738         s = '(user job is recoverable)' if errors.is_recoverable(code=job.piloterrorcode) else '(user job is not recoverable)'
2739     logger.info('pilot state: %s %s', job.state, s)
2740     logger.info('transexitcode: %d', job.transexitcode)
2741     logger.info('exeerrorcode: %d', job.exeerrorcode)
2742     logger.info('exeerrordiag: %s', job.exeerrordiag)
2743     logger.info('exitcode: %d', job.exitcode)
2744     logger.info('exitmsg: %s', job.exitmsg)
2745     logger.info('cpuconsumptiontime: %d %s', job.cpuconsumptiontime, job.cpuconsumptionunit)
2746     logger.info('nevents: %d', job.nevents)
2747     logger.info('neventsw: %d', job.neventsw)
2748     logger.info('pid: %s', job.pid)
2749     logger.info('pgrp: %s', str(job.pgrp))
2750     logger.info('corecount: %d', job.corecount)
2751     logger.info('event service: %s', str(job.is_eventservice))
2752     logger.info('sizes: %s', str(job.sizes))
2753     logger.info('--------------------------------------------------')
2754     logger.info('')