File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 from __future__ import print_function
0014
0015 import os
0016 import time
0017 import hashlib
0018 import random
0019 import socket
0020 import logging
0021
0022 try:
0023 import Queue as queue
0024
0025 except Exception:
0026 import queue
0027
0028 from json import dumps
0029 from re import findall
0030 from glob import glob
0031
0032 from pilot.common.errorcodes import ErrorCodes
0033 from pilot.common.exception import ExcThread, PilotException
0034 from pilot.info import infosys, JobData, InfoService, JobInfoProvider
0035 from pilot.util import https
0036 from pilot.util.auxiliary import get_batchsystem_jobid, get_job_scheduler_id, get_pilot_id, \
0037 set_pilot_state, get_pilot_state, check_for_final_server_update, pilot_version_banner, is_virtual_machine, \
0038 is_python3, show_memory_usage, has_instruction_sets, locate_core_file, get_display_info
0039 from pilot.util.config import config
0040 from pilot.util.common import should_abort, was_pilot_killed
0041 from pilot.util.constants import PILOT_MULTIJOB_START_TIME, PILOT_PRE_GETJOB, PILOT_POST_GETJOB, PILOT_KILL_SIGNAL, LOG_TRANSFER_NOT_DONE, \
0042 LOG_TRANSFER_IN_PROGRESS, LOG_TRANSFER_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_TROUBLE, SERVER_UPDATE_FINAL, \
0043 SERVER_UPDATE_UPDATING, SERVER_UPDATE_NOT_DONE
0044 from pilot.util.container import execute
0045 from pilot.util.filehandling import find_text_files, tail, is_json, copy, remove, write_json, establish_logging, write_file, \
0046 create_symlink
0047 from pilot.util.harvester import request_new_jobs, remove_job_request_file, parse_job_definition_file, \
0048 is_harvester_mode, get_worker_attributes_file, publish_job_report, publish_work_report, get_event_status_file, \
0049 publish_stageout_files
0050 from pilot.util.jobmetrics import get_job_metrics
0051 from pilot.util.math import mean
0052 from pilot.util.middleware import containerise_general_command
0053 from pilot.util.monitoring import job_monitor_tasks, check_local_space
0054 from pilot.util.monitoringtime import MonitoringTime
0055 from pilot.util.processes import cleanup, threads_aborted, kill_process, kill_processes
0056 from pilot.util.proxy import get_distinguished_name
0057 from pilot.util.queuehandling import scan_for_jobs, put_in_queue, queue_report, purge_queue
0058 from pilot.util.timing import add_to_pilot_timing, timing_report, get_postgetjob_time, get_time_since, time_stamp
0059 from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, get_cpu_model
0060
0061 logger = logging.getLogger(__name__)
0062 errors = ErrorCodes()
0063
0064
0065 def control(queues, traces, args):
0066 """
0067 Main function of job control.
0068
0069 :param queues: internal queues for job handling.
0070 :param traces: tuple containing internal pilot states.
0071 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0072 :return:
0073 """
0074
0075 targets = {'validate': validate, 'retrieve': retrieve, 'create_data_payload': create_data_payload,
0076 'queue_monitor': queue_monitor, 'job_monitor': job_monitor, 'fast_job_monitor': fast_job_monitor}
0077 threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'queues': queues, 'traces': traces, 'args': args},
0078 name=name) for name, target in list(targets.items())]
0079
0080 [thread.start() for thread in threads]
0081
0082
0083 while not args.graceful_stop.is_set():
0084 for thread in threads:
0085 bucket = thread.get_bucket()
0086 try:
0087 exc = bucket.get(block=False)
0088 except queue.Empty:
0089 pass
0090 else:
0091 exc_type, exc_obj, exc_trace = exc
0092 logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
0093
0094
0095
0096
0097 thread.join(0.1)
0098 time.sleep(0.1)
0099
0100 time.sleep(0.5)
0101
0102 logger.debug('job control ending since graceful_stop has been set')
0103 if args.abort_job.is_set():
0104 if traces.pilot['command'] == 'aborting':
0105 logger.warning('jobs are aborting')
0106 elif traces.pilot['command'] == 'abort':
0107 logger.warning('job control detected a set abort_job (due to a kill signal)')
0108 traces.pilot['command'] = 'aborting'
0109
0110
0111
0112
0113
0114 if threads_aborted():
0115 logger.debug('will proceed to set job_aborted')
0116 args.job_aborted.set()
0117 else:
0118 logger.debug('will not set job_aborted yet')
0119
0120 logger.debug('[job] control thread has finished')
0121
0122
0123
0124
0125
0126 def _validate_job(job):
0127 """
0128 Verify job parameters for specific problems.
0129
0130 :param job: job object.
0131 :return: Boolean.
0132 """
0133
0134 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0135 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0136 container = __import__('pilot.user.%s.container' % pilot_user, globals(), locals(), [user], 0)
0137
0138
0139 try:
0140 kwargs = {'job': job}
0141 job.usecontainer = container.do_use_container(**kwargs)
0142 except Exception as error:
0143 logger.warning('exception caught: %s', error)
0144
0145 return True if user.verify_job(job) else False
0146
0147
0148 def verify_error_code(job):
0149 """
0150 Make sure an error code is properly set.
0151 This makes sure that job.piloterrorcode is always set for a failed/holding job, that not only
0152 job.piloterrorcodes are set but not job.piloterrorcode. This function also negates the sign of the error code
0153 and sets job state 'holding' (instead of 'failed') if the error is found to be recoverable by a later job (user
0154 jobs only).
0155
0156 :param job: job object.
0157 :return:
0158 """
0159
0160 if job.piloterrorcode == 0 and len(job.piloterrorcodes) > 0:
0161 logger.warning('piloterrorcode set to first piloterrorcodes list entry: %s', str(job.piloterrorcodes))
0162 job.piloterrorcode = job.piloterrorcodes[0]
0163
0164 if job.piloterrorcode != 0 and job.is_analysis():
0165 if errors.is_recoverable(code=job.piloterrorcode):
0166 job.piloterrorcode = -abs(job.piloterrorcode)
0167 job.state = 'failed'
0168 logger.info('failed user job is recoverable (error code=%s)', job.piloterrorcode)
0169 else:
0170 logger.info('failed user job is not recoverable')
0171 else:
0172 logger.info('verified error code')
0173
0174
0175 def get_proper_state(job, state):
0176 """
0177 Return a proper job state to send to server.
0178 This function should only return 'starting', 'running', 'finished', 'holding' or 'failed'.
0179 If the internal job.serverstate is not yet set, it means it is the first server update, ie 'starting' should be
0180 sent.
0181
0182 :param job: job object.
0183 :param state: internal pilot state (string).
0184 :return: valid server state (string).
0185 """
0186
0187 if job.serverstate == "finished" or job.serverstate == "failed":
0188 pass
0189 elif job.serverstate == "" and state != "finished" and state != "failed":
0190 job.serverstate = 'starting'
0191 elif state == "finished" or state == "failed" or state == "holding":
0192 job.serverstate = state
0193 else:
0194 job.serverstate = 'running'
0195
0196 return job.serverstate
0197
0198
0199 def publish_harvester_reports(state, args, data, job, final):
0200 """
0201 Publish all reports needed by Harvester.
0202
0203 :param state: job state (string).
0204 :param args: pilot args object.
0205 :param data: data structure for server update (dictionary).
0206 :param job: job object.
0207 :param final: is this the final update? (Boolean).
0208 :return: True if successful, False otherwise (Boolean).
0209 """
0210
0211
0212 path = get_worker_attributes_file(args)
0213
0214
0215 data['jobStatus'] = state
0216
0217
0218 if not publish_work_report(data, path):
0219 logger.debug('failed to write to workerAttributesFile %s', path)
0220 return False
0221
0222
0223 if final:
0224
0225 event_status_file = get_event_status_file(args)
0226 if publish_stageout_files(job, event_status_file):
0227 logger.debug('wrote log and output files to file %s', event_status_file)
0228 else:
0229 logger.warning('could not write log and output files to file %s', event_status_file)
0230 return False
0231
0232
0233 _path = os.path.join(job.workdir, config.Payload.jobreport)
0234 if os.path.exists(_path):
0235 if publish_job_report(job, args, config.Payload.jobreport):
0236 logger.debug('wrote job report file')
0237 return True
0238 else:
0239 logger.warning('failed to write job report file')
0240 return False
0241 else:
0242 logger.info('finished writing various report files in Harvester mode')
0243
0244 return True
0245
0246
0247 def write_heartbeat_to_file(data):
0248 """
0249 Write heartbeat dictionary to file.
0250 This is only done when server updates are not wanted.
0251
0252 :param data: server data (dictionary).
0253 :return: True if successful, False otherwise (Boolean).
0254 """
0255
0256 path = os.path.join(os.environ.get('PILOT_HOME'), config.Pilot.heartbeat_message)
0257 if write_json(path, data):
0258 logger.debug('heartbeat dictionary: %s', data)
0259 logger.debug('wrote heartbeat to file %s', path)
0260 return True
0261 else:
0262 return False
0263
0264
0265 def send_state(job, args, state, xml=None, metadata=None, test_tobekilled=False):
0266 """
0267 Update the server (send heartbeat message).
0268 Interpret and handle any server instructions arriving with the updateJob back channel.
0269
0270 :param job: job object.
0271 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0272 :param state: job state (string).
0273 :param xml: optional metadata xml (string).
0274 :param metadata: job report metadata read as a string.
0275 :param test_tobekilled: emulate a tobekilled command (boolean).
0276 :return: boolean (True if successful, False otherwise).
0277 """
0278
0279 state = get_proper_state(job, state)
0280
0281
0282 if not args.update_server:
0283 logger.info('pilot will not update the server (heartbeat message will be written to file)')
0284 tag = 'sending' if args.update_server else 'writing'
0285
0286 if state == 'finished' or state == 'failed' or state == 'holding':
0287 final = True
0288 os.environ['SERVER_UPDATE'] = SERVER_UPDATE_UPDATING
0289 logger.info('job %s has %s - %s final server update', job.jobid, state, tag)
0290
0291
0292 if job.piloterrorcode or job.piloterrorcodes:
0293 logger.warning('making sure that job.state is set to failed since a pilot error code is set')
0294 state = 'failed'
0295 job.state = state
0296
0297 elif state != 'finished':
0298 verify_error_code(job)
0299 else:
0300 final = False
0301 logger.info('job %s has state \'%s\' - %s heartbeat', job.jobid, state, tag)
0302
0303
0304 data = get_data_structure(job, state, args, xml=xml, metadata=metadata)
0305
0306
0307 if not args.update_server:
0308 logger.debug('is_harvester_mode(args) : {0}'.format(is_harvester_mode(args)))
0309
0310 if is_harvester_mode(args):
0311 return publish_harvester_reports(state, args, data, job, final)
0312 else:
0313
0314 return write_heartbeat_to_file(data)
0315
0316 try:
0317 if config.Pilot.pandajob == 'real':
0318 time_before = int(time.time())
0319 max_attempts = 10
0320 attempt = 0
0321 done = False
0322 while attempt < max_attempts and not done:
0323 logger.info('job update attempt %d/%d', attempt + 1, max_attempts)
0324
0325
0326 pandaserver = get_panda_server(args.url, args.port)
0327
0328 res = https.request('{pandaserver}/server/panda/updateJob'.format(pandaserver=pandaserver), data=data)
0329 if res is not None:
0330 done = True
0331 attempt += 1
0332
0333 time_after = int(time.time())
0334 logger.info('server updateJob request completed in %ds for job %s', time_after - time_before, job.jobid)
0335 logger.info("server responded with: res = %s", str(res))
0336
0337 show_memory_usage()
0338
0339 if res is not None:
0340
0341 handle_backchannel_command(res, job, args, test_tobekilled=test_tobekilled)
0342
0343 if final:
0344 os.environ['SERVER_UPDATE'] = SERVER_UPDATE_FINAL
0345 logger.debug('set SERVER_UPDATE=SERVER_UPDATE_FINAL')
0346 return True
0347 else:
0348 logger.info('skipping job update for fake test job')
0349 return True
0350
0351 except Exception as error:
0352 logger.warning('exception caught while sending https request: %s', error)
0353 logger.warning('possibly offending data: %s', data)
0354
0355 if final:
0356 os.environ['SERVER_UPDATE'] = SERVER_UPDATE_TROUBLE
0357 logger.debug('set SERVER_UPDATE=SERVER_UPDATE_TROUBLE')
0358
0359 return False
0360
0361
0362 def get_job_status_from_server(job_id, url, port):
0363 """
0364 Return the current status of job <jobId> from the dispatcher.
0365 typical dispatcher response: 'status=finished&StatusCode=0'
0366 StatusCode 0: succeeded
0367 10: time-out
0368 20: general error
0369 30: failed
0370 In the case of time-out, the dispatcher will be asked one more time after 10 s.
0371
0372 :param job_id: PanDA job id (int).
0373 :param url: PanDA server URL (string).
0374 :param port: PanDA server port (int).
0375 :return: status (string; e.g. holding), attempt_nr (int), status_code (int)
0376 """
0377
0378 status = 'unknown'
0379 attempt_nr = 0
0380 status_code = 0
0381 if config.Pilot.pandajob == 'fake':
0382 return status, attempt_nr, status_code
0383
0384 data = {}
0385 data['ids'] = job_id
0386
0387
0388 pandaserver = get_panda_server(url, port)
0389
0390
0391 trial = 1
0392 max_trials = 2
0393
0394 while trial <= max_trials:
0395 try:
0396
0397 ret = https.request('{pandaserver}/server/panda/getStatus'.format(pandaserver=pandaserver), data=data)
0398 response = ret[1]
0399 logger.info("response: %s", str(response))
0400 if response:
0401 try:
0402
0403
0404
0405
0406 status = response['status']
0407 attempt_nr = int(response['attemptNr'])
0408 status_code = int(response['StatusCode'])
0409 except Exception as error:
0410 logger.warning(
0411 "exception: dispatcher did not return allowed values: %s, %s", str(ret), error)
0412 status = "unknown"
0413 attempt_nr = -1
0414 status_code = 20
0415 else:
0416 logger.debug('server job status=%s, attempt_nr=%d, status_code=%d', status, attempt_nr, status_code)
0417 else:
0418 logger.warning("dispatcher did not return allowed values: %s", str(ret))
0419 status = "unknown"
0420 attempt_nr = -1
0421 status_code = 20
0422 except Exception as error:
0423 logger.warning("could not interpret job status from dispatcher: %s", error)
0424 status = 'unknown'
0425 attempt_nr = -1
0426 status_code = -1
0427 break
0428 else:
0429 if status_code == 0:
0430 break
0431 elif status_code == 10:
0432 trial += 1
0433 time.sleep(10)
0434 continue
0435 elif status_code == 20:
0436 if ret[0] == 13056 or ret[0] == '13056':
0437 logger.warning("wrong certificate used with curl operation? (encountered error 13056)")
0438 break
0439 else:
0440 break
0441
0442 return status, attempt_nr, status_code
0443
0444
0445 def get_panda_server(url, port):
0446 """
0447 Get the URL for the PanDA server.
0448
0449 :param url: URL string, if set in pilot option (port not included).
0450 :param port: port number, if set in pilot option (int).
0451 :return: full URL (either from pilot options or from config file)
0452 """
0453
0454 if url.startswith('https://'):
0455 url = url.replace('https://', '')
0456
0457 if url != '' and port != 0:
0458 pandaserver = '%s:%s' % (url, port) if ":" not in url else url
0459 else:
0460 pandaserver = config.Pilot.pandaserver
0461
0462 if not pandaserver.startswith('http'):
0463 pandaserver = 'https://' + pandaserver
0464
0465
0466 default = 'pandaserver.cern.ch'
0467 if default in pandaserver:
0468 rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])])
0469 pandaserver = pandaserver.replace(default, rnd)
0470 logger.debug('updated %s to %s', default, pandaserver)
0471
0472 return pandaserver
0473
0474
0475 def get_debug_command(cmd):
0476 """
0477 Identify and filter the given debug command.
0478
0479 Note: only a single command will be allowed from a predefined list: tail, ls, gdb, ps, du.
0480
0481 :param cmd: raw debug command from job definition (string).
0482 :return: debug_mode (Boolean, True if command is deemed ok), debug_command (string).
0483 """
0484
0485 debug_mode = False
0486 debug_command = ""
0487
0488 allowed_commands = ['tail', 'ls', 'ps', 'gdb', 'du']
0489 forbidden_commands = ['rm']
0490
0491
0492 if ',' in cmd and 'debug' in cmd:
0493 cmd = cmd.replace('debug,', '').replace(',debug', '')
0494 try:
0495 tmp = cmd.split(' ')
0496 com = tmp[0]
0497 except Exception as error:
0498 logger.warning('failed to identify debug command: %s', error)
0499 else:
0500 if com not in allowed_commands:
0501 logger.warning('command=%s is not in the list of allowed commands: %s', com, str(allowed_commands))
0502 elif ';' in cmd or ';' in cmd:
0503 logger.warning('debug command cannot contain \';\': \'%s\'', cmd)
0504 elif com in forbidden_commands:
0505 logger.warning('command=%s is not allowed', com)
0506 else:
0507 debug_mode = True
0508 debug_command = cmd
0509 return debug_mode, debug_command
0510
0511
0512 def handle_backchannel_command(res, job, args, test_tobekilled=False):
0513 """
0514 Does the server update contain any backchannel information? if so, update the job object.
0515
0516 :param res: server response (dictionary).
0517 :param job: job object.
0518 :param args: pilot args object.
0519 :param test_tobekilled: emulate a tobekilled command (boolean).
0520 :return:
0521 """
0522
0523 if test_tobekilled:
0524 logger.info('faking a \'tobekilled\' command')
0525 res['command'] = 'tobekilled'
0526
0527 if 'command' in res and res.get('command') != 'NULL':
0528
0529 cmd = res.get('command')
0530
0531 if ' ' in cmd and 'tobekilled' not in cmd:
0532 try:
0533 job.debug, job.debug_command = get_debug_command(cmd)
0534 except Exception as error:
0535 logger.debug('exception caught in get_debug_command(): %s', error)
0536 elif 'tobekilled' in cmd:
0537 logger.info('pilot received a panda server signal to kill job %s at %s', job.jobid, time_stamp())
0538 set_pilot_state(job=job, state="failed")
0539 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PANDAKILL)
0540 if job.pid:
0541 logger.debug('killing payload process')
0542 kill_process(job.pid)
0543 else:
0544 logger.debug('no pid to kill')
0545 args.abort_job.set()
0546 elif 'softkill' in cmd:
0547 logger.info('pilot received a panda server signal to softkill job %s at %s', job.jobid, time_stamp())
0548
0549 job.debug_command = 'softkill'
0550 elif 'debug' in cmd:
0551 logger.info('pilot received a command to turn on standard debug mode from the server')
0552 job.debug = True
0553 job.debug_command = 'debug'
0554 elif 'debugoff' in cmd:
0555 logger.info('pilot received a command to turn off debug mode from the server')
0556 job.debug = False
0557 job.debug_command = 'debugoff'
0558 else:
0559 logger.warning('received unknown server command via backchannel: %s', cmd)
0560
0561
0562
0563
0564
0565
0566
0567
0568
0569
0570
0571
0572 def add_data_structure_ids(data, version_tag):
0573 """
0574 Add pilot, batch and scheduler ids to the data structure for getJob, updateJob.
0575
0576 :param data: data structure (dict).
0577 :return: updated data structure (dict).
0578 """
0579
0580 schedulerid = get_job_scheduler_id()
0581 if schedulerid:
0582 data['schedulerID'] = schedulerid
0583
0584 pilotid = get_pilot_id()
0585 if pilotid:
0586 pilotversion = os.environ.get('PILOT_VERSION')
0587
0588
0589 batchsystem_type, batchsystem_id = get_batchsystem_jobid()
0590
0591 if batchsystem_type:
0592 data['pilotID'] = "%s|%s|%s|%s" % (pilotid, batchsystem_type, version_tag, pilotversion)
0593 data['batchID'] = batchsystem_id
0594 else:
0595 data['pilotID'] = "%s|%s|%s" % (pilotid, version_tag, pilotversion)
0596
0597 return data
0598
0599
0600 def get_data_structure(job, state, args, xml=None, metadata=None):
0601 """
0602 Build the data structure needed for getJob, updateJob.
0603
0604 :param job: job object.
0605 :param state: state of the job (string).
0606 :param args:
0607 :param xml: optional XML string.
0608 :param metadata: job report metadata read as a string.
0609 :return: data structure (dictionary).
0610 """
0611
0612 data = {'jobId': job.jobid,
0613 'state': state,
0614 'timestamp': time_stamp(),
0615 'siteName': os.environ.get('PILOT_SITENAME'),
0616 'node': get_node_name(),
0617 'attemptNr': job.attemptnr}
0618
0619
0620 data = add_data_structure_ids(data, args.version_tag)
0621
0622 starttime = get_postgetjob_time(job.jobid, args)
0623 if starttime:
0624 data['startTime'] = starttime
0625
0626 job_metrics = get_job_metrics(job)
0627 if job_metrics:
0628 data['jobMetrics'] = job_metrics
0629
0630 if xml is not None:
0631 data['xml'] = xml
0632 if metadata is not None:
0633 data['metaData'] = metadata
0634
0635
0636 if job.debug:
0637 data['stdout'] = process_debug_mode(job)
0638
0639
0640 if job.corecount and job.corecount != 'null' and job.corecount != 'NULL':
0641 data['coreCount'] = job.corecount
0642
0643 if job.corecounts:
0644 _mean = mean(job.corecounts)
0645 logger.info('mean actualcorecount: %f', _mean)
0646 data['meanCoreCount'] = _mean
0647
0648
0649 if job.nevents != 0:
0650 data['nEvents'] = job.nevents
0651 logger.info("total number of processed events: %d (read)", job.nevents)
0652 else:
0653 logger.info("payload/TRF did not report the number of read events")
0654
0655
0656 constime = get_cpu_consumption_time(job.cpuconsumptiontime)
0657 if constime and constime != -1:
0658 data['cpuConsumptionTime'] = constime
0659 data['cpuConversionFactor'] = job.cpuconversionfactor
0660 data['cpuConsumptionUnit'] = job.cpuconsumptionunit + "+" + get_cpu_model()
0661
0662 instruction_sets = has_instruction_sets(['AVX2'])
0663 product, vendor = get_display_info()
0664 if instruction_sets:
0665 if 'cpuConsumptionUnit' in data:
0666 data['cpuConsumptionUnit'] += '+' + instruction_sets
0667 else:
0668 data['cpuConsumptionUnit'] = instruction_sets
0669 if product and vendor:
0670 logger.debug('cpuConsumptionUnit: could have added: product=%s, vendor=%s', product, vendor)
0671
0672
0673 add_memory_info(data, job.workdir, name=job.memorymonitor)
0674 if state == 'finished' or state == 'failed':
0675 add_timing_and_extracts(data, job, state, args)
0676 add_error_codes(data, job)
0677
0678 return data
0679
0680
0681 def process_debug_mode(job):
0682 """
0683 Handle debug mode - preprocess debug command, get the output and kill the payload in case of gdb.
0684
0685 :param job: job object.
0686 :return: stdout from debug command (string).
0687 """
0688
0689
0690 if job.debug_command.startswith('gdb '):
0691 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0692 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0693 user.preprocess_debug_command(job)
0694
0695 stdout = get_debug_stdout(job)
0696 if stdout:
0697
0698 if job.debug_command.startswith('gdb ') and job.pid:
0699 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PANDAKILL,
0700 msg='payload was killed after gdb produced requested core file')
0701 logger.debug('will proceed to kill payload processes')
0702 kill_processes(job.pid)
0703
0704 return stdout
0705
0706
0707 def get_debug_stdout(job):
0708 """
0709 Return the requested output from a given debug command.
0710
0711 :param job: job object.
0712 :return: output (string).
0713 """
0714
0715 if job.debug_command == 'debug':
0716 return get_payload_log_tail(job.workdir)
0717 elif 'tail ' in job.debug_command:
0718 return get_requested_log_tail(job.debug_command, job.workdir)
0719 elif 'ls ' in job.debug_command:
0720 return get_ls(job.debug_command, job.workdir)
0721 elif 'ps ' in job.debug_command or 'gdb ' in job.debug_command:
0722 return get_general_command_stdout(job)
0723 else:
0724
0725 _, stdout, _ = execute(job.debug_command)
0726 logger.info('debug_command: %s:\n\n%s\n', job.debug_command, stdout)
0727 return stdout
0728
0729
0730 def get_general_command_stdout(job):
0731 """
0732 Return the output from the requested debug command.
0733
0734 :param job: job object.
0735 :return: output (string).
0736 """
0737
0738 stdout = ''
0739
0740
0741 if 'gdb ' in job.debug_command and '--pid %' in job.debug_command:
0742 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0743 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0744 job.debug_command = user.process_debug_command(job.debug_command, job.jobid)
0745
0746 if job.debug_command:
0747 _containerisation = False
0748 if _containerisation:
0749 try:
0750 containerise_general_command(job, job.infosys.queuedata.container_options,
0751 label='general',
0752 container_type='container')
0753 except PilotException as error:
0754 logger.warning('general containerisation threw a pilot exception: %s', error)
0755 except Exception as error:
0756 logger.warning('general containerisation threw an exception: %s', error)
0757 else:
0758 _, stdout, stderr = execute(job.debug_command)
0759 logger.debug("%s (stdout):\n\n%s\n\n", job.debug_command, stdout)
0760 logger.debug("%s (stderr):\n\n%s\n\n", job.debug_command, stderr)
0761
0762
0763 path = locate_core_file(cmd=job.debug_command) if 'gdb ' in job.debug_command else ''
0764 if path:
0765
0766 try:
0767 copy(path, job.workdir)
0768 except Exception:
0769 pass
0770
0771 return stdout
0772
0773
0774 def get_ls(debug_command, workdir):
0775 """
0776 Return the requested ls debug command.
0777
0778 :param debug_command: full debug command (string).
0779 :param workdir: job work directory (string).
0780 :return: output (string).
0781 """
0782
0783 items = debug_command.split(' ')
0784
0785 options = ' '.join(items[1:])
0786 path = options.split(' ')[-1] if ' ' in options else options
0787 if path.startswith('-'):
0788 path = '.'
0789 finalpath = os.path.join(workdir, path)
0790 debug_command = debug_command.replace(path, finalpath)
0791
0792 _, stdout, _ = execute(debug_command)
0793 logger.debug("%s:\n\n%s\n\n", debug_command, stdout)
0794
0795 return stdout
0796
0797
0798 def get_requested_log_tail(debug_command, workdir):
0799 """
0800 Return the tail of the requested debug log.
0801
0802 Examples
0803 tail workdir/tmp.stdout* <- pilot finds the requested log file in the specified relative path
0804 tail log.RAWtoALL <- pilot finds the requested log file
0805
0806 :param debug_command: full debug command (string).
0807 :param workdir: job work directory (string).
0808 :return: output (string).
0809 """
0810
0811 _tail = ""
0812 items = debug_command.split(' ')
0813 cmd = items[0]
0814 options = ' '.join(items[1:])
0815 logger.debug('debug command: %s', cmd)
0816 logger.debug('debug options: %s', options)
0817
0818
0819 path = options.split(' ')[-1] if ' ' in options else options
0820 fullpath = os.path.join(workdir, path)
0821
0822
0823 files = glob(fullpath)
0824 if files:
0825 logger.info('files found: %s', str(files))
0826 _tail = get_latest_log_tail(files)
0827 else:
0828 logger.warning('did not find \'%s\' in path %s', path, fullpath)
0829
0830 if _tail:
0831 logger.debug('tail =\n\n%s\n\n', _tail)
0832
0833 return _tail
0834
0835
0836 def add_error_codes(data, job):
0837 """
0838 Add error codes to data structure.
0839
0840 :param data: data dictionary.
0841 :param job: job object.
0842 :return:
0843 """
0844
0845
0846 pilot_error_code = job.piloterrorcode
0847 pilot_error_codes = job.piloterrorcodes
0848 if pilot_error_codes != []:
0849 logger.warning('pilotErrorCodes = %s (will report primary/first error code)', str(pilot_error_codes))
0850 data['pilotErrorCode'] = pilot_error_codes[0]
0851 else:
0852 data['pilotErrorCode'] = pilot_error_code
0853
0854
0855 pilot_error_diag = job.piloterrordiag
0856 pilot_error_diags = job.piloterrordiags
0857 if pilot_error_diags != []:
0858 logger.warning('pilotErrorDiags = %s (will report primary/first error diag)', str(pilot_error_diags))
0859 data['pilotErrorDiag'] = pilot_error_diags[0]
0860 else:
0861 data['pilotErrorDiag'] = pilot_error_diag
0862 data['transExitCode'] = job.transexitcode
0863 data['exeErrorCode'] = job.exeerrorcode
0864 data['exeErrorDiag'] = job.exeerrordiag
0865
0866
0867 def get_cpu_consumption_time(cpuconsumptiontime):
0868 """
0869 Get the CPU consumption time.
0870 The function makes sure that the value exists and is within allowed limits (< 10^9).
0871
0872 :param cpuconsumptiontime: CPU consumption time (int/None).
0873 :return: properly set CPU consumption time (int/None).
0874 """
0875
0876 constime = None
0877
0878 try:
0879 constime = int(cpuconsumptiontime)
0880 except Exception:
0881 constime = None
0882 if constime and constime > 10 ** 9:
0883 logger.warning("unrealistic cpuconsumptiontime: %d (reset to -1)", constime)
0884 constime = -1
0885
0886 return constime
0887
0888
0889 def add_timing_and_extracts(data, job, state, args):
0890 """
0891 Add timing info and log extracts to data structure for a completed job (finished or failed) to be sent to server.
0892 Note: this function updates the data dictionary.
0893
0894 :param data: data structure (dictionary).
0895 :param job: job object.
0896 :param state: state of the job (string).
0897 :param args: pilot args.
0898 :return:
0899 """
0900
0901 time_getjob, time_stagein, time_payload, time_stageout, time_total_setup = timing_report(job.jobid, args)
0902 data['pilotTiming'] = "%s|%s|%s|%s|%s" % \
0903 (time_getjob, time_stagein, time_payload, time_stageout, time_total_setup)
0904
0905
0906 extracts = ""
0907 if state == 'failed' or state == 'holding':
0908 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0909 user = __import__('pilot.user.%s.diagnose' % pilot_user, globals(), locals(), [pilot_user], 0)
0910 extracts = user.get_log_extracts(job, state)
0911 if extracts != "":
0912 logger.warning('\nXXXXXXXXXXXXXXXXXXXXX[begin log extracts]\n%s\nXXXXXXXXXXXXXXXXXXXXX[end log extracts]', extracts)
0913 data['pilotLog'] = extracts[:1024]
0914 data['endTime'] = time.time()
0915
0916
0917 def add_memory_info(data, workdir, name=""):
0918 """
0919 Add memory information (if available) to the data structure that will be sent to the server with job updates
0920 Note: this function updates the data dictionary.
0921
0922 :param data: data structure (dictionary).
0923 :param workdir: working directory of the job (string).
0924 :param name: name of memory monitor (string).
0925 :return:
0926 """
0927
0928 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0929 utilities = __import__('pilot.user.%s.utilities' % pilot_user, globals(), locals(), [pilot_user], 0)
0930 try:
0931 utility_node = utilities.get_memory_monitor_info(workdir, name=name)
0932 data.update(utility_node)
0933 except Exception as error:
0934 logger.info('memory information not available: %s', error)
0935
0936
0937 def remove_pilot_logs_from_list(list_of_files):
0938 """
0939 Remove any pilot logs from the list of last updated files.
0940
0941 :param list_of_files: list of last updated files (list).
0942 :return: list of files (list).
0943 """
0944
0945
0946
0947
0948 try:
0949 to_be_removed = [config.Pilot.pilotlog, config.Pilot.stageinlog, config.Pilot.stageoutlog,
0950 config.Pilot.timing_file, config.Pilot.remotefileverification_dictionary,
0951 config.Pilot.remotefileverification_log, config.Pilot.base_trace_report,
0952 config.Container.container_script, config.Container.release_setup,
0953 config.Container.stagein_status_dictionary, config.Container.stagein_replica_dictionary,
0954 'eventLoopHeartBeat.txt', 'memory_monitor_output.txt', 'memory_monitor_summary.json_snapshot']
0955 except Exception as error:
0956 logger.warning('exception caught: %s', error)
0957 to_be_removed = []
0958
0959 new_list_of_files = []
0960 for filename in list_of_files:
0961 if os.path.basename(filename) not in to_be_removed and '/pilot/' not in filename and 'prmon' not in filename:
0962 new_list_of_files.append(filename)
0963
0964 return new_list_of_files
0965
0966
0967 def get_payload_log_tail(workdir):
0968 """
0969 Return the tail of the payload stdout or its latest updated log file.
0970
0971 :param workdir: job work directory (string).
0972 :return: tail of stdout (string).
0973 """
0974
0975
0976
0977
0978 list_of_files = find_text_files()
0979 list_of_files = remove_pilot_logs_from_list(list_of_files)
0980
0981 if not list_of_files:
0982 logger.info('no log files were found (will use default %s)', config.Payload.payloadstdout)
0983 list_of_files = [os.path.join(workdir, config.Payload.payloadstdout)]
0984
0985 return get_latest_log_tail(list_of_files)
0986
0987
0988 def get_latest_log_tail(files):
0989 """
0990 Get the tail of the latest updated file from the given file list.
0991
0992 :param files: files (list).
0993 """
0994
0995 stdout_tail = ""
0996
0997 try:
0998 latest_file = max(files, key=os.path.getmtime)
0999 logger.info('tail of file %s will be added to heartbeat', latest_file)
1000
1001
1002 stdout_tail = latest_file + "\n" + tail(latest_file)
1003 stdout_tail = stdout_tail[-2048:]
1004 except OSError as exc:
1005 logger.warning('failed to get payload stdout tail: %s', exc)
1006
1007 return stdout_tail
1008
1009
1010 def validate(queues, traces, args):
1011 """
1012 Perform validation of job.
1013
1014 :param queues: queues object.
1015 :param traces: traces object.
1016 :param args: args object.
1017 :return:
1018 """
1019
1020 while not args.graceful_stop.is_set():
1021 time.sleep(0.5)
1022 try:
1023 job = queues.jobs.get(block=True, timeout=1)
1024 except queue.Empty:
1025 continue
1026
1027 traces.pilot['nr_jobs'] += 1
1028
1029
1030 os.environ['PanDA_TaskID'] = str(job.taskid)
1031 logger.info('processing PanDA job %s from task %s', job.jobid, job.taskid)
1032
1033 if _validate_job(job):
1034
1035
1036 os.setpgrp()
1037
1038 job_dir = os.path.join(args.mainworkdir, 'PanDA_Pilot-%s' % job.jobid)
1039 logger.debug('creating job working directory: %s', job_dir)
1040 try:
1041 os.mkdir(job_dir)
1042 os.chmod(job_dir, 0o770)
1043 job.workdir = job_dir
1044 except Exception as error:
1045 logger.debug('cannot create working directory: %s', error)
1046 traces.pilot['error_code'] = errors.MKDIR
1047 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(traces.pilot['error_code'])
1048 job.piloterrordiag = error
1049 put_in_queue(job, queues.failed_jobs)
1050 break
1051 else:
1052 create_k8_link(job_dir)
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068 create_symlink(from_path='../%s' % config.Pilot.pilotlog, to_path=os.path.join(job_dir, config.Pilot.pilotlog))
1069
1070
1071 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
1072 utilities = __import__('pilot.user.%s.utilities' % pilot_user, globals(), locals(), [pilot_user], 0)
1073 try:
1074 utilities.precleanup()
1075 except Exception as error:
1076 logger.warning('exception caught: %s', error)
1077
1078
1079 store_jobid(job.jobid, args.sourcedir)
1080
1081
1082 delayed_space_check(queues, traces, args, job)
1083
1084
1085 verify_ctypes(queues, job)
1086 else:
1087 logger.debug('Failed to validate job=%s', job.jobid)
1088 put_in_queue(job, queues.failed_jobs)
1089
1090
1091 if threads_aborted():
1092 logger.debug('will proceed to set job_aborted')
1093 args.job_aborted.set()
1094 else:
1095 logger.debug('will not set job_aborted yet')
1096
1097 logger.debug('[job] validate thread has finished')
1098
1099
1100 def verify_ctypes(queues, job):
1101 """
1102 Verify ctypes and make sure all subprocess are parented.
1103
1104 :param queues: queues object.
1105 :param job: job object.
1106 :return:
1107 """
1108
1109 try:
1110 import ctypes
1111
1112 except Exception as error:
1113 diagnostics = 'ctypes python module could not be imported: %s' % error
1114 logger.warning(diagnostics)
1115
1116
1117
1118 else:
1119 logger.debug('ctypes python module imported')
1120
1121
1122
1123
1124 libc = ctypes.CDLL('libc.so.6')
1125 pr_set_child_subreaper = 36
1126 libc.prctl(pr_set_child_subreaper, 1)
1127 logger.debug('all child subprocesses will be parented')
1128
1129
1130 def delayed_space_check(queues, traces, args, job):
1131 """
1132 Run the delayed space check if necessary.
1133
1134 :param queues: queues object.
1135 :param traces: traces object.
1136 :param args: args object.
1137 :param job: job object.
1138 :return:
1139 """
1140
1141 proceed_with_local_space_check = True if (args.harvester_submitmode.lower() == 'push' and args.update_server) else False
1142 if proceed_with_local_space_check:
1143 logger.debug('pilot will now perform delayed space check')
1144 exit_code, diagnostics = check_local_space()
1145 if exit_code != 0:
1146 traces.pilot['error_code'] = errors.NOLOCALSPACE
1147
1148 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.NOLOCALSPACE, msg=diagnostics)
1149 logger.debug('Failed to validate job=%s', job.jobid)
1150 put_in_queue(job, queues.failed_jobs)
1151 else:
1152 put_in_queue(job, queues.validated_jobs)
1153 else:
1154 put_in_queue(job, queues.validated_jobs)
1155
1156
1157 def create_k8_link(job_dir):
1158 """
1159 Create a soft link to the payload workdir on Kubernetes if SHARED_DIR exists.
1160
1161 :param job_dir: payload workdir (string).
1162 """
1163
1164 shared_dir = os.environ.get('SHARED_DIR', None)
1165 if shared_dir:
1166
1167 create_symlink(from_path=job_dir, to_path=os.path.join(shared_dir, 'payload_workdir'))
1168 else:
1169 logger.debug('will not create symlink in SHARED_DIR')
1170
1171
1172 def store_jobid(jobid, init_dir):
1173 """
1174 Store the PanDA job id in a file that can be picked up by the wrapper for other reporting.
1175
1176 :param jobid: job id (int).
1177 :param init_dir: pilot init dir (string).
1178 :return:
1179 """
1180
1181 try:
1182 path = os.path.join(os.path.join(init_dir, 'pilot2'), config.Pilot.jobid_file)
1183 path = path.replace('pilot2/pilot2', 'pilot2')
1184 mode = 'a' if os.path.exists(path) else 'w'
1185 logger.debug('path=%s mode=%s', path, mode)
1186 write_file(path, "%s\n" % str(jobid), mode=mode, mute=False)
1187 except Exception as error:
1188 logger.warning('exception caught while trying to store job id: %s', error)
1189
1190
1191 def create_data_payload(queues, traces, args):
1192 """
1193 Get a Job object from the "validated_jobs" queue.
1194
1195 If the job has defined input files, move the Job object to the "data_in" queue and put the internal pilot state to
1196 "stagein". In case there are no input files, place the Job object in the "finished_data_in" queue. For either case,
1197 the thread also places the Job object in the "payloads" queue (another thread will retrieve it and wait for any
1198 stage-in to finish).
1199
1200 :param queues: internal queues for job handling.
1201 :param traces: tuple containing internal pilot states.
1202 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1203 :return:
1204 """
1205
1206 while not args.graceful_stop.is_set():
1207 time.sleep(0.5)
1208 try:
1209 job = queues.validated_jobs.get(block=True, timeout=1)
1210 except queue.Empty:
1211 continue
1212
1213 if job.indata:
1214
1215 set_pilot_state(job=job, state='stagein')
1216 put_in_queue(job, queues.data_in)
1217
1218 else:
1219
1220
1221 put_in_queue(job, queues.finished_data_in)
1222
1223 put_in_queue(job, queues.payloads)
1224
1225
1226 if threads_aborted():
1227 logger.debug('will proceed to set job_aborted')
1228 args.job_aborted.set()
1229 else:
1230 logger.debug('will not set job_aborted yet')
1231
1232 logger.debug('[job] create_data_payload thread has finished')
1233
1234
1235 def get_task_id():
1236 """
1237 Return the task id for the current job.
1238 Note: currently the implementation uses an environmental variable to store this number (PanDA_TaskID).
1239
1240 :return: task id (string). Returns empty string in case of error.
1241 """
1242
1243 if "PanDA_TaskID" in os.environ:
1244 taskid = os.environ["PanDA_TaskID"]
1245 else:
1246 logger.warning('PanDA_TaskID not set in environment')
1247 taskid = ""
1248
1249 return taskid
1250
1251
1252 def get_job_label(args):
1253 """
1254 Return a proper job label.
1255 The function returns a job label that corresponds to the actual pilot version, ie if the pilot is a development
1256 version (ptest or rc_test2) or production version (managed or user).
1257 Example: -i RC -> job_label = rc_test2.
1258 NOTE: it should be enough to only use the job label, -j rc_test2 (and not specify -i RC at all).
1259
1260 :param args: pilot args object.
1261 :return: job_label (string).
1262 """
1263
1264
1265 status = infosys.queuedata.status
1266
1267 if args.version_tag == 'RC' and args.job_label == 'rc_test2':
1268 job_label = 'rc_test2'
1269 elif args.version_tag == 'RC' and args.job_label == 'ptest':
1270 job_label = args.job_label
1271 elif args.version_tag == 'RCM' and args.job_label == 'ptest':
1272 job_label = 'rcm_test2'
1273 elif args.version_tag == 'ALRB':
1274 job_label = 'rc_alrb'
1275 elif status == 'test' and args.job_label != 'ptest':
1276 logger.warning('PQ status set to test - will use job label / prodSourceLabel test')
1277 job_label = 'test'
1278 else:
1279 job_label = args.job_label
1280
1281 return job_label
1282
1283
1284 def get_dispatcher_dictionary(args):
1285 """
1286 Return a dictionary with required fields for the dispatcher getJob operation.
1287
1288 The dictionary should contain the following fields: siteName, computingElement (queue name),
1289 prodSourceLabel (e.g. user, test, ptest), diskSpace (available disk space for a job in MB),
1290 workingGroup, countryGroup, cpu (float), mem (float) and node (worker node name).
1291
1292 workingGroup, countryGroup and allowOtherCountry
1293 we add a new pilot setting allowOtherCountry=True to be used in conjunction with countryGroup=us for
1294 US pilots. With these settings, the Panda server will produce the desired behaviour of dedicated X% of
1295 the resource exclusively (so long as jobs are available) to countryGroup=us jobs. When allowOtherCountry=false
1296 this maintains the behavior relied on by current users of the countryGroup mechanism -- to NOT allow
1297 the resource to be used outside the privileged group under any circumstances.
1298
1299 :param args: arguments (e.g. containing queue name, queuedata dictionary, etc).
1300 :returns: dictionary prepared for the dispatcher getJob operation.
1301 """
1302
1303 _diskspace = get_disk_space(infosys.queuedata)
1304
1305 _mem, _cpu, _disk = collect_workernode_info()
1306
1307 _nodename = get_node_name()
1308
1309
1310 job_label = get_job_label(args)
1311
1312 data = {
1313 'siteName': infosys.queuedata.resource,
1314 'computingElement': args.queue,
1315 'prodSourceLabel': job_label,
1316 'diskSpace': _diskspace,
1317 'workingGroup': args.working_group,
1318 'cpu': _cpu,
1319 'mem': _mem,
1320 'node': _nodename
1321 }
1322
1323 if args.jobtype != "":
1324 data['jobType'] = args.jobtype
1325
1326 if args.allow_other_country != "":
1327 data['allowOtherCountry'] = args.allow_other_country
1328
1329 if args.country_group != "":
1330 data['countryGroup'] = args.country_group
1331
1332 if args.job_label == 'self':
1333 dn = get_distinguished_name()
1334 data['prodUserID'] = dn
1335
1336 taskid = get_task_id()
1337 if taskid != "" and args.allow_same_user:
1338 data['taskID'] = taskid
1339 logger.info("will download a new job belonging to task id: %s", data['taskID'])
1340
1341 if args.resource_type != "":
1342 data['resourceType'] = args.resource_type
1343
1344
1345 if 'HARVESTER_ID' in os.environ:
1346 data['harvester_id'] = os.environ.get('HARVESTER_ID')
1347 if 'HARVESTER_WORKER_ID' in os.environ:
1348 data['worker_id'] = os.environ.get('HARVESTER_WORKER_ID')
1349
1350
1351
1352
1353
1354 return data
1355
1356
1357 def proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, max_getjob_requests, update_server, submitmode, harvester, verify_proxy, traces):
1358 """
1359 Can we proceed with getJob?
1360 We may not proceed if we have run out of time (timefloor limit), if the proxy is too short, if disk space is too
1361 small or if we have already proceed enough jobs.
1362
1363 :param timefloor: timefloor limit (s) (int).
1364 :param starttime: start time of retrieve() (s) (int).
1365 :param jobnumber: number of downloaded jobs (int).
1366 :param getjob_requests: number of getjob requests (int).
1367 :param update_server: should pilot update server? (Boolean).
1368 :param submitmode: Harvester submit mode, PULL or PUSH (string).
1369 :param harvester: True if Harvester is used, False otherwise. Affects the max number of getjob reads (from file) (Boolean).
1370 :param verify_proxy: True if the proxy should be verified. False otherwise (Boolean).
1371 :param traces: traces object (to be able to propagate a proxy error all the way back to the wrapper).
1372 :return: True if pilot should proceed with getJob (Boolean).
1373 """
1374
1375
1376
1377
1378
1379 currenttime = time.time()
1380
1381 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
1382 common = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
1383 if not common.allow_timefloor(submitmode):
1384 timefloor = 0
1385
1386
1387 if verify_proxy:
1388 userproxy = __import__('pilot.user.%s.proxy' % pilot_user, globals(), locals(), [pilot_user], 0)
1389
1390
1391 exit_code, diagnostics = userproxy.verify_proxy()
1392 if traces.pilot['error_code'] == 0:
1393 traces.pilot['error_code'] = exit_code
1394 if exit_code == errors.NOPROXY or exit_code == errors.NOVOMSPROXY:
1395 logger.warning(diagnostics)
1396 return False
1397
1398
1399
1400
1401
1402 proceed_with_local_space_check = False if (submitmode.lower() == 'push' and update_server) else True
1403 if proceed_with_local_space_check:
1404 exit_code, diagnostics = check_local_space()
1405 if exit_code != 0:
1406 traces.pilot['error_code'] = errors.NOLOCALSPACE
1407 return False
1408 else:
1409 logger.debug('pilot will delay local space check until after job definition has been read from file')
1410
1411 maximum_getjob_requests = 60 if harvester else max_getjob_requests
1412 if getjob_requests > int(maximum_getjob_requests):
1413 logger.warning('reached maximum number of getjob requests (%s) -- will abort pilot', maximum_getjob_requests)
1414
1415
1416 os.environ['PILOT_WRAP_UP'] = 'QUICKLY'
1417 return False
1418
1419 if timefloor == 0 and jobnumber > 0:
1420 logger.warning("since timefloor is set to 0, pilot was only allowed to run one job")
1421
1422
1423 os.environ['PILOT_WRAP_UP'] = 'QUICKLY'
1424 return False
1425
1426 if (currenttime - starttime > timefloor) and jobnumber > 0:
1427 logger.warning("the pilot has run out of time (timefloor=%d has been passed)", timefloor)
1428
1429
1430 os.environ['PILOT_WRAP_UP'] = 'QUICKLY'
1431 return False
1432
1433
1434 if jobnumber > 0:
1435 logger.info('since timefloor=%d s and only %d s has passed since launch, pilot can run another job', timefloor, currenttime - starttime)
1436
1437 if harvester and jobnumber > 0:
1438
1439
1440 logger.info('asking Harvester for another job')
1441 request_new_jobs()
1442
1443 if os.environ.get('SERVER_UPDATE', '') == SERVER_UPDATE_UPDATING:
1444 logger.info('still updating previous job, will not ask for a new job yet')
1445 return False
1446
1447 os.environ['SERVER_UPDATE'] = SERVER_UPDATE_NOT_DONE
1448 return True
1449
1450
1451 def getjob_server_command(url, port):
1452 """
1453 Prepare the getJob server command.
1454
1455 :param url: PanDA server URL (string)
1456 :param port: PanDA server port
1457 :return: full server command (URL string)
1458 """
1459
1460 if url != "":
1461 port_pattern = '.:([0-9]+)'
1462 if not findall(port_pattern, url):
1463 url = url + ':%s' % port
1464 else:
1465 logger.debug('URL already contains port: %s', url)
1466 else:
1467 url = config.Pilot.pandaserver
1468 if url == "":
1469 logger.fatal('PanDA server url not set (either as pilot option or in config file)')
1470 elif not url.startswith("http"):
1471 url = 'https://' + url
1472 logger.warning('detected missing protocol in server url (added)')
1473
1474 return '{pandaserver}/server/panda/getJob'.format(pandaserver=url)
1475
1476
1477 def get_job_definition_from_file(path, harvester):
1478 """
1479 Get a job definition from a pre-placed file.
1480 In Harvester mode, also remove any existing job request files since it is no longer needed/wanted.
1481
1482 :param path: path to job definition file.
1483 :param harvester: True if Harvester is being used (determined from args.harvester), otherwise False
1484 :return: job definition dictionary.
1485 """
1486
1487
1488 if harvester:
1489 remove_job_request_file()
1490 if is_json(path):
1491 job_definition_list = parse_job_definition_file(path)
1492 if not job_definition_list:
1493 logger.warning('no jobs were found in Harvester job definitions file: %s', path)
1494 return {}
1495 else:
1496
1497 new_path = os.path.join(os.environ.get('PILOT_HOME'), 'job_definition.json')
1498 copy(path, new_path)
1499 remove(path)
1500
1501
1502 return job_definition_list[0]
1503
1504
1505 res = {}
1506 with open(path, 'r') as jobdatafile:
1507 response = jobdatafile.read()
1508 if len(response) == 0:
1509 logger.fatal('encountered empty job definition file: %s', path)
1510 res = None
1511 else:
1512
1513
1514 try:
1515 from urlparse import parse_qsl
1516
1517 except Exception:
1518 from urllib.parse import parse_qsl
1519 datalist = parse_qsl(response, keep_blank_values=True)
1520
1521
1522 for data in datalist:
1523 res[data[0]] = data[1]
1524
1525 if os.path.exists(path):
1526 remove(path)
1527
1528 return res
1529
1530
1531 def get_job_definition_from_server(args):
1532 """
1533 Get a job definition from a server.
1534
1535 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1536 :return: job definition dictionary.
1537 """
1538
1539 res = {}
1540
1541
1542 data = get_dispatcher_dictionary(args)
1543
1544 cmd = getjob_server_command(args.url, args.port)
1545 if cmd != "":
1546 logger.info('executing server command: %s', cmd)
1547 res = https.request(cmd, data=data)
1548
1549 return res
1550
1551
1552 def locate_job_definition(args):
1553 """
1554 Locate the job definition file among standard locations.
1555
1556 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1557 :return: path (string).
1558 """
1559
1560 if args.harvester_datadir:
1561 paths = [os.path.join(args.harvester_datadir, config.Pilot.pandajobdata)]
1562 else:
1563 paths = [os.path.join("%s/.." % args.sourcedir, config.Pilot.pandajobdata),
1564 os.path.join(args.sourcedir, config.Pilot.pandajobdata),
1565 os.path.join(os.environ['PILOT_WORK_DIR'], config.Pilot.pandajobdata)]
1566
1567 if args.harvester_workdir:
1568 paths.append(os.path.join(args.harvester_workdir, config.Harvester.pandajob_file))
1569 if 'HARVESTER_WORKDIR' in os.environ:
1570 paths.append(os.path.join(os.environ['HARVESTER_WORKDIR'], config.Harvester.pandajob_file))
1571
1572 path = ""
1573 for _path in paths:
1574 if os.path.exists(_path):
1575 path = _path
1576 break
1577
1578 if path == "":
1579 logger.info('did not find any local job definition file')
1580
1581 return path
1582
1583
1584 def get_job_definition(args):
1585 """
1586 Get a job definition from a source (server or pre-placed local file).
1587
1588 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1589 :return: job definition dictionary.
1590 """
1591
1592 res = {}
1593 path = locate_job_definition(args)
1594
1595
1596 if config.Pilot.pandajob == 'fake':
1597 logger.info('will use a fake PanDA job')
1598 res = get_fake_job()
1599 elif os.path.exists(path):
1600 logger.info('will read job definition from file %s', path)
1601 res = get_job_definition_from_file(path, args.harvester)
1602 else:
1603 if args.harvester and args.harvester_submitmode.lower() == 'push':
1604 pass
1605 else:
1606 logger.info('will download job definition from server')
1607 res = get_job_definition_from_server(args)
1608
1609 return res
1610
1611
1612 def now():
1613 """
1614 Return the current epoch as a UTF-8 encoded string.
1615 :return: current time as encoded string
1616 """
1617 return str(time.time()).encode('utf-8')
1618
1619
1620 def get_fake_job(input=True):
1621 """
1622 Return a job definition for internal pilot testing.
1623 Note: this function is only used for testing purposes. The job definitions below are ATLAS specific.
1624
1625 :param input: Boolean, set to False if no input files are wanted
1626 :return: job definition (dictionary).
1627 """
1628
1629 res = None
1630
1631
1632 hash = hashlib.md5()
1633 hash.update(now())
1634 log_guid = hash.hexdigest()
1635 hash.update(now())
1636 guid = hash.hexdigest()
1637 hash.update(now())
1638 job_name = hash.hexdigest()
1639
1640 if config.Pilot.testjobtype == 'production':
1641 logger.info('creating fake test production job definition')
1642 res = {'jobsetID': 'NULL',
1643 'logGUID': log_guid,
1644 'cmtConfig': 'x86_64-slc6-gcc48-opt',
1645 'prodDBlocks': 'user.mlassnig:user.mlassnig.pilot.test.single.hits',
1646 'dispatchDBlockTokenForOut': 'NULL,NULL',
1647 'destinationDBlockToken': 'NULL,NULL',
1648 'destinationSE': 'AGLT2_TEST',
1649 'realDatasets': job_name,
1650 'prodUserID': 'no_one',
1651 'GUID': guid,
1652 'realDatasetsIn': 'user.mlassnig:user.mlassnig.pilot.test.single.hits',
1653 'nSent': 0,
1654 'cloud': 'US',
1655 'StatusCode': 0,
1656 'homepackage': 'AtlasProduction/20.1.4.14',
1657 'inFiles': 'HITS.06828093._000096.pool.root.1',
1658 'processingType': 'pilot-ptest',
1659 'ddmEndPointOut': 'UTA_SWT2_DATADISK,UTA_SWT2_DATADISK',
1660 'fsize': '94834717',
1661 'fileDestinationSE': 'AGLT2_TEST,AGLT2_TEST',
1662 'scopeOut': 'panda',
1663 'minRamCount': 0,
1664 'jobDefinitionID': 7932,
1665 'maxWalltime': 'NULL',
1666 'scopeLog': 'panda',
1667 'transformation': 'Reco_tf.py',
1668 'maxDiskCount': 0,
1669 'coreCount': 1,
1670 'prodDBlockToken': 'NULL',
1671 'transferType': 'NULL',
1672 'destinationDblock': job_name,
1673 'dispatchDBlockToken': 'NULL',
1674 'jobPars': '--maxEvents=1 --inputHITSFile HITS.06828093._000096.pool.root.1 --outputRDOFile RDO_%s.root' % job_name,
1675 'attemptNr': 0,
1676 'swRelease': 'Atlas-20.1.4',
1677 'nucleus': 'NULL',
1678 'maxCpuCount': 0,
1679 'outFiles': 'RDO_%s.root,%s.job.log.tgz' % (job_name, job_name),
1680 'currentPriority': 1000,
1681 'scopeIn': 'mc15_13TeV',
1682 'PandaID': '0',
1683 'sourceSite': 'NULL',
1684 'dispatchDblock': 'NULL',
1685 'prodSourceLabel': 'ptest',
1686 'checksum': 'ad:5d000974',
1687 'jobName': job_name,
1688 'ddmEndPointIn': 'UTA_SWT2_DATADISK',
1689 'taskID': 'NULL',
1690 'logFile': '%s.job.log.tgz' % job_name}
1691 elif config.Pilot.testjobtype == 'user':
1692 logger.info('creating fake test user job definition')
1693 res = {'jobsetID': 'NULL',
1694 'logGUID': log_guid,
1695 'cmtConfig': 'x86_64-slc6-gcc49-opt',
1696 'prodDBlocks': 'data15_13TeV:data15_13TeV.00276336.physics_Main.merge.AOD.r7562_p2521_tid07709524_00',
1697 'dispatchDBlockTokenForOut': 'NULL,NULL',
1698 'destinationDBlockToken': 'NULL,NULL',
1699 'destinationSE': 'ANALY_SWT2_CPB',
1700 'realDatasets': job_name,
1701 'prodUserID': 'None',
1702 'GUID': guid,
1703 'realDatasetsIn': 'data15_13TeV:data15_13TeV.00276336.physics_Main.merge.AOD.r7562_p2521_tid07709524_00',
1704 'nSent': '0',
1705 'cloud': 'US',
1706 'StatusCode': 0,
1707 'homepackage': 'AnalysisTransforms-AtlasDerivation_20.7.6.4',
1708 'inFiles': 'AOD.07709524._000050.pool.root.1',
1709 'processingType': 'pilot-ptest',
1710 'ddmEndPointOut': 'SWT2_CPB_SCRATCHDISK,SWT2_CPB_SCRATCHDISK',
1711 'fsize': '1564780952',
1712 'fileDestinationSE': 'ANALY_SWT2_CPB,ANALY_SWT2_CPB',
1713 'scopeOut': 'user.gangarbt',
1714 'minRamCount': '0',
1715 'jobDefinitionID': '9445',
1716 'maxWalltime': 'NULL',
1717 'scopeLog': 'user.gangarbt',
1718 'transformation': 'http://pandaserver.cern.ch:25080/trf/user/runAthena-00-00-11',
1719 'maxDiskCount': '0',
1720 'coreCount': '1',
1721 'prodDBlockToken': 'NULL',
1722 'transferType': 'NULL',
1723 'destinationDblock': job_name,
1724 'dispatchDBlockToken': 'NULL',
1725 'jobPars': '-a sources.20115461.derivation.tgz -r ./ -j "Reco_tf.py '
1726 '--inputAODFile AOD.07709524._000050.pool.root.1 --outputDAODFile test.pool.root '
1727 '--reductionConf HIGG3D1" -i "[\'AOD.07709524._000050.pool.root.1\']" -m "[]" -n "[]" --trf'
1728 ' --useLocalIO --accessmode=copy -o '
1729 '"{\'IROOT\': [(\'DAOD_HIGG3D1.test.pool.root\', \'%s.root\')]}" '
1730 '--sourceURL https://aipanda012.cern.ch:25443' % (job_name),
1731 'attemptNr': '0',
1732 'swRelease': 'Atlas-20.7.6',
1733 'nucleus': 'NULL',
1734 'maxCpuCount': '0',
1735 'outFiles': '%s.root,%s.job.log.tgz' % (job_name, job_name),
1736 'currentPriority': '1000',
1737 'scopeIn': 'data15_13TeV',
1738 'PandaID': '0',
1739 'sourceSite': 'NULL',
1740 'dispatchDblock': 'data15_13TeV:data15_13TeV.00276336.physics_Main.merge.AOD.r7562_p2521_tid07709524_00',
1741 'prodSourceLabel': 'ptest',
1742 'checksum': 'ad:b11f45a7',
1743 'jobName': job_name,
1744 'ddmEndPointIn': 'SWT2_CPB_SCRATCHDISK',
1745 'taskID': 'NULL',
1746 'logFile': '%s.job.log.tgz' % job_name}
1747 else:
1748 logger.warning('unknown test job type: %s', config.Pilot.testjobtype)
1749
1750 if res:
1751 if not input:
1752 res['inFiles'] = 'NULL'
1753 res['GUID'] = 'NULL'
1754 res['scopeIn'] = 'NULL'
1755 res['fsize'] = 'NULL'
1756 res['realDatasetsIn'] = 'NULL'
1757 res['checksum'] = 'NULL'
1758
1759 if config.Pilot.testtransfertype == "NULL" or config.Pilot.testtransfertype == 'direct':
1760 res['transferType'] = config.Pilot.testtransfertype
1761 else:
1762 logger.warning('unknown test transfer type: %s (ignored)', config.Pilot.testtransfertype)
1763
1764 if config.Pilot.testjobcommand == 'sleep':
1765 res['transformation'] = 'sleep'
1766 res['jobPars'] = '1'
1767 res['inFiles'] = ''
1768 res['outFiles'] = ''
1769
1770
1771 try:
1772 if not is_python3():
1773 _res = {}
1774 for entry in res:
1775 if type(res[entry]) is str:
1776 _res[u'%s' % entry] = u'%s' % res[entry]
1777 else:
1778 _res[u'%s' % entry] = res[entry]
1779 res = _res
1780 except Exception:
1781 pass
1782 return res
1783
1784
1785 def get_job_retrieval_delay(harvester):
1786 """
1787 Return the proper delay between job retrieval attempts.
1788 In Harvester mode, the pilot will look once per second for a job definition file.
1789
1790 :param harvester: True if Harvester is being used (determined from args.harvester), otherwise False
1791 :return: sleep (s)
1792 """
1793
1794 return 1 if harvester else 60
1795
1796
1797 def retrieve(queues, traces, args):
1798 """
1799 Retrieve all jobs from a source.
1800
1801 The job definition is a json dictionary that is either present in the launch
1802 directory (preplaced) or downloaded from a server specified by `args.url`.
1803
1804 The function retrieves the job definition from the proper source and places
1805 it in the `queues.jobs` queue.
1806
1807 WARNING: this function is nearly too complex. Be careful with adding more lines as flake8 will fail it.
1808
1809 :param queues: internal queues for job handling.
1810 :param traces: tuple containing internal pilot states.
1811 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
1812 :raises PilotException: if create_job fails (e.g. because queuedata could not be downloaded).
1813 :return:
1814 """
1815
1816 timefloor = infosys.queuedata.timefloor
1817 starttime = time.time()
1818
1819 jobnumber = 0
1820 getjob_requests = 0
1821 getjob_failures = 0
1822 print_node_info()
1823
1824 while not args.graceful_stop.is_set():
1825
1826 time.sleep(0.5)
1827 getjob_requests += 1
1828
1829 if not proceed_with_getjob(timefloor, starttime, jobnumber, getjob_requests, args.getjob_requests,
1830 args.update_server, args.harvester_submitmode, args.harvester, args.verify_proxy, traces):
1831
1832
1833 check_for_final_server_update(args.update_server)
1834 args.graceful_stop.set()
1835 break
1836
1837
1838 time_pre_getjob = time.time()
1839
1840
1841 res = get_job_definition(args)
1842 logger.info('job definition = %s', str(res))
1843
1844 if res is None:
1845 logger.fatal('fatal error in job download loop - cannot continue')
1846
1847
1848 check_for_final_server_update(args.update_server)
1849 args.graceful_stop.set()
1850 break
1851
1852 if not res:
1853 getjob_failures += 1
1854 if getjob_failures >= args.getjob_failures:
1855 logger.warning('did not get a job -- max number of job request failures reached: %d', getjob_failures)
1856 args.graceful_stop.set()
1857 break
1858
1859 delay = get_job_retrieval_delay(args.harvester)
1860 if not args.harvester:
1861 logger.warning('did not get a job -- sleep %d s and repeat', delay)
1862 for _ in range(delay):
1863 if args.graceful_stop.is_set():
1864 break
1865 time.sleep(1)
1866 else:
1867
1868
1869 if 'StatusCode' in res and res['StatusCode'] != '0' and res['StatusCode'] != 0:
1870 getjob_failures += 1
1871 if getjob_failures >= args.getjob_failures:
1872 logger.warning('did not get a job -- max number of job request failures reached: %d',
1873 getjob_failures)
1874 args.graceful_stop.set()
1875 break
1876
1877 logger.warning('did not get a job -- sleep 60s and repeat -- status: %s', res['StatusCode'])
1878 for i in range(60):
1879 if args.graceful_stop.is_set():
1880 break
1881 time.sleep(1)
1882 else:
1883
1884 try:
1885 job = create_job(res, args.queue)
1886 except PilotException as error:
1887 raise error
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900 add_to_pilot_timing(job.jobid, PILOT_PRE_GETJOB, time_pre_getjob, args)
1901 add_to_pilot_timing(job.jobid, PILOT_POST_GETJOB, time.time(), args)
1902
1903
1904
1905 put_in_queue(job, queues.jobs)
1906
1907 jobnumber += 1
1908 while not args.graceful_stop.is_set():
1909 if has_job_completed(queues, args):
1910
1911 purge_queue(queues.finished_data_in)
1912
1913 args.job_aborted.clear()
1914 args.abort_job.clear()
1915 logger.info('ready for new job')
1916
1917
1918 logging.info('pilot has finished for previous job - re-establishing logging')
1919 logging.handlers = []
1920 logging.shutdown()
1921 establish_logging(debug=args.debug, nopilotlog=args.nopilotlog)
1922 pilot_version_banner()
1923 getjob_requests = 0
1924 add_to_pilot_timing('1', PILOT_MULTIJOB_START_TIME, time.time(), args)
1925 break
1926 time.sleep(0.5)
1927
1928
1929 if threads_aborted():
1930 logger.debug('will proceed to set job_aborted')
1931 args.job_aborted.set()
1932 else:
1933 logger.debug('will not set job_aborted yet')
1934
1935 logger.debug('[job] retrieve thread has finished')
1936
1937
1938 def print_node_info():
1939 """
1940 Print information about the local node to the log.
1941
1942 :return:
1943 """
1944
1945 if is_virtual_machine():
1946 logger.info("pilot is running in a virtual machine")
1947 else:
1948 logger.info("pilot is not running in a virtual machine")
1949
1950
1951 def create_job(dispatcher_response, queue):
1952 """
1953 Create a job object out of the dispatcher response.
1954
1955 :param dispatcher_response: raw job dictionary from the dispatcher.
1956 :param queue: queue name (string).
1957 :return: job object
1958 """
1959
1960
1961
1962 job = JobData(dispatcher_response)
1963
1964 jobinfosys = InfoService()
1965 jobinfosys.init(queue, infosys.confinfo, infosys.extinfo, JobInfoProvider(job))
1966 job.init(infosys)
1967
1968
1969
1970 logger.info('received job: %s (sleep until the job has finished)', job.jobid)
1971 logger.info('job details: \n%s', job)
1972
1973
1974 os.environ['PANDAID'] = job.jobid
1975
1976 return job
1977
1978
1979 def has_job_completed(queues, args):
1980 """
1981 Has the current job completed (finished or failed)?
1982 Note: the job object was extracted from monitored_payloads queue before this function was called.
1983
1984 :param queues: Pilot queues object.
1985 :return: True is the payload has finished or failed
1986 """
1987
1988
1989 try:
1990 job = queues.completed_jobs.get(block=True, timeout=1)
1991 except queue.Empty:
1992
1993 pass
1994 else:
1995 make_job_report(job)
1996 cmd = 'ls -lF %s' % os.environ.get('PILOT_HOME')
1997 logger.debug('%s:\n', cmd)
1998 _, stdout, _ = execute(cmd)
1999 logger.debug(stdout)
2000
2001 queue_report(queues)
2002 job.reset_errors()
2003 logger.info("job %s has completed (purged errors)", job.jobid)
2004
2005
2006 if job.pid:
2007 job.zombies.append(job.pid)
2008 cleanup(job, args)
2009
2010 return True
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026 return False
2027
2028
2029 def get_job_from_queue(queues, state):
2030 """
2031 Check if the job has finished or failed and if so return it.
2032
2033 :param queues: pilot queues.
2034 :param state: job state (e.g. finished/failed) (string).
2035 :return: job object.
2036 """
2037 try:
2038 if state == "finished":
2039 job = queues.finished_jobs.get(block=True, timeout=1)
2040 elif state == "failed":
2041 job = queues.failed_jobs.get(block=True, timeout=1)
2042 else:
2043 job = None
2044 except queue.Empty:
2045
2046 job = None
2047 else:
2048
2049 set_pilot_state(job=job, state=state)
2050 logger.info("job %s has state=%s", job.jobid, job.state)
2051
2052 return job
2053
2054
2055 def is_queue_empty(queues, queue):
2056 """
2057 Check if the given queue is empty (without pulling).
2058
2059 :param queues: pilot queues object.
2060 :param queue: queue name (string).
2061 :return: True if queue is empty, False otherwise
2062 """
2063
2064 status = False
2065 if queue in queues._fields:
2066 _queue = getattr(queues, queue)
2067 jobs = list(_queue.queue)
2068 if len(jobs) > 0:
2069 logger.info('queue %s not empty: found %d job(s)', queue, len(jobs))
2070 else:
2071 logger.info('queue %s is empty', queue)
2072 status = True
2073 else:
2074 logger.warning('queue %s not present in %s', queue, queues._fields)
2075
2076 return status
2077
2078
2079 def order_log_transfer(queues, job):
2080 """
2081 Order a log transfer for a failed job.
2082
2083 :param queues: pilot queues object.
2084 :param job: job object.
2085 :return:
2086 """
2087
2088
2089 job.stageout = 'log'
2090
2091 put_in_queue(job, queues.data_out)
2092
2093 logger.debug('job added to data_out queue')
2094
2095
2096 n = 0
2097 nmax = 60
2098 while n < nmax:
2099
2100 log_transfer = job.get_status('LOG_TRANSFER')
2101 logger.info('waiting for log transfer to finish (#%d/#%d): %s', n + 1, nmax, log_transfer)
2102 if is_queue_empty(queues, 'data_out') and \
2103 (log_transfer == LOG_TRANSFER_DONE or log_transfer == LOG_TRANSFER_FAILED):
2104 logger.info('stage-out of log has completed')
2105 break
2106 else:
2107 if log_transfer == LOG_TRANSFER_IN_PROGRESS:
2108 logger.info('log transfer is in progress')
2109 time.sleep(2)
2110 n += 1
2111
2112 logger.info('proceeding with server update (n=%d)', n)
2113
2114
2115 def wait_for_aborted_job_stageout(args, queues, job):
2116 """
2117 Wait for stage-out to finish for aborted job.
2118
2119 :param args: pilot args object.
2120 :param queues: pilot queues object.
2121 :param job: job object.
2122 :return:
2123 """
2124
2125
2126 try:
2127 time_since_kill = get_time_since('1', PILOT_KILL_SIGNAL, args)
2128 was_killed = was_pilot_killed(args.timing)
2129 if was_killed:
2130 logger.info('%d s passed since kill signal was intercepted - make sure that stage-out has finished', time_since_kill)
2131 except Exception as error:
2132 logger.warning('exception caught: %s', error)
2133 time_since_kill = 60
2134 else:
2135 if time_since_kill > 60 or time_since_kill < 0:
2136 logger.warning('reset time_since_kill to 60 since value is out of allowed limits')
2137 time_since_kill = 60
2138
2139
2140
2141 max_wait_time = 2 * 60 - time_since_kill - 5
2142 logger.debug('using max_wait_time = %d s', max_wait_time)
2143 t0 = time.time()
2144 while time.time() - t0 < max_wait_time:
2145 if job in queues.finished_data_out.queue or job in queues.failed_data_out.queue:
2146 logger.info('stage-out has finished, proceed with final server update')
2147 break
2148 else:
2149 time.sleep(0.5)
2150
2151 logger.info('proceeding with final server update')
2152
2153
2154 def get_job_status(job, key):
2155 """
2156 Wrapper function around job.get_status().
2157 If key = 'LOG_TRANSFER' but job object is not defined, the function will return value = LOG_TRANSFER_NOT_DONE.
2158
2159 :param job: job object.
2160 :param key: key name (string).
2161 :return: value (string).
2162 """
2163
2164 value = ""
2165 if job:
2166 value = job.get_status(key)
2167 else:
2168 if key == 'LOG_TRANSFER':
2169 value = LOG_TRANSFER_NOT_DONE
2170
2171 return value
2172
2173
2174 def queue_monitor(queues, traces, args):
2175 """
2176 Monitoring of queues.
2177 This function monitors queue activity, specifically if a job has finished or failed and then reports to the server.
2178
2179 :param queues: internal queues for job handling.
2180 :param traces: tuple containing internal pilot states.
2181 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2182 :return:
2183 """
2184
2185
2186 if not scan_for_jobs(queues):
2187 logger.warning('queues are still empty of jobs - will begin queue monitoring anyway')
2188
2189 job = None
2190 while True:
2191 time.sleep(1)
2192
2193 if traces.pilot['command'] == 'abort':
2194 logger.warning('job queue monitor received an abort instruction')
2195 args.graceful_stop.set()
2196
2197
2198
2199 abort_thread = should_abort(args, label='job:queue_monitor')
2200 if abort_thread and os.environ.get('PILOT_WRAP_UP', '') == 'NORMAL':
2201 pause_queue_monitor(20)
2202
2203
2204 imax = 20
2205 i = 0
2206 while i < imax and os.environ.get('PILOT_WRAP_UP', '') == 'NORMAL':
2207 job = get_finished_or_failed_job(args, queues)
2208 if job:
2209 logger.debug('returned job has state=%s', job.state)
2210
2211
2212 break
2213 i += 1
2214 state = get_pilot_state()
2215 if state != 'stage-out':
2216
2217 break
2218 pause_queue_monitor(1) if not abort_thread else pause_queue_monitor(10)
2219
2220
2221 if not job and not abort_thread:
2222 continue
2223
2224 completed_jobids = queues.completed_jobids.queue if queues.completed_jobids else []
2225 if job and job.jobid not in completed_jobids:
2226 logger.info("preparing for final server update for job %s in state=\'%s\'", job.jobid, job.state)
2227
2228 if args.job_aborted.is_set():
2229
2230 wait_for_aborted_job_stageout(args, queues, job)
2231
2232
2233 update_server(job, args)
2234
2235
2236
2237 try:
2238 _job = queues.monitored_payloads.get(block=True, timeout=1)
2239 except queue.Empty:
2240 logger.warning('failed to dequeue job: queue is empty (did job fail before job monitor started?)')
2241 make_job_report(job)
2242 else:
2243 logger.debug('job %s was dequeued from the monitored payloads queue', _job.jobid)
2244
2245 put_in_queue(job.jobid, queues.completed_jobids)
2246
2247 put_in_queue(job, queues.completed_jobs)
2248 del _job
2249 logger.debug('tmp job object deleted')
2250
2251 if abort_thread:
2252 break
2253
2254
2255 if threads_aborted():
2256 logger.debug('will proceed to set job_aborted')
2257 args.job_aborted.set()
2258 else:
2259 logger.debug('will not set job_aborted yet')
2260
2261 logger.debug('[job] queue monitor thread has finished')
2262
2263
2264 def update_server(job, args):
2265 """
2266 Update the server (wrapper for send_state() that also prepares the metadata).
2267
2268 :param job: job object.
2269 :param args: pilot args object.
2270 :return:
2271 """
2272
2273
2274 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
2275 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
2276 metadata = user.get_metadata(job.workdir)
2277 try:
2278 user.update_server(job)
2279 except Exception as error:
2280 logger.warning('exception caught in update_server(): %s', error)
2281 if job.fileinfo:
2282 send_state(job, args, job.state, xml=dumps(job.fileinfo), metadata=metadata)
2283 else:
2284 send_state(job, args, job.state, metadata=metadata)
2285
2286
2287 def pause_queue_monitor(delay):
2288 """
2289 Pause the queue monitor to let log transfer complete.
2290 Note: this function should use globally available object. Use sleep for now.
2291 :param delay: sleep time in seconds (int).
2292 :return:
2293 """
2294
2295 logger.warning('since job:queue_monitor is responsible for sending job updates, we sleep for %d s', delay)
2296 time.sleep(delay)
2297
2298
2299 def get_finished_or_failed_job(args, queues):
2300 """
2301 Check if the job has either finished or failed and if so return it.
2302 If failed, order a log transfer. If the job is in state 'failed' and abort_job is set, set job_aborted.
2303
2304 :param args: pilot args object.
2305 :param queues: pilot queues object.
2306 :return: job object.
2307 """
2308
2309 job = get_job_from_queue(queues, "finished")
2310 if job:
2311
2312 pass
2313 else:
2314
2315 job = get_job_from_queue(queues, "failed")
2316 if job:
2317 logger.debug('get_finished_or_failed_job: job has failed')
2318 job.state = 'failed'
2319 args.job_aborted.set()
2320
2321
2322 log_transfer = get_job_status(job, 'LOG_TRANSFER')
2323 if log_transfer == LOG_TRANSFER_NOT_DONE:
2324
2325 order_log_transfer(queues, job)
2326
2327
2328 if job and job.state == 'failed':
2329
2330 if args.abort_job.is_set():
2331 logger.warning('queue monitor detected a set abort_job (due to a kill signal)')
2332
2333
2334
2335
2336
2337 return job
2338
2339
2340 def get_heartbeat_period(debug=False):
2341 """
2342 Return the proper heartbeat period, as determined by normal or debug mode.
2343 In normal mode, the heartbeat period is 30*60 s, while in debug mode it is 5*60 s. Both values are defined in the
2344 config file.
2345
2346 :param debug: Boolean, True for debug mode. False otherwise.
2347 :return: heartbeat period (int).
2348 """
2349
2350 try:
2351 return int(config.Pilot.heartbeat if not debug else config.Pilot.debug_heartbeat)
2352 except Exception as error:
2353 logger.warning('bad config data for heartbeat period: %s (will use default 1800 s)', error)
2354 return 1800
2355
2356
2357 def check_for_abort_job(args, caller=''):
2358 """
2359 Check if args.abort_job.is_set().
2360
2361 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2362 :param caller: function name of caller (string).
2363 :return: Boolean, True if args_job.is_set()
2364 """
2365 abort_job = False
2366 if args.abort_job.is_set():
2367 logger.warning('%s detected an abort_job request (signal=%s)', caller, args.signal)
2368 logger.warning('in case pilot is running more than one job, all jobs will be aborted')
2369 abort_job = True
2370
2371 return abort_job
2372
2373
2374 def interceptor(queues, traces, args):
2375 """
2376 MOVE THIS TO INTERCEPTOR.PY; TEMPLATE FOR THREADS
2377
2378 :param queues: internal queues for job handling.
2379 :param traces: tuple containing internal pilot states.
2380 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2381 :return:
2382 """
2383
2384
2385 n = 0
2386 while not args.graceful_stop.is_set():
2387 time.sleep(0.1)
2388
2389
2390
2391 abort = should_abort(args, label='job:interceptor')
2392
2393
2394 abort_job = check_for_abort_job(args, caller='interceptor')
2395 if not abort_job:
2396
2397 jobs = queues.monitored_payloads.queue
2398 if jobs:
2399 for _ in range(len(jobs)):
2400 logger.info('interceptor loop %d: looking for communication file', n)
2401 time.sleep(30)
2402
2403 n += 1
2404
2405 if abort or abort_job:
2406 break
2407
2408
2409 if threads_aborted():
2410 logger.debug('will proceed to set job_aborted')
2411 args.job_aborted.set()
2412 else:
2413 logger.debug('will not set job_aborted yet')
2414
2415 logger.debug('[job] interceptor thread has finished')
2416
2417
2418 def fast_monitor_tasks(job):
2419 """
2420 Perform user specific fast monitoring tasks.
2421
2422 :param job: job object.
2423 :return: exit code (int).
2424 """
2425
2426 exit_code = 0
2427
2428 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
2429 user = __import__('pilot.user.%s.monitoring' % pilot_user, globals(), locals(), [pilot_user], 0)
2430 try:
2431 exit_code = user.fast_monitor_tasks(job)
2432 except Exception as exc:
2433 logger.warning('caught exception: %s', exc)
2434
2435 return exit_code
2436
2437
2438 def fast_job_monitor(queues, traces, args):
2439 """
2440 Fast monitoring of job parameters.
2441
2442 This function can be used for monitoring processes below the one minute threshold of the normal job_monitor thread.
2443
2444 :param queues: internal queues for job handling.
2445 :param traces: tuple containing internal pilot states.
2446 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2447 :return:
2448 """
2449
2450
2451
2452
2453
2454
2455
2456 if not args.use_realtime_logging:
2457 logger.warning('fast monitoring not required - ending thread')
2458 return
2459
2460 while not args.graceful_stop.is_set():
2461 time.sleep(10)
2462
2463
2464
2465 abort = should_abort(args, label='job:fast_job_monitor')
2466 if abort:
2467 break
2468
2469 if traces.pilot.get('command') == 'abort':
2470 logger.warning('fast job monitor received an abort command')
2471 break
2472
2473
2474 abort_job = check_for_abort_job(args, caller='fast job monitor')
2475 if abort_job:
2476 break
2477 else:
2478
2479 jobs = queues.monitored_payloads.queue
2480 if jobs:
2481 for i in range(len(jobs)):
2482
2483 if jobs[i].state == 'finished' or jobs[i].state == 'failed':
2484 logger.info('will abort job monitoring soon since job state=%s (job is still in queue)', jobs[i].state)
2485 break
2486
2487
2488 exit_code = fast_monitor_tasks(jobs[i])
2489 if exit_code:
2490 logger.debug('fast monitoring reported an error: %d', exit_code)
2491
2492
2493 if threads_aborted():
2494 logger.debug('will proceed to set job_aborted')
2495 args.job_aborted.set()
2496 else:
2497 logger.debug('will not set job_aborted yet')
2498
2499 logger.debug('[job] fast job monitor thread has finished')
2500
2501
2502 def job_monitor(queues, traces, args):
2503 """
2504 Monitoring of job parameters.
2505 This function monitors certain job parameters, such as job looping, at various time intervals. The main loop
2506 is executed once a minute, while individual verifications may be executed at any time interval (>= 1 minute). E.g.
2507 looping jobs are checked once per ten minutes (default) and the heartbeat is send once per 30 minutes. Memory
2508 usage is checked once a minute.
2509
2510 :param queues: internal queues for job handling.
2511 :param traces: tuple containing internal pilot states.
2512 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
2513 :return:
2514 """
2515
2516
2517 mt = MonitoringTime()
2518
2519
2520
2521 peeking_time = int(time.time())
2522 update_time = peeking_time
2523
2524
2525 n = 0
2526 while not args.graceful_stop.is_set():
2527 time.sleep(0.5)
2528
2529
2530
2531 abort = should_abort(args, label='job:job_monitor')
2532
2533 if traces.pilot.get('command') == 'abort':
2534 logger.warning('job monitor received an abort command')
2535
2536
2537 abort_job = check_for_abort_job(args, caller='job monitor')
2538 if not abort_job:
2539 if not queues.current_data_in.empty():
2540
2541 jobs = queues.current_data_in.queue
2542 if jobs:
2543 for i in range(len(jobs)):
2544
2545
2546 update_time = send_heartbeat_if_time(jobs[i], args, update_time)
2547
2548
2549 try:
2550 jobs[i]
2551 except Exception as error:
2552 logger.warning('detected stale jobs[i] object in job_monitor: %s', error)
2553 else:
2554 if jobs[i].state == 'failed':
2555 logger.warning('job state is \'failed\' - order log transfer and abort job_monitor() (1)')
2556 jobs[i].stageout = 'log'
2557 put_in_queue(jobs[i], queues.data_out)
2558
2559
2560 time.sleep(1)
2561 continue
2562 elif queues.finished_data_in.empty():
2563
2564 time.sleep(1)
2565 continue
2566
2567 time.sleep(60)
2568
2569
2570 jobs = queues.monitored_payloads.queue
2571 if jobs:
2572
2573 peeking_time = int(time.time())
2574 for i in range(len(jobs)):
2575 current_id = jobs[i].jobid
2576 logger.info('monitor loop #%d: job %d:%s is in state \'%s\'', n, i, current_id, jobs[i].state)
2577 if jobs[i].state == 'finished' or jobs[i].state == 'failed':
2578 logger.info('will abort job monitoring soon since job state=%s (job is still in queue)', jobs[i].state)
2579 break
2580
2581
2582 exit_code, diagnostics = job_monitor_tasks(jobs[i], mt, args)
2583 if exit_code != 0:
2584 if exit_code == errors.NOVOMSPROXY:
2585 logger.warning('VOMS proxy has expired - keep monitoring job')
2586 elif exit_code == errors.KILLPAYLOAD:
2587 jobs[i].piloterrorcodes, jobs[i].piloterrordiags = errors.add_error_code(exit_code)
2588 logger.debug('killing payload process')
2589 kill_process(jobs[i].pid)
2590 break
2591 else:
2592 try:
2593 fail_monitored_job(jobs[i], exit_code, diagnostics, queues, traces)
2594 except Exception as error:
2595 logger.warning('(1) exception caught: %s (job id=%s)', error, current_id)
2596 break
2597
2598
2599
2600 try:
2601 _job = jobs[i]
2602 except Exception:
2603 logger.info('aborting job monitoring since job object (job id=%s) has expired', current_id)
2604 break
2605
2606
2607
2608 try:
2609 update_time = send_heartbeat_if_time(_job, args, update_time)
2610 except Exception as error:
2611 logger.warning('(2) exception caught: %s (job id=%s)', error, current_id)
2612 break
2613 else:
2614
2615 if _job.state == 'failed':
2616 logger.warning('job state is \'failed\' - order log transfer and abort job_monitor() (2)')
2617 _job.stageout = 'log'
2618 put_in_queue(_job, queues.data_out)
2619 abort = True
2620 break
2621
2622 elif os.environ.get('PILOT_JOB_STATE') == 'stagein':
2623 logger.info('job monitoring is waiting for stage-in to finish')
2624 else:
2625
2626 check_job_monitor_waiting_time(args, peeking_time, abort_override=abort_job)
2627
2628 n += 1
2629
2630 if abort or abort_job:
2631 break
2632
2633
2634 if threads_aborted():
2635 logger.debug('will proceed to set job_aborted')
2636 args.job_aborted.set()
2637 else:
2638 logger.debug('will not set job_aborted yet')
2639
2640 logger.debug('[job] job monitor thread has finished')
2641
2642
2643 def send_heartbeat_if_time(job, args, update_time):
2644 """
2645 Send a heartbeat to the server if it is time to do so.
2646
2647 :param job: job object.
2648 :param args: args object.
2649 :param update_time: last update time (from time.time()).
2650 :return: possibly updated update_time (from time.time()).
2651 """
2652
2653 if int(time.time()) - update_time >= get_heartbeat_period(job.debug):
2654 if job.serverstate != 'finished' and job.serverstate != 'failed':
2655 send_state(job, args, 'running')
2656 update_time = int(time.time())
2657
2658 return update_time
2659
2660
2661 def check_job_monitor_waiting_time(args, peeking_time, abort_override=False):
2662 """
2663 Check the waiting time in the job monitor.
2664 Set global graceful_stop if necessary.
2665
2666 :param args: args object.
2667 :param peeking_time: time when monitored_payloads queue was peeked into (int).
2668 :return:
2669 """
2670
2671 waiting_time = int(time.time()) - peeking_time
2672 msg = 'no jobs in monitored_payloads queue (waited for %d s)' % waiting_time
2673 if waiting_time > 60 * 60:
2674 abort = True
2675 msg += ' - aborting'
2676 else:
2677 abort = False
2678 if logger:
2679 logger.warning(msg)
2680 else:
2681 print(msg)
2682 if abort or abort_override:
2683
2684
2685 check_for_final_server_update(args.update_server)
2686 args.graceful_stop.set()
2687
2688
2689 def fail_monitored_job(job, exit_code, diagnostics, queues, traces):
2690 """
2691 Fail a monitored job.
2692
2693 :param job: job object
2694 :param exit_code: exit code from job_monitor_tasks (int).
2695 :param diagnostics: pilot error diagnostics (string).
2696 :param queues: queues object.
2697 :param traces: traces object.
2698 :return:
2699 """
2700
2701 set_pilot_state(job=job, state="failed")
2702 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=diagnostics)
2703 job.piloterrordiag = diagnostics
2704 traces.pilot['error_code'] = exit_code
2705 put_in_queue(job, queues.failed_payloads)
2706 logger.info('aborting job monitoring since job state=%s', job.state)
2707
2708
2709 def make_job_report(job):
2710 """
2711 Make a summary report for the given job.
2712 This function is called when the job has completed.
2713
2714 :param job: job object.
2715 :return:
2716 """
2717
2718 logger.info('')
2719 logger.info('job summary report')
2720 logger.info('--------------------------------------------------')
2721 logger.info('PanDA job id: %s', job.jobid)
2722 logger.info('task id: %s', job.taskid)
2723 n = len(job.piloterrorcodes)
2724 if n > 0:
2725 for i in range(n):
2726 logger.info('error %d/%d: %s: %s', i + 1, n, job.piloterrorcodes[i], job.piloterrordiags[i])
2727 else:
2728 logger.info('errors: (none)')
2729 if job.piloterrorcode != 0:
2730 logger.info('pilot error code: %d', job.piloterrorcode)
2731 logger.info('pilot error diag: %s', job.piloterrordiag)
2732 info = ""
2733 for key in job.status:
2734 info += key + " = " + job.status[key] + " "
2735 logger.info('status: %s', info)
2736 s = ""
2737 if job.is_analysis() and job.state != 'finished':
2738 s = '(user job is recoverable)' if errors.is_recoverable(code=job.piloterrorcode) else '(user job is not recoverable)'
2739 logger.info('pilot state: %s %s', job.state, s)
2740 logger.info('transexitcode: %d', job.transexitcode)
2741 logger.info('exeerrorcode: %d', job.exeerrorcode)
2742 logger.info('exeerrordiag: %s', job.exeerrordiag)
2743 logger.info('exitcode: %d', job.exitcode)
2744 logger.info('exitmsg: %s', job.exitmsg)
2745 logger.info('cpuconsumptiontime: %d %s', job.cpuconsumptiontime, job.cpuconsumptionunit)
2746 logger.info('nevents: %d', job.nevents)
2747 logger.info('neventsw: %d', job.neventsw)
2748 logger.info('pid: %s', job.pid)
2749 logger.info('pgrp: %s', str(job.pgrp))
2750 logger.info('corecount: %d', job.corecount)
2751 logger.info('event service: %s', str(job.is_eventservice))
2752 logger.info('sizes: %s', str(job.sizes))
2753 logger.info('--------------------------------------------------')
2754 logger.info('')