File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import time
0013
0014 try:
0015 import Queue as queue
0016 except Exception:
0017 import queue
0018
0019 from pilot.common.exception import ExcThread
0020 from pilot.util.processes import threads_aborted
0021
0022 import logging
0023 logger = logging.getLogger(__name__)
0024
0025
0026 def run(args):
0027 """
0028 Main execution function for the interceptor communication layer.
0029
0030 :param args: pilot arguments.
0031 :returns:
0032 """
0033
0034 targets = {'receive': receive, 'send': send}
0035 threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'args': args},
0036 name=name) for name, target in list(targets.items())]
0037
0038 [thread.start() for thread in threads]
0039
0040
0041 while not args.graceful_stop.is_set():
0042 for thread in threads:
0043 bucket = thread.get_bucket()
0044 try:
0045 exc = bucket.get(block=False)
0046 except queue.Empty:
0047 pass
0048 else:
0049 exc_type, exc_obj, exc_trace = exc
0050 logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
0051
0052
0053
0054
0055 thread.join(0.1)
0056 time.sleep(0.1)
0057
0058 time.sleep(0.5)
0059
0060
0061 if threads_aborted():
0062 logger.debug('will proceed to set job_aborted')
0063 args.job_aborted.set()
0064 else:
0065 logger.debug('will not set job_aborted yet')
0066
0067 logger.debug('[interceptor] run thread has finished')
0068
0069
0070 def receive(args):
0071 """
0072 Look for interceptor messages.
0073
0074 :param args: Pilot args object.
0075 :return:
0076 """
0077
0078 while not args.graceful_stop.is_set():
0079 time.sleep(0.5)
0080
0081
0082 if threads_aborted():
0083 logger.debug('will proceed to set job_aborted')
0084 args.job_aborted.set()
0085 else:
0086 logger.debug('will not set job_aborted yet')
0087
0088 logger.debug('[interceptor] receive thread has finished')
0089
0090
0091 def send(args):
0092 """
0093 Send message to interceptor.
0094
0095 :param args: Pilot args object.
0096 :return:
0097 """
0098
0099 while not args.graceful_stop.is_set():
0100 time.sleep(0.5)
0101
0102
0103 if threads_aborted():
0104 logger.debug('will proceed to set job_aborted')
0105 args.job_aborted.set()
0106 else:
0107 logger.debug('will not set job_aborted yet')
0108
0109 logger.debug('[interceptor] receive send has finished')