Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2020-2021
0009 
0010 # Note: leave this module for now - the code might be useful for reuse
0011 
0012 import time
0013 
0014 try:
0015     import Queue as queue  # noqa: N813
0016 except Exception:
0017     import queue  # Python 3
0018 
0019 from pilot.common.exception import ExcThread
0020 from pilot.util.processes import threads_aborted
0021 
0022 import logging
0023 logger = logging.getLogger(__name__)
0024 
0025 
0026 def run(args):
0027     """
0028     Main execution function for the interceptor communication layer.
0029 
0030     :param args: pilot arguments.
0031     :returns:
0032     """
0033 
0034     targets = {'receive': receive, 'send': send}
0035     threads = [ExcThread(bucket=queue.Queue(), target=target, kwargs={'args': args},
0036                          name=name) for name, target in list(targets.items())]  # Python 2/3
0037 
0038     [thread.start() for thread in threads]
0039 
0040     # if an exception is thrown, the graceful_stop will be set by the ExcThread class run() function
0041     while not args.graceful_stop.is_set():
0042         for thread in threads:
0043             bucket = thread.get_bucket()
0044             try:
0045                 exc = bucket.get(block=False)
0046             except queue.Empty:
0047                 pass
0048             else:
0049                 exc_type, exc_obj, exc_trace = exc
0050                 logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
0051 
0052                 # deal with the exception
0053                 # ..
0054 
0055             thread.join(0.1)
0056             time.sleep(0.1)
0057 
0058         time.sleep(0.5)
0059 
0060     # proceed to set the job_aborted flag?
0061     if threads_aborted():
0062         logger.debug('will proceed to set job_aborted')
0063         args.job_aborted.set()
0064     else:
0065         logger.debug('will not set job_aborted yet')
0066 
0067     logger.debug('[interceptor] run thread has finished')
0068 
0069 
0070 def receive(args):
0071     """
0072     Look for interceptor messages.
0073 
0074     :param args: Pilot args object.
0075     :return:
0076     """
0077 
0078     while not args.graceful_stop.is_set():
0079         time.sleep(0.5)
0080 
0081     # proceed to set the job_aborted flag?
0082     if threads_aborted():
0083         logger.debug('will proceed to set job_aborted')
0084         args.job_aborted.set()
0085     else:
0086         logger.debug('will not set job_aborted yet')
0087 
0088     logger.debug('[interceptor] receive thread has finished')
0089 
0090 
0091 def send(args):
0092     """
0093     Send message to interceptor.
0094 
0095     :param args: Pilot args object.
0096     :return:
0097     """
0098 
0099     while not args.graceful_stop.is_set():
0100         time.sleep(0.5)
0101 
0102     # proceed to set the job_aborted flag?
0103     if threads_aborted():
0104         logger.debug('will proceed to set job_aborted')
0105         args.job_aborted.set()
0106     else:
0107         logger.debug('will not set job_aborted yet')
0108 
0109     logger.debug('[interceptor] receive send has finished')