File indexing completed on 2026-04-09 07:58:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import logging
0013 import heapq
0014 import threading
0015 import traceback
0016 from concurrent import futures
0017
0018 from .timertask import TimerTask
0019
0020
0021 class IDDSThreadPoolExecutor(futures.ThreadPoolExecutor):
0022 def __init__(self, max_workers=None, thread_name_prefix='',
0023 initializer=None, initargs=()):
0024 self.futures = []
0025 self._lock = threading.RLock()
0026 super(IDDSThreadPoolExecutor, self).__init__(max_workers=max_workers,
0027 thread_name_prefix=thread_name_prefix,
0028 initializer=initializer,
0029 initargs=initargs)
0030
0031 def submit(self, fn, *args, **kwargs):
0032 future = super(IDDSThreadPoolExecutor, self).submit(fn, *args, **kwargs)
0033 with self._lock:
0034 self.futures.append(future)
0035 return future
0036
0037 def get_max_workers(self):
0038 return self._max_workers
0039
0040 def get_num_workers(self):
0041 with self._lock:
0042 for future in self.futures.copy():
0043 if future.done():
0044 try:
0045 future.result()
0046 except Exception as ex:
0047 logging.info(f"Executor pool execution exception: {ex}")
0048 self.futures.remove(future)
0049 return len(self.futures)
0050
0051 def has_free_workers(self):
0052 return self.get_num_workers() < self._max_workers
0053
0054 def get_num_free_workers(self):
0055 return self._max_workers - self.get_num_workers()
0056
0057
0058 class IDDSProcessPoolExecutor(futures.ProcessPoolExecutor):
0059 def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
0060 self.futures = []
0061 self._lock = threading.RLock()
0062 super(IDDSProcessPoolExecutor, self).__init__(
0063 max_workers=max_workers,
0064 initializer=initializer,
0065 initargs=initargs
0066 )
0067
0068 def submit(self, fn, *args, **kwargs):
0069 future = super(IDDSProcessPoolExecutor, self).submit(fn, *args, **kwargs)
0070 with self._lock:
0071 self.futures.append(future)
0072 return future
0073
0074 def get_max_workers(self):
0075 return self._max_workers
0076
0077 def get_num_workers(self):
0078 with self._lock:
0079
0080 for future in self.futures.copy():
0081 if future.done():
0082 try:
0083 future.result()
0084 except Exception as ex:
0085 logging.info(f"Process pool execution exception: {ex}")
0086 self.futures.remove(future)
0087 return len(self.futures)
0088
0089 def has_free_workers(self):
0090 return self.get_num_workers() < self._max_workers
0091
0092 def get_num_free_workers(self):
0093 return self._max_workers - self.get_num_workers()
0094
0095
0096 class TimerScheduler(threading.Thread):
0097 """
0098 The base class to schedule Task which will be executed after some time
0099 """
0100
0101 _thread_executor = None
0102 _process_executor = None
0103 _singleton_lock = threading.Lock()
0104
0105 def __init__(self, num_threads, name=None, logger=None, use_process_pool=False):
0106 super(TimerScheduler, self).__init__(name=name)
0107 self.num_threads = int(num_threads)
0108 if self.num_threads < 1:
0109 self.num_threads = 1
0110 self.graceful_stop = threading.Event()
0111 self.executor_name = name
0112 self.use_process_pool = use_process_pool
0113
0114
0115 self.executors = self.get_executor()
0116
0117 self.executors_timer = IDDSThreadPoolExecutor(max_workers=1,
0118 thread_name_prefix=name + "_Timer")
0119 self.timer_thread = None
0120
0121 self._task_queue = []
0122 self._lock = threading.RLock()
0123
0124 self.logger = logger
0125
0126 def get_executor(self):
0127 if self.use_process_pool:
0128 return IDDSProcessPoolExecutor(max_workers=self.num_threads, thread_name_prefix=self.executor_name)
0129 else:
0130 return IDDSThreadPoolExecutor(max_workers=self.num_threads, thread_name_prefix=self.executor_name)
0131
0132 def get_executor_signleton(self):
0133 with TimerScheduler._singleton_lock:
0134 if self.use_process_pool:
0135 if TimerScheduler._process_executor is None:
0136 TimerScheduler._process_executor = IDDSProcessPoolExecutor(max_workers=self.num_threads,
0137 thread_name_prefix=self.executor_name)
0138 return TimerScheduler._process_executor
0139 else:
0140 if TimerScheduler._thread_executor is None:
0141 TimerScheduler._thread_executor = IDDSThreadPoolExecutor(max_workers=self.num_threads,
0142 thread_name_prefix=self.executor_name)
0143 return TimerScheduler._thread_executor
0144
0145 def set_logger(self, logger):
0146 self.logger = logger
0147
0148 def stop(self, signum=None, frame=None):
0149 self.graceful_stop.set()
0150
0151 def create_executors(self, name, max_workers=1):
0152 if self.use_process_pool:
0153 executors = IDDSProcessPoolExecutor(max_workers=max_workers, thread_name_prefix=name)
0154 else:
0155 executors = IDDSThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=name)
0156 return executors
0157
0158 def get_max_workers(self):
0159 return self.executors.get_max_workers()
0160
0161 def get_num_workers(self):
0162 return self.executors.get_num_workers()
0163
0164 def get_num_free_workers(self):
0165 return self.executors.get_num_free_workers()
0166
0167 def create_task(self, task_func, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=10, priority=1):
0168 return TimerTask(task_func, task_output_queue, task_args, task_kwargs, delay_time, priority, self.logger)
0169
0170 def add_task(self, task):
0171 with self._lock:
0172 heapq.heappush(self._task_queue, task)
0173
0174 def remove_task(self, task):
0175 with self._lock:
0176 self._task_queue.remove(task)
0177 heapq.heapify(self._task_queue)
0178
0179 def remove_all(self):
0180 with self._lock:
0181 self._task_queue = []
0182
0183 def get_ready_task(self):
0184 with self._lock:
0185 if not self._task_queue:
0186 return None
0187 task = self._task_queue[0]
0188 if task.is_ready():
0189 heapq.heappop(self._task_queue)
0190 return task
0191 return None
0192
0193 def submit(self, fn, *args, **kwargs):
0194 try:
0195 self.logger.info(f"Executors submit: func: {fn}, args: {args}, kwargs: {kwargs}")
0196 future = self.executors.submit(fn, *args, **kwargs)
0197 return future
0198 except Exception as ex:
0199 self.logger.error(f"Executors submit failed: {ex}")
0200
0201 def execute_task(self, task):
0202
0203 task.execute()
0204 self.add_task(task)
0205
0206 def execute_local(self):
0207 while not self.graceful_stop.is_set():
0208 try:
0209 task = self.get_ready_task()
0210 if task:
0211 self.executors_timer.submit(self.execute_task, task)
0212 else:
0213 self.graceful_stop.wait(0.0001)
0214 except Exception as error:
0215 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0216
0217 def execute(self):
0218 self.execute_local()
0219
0220 def execute_once(self):
0221 try:
0222 task = self.get_ready_task()
0223 if task:
0224 self.executors_timer.submit(self.execute_task, task)
0225 except Exception as error:
0226 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0227
0228 def execute_timer_schedule(self):
0229 try:
0230 task = self.get_ready_task()
0231 if task:
0232 self.executors_timer.submit(self.execute_task, task)
0233 except Exception as error:
0234 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0235
0236 def execute_timer_schedule_thread(self):
0237 if self.timer_thread is None:
0238 self.timer_thread = threading.Thread(target=self.execute_local)
0239 self.timer_thread.start()