Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017
0009 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
0010 # - Tobias Wegner, tobias.wegner@cern.ch, 2017
0011 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0012 # - Wen Guan, wen.guan@cern.ch, 2017-2018
0013 
0014 import os
0015 import time
0016 import traceback
0017 
0018 try:
0019     import Queue as queue  # noqa: N813
0020 except Exception:
0021     import queue  # Python 3
0022 
0023 from pilot.control.payloads import generic, eventservice, eventservicemerge
0024 from pilot.control.job import send_state
0025 from pilot.util.auxiliary import set_pilot_state
0026 from pilot.util.processes import get_cpu_consumption_time
0027 from pilot.util.config import config
0028 from pilot.util.filehandling import read_file, remove_core_dumps, get_guid
0029 from pilot.util.processes import threads_aborted
0030 from pilot.util.queuehandling import put_in_queue
0031 from pilot.common.errorcodes import ErrorCodes
0032 from pilot.common.exception import ExcThread
0033 
0034 import logging
0035 logger = logging.getLogger(__name__)
0036 
0037 errors = ErrorCodes()
0038 
0039 
0040 def control(queues, traces, args):
0041     """
0042     (add description)
0043 
0044     :param queues:
0045     :param traces:
0046     :param args:
0047     :return:
0048     """
0049 
0050     targets = {'validate_pre': validate_pre, 'execute_payloads': execute_payloads, 'validate_post': validate_post,
0051                'failed_post': failed_post}
0052     threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'queues': queues, 'traces': traces, 'args': args},
0053                          name=name) for name, target in list(targets.items())]  # Python 3
0054 
0055     [thread.start() for thread in threads]
0056 
0057     # if an exception is thrown, the graceful_stop will be set by the ExcThread class run() function
0058     while not args.graceful_stop.is_set():
0059         for thread in threads:
0060             bucket = thread.get_bucket()
0061             try:
0062                 exc = bucket.get(block=False)
0063             except queue.Empty:
0064                 pass
0065             else:
0066                 exc_type, exc_obj, exc_trace = exc
0067                 logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
0068 
0069                 # deal with the exception
0070                 # ..
0071 
0072             thread.join(0.1)
0073             time.sleep(0.1)
0074 
0075         time.sleep(0.5)
0076 
0077     logger.debug('payload control ending since graceful_stop has been set')
0078     if args.abort_job.is_set():
0079         if traces.pilot['command'] == 'aborting':
0080             logger.warning('jobs are aborting')
0081         elif traces.pilot['command'] == 'abort':
0082             logger.warning('data control detected a set abort_job (due to a kill signal)')
0083             traces.pilot['command'] = 'aborting'
0084 
0085             # find all running jobs and stop them, find all jobs in queues relevant to this module
0086             #abort_jobs_in_queues(queues, args.signal)
0087 
0088     # proceed to set the job_aborted flag?
0089     if threads_aborted():
0090         logger.debug('will proceed to set job_aborted')
0091         args.job_aborted.set()
0092     else:
0093         logger.debug('will not set job_aborted yet')
0094 
0095     logger.debug('[payload] control thread has finished')
0096 
0097 
0098 def validate_pre(queues, traces, args):
0099     """
0100     Get a Job object from the "payloads" queue and validate it.
0101 
0102     If the payload is successfully validated (user defined), the Job object is placed in the "validated_payloads" queue,
0103     otherwise it is placed in the "failed_payloads" queue.
0104 
0105     :param queues: internal queues for job handling.
0106     :param traces: tuple containing internal pilot states.
0107     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0108     :return:
0109     """
0110     while not args.graceful_stop.is_set():
0111         time.sleep(0.5)
0112         try:
0113             job = queues.payloads.get(block=True, timeout=1)
0114         except queue.Empty:
0115             continue
0116 
0117         if _validate_payload(job):
0118             #queues.validated_payloads.put(job)
0119             put_in_queue(job, queues.validated_payloads)
0120         else:
0121             #queues.failed_payloads.put(job)
0122             put_in_queue(job, queues.failed_payloads)
0123 
0124     # proceed to set the job_aborted flag?
0125     if threads_aborted():
0126         logger.debug('will proceed to set job_aborted')
0127         args.job_aborted.set()
0128     else:
0129         logger.debug('will not set job_aborted yet')
0130 
0131     logger.info('[payload] validate_pre thread has finished')
0132 
0133 
0134 def _validate_payload(job):
0135     """
0136     Perform validation tests for the payload.
0137 
0138     :param job: job object.
0139     :return: boolean.
0140     """
0141 
0142     status = True
0143 
0144     # perform user specific validation
0145     pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0146     user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0147     try:
0148         status = user.validate(job)
0149     except Exception as error:
0150         logger.fatal('failed to execute user validate() function: %s', error)
0151         status = False
0152 
0153     return status
0154 
0155 
0156 def get_payload_executor(args, job, out, err, traces):
0157     """
0158     Get payload executor function for different payload.
0159 
0160     :param args: args object.
0161     :param job: job object.
0162     :param out:
0163     :param err:
0164     :param traces: traces object.
0165     :return: instance of a payload executor
0166     """
0167     if job.is_eventservice:  # True for native HPO workflow as well
0168         payload_executor = eventservice.Executor(args, job, out, err, traces)
0169     elif job.is_eventservicemerge:
0170         payload_executor = eventservicemerge.Executor(args, job, out, err, traces)
0171     else:
0172         payload_executor = generic.Executor(args, job, out, err, traces)
0173     return payload_executor
0174 
0175 
0176 def execute_payloads(queues, traces, args):  # noqa: C901
0177     """
0178     Execute queued payloads.
0179 
0180     Extract a Job object from the "validated_payloads" queue and put it in the "monitored_jobs" queue. The payload
0181     stdout/err streams are opened and the pilot state is changed to "starting". A payload executor is selected (for
0182     executing a normal job, an event service job or event service merge job). After the payload (or rather its executor)
0183     is started, the thread will wait for it to finish and then check for any failures. A successfully completed job is
0184     placed in the "finished_payloads" queue, and a failed job will be placed in the "failed_payloads" queue.
0185 
0186     :param queues: internal queues for job handling.
0187     :param traces: tuple containing internal pilot states.
0188     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0189     :return:
0190     """
0191 
0192     job = None
0193     while not args.graceful_stop.is_set():
0194         time.sleep(0.5)
0195         try:
0196             job = queues.validated_payloads.get(block=True, timeout=1)
0197 
0198             #q_snapshot = list(queues.finished_data_in.queue) if queues.finished_data_in else []
0199             #peek = [s_job for s_job in q_snapshot if job.jobid == s_job.jobid]
0200             #if job.jobid not in q_snapshot:
0201 
0202             q_snapshot = list(queues.finished_data_in.queue)
0203             peek = [s_job for s_job in q_snapshot if job.jobid == s_job.jobid]
0204             if len(peek) == 0:
0205                 put_in_queue(job, queues.validated_payloads)
0206                 for _ in range(10):  # Python 3
0207                     if args.graceful_stop.is_set():
0208                         break
0209                     time.sleep(1)
0210                 continue
0211 
0212             # this job is now to be monitored, so add it to the monitored_payloads queue
0213             #queues.monitored_payloads.put(job)
0214             put_in_queue(job, queues.monitored_payloads)
0215 
0216             logger.info('job %s added to monitored payloads queue', job.jobid)
0217 
0218             try:
0219                 out = open(os.path.join(job.workdir, config.Payload.payloadstdout), 'wb')
0220                 err = open(os.path.join(job.workdir, config.Payload.payloadstderr), 'wb')
0221             except Exception as error:
0222                 logger.warning('failed to open payload stdout/err: %s', error)
0223                 out = None
0224                 err = None
0225             send_state(job, args, 'starting')
0226 
0227             # note: when sending a state change to the server, the server might respond with 'tobekilled'
0228             if job.state == 'failed':
0229                 logger.warning('job state is \'failed\' - abort execute_payloads()')
0230                 break
0231 
0232             payload_executor = get_payload_executor(args, job, out, err, traces)
0233             logger.info("will use payload executor: %s", payload_executor)
0234 
0235             # run the payload and measure the execution time
0236             job.t0 = os.times()
0237             exit_code = payload_executor.run()
0238 
0239             set_cpu_consumption_time(job)
0240             job.transexitcode = exit_code % 255
0241 
0242             out.close()
0243             err.close()
0244 
0245             pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
0246 
0247             # some HPO jobs will produce new output files (following lfn name pattern), discover those and replace the job.outdata list
0248             if job.is_hpo:
0249                 user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user],
0250                                   0)  # Python 2/3
0251                 try:
0252                     user.update_output_for_hpo(job)
0253                 except Exception as error:
0254                     logger.warning('exception caught by update_output_for_hpo(): %s', error)
0255                 else:
0256                     for dat in job.outdata:
0257                         if not dat.guid:
0258                             dat.guid = get_guid()
0259                             logger.warning('guid not set: generated guid=%s for lfn=%s', dat.guid, dat.lfn)
0260 
0261             #if traces.pilot['nr_jobs'] == 1:
0262             #    logger.debug('faking job failure in first multi-job')
0263             #    job.transexitcode = 1
0264             #    exit_code = 1
0265 
0266             # analyze and interpret the payload execution output
0267             perform_initial_payload_error_analysis(job, exit_code)
0268 
0269             # was an error already found?
0270             #if job.piloterrorcodes:
0271             #    exit_code_interpret = 1
0272             #else:
0273             user = __import__('pilot.user.%s.diagnose' % pilot_user, globals(), locals(), [pilot_user], 0)  # Python 2/3
0274             try:
0275                 exit_code_interpret = user.interpret(job)
0276             except Exception as error:
0277                 logger.warning('exception caught: %s', error)
0278                 #exit_code_interpret = -1
0279                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.INTERNALPILOTPROBLEM)
0280 
0281             if job.piloterrorcodes:
0282                 exit_code_interpret = 1
0283 
0284             if exit_code_interpret == 0 and exit_code == 0:
0285                 logger.info('main payload error analysis completed - did not find any errors')
0286 
0287                 # update output lists if zipmaps were used
0288                 #job.add_archives_to_output_lists()
0289 
0290                 # queues.finished_payloads.put(job)
0291                 put_in_queue(job, queues.finished_payloads)
0292             else:
0293                 logger.debug('main payload error analysis completed - adding job to failed_payloads queue')
0294                 #queues.failed_payloads.put(job)
0295                 put_in_queue(job, queues.failed_payloads)
0296 
0297         except queue.Empty:
0298             continue
0299         except Exception as error:
0300             logger.fatal('execute payloads caught an exception (cannot recover): %s, %s', error, traceback.format_exc())
0301             if job:
0302                 job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PAYLOADEXECUTIONEXCEPTION)
0303                 #queues.failed_payloads.put(job)
0304                 put_in_queue(job, queues.failed_payloads)
0305             while not args.graceful_stop.is_set():
0306                 # let stage-out of log finish, but stop running payloads as there should be a problem with the pilot
0307                 time.sleep(5)
0308 
0309     # proceed to set the job_aborted flag?
0310     if threads_aborted():
0311         logger.debug('will proceed to set job_aborted')
0312         args.job_aborted.set()
0313     else:
0314         logger.debug('will not set job_aborted yet')
0315 
0316     logger.info('[payload] execute_payloads thread has finished')
0317 
0318 
0319 def set_cpu_consumption_time(job):
0320     """
0321     Set the CPU consumption time.
0322     :param job: job object.
0323     :return:
0324     """
0325 
0326     cpuconsumptiontime = get_cpu_consumption_time(job.t0)
0327     job.cpuconsumptiontime = int(round(cpuconsumptiontime))
0328     job.cpuconsumptionunit = "s"
0329     job.cpuconversionfactor = 1.0
0330     logger.info('CPU consumption time: %f %s (rounded to %d %s)', cpuconsumptiontime, job.cpuconsumptionunit, job.cpuconsumptiontime, job.cpuconsumptionunit)
0331 
0332 
0333 def perform_initial_payload_error_analysis(job, exit_code):
0334     """
0335     Perform an initial analysis of the payload.
0336     Singularity errors are caught here.
0337 
0338     :param job: job object.
0339     :param exit_code: exit code from payload execution.
0340     :return:
0341     """
0342 
0343     if exit_code != 0:
0344         logger.warning('main payload execution returned non-zero exit code: %d', exit_code)
0345 
0346     # look for singularity errors (the exit code can be zero in this case)
0347     stderr = read_file(os.path.join(job.workdir, config.Payload.payloadstderr))
0348     exit_code = errors.resolve_transform_error(exit_code, stderr)
0349 
0350     if exit_code != 0:
0351         msg = ""
0352         if stderr != "":
0353             msg = errors.extract_stderr_error(stderr)
0354             if msg == "":
0355                 # look for warning messages instead (might not be fatal so do not set UNRECOGNIZEDTRFSTDERR)
0356                 msg = errors.extract_stderr_warning(stderr)
0357             #    fatal = False
0358             #else:
0359             #    fatal = True
0360             #if msg != "":  # redundant since resolve_transform_error is used above
0361             #    logger.warning("extracted message from stderr:\n%s", msg)
0362             #    exit_code = set_error_code_from_stderr(msg, fatal)
0363 
0364         if msg:
0365             msg = errors.format_diagnostics(exit_code, msg)
0366 
0367         job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=msg)
0368 
0369         '''
0370         if exit_code != 0:
0371             if msg:
0372                 msg = errors.format_diagnostics(exit_code, msg)
0373             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=msg)
0374         else:
0375             if job.piloterrorcodes:
0376                 logger.warning('error code(s) already set: %s', str(job.piloterrorcodes))
0377             else:
0378                 # check if core dumps exist, if so remove them and return True
0379                 if remove_core_dumps(job.workdir) and not job.debug:
0380                     job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.COREDUMP)
0381                 else:
0382                     logger.warning('initial error analysis did not resolve the issue (and core dumps were not found)')
0383         '''
0384     else:
0385         logger.info('main payload execution returned zero exit code')
0386 
0387     # check if core dumps exist, if so remove them and return True
0388     if not job.debug:  # do not shorten these if-statements
0389         # only return True if found core dump belongs to payload
0390         if remove_core_dumps(job.workdir, pid=job.pid):
0391             # COREDUMP error will only be set if the core dump belongs to the payload (ie 'core.<payload pid>')
0392             logger.warning('setting COREDUMP error')
0393             job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.COREDUMP)
0394 
0395 
0396 def set_error_code_from_stderr(msg, fatal):
0397     """
0398     Identify specific errors in stderr and set the corresponding error code.
0399     The function returns 0 if no error is recognized.
0400 
0401     :param msg: stderr (string).
0402     :param fatal: boolean flag if fatal error among warning messages in stderr.
0403     :return: error code (int).
0404     """
0405 
0406     exit_code = 0
0407     error_map = {errors.SINGULARITYNEWUSERNAMESPACE: "Failed invoking the NEWUSER namespace runtime",
0408                  errors.SINGULARITYFAILEDUSERNAMESPACE: "Failed to create user namespace",
0409                  errors.SINGULARITYRESOURCEUNAVAILABLE: "resource temporarily unavailable",
0410                  errors.SINGULARITYNOTINSTALLED: "Singularity is not installed",
0411                  errors.TRANSFORMNOTFOUND: "command not found",
0412                  errors.UNSUPPORTEDSL5OS: "SL5 is unsupported",
0413                  errors.UNRECOGNIZEDTRFARGUMENTS: "unrecognized arguments"}
0414 
0415     for key, value in error_map.items():
0416         if value in msg:
0417             exit_code = key
0418             break
0419 
0420     if fatal and not exit_code:
0421         exit_code = errors.UNRECOGNIZEDTRFSTDERR
0422 
0423     return exit_code
0424 
0425 
0426 def validate_post(queues, traces, args):
0427     """
0428     Validate finished payloads.
0429     If payload finished correctly, add the job to the data_out queue. If it failed, add it to the data_out queue as
0430     well but only for log stage-out (in failed_post() below).
0431 
0432     :param queues: internal queues for job handling.
0433     :param traces: tuple containing internal pilot states.
0434     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0435     :return:
0436     """
0437 
0438     while not args.graceful_stop.is_set():
0439         time.sleep(0.5)
0440         # finished payloads
0441         try:
0442             job = queues.finished_payloads.get(block=True, timeout=1)
0443         except queue.Empty:
0444             time.sleep(0.1)
0445             continue
0446 
0447         # by default, both output and log should be staged out
0448         job.stageout = 'all'
0449         logger.debug('adding job to data_out queue')
0450         #queues.data_out.put(job)
0451         set_pilot_state(job=job, state='stageout')
0452         put_in_queue(job, queues.data_out)
0453 
0454     # proceed to set the job_aborted flag?
0455     if threads_aborted():
0456         logger.debug('will proceed to set job_aborted')
0457         args.job_aborted.set()
0458     else:
0459         logger.debug('will not set job_aborted yet')
0460 
0461     logger.info('[payload] validate_post thread has finished')
0462 
0463 
0464 def failed_post(queues, traces, args):
0465     """
0466     Get a Job object from the "failed_payloads" queue. Set the pilot state to "stakeout" and the stageout field to
0467     "log", and add the Job object to the "data_out" queue.
0468 
0469     :param queues: internal queues for job handling.
0470     :param traces: tuple containing internal pilot states.
0471     :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
0472     :return:
0473     """
0474 
0475     while not args.graceful_stop.is_set():
0476         time.sleep(0.5)
0477         # finished payloads
0478         try:
0479             job = queues.failed_payloads.get(block=True, timeout=1)
0480         except queue.Empty:
0481             time.sleep(0.1)
0482             continue
0483 
0484         logger.debug('adding log for log stageout')
0485 
0486         job.stageout = 'log'  # only stage-out log file
0487         #queues.data_out.put(job)
0488         set_pilot_state(job=job, state='stageout')
0489         put_in_queue(job, queues.data_out)
0490 
0491     # proceed to set the job_aborted flag?
0492     if threads_aborted():
0493         logger.debug('will proceed to set job_aborted')
0494         args.job_aborted.set()
0495     else:
0496         logger.debug('will not set job_aborted yet')
0497 
0498     logger.info('[payload] failed_post thread has finished')