Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:18

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017
0009 # - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2019
0011 
0012 from __future__ import print_function  # Python 2, 2to3 complains about this
0013 
0014 import functools
0015 import signal
0016 import threading
0017 import traceback
0018 
0019 from time import time, sleep
0020 from sys import stderr
0021 from os import getpid
0022 from shutil import rmtree
0023 
0024 try:
0025     import Queue as queue  # noqa: N813
0026 except Exception:
0027     import queue  # Python 3
0028 
0029 from collections import namedtuple
0030 
0031 from pilot.common.exception import ExcThread
0032 from pilot.control import job, payload, data, monitor
0033 from pilot.util.constants import SUCCESS, PILOT_KILL_SIGNAL, MAX_KILL_WAIT_TIME
0034 from pilot.util.processes import kill_processes, threads_aborted
0035 from pilot.util.timing import add_to_pilot_timing
0036 
0037 import logging
0038 logger = logging.getLogger(__name__)
0039 
0040 
0041 def interrupt(args, signum, frame):
0042     """
0043     Interrupt function on the receiving end of kill signals.
0044     This function is forwarded any incoming signals (SIGINT, SIGTERM, etc) and will set abort_job which instructs
0045     the threads to abort the job.
0046 
0047     :param args: pilot arguments.
0048     :param signum: signal.
0049     :param frame: stack/execution frame pointing to the frame that was interrupted by the signal.
0050     :return:
0051     """
0052 
0053     try:
0054         sig = [v for v, k in signal.__dict__.iteritems() if k == signum][0]
0055     except Exception:
0056         sig = [v for v, k in list(signal.__dict__.items()) if k == signum][0]
0057 
0058     args.signal_counter += 1
0059 
0060     # keep track of when first kill signal arrived, any stuck loops should abort at a defined cut off time
0061     if args.kill_time == 0:
0062         args.kill_time = int(time())
0063 
0064     max_kill_wait_time = MAX_KILL_WAIT_TIME + 60  # add another minute of grace to let threads finish
0065     current_time = int(time())
0066     if args.kill_time and current_time - args.kill_time > max_kill_wait_time:
0067         logger.warning('passed maximum waiting time after first kill signal - will commit suicide - farewell')
0068         try:
0069             rmtree(args.sourcedir)
0070         except Exception as e:
0071             logger.warning(e)
0072         logging.shutdown()
0073         kill_processes(getpid())
0074 
0075     add_to_pilot_timing('0', PILOT_KILL_SIGNAL, time(), args)
0076     add_to_pilot_timing('1', PILOT_KILL_SIGNAL, time(), args)
0077     logger.warning('caught signal: %s in FRAME=\n%s' % (sig, '\n'.join(traceback.format_stack(frame))))
0078 
0079     args.signal = sig
0080     logger.warning('will instruct threads to abort and update the server')
0081     args.abort_job.set()
0082     logger.warning('setting graceful stop (in case it was not set already)')
0083     args.graceful_stop.set()
0084     logger.warning('waiting for threads to finish')
0085     args.job_aborted.wait()
0086 
0087 
0088 def register_signals(signals, args):
0089     """
0090     Register kill signals for intercept function.
0091 
0092     :param signals: list of signals.
0093     :param args: pilot args.
0094     :return:
0095     """
0096 
0097     for sig in signals:
0098         signal.signal(sig, functools.partial(interrupt, args))
0099 
0100 
0101 def run(args):
0102     """
0103     Main execution function for the generic workflow.
0104 
0105     The function sets up the internal queues which handle the flow of jobs.
0106 
0107     :param args: pilot arguments.
0108     :returns: traces.
0109     """
0110 
0111     logger.info('setting up signal handling')
0112     register_signals([signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGSEGV, signal.SIGXCPU, signal.SIGUSR1, signal.SIGBUS], args)
0113 
0114     logger.info('setting up queues')
0115     queues = namedtuple('queues', ['jobs', 'payloads', 'data_in', 'data_out', 'current_data_in',
0116                                    'validated_jobs', 'validated_payloads', 'monitored_payloads',
0117                                    'finished_jobs', 'finished_payloads', 'finished_data_in', 'finished_data_out',
0118                                    'failed_jobs', 'failed_payloads', 'failed_data_in', 'failed_data_out',
0119                                    'completed_jobs', 'completed_jobids'])  #, 'interceptor_messages'])
0120 
0121     queues.jobs = queue.Queue()
0122     queues.payloads = queue.Queue()
0123     queues.data_in = queue.Queue()
0124     queues.data_out = queue.Queue()
0125 
0126     queues.current_data_in = queue.Queue()
0127     queues.validated_jobs = queue.Queue()
0128     queues.validated_payloads = queue.Queue()
0129     queues.monitored_payloads = queue.Queue()
0130 
0131     queues.finished_jobs = queue.Queue()
0132     queues.finished_payloads = queue.Queue()
0133     queues.finished_data_in = queue.Queue()
0134     queues.finished_data_out = queue.Queue()
0135 
0136     queues.failed_jobs = queue.Queue()
0137     queues.failed_payloads = queue.Queue()
0138     queues.failed_data_in = queue.Queue()
0139     queues.failed_data_out = queue.Queue()
0140 
0141     queues.completed_jobs = queue.Queue()
0142     queues.completed_jobids = queue.Queue()
0143 
0144     # queues.interceptor_messages = queue.Queue()
0145 
0146     logger.info('setting up tracing')
0147     traces = namedtuple('traces', ['pilot'])
0148     traces.pilot = {'state': SUCCESS,
0149                     'nr_jobs': 0,
0150                     'error_code': 0,
0151                     'command': None}
0152 
0153     # initial sanity check defined by pilot user
0154     try:
0155         user = __import__('pilot.user.%s.common' % args.pilot_user.lower(), globals(), locals(),
0156                           [args.pilot_user.lower()], 0)  # Python 2/3
0157         exit_code = user.sanity_check()
0158     except Exception as e:
0159         logger.info('skipping sanity check since: %s' % e)
0160     else:
0161         if exit_code != 0:
0162             logger.info('aborting workflow since sanity check failed')
0163             traces.pilot['error_code'] = exit_code
0164             return traces
0165         else:
0166             logger.info('passed sanity check')
0167 
0168     # define the threads
0169     targets = {'job': job.control, 'payload': payload.control, 'data': data.control, 'monitor': monitor.control}
0170     threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'queues': queues, 'traces': traces, 'args': args},
0171                          name=name) for name, target in list(targets.items())]  # Python 2/3
0172 
0173     logger.info('starting threads')
0174     [thread.start() for thread in threads]
0175 
0176     logger.info('waiting for interrupts')
0177 
0178     thread_count = threading.activeCount()
0179     while threading.activeCount() > 1:
0180         for thread in threads:
0181             bucket = thread.get_bucket()
0182             try:
0183                 exc = bucket.get(block=False)
0184             except queue.Empty:
0185                 pass
0186             else:
0187                 exc_type, exc_obj, exc_trace = exc
0188                 # deal with the exception
0189                 print('received exception from bucket queue in generic workflow: %s' % exc_obj, file=stderr)
0190                 # logger.fatal('caught exception: %s' % exc_obj)
0191 
0192             thread.join(0.1)
0193 
0194         abort = False
0195         if thread_count != threading.activeCount():
0196             # has all threads finished?
0197             abort = threads_aborted(abort_at=1)
0198             if abort:
0199                 break
0200 
0201         sleep(1)
0202 
0203     logger.info('end of generic workflow (traces error code: %d)' % traces.pilot['error_code'])
0204 
0205     return traces