Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import datetime
0002 import multiprocessing
0003 import multiprocessing.reduction
0004 import os
0005 import signal
0006 import sys
0007 import time
0008 
0009 from pandacommon.pandautils.PandaUtils import naive_utcnow
0010 
0011 try:
0012     from multiprocessing.connection import reduce_connection
0013 except ImportError:
0014     from multiprocessing.reduction import reduce_connection
0015 
0016 # import multiprocessing
0017 # logger = multiprocessing.log_to_stderr()
0018 # logger.setLevel(multiprocessing.SUBDEBUG)
0019 
0020 ###########################################################
0021 #
0022 # status codes for IPC
0023 #
0024 
0025 
0026 # class for status code
0027 class StatusCode(object):
0028     def __init__(self, value):
0029         self.value = value
0030 
0031     def __str__(self):
0032         return f"{self.value}"
0033 
0034     # comparator
0035     def __eq__(self, other):
0036         try:
0037             return self.value == other.value
0038         except Exception:
0039             return False
0040 
0041     def __ne__(self, other):
0042         try:
0043             return self.value != other.value
0044         except Exception:
0045             return True
0046 
0047 
0048 # mapping to accessors
0049 statusCodeMap = {
0050     "SC_SUCCEEDED": StatusCode(0),
0051     "SC_FAILED": StatusCode(1),
0052     "SC_FATAL": StatusCode(2),
0053 }
0054 
0055 
0056 # install the list of status codes to a class
0057 def installSC(cls):
0058     for sc, val in statusCodeMap.items():
0059         setattr(cls, sc, val)
0060 
0061 
0062 # install SCs in this module
0063 installSC(sys.modules[__name__])
0064 
0065 
0066 ###########################################################
0067 #
0068 # classes for IPC
0069 #
0070 
0071 
0072 # log message with timestamp
0073 def dumpStdOut(sender, message):
0074     timeNow = naive_utcnow()
0075     print(f"{str(timeNow)} {sender}: INFO    {message}")
0076 
0077 
0078 # object class for command
0079 class CommandObject(object):
0080     # constructor
0081     def __init__(self, methodName, argList, argMap):
0082         self.methodName = methodName
0083         self.argList = argList
0084         self.argMap = argMap
0085 
0086 
0087 # object class for response
0088 class ReturnObject(object):
0089     # constructor
0090     def __init__(self):
0091         self.statusCode = None
0092         self.errorValue = None
0093         self.returnValue = None
0094 
0095 
0096 # process class
0097 class ProcessClass(object):
0098     # constructor
0099     def __init__(self, pid, connection):
0100         self.pid = pid
0101         self.nused = 0
0102         self.usedMemory = 0
0103         self.nMemLookup = 20
0104         # reduce connection to make it picklable
0105         self.reduced_pipe = reduce_connection(connection)
0106 
0107     # get connection
0108     def connection(self):
0109         # rebuild connection
0110         return self.reduced_pipe[0](*self.reduced_pipe[1])
0111 
0112     # reduce connection
0113     def reduceConnection(self, connection):
0114         self.reduced_pipe = reduce_connection(connection)
0115 
0116     # get memory usage
0117     def getMemUsage(self):
0118         # update memory info
0119         if self.nused % self.nMemLookup == 0:
0120             try:
0121                 # read memory info from /proc
0122                 t = open(f"/proc/{self.pid}/status")
0123                 v = t.read()
0124                 t.close()
0125                 value = 0
0126                 for line in v.split("\n"):
0127                     if line.startswith("VmRSS"):
0128                         items = line.split()
0129                         value = int(items[1])
0130                         if items[2] in ["kB", "KB"]:
0131                             value /= 1024
0132                         elif items[2] in ["mB", "MB"]:
0133                             pass
0134                         break
0135                 self.usedMemory = value
0136             except Exception:
0137                 pass
0138             return self.usedMemory
0139         else:
0140             return None
0141 
0142 
0143 # method class
0144 class MethodClass(object):
0145     # constructor
0146     def __init__(self, className, methodName, vo, connectionQueue, voIF):
0147         self.className = className
0148         self.methodName = methodName
0149         self.vo = vo
0150         self.connectionQueue = connectionQueue
0151         self.voIF = voIF
0152         self.pipeList = []
0153 
0154     # method emulation
0155     def __call__(self, *args, **kwargs):
0156         commandObj = CommandObject(self.methodName, args, kwargs)
0157         nTry = 3
0158         for iTry in range(nTry):
0159             # exceptions
0160             retException = None
0161             strException = None
0162             try:
0163                 stepIdx = 0
0164                 # get child process
0165                 child_process = self.connectionQueue.get()
0166                 # get pipe
0167                 stepIdx = 1
0168                 pipe = child_process.connection()
0169                 # get ack
0170                 stepIdx = 2
0171                 timeoutPeriodACK = 30
0172                 if not pipe.poll(timeoutPeriodACK):
0173                     raise JEDITimeoutError(f"did not get ACK for {timeoutPeriodACK}sec")
0174                 ack = pipe.recv()
0175                 # send command
0176                 stepIdx = 3
0177                 pipe.send(commandObj)
0178                 # wait response
0179                 stepIdx = 4
0180                 timeoutPeriod = 600
0181                 timeNow = naive_utcnow()
0182                 if not pipe.poll(timeoutPeriod):
0183                     raise JEDITimeoutError(f"did not get response for {timeoutPeriod}sec")
0184                 regTime = naive_utcnow() - timeNow
0185                 if regTime > datetime.timedelta(seconds=60):
0186                     dumpStdOut(
0187                         self.className,
0188                         f"methodName={self.methodName} took {regTime.seconds}.{int(regTime.microseconds / 1000):03d} sec in pid={child_process.pid}",
0189                     )
0190                 # get response
0191                 stepIdx = 5
0192                 ret = pipe.recv()
0193                 # set exception type based on error
0194                 stepIdx = 6
0195                 if ret.statusCode == SC_FAILED:
0196                     retException = JEDITemporaryError
0197                 elif ret.statusCode == SC_FATAL:
0198                     retException = JEDIFatalError
0199             except Exception:
0200                 errtype, errvalue = sys.exc_info()[:2]
0201                 retException = errtype
0202                 argStr = f"args={str(args)} kargs={str(kwargs)}"
0203                 strException = f"VO={self.vo} type={errtype.__name__} stepIdx={stepIdx} : {self.className}.{self.methodName} {errvalue} {argStr[:200]}"
0204             # increment nused
0205             child_process.nused += 1
0206             # memory check
0207             largeMemory = False
0208             memUsed = child_process.getMemUsage()
0209             if memUsed is not None:
0210                 memStr = f"pid={child_process.pid} memory={memUsed}MB"
0211                 if memUsed > 1.5 * 1024:
0212                     largeMemory = True
0213                     memStr += " exceeds memory limit"
0214                     dumpStdOut(self.className, memStr)
0215             # kill old or problematic process
0216             if child_process.nused > 1000 or retException not in [None, JEDITemporaryError, JEDIFatalError] or largeMemory:
0217                 dumpStdOut(
0218                     self.className,
0219                     f"methodName={self.methodName} ret={retException} nused={child_process.nused} {strException} in pid={child_process.pid}",
0220                 )
0221                 # close connection
0222                 try:
0223                     pipe.close()
0224                 except Exception:
0225                     pass
0226                 # terminate child process
0227                 try:
0228                     dumpStdOut(self.className, f"killing pid={child_process.pid}")
0229                     os.kill(child_process.pid, signal.SIGKILL)
0230                     dumpStdOut(self.className, f"waiting pid={child_process.pid}")
0231                     os.waitpid(child_process.pid, 0)
0232                     dumpStdOut(self.className, f"terminated pid={child_process.pid}")
0233                 except Exception:
0234                     errtype, errvalue = sys.exc_info()[:2]
0235                     if "No child processes" not in str(errvalue):
0236                         dumpStdOut(self.className, f"failed to terminate {child_process.pid} with {errtype}:{errvalue}")
0237                 # make new child process
0238                 self.voIF.launchChild()
0239             else:
0240                 # reduce process object to avoid deadlock due to rebuilding of connection
0241                 child_process.reduceConnection(pipe)
0242                 self.connectionQueue.put(child_process)
0243             # success, fatal error, or maximally attempted
0244             if retException in [None, JEDIFatalError] or (iTry + 1 == nTry):
0245                 break
0246             # sleep
0247             time.sleep(1)
0248         # raise exception
0249         if retException is not None:
0250             if strException is None:
0251                 strException = f"VO={self.vo} {ret.errorValue}"
0252             raise retException(strException)
0253         # return
0254         if ret.statusCode == SC_SUCCEEDED:
0255             return ret.returnValue
0256         else:
0257             raise retException(f"VO={self.vo} {ret.errorValue}")
0258 
0259 
0260 # interface class to send command
0261 class CommandSendInterface(object):
0262     # constructor
0263     def __init__(self, vo, maxChild, moduleName, className):
0264         self.vo = vo
0265         self.maxChild = maxChild
0266         self.connectionQueue = multiprocessing.Queue(maxChild)
0267         self.moduleName = moduleName
0268         self.className = className
0269 
0270     # factory method
0271     def __getattr__(self, attrName):
0272         return MethodClass(self.className, attrName, self.vo, self.connectionQueue, self)
0273 
0274     # launcher for child processe
0275     def launcher(self, channel):
0276         # import module
0277         mod = __import__(self.moduleName)
0278         for subModuleName in self.moduleName.split(".")[1:]:
0279             mod = getattr(mod, subModuleName)
0280         # get class
0281         cls = getattr(mod, self.className)
0282         # start child process
0283         msg = f"start {self.className} with pid={os.getpid()}"
0284         dumpStdOut(self.moduleName, msg)
0285         timeNow = naive_utcnow()
0286         try:
0287             cls(channel).start()
0288         except Exception:
0289             errtype, errvalue = sys.exc_info()[:2]
0290             dumpStdOut(self.className, f"launcher crashed with {errtype}:{errvalue}")
0291 
0292     # launch child processes to interact with DDM
0293     def launchChild(self):
0294         # make pipe
0295         parent_conn, child_conn = multiprocessing.Pipe()
0296         # make child process
0297         child_process = multiprocessing.Process(target=self.launcher, args=(child_conn,))
0298         # start child process
0299         child_process.daemon = True
0300         child_process.start()
0301         # keep process in queue
0302         processObj = ProcessClass(child_process.pid, parent_conn)
0303         # sync to wait until object in the child process is instantiated
0304         pipe = processObj.connection()
0305         pipe.recv()
0306         processObj.reduceConnection(pipe)
0307         # ready
0308         self.connectionQueue.put(processObj)
0309 
0310     # initialize
0311     def initialize(self):
0312         for i in range(self.maxChild):
0313             self.launchChild()
0314 
0315 
0316 # interface class to receive command
0317 class CommandReceiveInterface(object):
0318     # constructor
0319     def __init__(self, con):
0320         self.con = con
0321         self.cacheMap = {}
0322 
0323     # make key for cache
0324     def makeKey(self, className, methodName, argList, argMap):
0325         try:
0326             tmpKey = f"{className}:{methodName}:"
0327             for argItem in argList:
0328                 tmpKey += f"{str(argItem)}:"
0329             for argKey, argVal in argMap.items():
0330                 tmpKey += f"{argKey}={str(argVal)}:"
0331             tmpKey = tmpKey[:-1]
0332             return tmpKey
0333         except Exception:
0334             return None
0335 
0336     # main loop
0337     def start(self):
0338         # sync
0339         self.con.send("ready")
0340         # main loop
0341         while True:
0342             # send ACK
0343             self.con.send("ack")
0344             # get command
0345             commandObj = self.con.recv()
0346             # make return
0347             retObj = ReturnObject()
0348             # get class name
0349             className = self.__class__.__name__
0350             # check method name
0351             if not hasattr(self, commandObj.methodName):
0352                 # method not found
0353                 retObj.statusCode = self.SC_FATAL
0354                 retObj.errorValue = f"type=AttributeError : {className} instance has no attribute {commandObj.methodName}"
0355             else:
0356                 try:
0357                     # use cache
0358                     useCache = False
0359                     doExec = True
0360                     if "useResultCache" in commandObj.argMap:
0361                         # get time range
0362                         timeRange = commandObj.argMap["useResultCache"]
0363                         # delete from args map
0364                         del commandObj.argMap["useResultCache"]
0365                         # make key for cache
0366                         tmpCacheKey = self.makeKey(className, commandObj.methodName, commandObj.argList, commandObj.argMap)
0367                         if tmpCacheKey is not None:
0368                             useCache = True
0369                             # cache is fresh
0370                             if tmpCacheKey in self.cacheMap and self.cacheMap[tmpCacheKey]["utime"] + datetime.timedelta(seconds=timeRange) > naive_utcnow():
0371                                 tmpRet = self.cacheMap[tmpCacheKey]["value"]
0372                                 doExec = False
0373                     # exec
0374                     if doExec:
0375                         # get function
0376                         functionObj = getattr(self, commandObj.methodName)
0377                         # exec
0378                         tmpRet = functionObj(*commandObj.argList, **commandObj.argMap)
0379                     if isinstance(tmpRet, StatusCode):
0380                         # only status code was returned
0381                         retObj.statusCode = tmpRet
0382                     elif (isinstance(tmpRet, tuple) or isinstance(tmpRet, list)) and len(tmpRet) > 0 and isinstance(tmpRet[0], StatusCode):
0383                         retObj.statusCode = tmpRet[0]
0384                         # status code + return values
0385                         if len(tmpRet) > 1:
0386                             if retObj.statusCode == self.SC_SUCCEEDED:
0387                                 if len(tmpRet) == 2:
0388                                     retObj.returnValue = tmpRet[1]
0389                                 else:
0390                                     retObj.returnValue = tmpRet[1:]
0391                             else:
0392                                 if len(tmpRet) == 2:
0393                                     retObj.errorValue = tmpRet[1]
0394                                 else:
0395                                     retObj.errorValue = tmpRet[1:]
0396                     else:
0397                         retObj.statusCode = self.SC_SUCCEEDED
0398                         retObj.returnValue = tmpRet
0399                 except Exception:
0400                     errtype, errvalue = sys.exc_info()[:2]
0401                     # failed
0402                     retObj.statusCode = self.SC_FATAL
0403                     retObj.errorValue = f"type={errtype.__name__} : {className}.{commandObj.methodName} : {errvalue}"
0404                 # cache
0405                 if useCache and doExec and retObj.statusCode == self.SC_SUCCEEDED:
0406                     self.cacheMap[tmpCacheKey] = {"utime": naive_utcnow(), "value": tmpRet}
0407             # return
0408             self.con.send(retObj)
0409 
0410 
0411 # install SCs
0412 installSC(CommandReceiveInterface)
0413 
0414 
0415 ###########################################################
0416 #
0417 # exceptions for IPC
0418 #
0419 
0420 
0421 # exception for temporary error
0422 class JEDITemporaryError(Exception):
0423     pass
0424 
0425 
0426 # exception for fatal error
0427 class JEDIFatalError(Exception):
0428     pass
0429 
0430 
0431 # exception for timeout error
0432 class JEDITimeoutError(Exception):
0433     pass