File indexing completed on 2026-04-10 08:39:07
0001 import multiprocessing
0002 import os
0003 import pickle
0004 import sys
0005 import threading
0006 import time
0007 from concurrent.futures import ThreadPoolExecutor
0008
0009 from pandacommon.pandalogger.LogWrapper import LogWrapper
0010 from pandacommon.pandalogger.PandaLogger import PandaLogger
0011 from pandaserver.taskbuffer import FileSpec, JobSpec
0012
0013 JobSpec.reserveChangedState = True
0014 FileSpec.reserveChangedState = True
0015
0016
0017 _logger = PandaLogger().getLogger("TaskBufferInterface")
0018
0019
0020
0021 class TaskBufferMethod:
0022 def __init__(self, methodName, commDict, childlock, comLock, resLock):
0023 self.methodName = methodName
0024 self.childlock = childlock
0025 self.commDict = commDict
0026 self.comLock = comLock
0027 self.resLock = resLock
0028
0029 def __call__(self, *args, **kwargs):
0030 log = LogWrapper(
0031 _logger,
0032 f"pid={os.getpid()} thr={threading.current_thread().ident} {self.methodName}",
0033 )
0034 log.debug("start")
0035
0036 i = self.childlock.get()
0037
0038 self.commDict[i].update(
0039 {
0040 "methodName": self.methodName,
0041 "args": pickle.dumps(args),
0042 "kwargs": pickle.dumps(kwargs),
0043 }
0044 )
0045
0046 self.comLock[i].release()
0047
0048 self.resLock[i].acquire()
0049 res = pickle.loads(self.commDict[i]["res"])
0050 statusCode = self.commDict[i]["stat"]
0051
0052 self.childlock.put(i)
0053 log.debug("end")
0054
0055 if statusCode == 0:
0056 return res
0057 else:
0058 errtype, errvalue = res
0059 raise RuntimeError(f"{self.methodName}: {errtype.__name__} {errvalue}")
0060
0061
0062
0063 class TaskBufferInterfaceChild:
0064
0065 def __init__(self, commDict, childlock, comLock, resLock):
0066 self.childlock = childlock
0067 self.commDict = commDict
0068 self.comLock = comLock
0069 self.resLock = resLock
0070
0071
0072 def __getattr__(self, attrName):
0073 return TaskBufferMethod(attrName, self.commDict, self.childlock, self.comLock, self.resLock)
0074
0075
0076
0077 class TaskBufferInterface:
0078
0079 def __init__(self):
0080
0081 self.manager = multiprocessing.Manager()
0082 self.taskBuffer = None
0083
0084
0085 def run(self, taskBuffer, commDict, comLock, resLock, to_stop):
0086 with ThreadPoolExecutor(max_workers=taskBuffer.get_num_connections()) as pool:
0087 [
0088 pool.submit(
0089 self.thread_run,
0090 taskBuffer,
0091 commDict[i],
0092 comLock[i],
0093 resLock[i],
0094 to_stop,
0095 )
0096 for i in commDict.keys()
0097 ]
0098
0099
0100 def thread_run(self, taskBuffer, commDict, comLock, resLock, to_stop):
0101
0102 while True:
0103
0104 if to_stop.value:
0105 break
0106
0107 if not comLock.acquire(timeout=0.25):
0108 continue
0109 try:
0110
0111 methodName = commDict["methodName"]
0112 args = pickle.loads(commDict["args"])
0113 kwargs = pickle.loads(commDict["kwargs"])
0114
0115 method = getattr(taskBuffer, methodName)
0116 res = method(*args, **kwargs)
0117 commDict["stat"] = 0
0118
0119 commDict["res"] = pickle.dumps(res)
0120 except Exception:
0121 res = sys.exc_info()[:2]
0122 commDict["stat"] = 1
0123 commDict["res"] = pickle.dumps(res)
0124
0125 resLock.release()
0126
0127
0128 def launch(self, taskBuffer):
0129
0130 self.childlock = multiprocessing.Queue()
0131 self.commDict = dict()
0132 self.comLock = dict()
0133 self.resLock = dict()
0134 self.taskBuffer = taskBuffer
0135 self.to_stop = multiprocessing.Value("i", 0)
0136 for i in range(taskBuffer.get_num_connections()):
0137 self.childlock.put(i)
0138 self.commDict[i] = self.manager.dict()
0139 self.comLock[i] = multiprocessing.Semaphore(0)
0140 self.resLock[i] = multiprocessing.Semaphore(0)
0141
0142
0143 self.process = multiprocessing.Process(
0144 target=self.run,
0145 args=(taskBuffer, self.commDict, self.comLock, self.resLock, self.to_stop),
0146 )
0147 self.process.start()
0148
0149
0150 def getInterface(self):
0151 return TaskBufferInterfaceChild(self.commDict, self.childlock, self.comLock, self.resLock)
0152
0153
0154 def stop(self, requester=None):
0155 with self.to_stop.get_lock():
0156 self.to_stop.value = 1
0157 while self.process.is_alive():
0158 time.sleep(1)
0159 self.taskBuffer.cleanup(requester=requester)
0160
0161
0162 def terminate(self):
0163 self.process.terminate()