File indexing completed on 2026-04-10 08:39:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 """
0012 Standalone implementation of time-out check on function call.
0013 Timer stops execution of wrapped function if it reaches the limit of provided time. Supports decorator feature.
0014
0015 :author: Alexey Anisenkov
0016 :contact: anisyonk@cern.ch
0017 :date: March 2018
0018 """
0019
0020 from __future__ import print_function
0021
0022 import os
0023 import signal
0024 import sys
0025
0026 import traceback
0027 import threading
0028 import multiprocessing
0029
0030 try:
0031 from queue import Empty
0032 except Exception:
0033 from Queue import Empty
0034 from functools import wraps
0035
0036
0037 class TimeoutException(Exception):
0038
0039 def __init__(self, message, timeout=None, *args):
0040 self.timeout = timeout
0041 self.message = message
0042 self._errorCode = 1334
0043 super(TimeoutException, self).__init__(*args)
0044
0045 def __str__(self):
0046 return "%s: %s, timeout=%s seconds%s" % (self.__class__.__name__, self.message, self.timeout, ' : %s' % repr(self.args) if self.args else '')
0047
0048
0049 class TimedThread(object):
0050 """
0051 Thread-based Timer implementation (`threading` module)
0052 (shared memory space, GIL limitations, no way to kill thread, Windows compatible)
0053 """
0054
0055 def __init__(self, timeout):
0056 """
0057 :param timeout: timeout value for operation in seconds.
0058 """
0059
0060 self.timeout = timeout
0061 self.is_timeout = False
0062
0063 def execute(self, func, args, kwargs):
0064
0065 try:
0066 ret = (True, func(*args, **kwargs))
0067 except Exception:
0068 ret = (False, sys.exc_info())
0069
0070 self.result = ret
0071
0072 return ret
0073
0074 def run(self, func, args, kwargs, timeout=None):
0075 """
0076 :raise: TimeoutException if timeout value is reached before function finished
0077 """
0078
0079 thread = threading.Thread(target=self.execute, args=(func, args, kwargs))
0080 thread.daemon = True
0081
0082 thread.start()
0083
0084 timeout = timeout if timeout is not None else self.timeout
0085
0086 thread.join(timeout)
0087
0088 if thread.is_alive():
0089 self.is_timeout = True
0090 raise TimeoutException("Timeout reached", timeout=timeout)
0091
0092 ret = self.result
0093
0094 if ret[0]:
0095 return ret[1]
0096 else:
0097 try:
0098 _r = ret[1][0](ret[1][1]).with_traceback(ret[1][2])
0099 except AttributeError:
0100 exec("raise ret[1][0], ret[1][1], ret[1][2]")
0101 raise _r
0102
0103
0104 class TimedProcess(object):
0105 """
0106 Process-based Timer implementation (`multiprocessing` module). Uses shared Queue to keep result.
0107 (completely isolated memory space)
0108 In default python implementation multiprocessing considers (c)pickle as serialization backend
0109 which is not able properly (requires a hack) to pickle local and decorated functions (affects Windows only)
0110 Traceback data is printed to stderr
0111 """
0112
0113 def __init__(self, timeout):
0114 """
0115 :param timeout: timeout value for operation in seconds.
0116 """
0117
0118 self.timeout = timeout
0119 self.is_timeout = False
0120
0121 def run(self, func, args, kwargs, timeout=None):
0122
0123 def _execute(func, args, kwargs, queue):
0124 try:
0125 ret = func(*args, **kwargs)
0126 queue.put((True, ret))
0127 except Exception as e:
0128 print('Exception occurred while executing %s' % func, file=sys.stderr)
0129 traceback.print_exc(file=sys.stderr)
0130 queue.put((False, e))
0131
0132 queue = multiprocessing.Queue(1)
0133 process = multiprocessing.Process(target=_execute, args=(func, args, kwargs, queue))
0134 process.daemon = True
0135 process.start()
0136
0137 timeout = timeout if timeout is not None else self.timeout
0138
0139 try:
0140 ret = queue.get(block=True, timeout=timeout)
0141 except Empty:
0142 self.is_timeout = True
0143 process.terminate()
0144 raise TimeoutException("Timeout reached", timeout=timeout)
0145 finally:
0146 while process.is_alive():
0147 process.join(1)
0148 if process.is_alive():
0149 process.terminate()
0150 process.join(1)
0151 if process.is_alive() and process.pid:
0152 os.kill(process.pid, signal.SIGKILL)
0153
0154 multiprocessing.active_children()
0155
0156 if ret[0]:
0157 return ret[1]
0158 else:
0159 raise ret[1]
0160
0161
0162 Timer = TimedProcess
0163
0164
0165 def timeout(seconds, timer=None):
0166 """
0167 Decorator for a function which causes it to timeout (stop execution) once passed given number of seconds.
0168 :param timer: timer class (by default is Timer)
0169 :raise: TimeoutException in case of timeout interrupt
0170 """
0171
0172 timer = timer or Timer
0173
0174 def decorate(function):
0175
0176 @wraps(function)
0177 def wrapper(*args, **kwargs):
0178 return timer(seconds).run(function, args, kwargs)
0179
0180 return wrapper
0181
0182 return decorate