Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import multiprocessing
0002 import sys
0003 import threading
0004 import time
0005 
0006 
0007 # list with lock
0008 class ListWithLock:
0009     def __init__(self, dataList):
0010         self.lock = threading.Lock()
0011         self.dataList = dataList
0012         self.dataIndex = 0
0013 
0014     def __iter__(self):
0015         return self
0016 
0017     def __contains__(self, item):
0018         self.lock.acquire()
0019         ret = self.dataList.__contains__(item)
0020         self.lock.release()
0021         return ret
0022 
0023     def __next__(self):
0024         if self.dataIndex >= len(self.dataList):
0025             self.dataIndex = 0
0026             raise StopIteration
0027         val = self.dataList[self.dataIndex]
0028         self.dataIndex += 1
0029         return val
0030 
0031     def next(self):
0032         return self.__next__()
0033 
0034     def append(self, item):
0035         self.lock.acquire()
0036         appended = False
0037         if item not in self.dataList:
0038             self.dataList.append(item)
0039             appended = True
0040         self.lock.release()
0041         return appended
0042 
0043     def get(self, num):
0044         self.lock.acquire()
0045         retList = self.dataList[self.dataIndex : self.dataIndex + num]
0046         self.dataIndex += len(retList)
0047         self.lock.release()
0048         return retList
0049 
0050     def stat(self):
0051         self.lock.acquire()
0052         total = len(self.dataList)
0053         nIndx = self.dataIndex
0054         self.lock.release()
0055         return total, nIndx
0056 
0057     def __len__(self):
0058         self.lock.acquire()
0059         ret = len(self.dataList)
0060         self.lock.release()
0061         return ret
0062 
0063     def dump(self):
0064         self.lock.acquire()
0065         if len(self.dataList) > self.dataIndex:
0066             ret = ",".join(map(str, self.dataList[self.dataIndex :]))
0067         else:
0068             ret = "None"
0069         self.lock.release()
0070         return ret
0071 
0072 
0073 # map with lock
0074 class MapWithLock:
0075     def __init__(self, dataMap=None):
0076         self.lock = threading.Lock()
0077         if dataMap is None:
0078             dataMap = {}
0079         self.dataMap = dataMap
0080 
0081     def __getitem__(self, item):
0082         ret = self.dataMap.__getitem__(item)
0083         return ret
0084 
0085     def __setitem__(self, item, value):
0086         self.dataMap.__setitem__(item, value)
0087 
0088     def __contains__(self, item):
0089         ret = self.dataMap.__contains__(item)
0090         return ret
0091 
0092     def acquire(self):
0093         self.lock.acquire()
0094 
0095     def release(self):
0096         self.lock.release()
0097 
0098     def add(self, item, value):
0099         if item not in self.dataMap:
0100             self.dataMap[item] = 0
0101         self.dataMap[item] += value
0102 
0103     def get(self, item):
0104         if item not in self.dataMap:
0105             return 0
0106         return self.dataMap[item]
0107 
0108     def items(self):
0109         return self.dataMap.items()
0110 
0111     def iteritems(self):
0112         return self.items()
0113 
0114 
0115 # thread pool
0116 class ThreadPool:
0117     def __init__(self):
0118         self.lock = threading.Lock()
0119         self.list = []
0120 
0121     # add thread
0122     def add(self, obj):
0123         self.lock.acquire()
0124         self.list.append(obj)
0125         self.lock.release()
0126 
0127     # remove thread
0128     def remove(self, obj):
0129         self.lock.acquire()
0130         try:
0131             self.list.remove(obj)
0132         except Exception:
0133             pass
0134         self.lock.release()
0135 
0136     # join
0137     def join(self, timeOut=None):
0138         thrlist = tuple(self.list)
0139         for thr in thrlist:
0140             try:
0141                 thr.join(timeOut)
0142                 if thr.is_alive():
0143                     break
0144             except Exception:
0145                 pass
0146 
0147     # remove inactive threads
0148     def clean(self):
0149         thrlist = tuple(self.list)
0150         for thr in thrlist:
0151             if not thr.is_alive():
0152                 self.remove(thr)
0153 
0154     # dump contents
0155     def dump(self):
0156         thrlist = tuple(self.list)
0157         nActv = 0
0158         for thr in thrlist:
0159             if thr.is_alive():
0160                 nActv += 1
0161         return f"nActive={nActv}"
0162 
0163 
0164 # thread class working with semaphore and thread pool
0165 class WorkerThread(threading.Thread):
0166     # constructor
0167     def __init__(self, workerSemaphore, threadPool, logger):
0168         threading.Thread.__init__(self)
0169         self.workerSemaphore = workerSemaphore
0170         self.threadPool = threadPool
0171         if self.threadPool is not None:
0172             self.threadPool.add(self)
0173         self.logger = logger
0174 
0175     # main loop
0176     def run(self):
0177         # get slot
0178         if self.workerSemaphore is not None:
0179             self.workerSemaphore.acquire()
0180         # execute real work
0181         try:
0182             self.runImpl()
0183         except Exception:
0184             errtype, errvalue = sys.exc_info()[:2]
0185             self.logger.error(f"{self.__class__.__name__} crashed in WorkerThread.run() with {errtype.__name__}:{errvalue}")
0186         # remove self from thread pool
0187         if self.threadPool is not None:
0188             self.threadPool.remove(self)
0189         # release slot
0190         if self.workerSemaphore is not None:
0191             self.workerSemaphore.release()
0192 
0193 
0194 # thread class to cleanup zombi processes
0195 class ZombieCleaner(threading.Thread):
0196     # constructor
0197     def __init__(self, interval=20):
0198         threading.Thread.__init__(self)
0199         self.interval = interval
0200 
0201     # main loop
0202     def run(self):
0203         while True:
0204             x = multiprocessing.active_children()
0205             time.sleep(self.interval)