File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 from __future__ import print_function
0013
0014 import functools
0015 import signal
0016 import threading
0017 import traceback
0018
0019 from time import time, sleep
0020 from sys import stderr
0021 from os import getpid
0022 from shutil import rmtree
0023
0024 try:
0025 import Queue as queue
0026 except Exception:
0027 import queue
0028
0029 from collections import namedtuple
0030
0031 from pilot.common.exception import ExcThread
0032 from pilot.control import job, payload, data, monitor
0033 from pilot.util.constants import SUCCESS, PILOT_KILL_SIGNAL, MAX_KILL_WAIT_TIME
0034 from pilot.util.processes import kill_processes, threads_aborted
0035 from pilot.util.timing import add_to_pilot_timing
0036
0037 import logging
0038 logger = logging.getLogger(__name__)
0039
0040
0041 def interrupt(args, signum, frame):
0042 """
0043 Interrupt function on the receiving end of kill signals.
0044 This function is forwarded any incoming signals (SIGINT, SIGTERM, etc) and will set abort_job which instructs
0045 the threads to abort the job.
0046
0047 :param args: pilot arguments.
0048 :param signum: signal.
0049 :param frame: stack/execution frame pointing to the frame that was interrupted by the signal.
0050 :return:
0051 """
0052
0053 try:
0054 sig = [v for v, k in signal.__dict__.iteritems() if k == signum][0]
0055 except Exception:
0056 sig = [v for v, k in list(signal.__dict__.items()) if k == signum][0]
0057
0058 args.signal_counter += 1
0059
0060
0061 if args.kill_time == 0:
0062 args.kill_time = int(time())
0063
0064 max_kill_wait_time = MAX_KILL_WAIT_TIME + 60
0065 current_time = int(time())
0066 if args.kill_time and current_time - args.kill_time > max_kill_wait_time:
0067 logger.warning('passed maximum waiting time after first kill signal - will commit suicide - farewell')
0068 try:
0069 rmtree(args.sourcedir)
0070 except Exception as e:
0071 logger.warning(e)
0072 logging.shutdown()
0073 kill_processes(getpid())
0074
0075 add_to_pilot_timing('0', PILOT_KILL_SIGNAL, time(), args)
0076 add_to_pilot_timing('1', PILOT_KILL_SIGNAL, time(), args)
0077 logger.warning('caught signal: %s in FRAME=\n%s' % (sig, '\n'.join(traceback.format_stack(frame))))
0078
0079 args.signal = sig
0080 logger.warning('will instruct threads to abort and update the server')
0081 args.abort_job.set()
0082 logger.warning('setting graceful stop (in case it was not set already)')
0083 args.graceful_stop.set()
0084 logger.warning('waiting for threads to finish')
0085 args.job_aborted.wait()
0086
0087
0088 def register_signals(signals, args):
0089 """
0090 Register kill signals for intercept function.
0091
0092 :param signals: list of signals.
0093 :param args: pilot args.
0094 :return:
0095 """
0096
0097 for sig in signals:
0098 signal.signal(sig, functools.partial(interrupt, args))
0099
0100
0101 def run(args):
0102 """
0103 Main execution function for the generic workflow.
0104
0105 The function sets up the internal queues which handle the flow of jobs.
0106
0107 :param args: pilot arguments.
0108 :returns: traces.
0109 """
0110
0111 logger.info('setting up signal handling')
0112 register_signals([signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGSEGV, signal.SIGXCPU, signal.SIGUSR1, signal.SIGBUS], args)
0113
0114 logger.info('setting up queues')
0115 queues = namedtuple('queues', ['jobs', 'payloads', 'data_in', 'data_out', 'current_data_in',
0116 'validated_jobs', 'validated_payloads', 'monitored_payloads',
0117 'finished_jobs', 'finished_payloads', 'finished_data_in', 'finished_data_out',
0118 'failed_jobs', 'failed_payloads', 'failed_data_in', 'failed_data_out',
0119 'completed_jobs', 'completed_jobids'])
0120
0121 queues.jobs = queue.Queue()
0122 queues.payloads = queue.Queue()
0123 queues.data_in = queue.Queue()
0124 queues.data_out = queue.Queue()
0125
0126 queues.current_data_in = queue.Queue()
0127 queues.validated_jobs = queue.Queue()
0128 queues.validated_payloads = queue.Queue()
0129 queues.monitored_payloads = queue.Queue()
0130
0131 queues.finished_jobs = queue.Queue()
0132 queues.finished_payloads = queue.Queue()
0133 queues.finished_data_in = queue.Queue()
0134 queues.finished_data_out = queue.Queue()
0135
0136 queues.failed_jobs = queue.Queue()
0137 queues.failed_payloads = queue.Queue()
0138 queues.failed_data_in = queue.Queue()
0139 queues.failed_data_out = queue.Queue()
0140
0141 queues.completed_jobs = queue.Queue()
0142 queues.completed_jobids = queue.Queue()
0143
0144
0145
0146 logger.info('setting up tracing')
0147 traces = namedtuple('traces', ['pilot'])
0148 traces.pilot = {'state': SUCCESS,
0149 'nr_jobs': 0,
0150 'error_code': 0,
0151 'command': None}
0152
0153
0154 try:
0155 user = __import__('pilot.user.%s.common' % args.pilot_user.lower(), globals(), locals(),
0156 [args.pilot_user.lower()], 0)
0157 exit_code = user.sanity_check()
0158 except Exception as e:
0159 logger.info('skipping sanity check since: %s' % e)
0160 else:
0161 if exit_code != 0:
0162 logger.info('aborting workflow since sanity check failed')
0163 traces.pilot['error_code'] = exit_code
0164 return traces
0165 else:
0166 logger.info('passed sanity check')
0167
0168
0169 targets = {'job': job.control, 'payload': payload.control, 'data': data.control, 'monitor': monitor.control}
0170 threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'queues': queues, 'traces': traces, 'args': args},
0171 name=name) for name, target in list(targets.items())]
0172
0173 logger.info('starting threads')
0174 [thread.start() for thread in threads]
0175
0176 logger.info('waiting for interrupts')
0177
0178 thread_count = threading.activeCount()
0179 while threading.activeCount() > 1:
0180 for thread in threads:
0181 bucket = thread.get_bucket()
0182 try:
0183 exc = bucket.get(block=False)
0184 except queue.Empty:
0185 pass
0186 else:
0187 exc_type, exc_obj, exc_trace = exc
0188
0189 print('received exception from bucket queue in generic workflow: %s' % exc_obj, file=stderr)
0190
0191
0192 thread.join(0.1)
0193
0194 abort = False
0195 if thread_count != threading.activeCount():
0196
0197 abort = threads_aborted(abort_at=1)
0198 if abort:
0199 break
0200
0201 sleep(1)
0202
0203 logger.info('end of generic workflow (traces error code: %d)' % traces.pilot['error_code'])
0204
0205 return traces