Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:07

0001 import multiprocessing
0002 import os
0003 import pickle
0004 import sys
0005 import threading
0006 import time
0007 from concurrent.futures import ThreadPoolExecutor
0008 
0009 from pandacommon.pandalogger.LogWrapper import LogWrapper
0010 from pandacommon.pandalogger.PandaLogger import PandaLogger
0011 from pandaserver.taskbuffer import FileSpec, JobSpec
0012 
0013 JobSpec.reserveChangedState = True
0014 FileSpec.reserveChangedState = True
0015 
0016 
0017 _logger = PandaLogger().getLogger("TaskBufferInterface")
0018 
0019 
0020 # method class
0021 class TaskBufferMethod:
0022     def __init__(self, methodName, commDict, childlock, comLock, resLock):
0023         self.methodName = methodName
0024         self.childlock = childlock
0025         self.commDict = commDict
0026         self.comLock = comLock
0027         self.resLock = resLock
0028 
0029     def __call__(self, *args, **kwargs):
0030         log = LogWrapper(
0031             _logger,
0032             f"pid={os.getpid()} thr={threading.current_thread().ident} {self.methodName}",
0033         )
0034         log.debug("start")
0035         # get lock among children
0036         i = self.childlock.get()
0037         # make dict to send it master
0038         self.commDict[i].update(
0039             {
0040                 "methodName": self.methodName,
0041                 "args": pickle.dumps(args),
0042                 "kwargs": pickle.dumps(kwargs),
0043             }
0044         )
0045         # send notification to master
0046         self.comLock[i].release()
0047         # wait response
0048         self.resLock[i].acquire()
0049         res = pickle.loads(self.commDict[i]["res"])
0050         statusCode = self.commDict[i]["stat"]
0051         # release lock to children
0052         self.childlock.put(i)
0053         log.debug("end")
0054         # return
0055         if statusCode == 0:
0056             return res
0057         else:
0058             errtype, errvalue = res
0059             raise RuntimeError(f"{self.methodName}: {errtype.__name__} {errvalue}")
0060 
0061 
0062 # child class
0063 class TaskBufferInterfaceChild:
0064     # constructor
0065     def __init__(self, commDict, childlock, comLock, resLock):
0066         self.childlock = childlock
0067         self.commDict = commDict
0068         self.comLock = comLock
0069         self.resLock = resLock
0070 
0071     # method emulation
0072     def __getattr__(self, attrName):
0073         return TaskBufferMethod(attrName, self.commDict, self.childlock, self.comLock, self.resLock)
0074 
0075 
0076 # master class
0077 class TaskBufferInterface:
0078     # constructor
0079     def __init__(self):
0080         # make manager to create shared objects
0081         self.manager = multiprocessing.Manager()
0082         self.taskBuffer = None
0083 
0084     # main loop
0085     def run(self, taskBuffer, commDict, comLock, resLock, to_stop):
0086         with ThreadPoolExecutor(max_workers=taskBuffer.get_num_connections()) as pool:
0087             [
0088                 pool.submit(
0089                     self.thread_run,
0090                     taskBuffer,
0091                     commDict[i],
0092                     comLock[i],
0093                     resLock[i],
0094                     to_stop,
0095                 )
0096                 for i in commDict.keys()
0097             ]
0098 
0099     # main loop
0100     def thread_run(self, taskBuffer, commDict, comLock, resLock, to_stop):
0101         # main loop
0102         while True:
0103             # stop sign
0104             if to_stop.value:
0105                 break
0106             # wait for command
0107             if not comLock.acquire(timeout=0.25):
0108                 continue
0109             try:
0110                 # get command from child
0111                 methodName = commDict["methodName"]
0112                 args = pickle.loads(commDict["args"])
0113                 kwargs = pickle.loads(commDict["kwargs"])
0114                 # execute
0115                 method = getattr(taskBuffer, methodName)
0116                 res = method(*args, **kwargs)
0117                 commDict["stat"] = 0
0118                 # set response
0119                 commDict["res"] = pickle.dumps(res)
0120             except Exception:
0121                 res = sys.exc_info()[:2]
0122                 commDict["stat"] = 1
0123                 commDict["res"] = pickle.dumps(res)
0124             # send response
0125             resLock.release()
0126 
0127     # launcher
0128     def launch(self, taskBuffer):
0129         # shared objects
0130         self.childlock = multiprocessing.Queue()
0131         self.commDict = dict()
0132         self.comLock = dict()
0133         self.resLock = dict()
0134         self.taskBuffer = taskBuffer
0135         self.to_stop = multiprocessing.Value("i", 0)
0136         for i in range(taskBuffer.get_num_connections()):
0137             self.childlock.put(i)
0138             self.commDict[i] = self.manager.dict()
0139             self.comLock[i] = multiprocessing.Semaphore(0)
0140             self.resLock[i] = multiprocessing.Semaphore(0)
0141 
0142         # run
0143         self.process = multiprocessing.Process(
0144             target=self.run,
0145             args=(taskBuffer, self.commDict, self.comLock, self.resLock, self.to_stop),
0146         )
0147         self.process.start()
0148 
0149     # get interface for child
0150     def getInterface(self):
0151         return TaskBufferInterfaceChild(self.commDict, self.childlock, self.comLock, self.resLock)
0152 
0153     # stop the loop
0154     def stop(self, requester=None):
0155         with self.to_stop.get_lock():
0156             self.to_stop.value = 1
0157         while self.process.is_alive():
0158             time.sleep(1)
0159         self.taskBuffer.cleanup(requester=requester)
0160 
0161     # kill
0162     def terminate(self):
0163         self.process.terminate()