File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 import os
0015 import time
0016 import traceback
0017
0018 try:
0019 import Queue as queue
0020 except Exception:
0021 import queue
0022
0023 from pilot.control.payloads import generic, eventservice, eventservicemerge
0024 from pilot.control.job import send_state
0025 from pilot.util.auxiliary import set_pilot_state
0026 from pilot.util.processes import get_cpu_consumption_time
0027 from pilot.util.config import config
0028 from pilot.util.filehandling import read_file, remove_core_dumps, get_guid
0029 from pilot.util.processes import threads_aborted
0030 from pilot.util.queuehandling import put_in_queue
0031 from pilot.common.errorcodes import ErrorCodes
0032 from pilot.common.exception import ExcThread
0033
0034 import logging
0035 logger = logging.getLogger(__name__)
0036
0037 errors = ErrorCodes()
0038
0039
0040 def control(queues, traces, args):
0041 """
0042 (add description)
0043
0044 :param queues:
0045 :param traces:
0046 :param args:
0047 :return:
0048 """
0049
0050 targets = {'validate_pre': validate_pre, 'execute_payloads': execute_payloads, 'validate_post': validate_post,
0051 'failed_post': failed_post}
0052 threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'queues': queues, 'traces': traces, 'args': args},
0053 name=name) for name, target in list(targets.items())]
0054
0055 [thread.start() for thread in threads]
0056
0057
0058 while not args.graceful_stop.is_set():
0059 for thread in threads:
0060 bucket = thread.get_bucket()
0061 try:
0062 exc = bucket.get(block=False)
0063 except queue.Empty:
0064 pass
0065 else:
0066 exc_type, exc_obj, exc_trace = exc
0067 logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
0068
0069
0070
0071
0072 thread.join(0.1)
0073 time.sleep(0.1)
0074
0075 time.sleep(0.5)
0076
0077 logger.debug('payload control ending since graceful_stop has been set')
0078 if args.abort_job.is_set():
0079 if traces.pilot['command'] == 'aborting':
0080 logger.warning('jobs are aborting')
0081 elif traces.pilot['command'] == 'abort':
0082 logger.warning('data control detected a set abort_job (due to a kill signal)')
0083 traces.pilot['command'] = 'aborting'
0084
0085
0086
0087
0088
0089 if threads_aborted():
0090 logger.debug('will proceed to set job_aborted')
0091 args.job_aborted.set()
0092 else:
0093 logger.debug('will not set job_aborted yet')
0094
0095 logger.debug('[payload] control thread has finished')
0096
0097
0098 def validate_pre(queues, traces, args):
0099 """
0100 Get a Job object from the "payloads" queue and validate it.
0101
0102 If the payload is successfully validated (user defined), the Job object is placed in the "validated_payloads" queue,
0103 otherwise it is placed in the "failed_payloads" queue.
0104
0105 :param queues: internal queues for job handling.
0106 :param traces: tuple containing internal pilot states.
0107 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0108 :return:
0109 """
0110 while not args.graceful_stop.is_set():
0111 time.sleep(0.5)
0112 try:
0113 job = queues.payloads.get(block=True, timeout=1)
0114 except queue.Empty:
0115 continue
0116
0117 if _validate_payload(job):
0118
0119 put_in_queue(job, queues.validated_payloads)
0120 else:
0121
0122 put_in_queue(job, queues.failed_payloads)
0123
0124
0125 if threads_aborted():
0126 logger.debug('will proceed to set job_aborted')
0127 args.job_aborted.set()
0128 else:
0129 logger.debug('will not set job_aborted yet')
0130
0131 logger.info('[payload] validate_pre thread has finished')
0132
0133
0134 def _validate_payload(job):
0135 """
0136 Perform validation tests for the payload.
0137
0138 :param job: job object.
0139 :return: boolean.
0140 """
0141
0142 status = True
0143
0144
0145 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0146 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
0147 try:
0148 status = user.validate(job)
0149 except Exception as error:
0150 logger.fatal('failed to execute user validate() function: %s', error)
0151 status = False
0152
0153 return status
0154
0155
0156 def get_payload_executor(args, job, out, err, traces):
0157 """
0158 Get payload executor function for different payload.
0159
0160 :param args: args object.
0161 :param job: job object.
0162 :param out:
0163 :param err:
0164 :param traces: traces object.
0165 :return: instance of a payload executor
0166 """
0167 if job.is_eventservice:
0168 payload_executor = eventservice.Executor(args, job, out, err, traces)
0169 elif job.is_eventservicemerge:
0170 payload_executor = eventservicemerge.Executor(args, job, out, err, traces)
0171 else:
0172 payload_executor = generic.Executor(args, job, out, err, traces)
0173 return payload_executor
0174
0175
0176 def execute_payloads(queues, traces, args):
0177 """
0178 Execute queued payloads.
0179
0180 Extract a Job object from the "validated_payloads" queue and put it in the "monitored_jobs" queue. The payload
0181 stdout/err streams are opened and the pilot state is changed to "starting". A payload executor is selected (for
0182 executing a normal job, an event service job or event service merge job). After the payload (or rather its executor)
0183 is started, the thread will wait for it to finish and then check for any failures. A successfully completed job is
0184 placed in the "finished_payloads" queue, and a failed job will be placed in the "failed_payloads" queue.
0185
0186 :param queues: internal queues for job handling.
0187 :param traces: tuple containing internal pilot states.
0188 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0189 :return:
0190 """
0191
0192 job = None
0193 while not args.graceful_stop.is_set():
0194 time.sleep(0.5)
0195 try:
0196 job = queues.validated_payloads.get(block=True, timeout=1)
0197
0198
0199
0200
0201
0202 q_snapshot = list(queues.finished_data_in.queue)
0203 peek = [s_job for s_job in q_snapshot if job.jobid == s_job.jobid]
0204 if len(peek) == 0:
0205 put_in_queue(job, queues.validated_payloads)
0206 for _ in range(10):
0207 if args.graceful_stop.is_set():
0208 break
0209 time.sleep(1)
0210 continue
0211
0212
0213
0214 put_in_queue(job, queues.monitored_payloads)
0215
0216 logger.info('job %s added to monitored payloads queue', job.jobid)
0217
0218 try:
0219 out = open(os.path.join(job.workdir, config.Payload.payloadstdout), 'wb')
0220 err = open(os.path.join(job.workdir, config.Payload.payloadstderr), 'wb')
0221 except Exception as error:
0222 logger.warning('failed to open payload stdout/err: %s', error)
0223 out = None
0224 err = None
0225 send_state(job, args, 'starting')
0226
0227
0228 if job.state == 'failed':
0229 logger.warning('job state is \'failed\' - abort execute_payloads()')
0230 break
0231
0232 payload_executor = get_payload_executor(args, job, out, err, traces)
0233 logger.info("will use payload executor: %s", payload_executor)
0234
0235
0236 job.t0 = os.times()
0237 exit_code = payload_executor.run()
0238
0239 set_cpu_consumption_time(job)
0240 job.transexitcode = exit_code % 255
0241
0242 out.close()
0243 err.close()
0244
0245 pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0246
0247
0248 if job.is_hpo:
0249 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user],
0250 0)
0251 try:
0252 user.update_output_for_hpo(job)
0253 except Exception as error:
0254 logger.warning('exception caught by update_output_for_hpo(): %s', error)
0255 else:
0256 for dat in job.outdata:
0257 if not dat.guid:
0258 dat.guid = get_guid()
0259 logger.warning('guid not set: generated guid=%s for lfn=%s', dat.guid, dat.lfn)
0260
0261
0262
0263
0264
0265
0266
0267 perform_initial_payload_error_analysis(job, exit_code)
0268
0269
0270
0271
0272
0273 user = __import__('pilot.user.%s.diagnose' % pilot_user, globals(), locals(), [pilot_user], 0)
0274 try:
0275 exit_code_interpret = user.interpret(job)
0276 except Exception as error:
0277 logger.warning('exception caught: %s', error)
0278
0279 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.INTERNALPILOTPROBLEM)
0280
0281 if job.piloterrorcodes:
0282 exit_code_interpret = 1
0283
0284 if exit_code_interpret == 0 and exit_code == 0:
0285 logger.info('main payload error analysis completed - did not find any errors')
0286
0287
0288
0289
0290
0291 put_in_queue(job, queues.finished_payloads)
0292 else:
0293 logger.debug('main payload error analysis completed - adding job to failed_payloads queue')
0294
0295 put_in_queue(job, queues.failed_payloads)
0296
0297 except queue.Empty:
0298 continue
0299 except Exception as error:
0300 logger.fatal('execute payloads caught an exception (cannot recover): %s, %s', error, traceback.format_exc())
0301 if job:
0302 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PAYLOADEXECUTIONEXCEPTION)
0303
0304 put_in_queue(job, queues.failed_payloads)
0305 while not args.graceful_stop.is_set():
0306
0307 time.sleep(5)
0308
0309
0310 if threads_aborted():
0311 logger.debug('will proceed to set job_aborted')
0312 args.job_aborted.set()
0313 else:
0314 logger.debug('will not set job_aborted yet')
0315
0316 logger.info('[payload] execute_payloads thread has finished')
0317
0318
0319 def set_cpu_consumption_time(job):
0320 """
0321 Set the CPU consumption time.
0322 :param job: job object.
0323 :return:
0324 """
0325
0326 cpuconsumptiontime = get_cpu_consumption_time(job.t0)
0327 job.cpuconsumptiontime = int(round(cpuconsumptiontime))
0328 job.cpuconsumptionunit = "s"
0329 job.cpuconversionfactor = 1.0
0330 logger.info('CPU consumption time: %f %s (rounded to %d %s)', cpuconsumptiontime, job.cpuconsumptionunit, job.cpuconsumptiontime, job.cpuconsumptionunit)
0331
0332
0333 def perform_initial_payload_error_analysis(job, exit_code):
0334 """
0335 Perform an initial analysis of the payload.
0336 Singularity errors are caught here.
0337
0338 :param job: job object.
0339 :param exit_code: exit code from payload execution.
0340 :return:
0341 """
0342
0343 if exit_code != 0:
0344 logger.warning('main payload execution returned non-zero exit code: %d', exit_code)
0345
0346
0347 stderr = read_file(os.path.join(job.workdir, config.Payload.payloadstderr))
0348 exit_code = errors.resolve_transform_error(exit_code, stderr)
0349
0350 if exit_code != 0:
0351 msg = ""
0352 if stderr != "":
0353 msg = errors.extract_stderr_error(stderr)
0354 if msg == "":
0355
0356 msg = errors.extract_stderr_warning(stderr)
0357
0358
0359
0360
0361
0362
0363
0364 if msg:
0365 msg = errors.format_diagnostics(exit_code, msg)
0366
0367 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=msg)
0368
0369 '''
0370 if exit_code != 0:
0371 if msg:
0372 msg = errors.format_diagnostics(exit_code, msg)
0373 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=msg)
0374 else:
0375 if job.piloterrorcodes:
0376 logger.warning('error code(s) already set: %s', str(job.piloterrorcodes))
0377 else:
0378 # check if core dumps exist, if so remove them and return True
0379 if remove_core_dumps(job.workdir) and not job.debug:
0380 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.COREDUMP)
0381 else:
0382 logger.warning('initial error analysis did not resolve the issue (and core dumps were not found)')
0383 '''
0384 else:
0385 logger.info('main payload execution returned zero exit code')
0386
0387
0388 if not job.debug:
0389
0390 if remove_core_dumps(job.workdir, pid=job.pid):
0391
0392 logger.warning('setting COREDUMP error')
0393 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.COREDUMP)
0394
0395
0396 def set_error_code_from_stderr(msg, fatal):
0397 """
0398 Identify specific errors in stderr and set the corresponding error code.
0399 The function returns 0 if no error is recognized.
0400
0401 :param msg: stderr (string).
0402 :param fatal: boolean flag if fatal error among warning messages in stderr.
0403 :return: error code (int).
0404 """
0405
0406 exit_code = 0
0407 error_map = {errors.SINGULARITYNEWUSERNAMESPACE: "Failed invoking the NEWUSER namespace runtime",
0408 errors.SINGULARITYFAILEDUSERNAMESPACE: "Failed to create user namespace",
0409 errors.SINGULARITYRESOURCEUNAVAILABLE: "resource temporarily unavailable",
0410 errors.SINGULARITYNOTINSTALLED: "Singularity is not installed",
0411 errors.TRANSFORMNOTFOUND: "command not found",
0412 errors.UNSUPPORTEDSL5OS: "SL5 is unsupported",
0413 errors.UNRECOGNIZEDTRFARGUMENTS: "unrecognized arguments"}
0414
0415 for key, value in error_map.items():
0416 if value in msg:
0417 exit_code = key
0418 break
0419
0420 if fatal and not exit_code:
0421 exit_code = errors.UNRECOGNIZEDTRFSTDERR
0422
0423 return exit_code
0424
0425
0426 def validate_post(queues, traces, args):
0427 """
0428 Validate finished payloads.
0429 If payload finished correctly, add the job to the data_out queue. If it failed, add it to the data_out queue as
0430 well but only for log stage-out (in failed_post() below).
0431
0432 :param queues: internal queues for job handling.
0433 :param traces: tuple containing internal pilot states.
0434 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0435 :return:
0436 """
0437
0438 while not args.graceful_stop.is_set():
0439 time.sleep(0.5)
0440
0441 try:
0442 job = queues.finished_payloads.get(block=True, timeout=1)
0443 except queue.Empty:
0444 time.sleep(0.1)
0445 continue
0446
0447
0448 job.stageout = 'all'
0449 logger.debug('adding job to data_out queue')
0450
0451 set_pilot_state(job=job, state='stageout')
0452 put_in_queue(job, queues.data_out)
0453
0454
0455 if threads_aborted():
0456 logger.debug('will proceed to set job_aborted')
0457 args.job_aborted.set()
0458 else:
0459 logger.debug('will not set job_aborted yet')
0460
0461 logger.info('[payload] validate_post thread has finished')
0462
0463
0464 def failed_post(queues, traces, args):
0465 """
0466 Get a Job object from the "failed_payloads" queue. Set the pilot state to "stakeout" and the stageout field to
0467 "log", and add the Job object to the "data_out" queue.
0468
0469 :param queues: internal queues for job handling.
0470 :param traces: tuple containing internal pilot states.
0471 :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0472 :return:
0473 """
0474
0475 while not args.graceful_stop.is_set():
0476 time.sleep(0.5)
0477
0478 try:
0479 job = queues.failed_payloads.get(block=True, timeout=1)
0480 except queue.Empty:
0481 time.sleep(0.1)
0482 continue
0483
0484 logger.debug('adding log for log stageout')
0485
0486 job.stageout = 'log'
0487
0488 set_pilot_state(job=job, state='stageout')
0489 put_in_queue(job, queues.data_out)
0490
0491
0492 if threads_aborted():
0493 logger.debug('will proceed to set job_aborted')
0494 args.job_aborted.set()
0495 else:
0496 logger.debug('will not set job_aborted yet')
0497
0498 logger.info('[payload] failed_post thread has finished')