File indexing completed on 2026-04-10 08:38:58
0001 import multiprocessing
0002 import sys
0003 import threading
0004 import time
0005
0006
0007
0008 class ListWithLock:
0009 def __init__(self, dataList):
0010 self.lock = threading.Lock()
0011 self.dataList = dataList
0012 self.dataIndex = 0
0013
0014 def __iter__(self):
0015 return self
0016
0017 def __contains__(self, item):
0018 self.lock.acquire()
0019 ret = self.dataList.__contains__(item)
0020 self.lock.release()
0021 return ret
0022
0023 def __next__(self):
0024 if self.dataIndex >= len(self.dataList):
0025 self.dataIndex = 0
0026 raise StopIteration
0027 val = self.dataList[self.dataIndex]
0028 self.dataIndex += 1
0029 return val
0030
0031 def next(self):
0032 return self.__next__()
0033
0034 def append(self, item):
0035 self.lock.acquire()
0036 appended = False
0037 if item not in self.dataList:
0038 self.dataList.append(item)
0039 appended = True
0040 self.lock.release()
0041 return appended
0042
0043 def get(self, num):
0044 self.lock.acquire()
0045 retList = self.dataList[self.dataIndex : self.dataIndex + num]
0046 self.dataIndex += len(retList)
0047 self.lock.release()
0048 return retList
0049
0050 def stat(self):
0051 self.lock.acquire()
0052 total = len(self.dataList)
0053 nIndx = self.dataIndex
0054 self.lock.release()
0055 return total, nIndx
0056
0057 def __len__(self):
0058 self.lock.acquire()
0059 ret = len(self.dataList)
0060 self.lock.release()
0061 return ret
0062
0063 def dump(self):
0064 self.lock.acquire()
0065 if len(self.dataList) > self.dataIndex:
0066 ret = ",".join(map(str, self.dataList[self.dataIndex :]))
0067 else:
0068 ret = "None"
0069 self.lock.release()
0070 return ret
0071
0072
0073
0074 class MapWithLock:
0075 def __init__(self, dataMap=None):
0076 self.lock = threading.Lock()
0077 if dataMap is None:
0078 dataMap = {}
0079 self.dataMap = dataMap
0080
0081 def __getitem__(self, item):
0082 ret = self.dataMap.__getitem__(item)
0083 return ret
0084
0085 def __setitem__(self, item, value):
0086 self.dataMap.__setitem__(item, value)
0087
0088 def __contains__(self, item):
0089 ret = self.dataMap.__contains__(item)
0090 return ret
0091
0092 def acquire(self):
0093 self.lock.acquire()
0094
0095 def release(self):
0096 self.lock.release()
0097
0098 def add(self, item, value):
0099 if item not in self.dataMap:
0100 self.dataMap[item] = 0
0101 self.dataMap[item] += value
0102
0103 def get(self, item):
0104 if item not in self.dataMap:
0105 return 0
0106 return self.dataMap[item]
0107
0108 def items(self):
0109 return self.dataMap.items()
0110
0111 def iteritems(self):
0112 return self.items()
0113
0114
0115
0116 class ThreadPool:
0117 def __init__(self):
0118 self.lock = threading.Lock()
0119 self.list = []
0120
0121
0122 def add(self, obj):
0123 self.lock.acquire()
0124 self.list.append(obj)
0125 self.lock.release()
0126
0127
0128 def remove(self, obj):
0129 self.lock.acquire()
0130 try:
0131 self.list.remove(obj)
0132 except Exception:
0133 pass
0134 self.lock.release()
0135
0136
0137 def join(self, timeOut=None):
0138 thrlist = tuple(self.list)
0139 for thr in thrlist:
0140 try:
0141 thr.join(timeOut)
0142 if thr.is_alive():
0143 break
0144 except Exception:
0145 pass
0146
0147
0148 def clean(self):
0149 thrlist = tuple(self.list)
0150 for thr in thrlist:
0151 if not thr.is_alive():
0152 self.remove(thr)
0153
0154
0155 def dump(self):
0156 thrlist = tuple(self.list)
0157 nActv = 0
0158 for thr in thrlist:
0159 if thr.is_alive():
0160 nActv += 1
0161 return f"nActive={nActv}"
0162
0163
0164
0165 class WorkerThread(threading.Thread):
0166
0167 def __init__(self, workerSemaphore, threadPool, logger):
0168 threading.Thread.__init__(self)
0169 self.workerSemaphore = workerSemaphore
0170 self.threadPool = threadPool
0171 if self.threadPool is not None:
0172 self.threadPool.add(self)
0173 self.logger = logger
0174
0175
0176 def run(self):
0177
0178 if self.workerSemaphore is not None:
0179 self.workerSemaphore.acquire()
0180
0181 try:
0182 self.runImpl()
0183 except Exception:
0184 errtype, errvalue = sys.exc_info()[:2]
0185 self.logger.error(f"{self.__class__.__name__} crashed in WorkerThread.run() with {errtype.__name__}:{errvalue}")
0186
0187 if self.threadPool is not None:
0188 self.threadPool.remove(self)
0189
0190 if self.workerSemaphore is not None:
0191 self.workerSemaphore.release()
0192
0193
0194
0195 class ZombieCleaner(threading.Thread):
0196
0197 def __init__(self, interval=20):
0198 threading.Thread.__init__(self)
0199 self.interval = interval
0200
0201
0202 def run(self):
0203 while True:
0204 x = multiprocessing.active_children()
0205 time.sleep(self.interval)