Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:18

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2020 - 2023
0010 
0011 
0012 import logging
0013 import heapq
0014 import threading
0015 import traceback
0016 from concurrent import futures
0017 
0018 from .timertask import TimerTask
0019 
0020 
0021 class IDDSThreadPoolExecutor(futures.ThreadPoolExecutor):
0022     def __init__(self, max_workers=None, thread_name_prefix='',
0023                  initializer=None, initargs=()):
0024         self.futures = []
0025         self._lock = threading.RLock()
0026         super(IDDSThreadPoolExecutor, self).__init__(max_workers=max_workers,
0027                                                      thread_name_prefix=thread_name_prefix,
0028                                                      initializer=initializer,
0029                                                      initargs=initargs)
0030 
0031     def submit(self, fn, *args, **kwargs):
0032         future = super(IDDSThreadPoolExecutor, self).submit(fn, *args, **kwargs)
0033         with self._lock:
0034             self.futures.append(future)
0035         return future
0036 
0037     def get_max_workers(self):
0038         return self._max_workers
0039 
0040     def get_num_workers(self):
0041         with self._lock:
0042             for future in self.futures.copy():
0043                 if future.done():
0044                     try:
0045                         future.result()
0046                     except Exception as ex:
0047                         logging.info(f"Executor pool execution exception: {ex}")
0048                     self.futures.remove(future)
0049             return len(self.futures)
0050 
0051     def has_free_workers(self):
0052         return self.get_num_workers() < self._max_workers
0053 
0054     def get_num_free_workers(self):
0055         return self._max_workers - self.get_num_workers()
0056 
0057 
0058 class IDDSProcessPoolExecutor(futures.ProcessPoolExecutor):
0059     def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
0060         self.futures = []
0061         self._lock = threading.RLock()  # Still use threading lock for thread-safe tracking
0062         super(IDDSProcessPoolExecutor, self).__init__(
0063             max_workers=max_workers,
0064             initializer=initializer,
0065             initargs=initargs
0066         )
0067 
0068     def submit(self, fn, *args, **kwargs):
0069         future = super(IDDSProcessPoolExecutor, self).submit(fn, *args, **kwargs)
0070         with self._lock:
0071             self.futures.append(future)
0072         return future
0073 
0074     def get_max_workers(self):
0075         return self._max_workers
0076 
0077     def get_num_workers(self):
0078         with self._lock:
0079             # Clean up completed futures
0080             for future in self.futures.copy():
0081                 if future.done():
0082                     try:
0083                         future.result()  # propagate exception if needed
0084                     except Exception as ex:
0085                         logging.info(f"Process pool execution exception: {ex}")
0086                     self.futures.remove(future)
0087             return len(self.futures)
0088 
0089     def has_free_workers(self):
0090         return self.get_num_workers() < self._max_workers
0091 
0092     def get_num_free_workers(self):
0093         return self._max_workers - self.get_num_workers()
0094 
0095 
0096 class TimerScheduler(threading.Thread):
0097     """
0098     The base class to schedule Task which will be executed after some time
0099     """
0100 
0101     _thread_executor = None
0102     _process_executor = None
0103     _singleton_lock = threading.Lock()
0104 
0105     def __init__(self, num_threads, name=None, logger=None, use_process_pool=False):
0106         super(TimerScheduler, self).__init__(name=name)
0107         self.num_threads = int(num_threads)
0108         if self.num_threads < 1:
0109             self.num_threads = 1
0110         self.graceful_stop = threading.Event()
0111         self.executor_name = name
0112         self.use_process_pool = use_process_pool
0113 
0114         # self.executors = self.get_executor_signleton()
0115         self.executors = self.get_executor()
0116 
0117         self.executors_timer = IDDSThreadPoolExecutor(max_workers=1,
0118                                                       thread_name_prefix=name + "_Timer")
0119         self.timer_thread = None
0120 
0121         self._task_queue = []
0122         self._lock = threading.RLock()
0123 
0124         self.logger = logger
0125 
0126     def get_executor(self):
0127         if self.use_process_pool:
0128             return IDDSProcessPoolExecutor(max_workers=self.num_threads, thread_name_prefix=self.executor_name)
0129         else:
0130             return IDDSThreadPoolExecutor(max_workers=self.num_threads, thread_name_prefix=self.executor_name)
0131 
0132     def get_executor_signleton(self):
0133         with TimerScheduler._singleton_lock:
0134             if self.use_process_pool:
0135                 if TimerScheduler._process_executor is None:
0136                     TimerScheduler._process_executor = IDDSProcessPoolExecutor(max_workers=self.num_threads,
0137                                                                                thread_name_prefix=self.executor_name)
0138                 return TimerScheduler._process_executor
0139             else:
0140                 if TimerScheduler._thread_executor is None:
0141                     TimerScheduler._thread_executor = IDDSThreadPoolExecutor(max_workers=self.num_threads,
0142                                                                              thread_name_prefix=self.executor_name)
0143                 return TimerScheduler._thread_executor
0144 
0145     def set_logger(self, logger):
0146         self.logger = logger
0147 
0148     def stop(self, signum=None, frame=None):
0149         self.graceful_stop.set()
0150 
0151     def create_executors(self, name, max_workers=1):
0152         if self.use_process_pool:
0153             executors = IDDSProcessPoolExecutor(max_workers=max_workers, thread_name_prefix=name)
0154         else:
0155             executors = IDDSThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=name)
0156         return executors
0157 
0158     def get_max_workers(self):
0159         return self.executors.get_max_workers()
0160 
0161     def get_num_workers(self):
0162         return self.executors.get_num_workers()
0163 
0164     def get_num_free_workers(self):
0165         return self.executors.get_num_free_workers()
0166 
0167     def create_task(self, task_func, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1):
0168         return TimerTask(task_func, task_output_queue, task_args, task_kwargs, delay_time, priority, self.logger)
0169 
0170     def add_task(self, task):
0171         with self._lock:
0172             heapq.heappush(self._task_queue, task)
0173 
0174     def remove_task(self, task):
0175         with self._lock:
0176             self._task_queue.remove(task)
0177             heapq.heapify(self._task_queue)
0178 
0179     def remove_all(self):
0180         with self._lock:
0181             self._task_queue = []
0182 
0183     def get_ready_task(self):
0184         with self._lock:
0185             if not self._task_queue:
0186                 return None
0187             task = self._task_queue[0]
0188             if task.is_ready():
0189                 heapq.heappop(self._task_queue)
0190                 return task
0191         return None
0192 
0193     def submit(self, fn, *args, **kwargs):
0194         try:
0195             self.logger.info(f"Executors submit: func: {fn}, args: {args}, kwargs: {kwargs}")
0196             future = self.executors.submit(fn, *args, **kwargs)
0197             return future
0198         except Exception as ex:
0199             self.logger.error(f"Executors submit failed: {ex}")
0200 
0201     def execute_task(self, task):
0202         # self.logger.info('execute task: %s' % task)
0203         task.execute()
0204         self.add_task(task)
0205 
0206     def execute_local(self):
0207         while not self.graceful_stop.is_set():
0208             try:
0209                 task = self.get_ready_task()
0210                 if task:
0211                     self.executors_timer.submit(self.execute_task, task)
0212                 else:
0213                     self.graceful_stop.wait(0.0001)
0214             except Exception as error:
0215                 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0216 
0217     def execute(self):
0218         self.execute_local()
0219 
0220     def execute_once(self):
0221         try:
0222             task = self.get_ready_task()
0223             if task:
0224                 self.executors_timer.submit(self.execute_task, task)
0225         except Exception as error:
0226             self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0227 
0228     def execute_timer_schedule(self):
0229         try:
0230             task = self.get_ready_task()
0231             if task:
0232                 self.executors_timer.submit(self.execute_task, task)
0233         except Exception as error:
0234             self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0235 
0236     def execute_timer_schedule_thread(self):
0237         if self.timer_thread is None:
0238             self.timer_thread = threading.Thread(target=self.execute_local)
0239             self.timer_thread.start()