File indexing completed on 2026-04-10 08:38:58
0001 import datetime
0002 import multiprocessing
0003 import multiprocessing.reduction
0004 import os
0005 import signal
0006 import sys
0007 import time
0008
0009 from pandacommon.pandautils.PandaUtils import naive_utcnow
0010
0011 try:
0012 from multiprocessing.connection import reduce_connection
0013 except ImportError:
0014 from multiprocessing.reduction import reduce_connection
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027 class StatusCode(object):
0028 def __init__(self, value):
0029 self.value = value
0030
0031 def __str__(self):
0032 return f"{self.value}"
0033
0034
0035 def __eq__(self, other):
0036 try:
0037 return self.value == other.value
0038 except Exception:
0039 return False
0040
0041 def __ne__(self, other):
0042 try:
0043 return self.value != other.value
0044 except Exception:
0045 return True
0046
0047
0048
0049 statusCodeMap = {
0050 "SC_SUCCEEDED": StatusCode(0),
0051 "SC_FAILED": StatusCode(1),
0052 "SC_FATAL": StatusCode(2),
0053 }
0054
0055
0056
0057 def installSC(cls):
0058 for sc, val in statusCodeMap.items():
0059 setattr(cls, sc, val)
0060
0061
0062
0063 installSC(sys.modules[__name__])
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073 def dumpStdOut(sender, message):
0074 timeNow = naive_utcnow()
0075 print(f"{str(timeNow)} {sender}: INFO {message}")
0076
0077
0078
0079 class CommandObject(object):
0080
0081 def __init__(self, methodName, argList, argMap):
0082 self.methodName = methodName
0083 self.argList = argList
0084 self.argMap = argMap
0085
0086
0087
0088 class ReturnObject(object):
0089
0090 def __init__(self):
0091 self.statusCode = None
0092 self.errorValue = None
0093 self.returnValue = None
0094
0095
0096
0097 class ProcessClass(object):
0098
0099 def __init__(self, pid, connection):
0100 self.pid = pid
0101 self.nused = 0
0102 self.usedMemory = 0
0103 self.nMemLookup = 20
0104
0105 self.reduced_pipe = reduce_connection(connection)
0106
0107
0108 def connection(self):
0109
0110 return self.reduced_pipe[0](*self.reduced_pipe[1])
0111
0112
0113 def reduceConnection(self, connection):
0114 self.reduced_pipe = reduce_connection(connection)
0115
0116
0117 def getMemUsage(self):
0118
0119 if self.nused % self.nMemLookup == 0:
0120 try:
0121
0122 t = open(f"/proc/{self.pid}/status")
0123 v = t.read()
0124 t.close()
0125 value = 0
0126 for line in v.split("\n"):
0127 if line.startswith("VmRSS"):
0128 items = line.split()
0129 value = int(items[1])
0130 if items[2] in ["kB", "KB"]:
0131 value /= 1024
0132 elif items[2] in ["mB", "MB"]:
0133 pass
0134 break
0135 self.usedMemory = value
0136 except Exception:
0137 pass
0138 return self.usedMemory
0139 else:
0140 return None
0141
0142
0143
0144 class MethodClass(object):
0145
0146 def __init__(self, className, methodName, vo, connectionQueue, voIF):
0147 self.className = className
0148 self.methodName = methodName
0149 self.vo = vo
0150 self.connectionQueue = connectionQueue
0151 self.voIF = voIF
0152 self.pipeList = []
0153
0154
0155 def __call__(self, *args, **kwargs):
0156 commandObj = CommandObject(self.methodName, args, kwargs)
0157 nTry = 3
0158 for iTry in range(nTry):
0159
0160 retException = None
0161 strException = None
0162 try:
0163 stepIdx = 0
0164
0165 child_process = self.connectionQueue.get()
0166
0167 stepIdx = 1
0168 pipe = child_process.connection()
0169
0170 stepIdx = 2
0171 timeoutPeriodACK = 30
0172 if not pipe.poll(timeoutPeriodACK):
0173 raise JEDITimeoutError(f"did not get ACK for {timeoutPeriodACK}sec")
0174 ack = pipe.recv()
0175
0176 stepIdx = 3
0177 pipe.send(commandObj)
0178
0179 stepIdx = 4
0180 timeoutPeriod = 600
0181 timeNow = naive_utcnow()
0182 if not pipe.poll(timeoutPeriod):
0183 raise JEDITimeoutError(f"did not get response for {timeoutPeriod}sec")
0184 regTime = naive_utcnow() - timeNow
0185 if regTime > datetime.timedelta(seconds=60):
0186 dumpStdOut(
0187 self.className,
0188 f"methodName={self.methodName} took {regTime.seconds}.{int(regTime.microseconds / 1000):03d} sec in pid={child_process.pid}",
0189 )
0190
0191 stepIdx = 5
0192 ret = pipe.recv()
0193
0194 stepIdx = 6
0195 if ret.statusCode == SC_FAILED:
0196 retException = JEDITemporaryError
0197 elif ret.statusCode == SC_FATAL:
0198 retException = JEDIFatalError
0199 except Exception:
0200 errtype, errvalue = sys.exc_info()[:2]
0201 retException = errtype
0202 argStr = f"args={str(args)} kargs={str(kwargs)}"
0203 strException = f"VO={self.vo} type={errtype.__name__} stepIdx={stepIdx} : {self.className}.{self.methodName} {errvalue} {argStr[:200]}"
0204
0205 child_process.nused += 1
0206
0207 largeMemory = False
0208 memUsed = child_process.getMemUsage()
0209 if memUsed is not None:
0210 memStr = f"pid={child_process.pid} memory={memUsed}MB"
0211 if memUsed > 1.5 * 1024:
0212 largeMemory = True
0213 memStr += " exceeds memory limit"
0214 dumpStdOut(self.className, memStr)
0215
0216 if child_process.nused > 1000 or retException not in [None, JEDITemporaryError, JEDIFatalError] or largeMemory:
0217 dumpStdOut(
0218 self.className,
0219 f"methodName={self.methodName} ret={retException} nused={child_process.nused} {strException} in pid={child_process.pid}",
0220 )
0221
0222 try:
0223 pipe.close()
0224 except Exception:
0225 pass
0226
0227 try:
0228 dumpStdOut(self.className, f"killing pid={child_process.pid}")
0229 os.kill(child_process.pid, signal.SIGKILL)
0230 dumpStdOut(self.className, f"waiting pid={child_process.pid}")
0231 os.waitpid(child_process.pid, 0)
0232 dumpStdOut(self.className, f"terminated pid={child_process.pid}")
0233 except Exception:
0234 errtype, errvalue = sys.exc_info()[:2]
0235 if "No child processes" not in str(errvalue):
0236 dumpStdOut(self.className, f"failed to terminate {child_process.pid} with {errtype}:{errvalue}")
0237
0238 self.voIF.launchChild()
0239 else:
0240
0241 child_process.reduceConnection(pipe)
0242 self.connectionQueue.put(child_process)
0243
0244 if retException in [None, JEDIFatalError] or (iTry + 1 == nTry):
0245 break
0246
0247 time.sleep(1)
0248
0249 if retException is not None:
0250 if strException is None:
0251 strException = f"VO={self.vo} {ret.errorValue}"
0252 raise retException(strException)
0253
0254 if ret.statusCode == SC_SUCCEEDED:
0255 return ret.returnValue
0256 else:
0257 raise retException(f"VO={self.vo} {ret.errorValue}")
0258
0259
0260
0261 class CommandSendInterface(object):
0262
0263 def __init__(self, vo, maxChild, moduleName, className):
0264 self.vo = vo
0265 self.maxChild = maxChild
0266 self.connectionQueue = multiprocessing.Queue(maxChild)
0267 self.moduleName = moduleName
0268 self.className = className
0269
0270
0271 def __getattr__(self, attrName):
0272 return MethodClass(self.className, attrName, self.vo, self.connectionQueue, self)
0273
0274
0275 def launcher(self, channel):
0276
0277 mod = __import__(self.moduleName)
0278 for subModuleName in self.moduleName.split(".")[1:]:
0279 mod = getattr(mod, subModuleName)
0280
0281 cls = getattr(mod, self.className)
0282
0283 msg = f"start {self.className} with pid={os.getpid()}"
0284 dumpStdOut(self.moduleName, msg)
0285 timeNow = naive_utcnow()
0286 try:
0287 cls(channel).start()
0288 except Exception:
0289 errtype, errvalue = sys.exc_info()[:2]
0290 dumpStdOut(self.className, f"launcher crashed with {errtype}:{errvalue}")
0291
0292
0293 def launchChild(self):
0294
0295 parent_conn, child_conn = multiprocessing.Pipe()
0296
0297 child_process = multiprocessing.Process(target=self.launcher, args=(child_conn,))
0298
0299 child_process.daemon = True
0300 child_process.start()
0301
0302 processObj = ProcessClass(child_process.pid, parent_conn)
0303
0304 pipe = processObj.connection()
0305 pipe.recv()
0306 processObj.reduceConnection(pipe)
0307
0308 self.connectionQueue.put(processObj)
0309
0310
0311 def initialize(self):
0312 for i in range(self.maxChild):
0313 self.launchChild()
0314
0315
0316
0317 class CommandReceiveInterface(object):
0318
0319 def __init__(self, con):
0320 self.con = con
0321 self.cacheMap = {}
0322
0323
0324 def makeKey(self, className, methodName, argList, argMap):
0325 try:
0326 tmpKey = f"{className}:{methodName}:"
0327 for argItem in argList:
0328 tmpKey += f"{str(argItem)}:"
0329 for argKey, argVal in argMap.items():
0330 tmpKey += f"{argKey}={str(argVal)}:"
0331 tmpKey = tmpKey[:-1]
0332 return tmpKey
0333 except Exception:
0334 return None
0335
0336
0337 def start(self):
0338
0339 self.con.send("ready")
0340
0341 while True:
0342
0343 self.con.send("ack")
0344
0345 commandObj = self.con.recv()
0346
0347 retObj = ReturnObject()
0348
0349 className = self.__class__.__name__
0350
0351 if not hasattr(self, commandObj.methodName):
0352
0353 retObj.statusCode = self.SC_FATAL
0354 retObj.errorValue = f"type=AttributeError : {className} instance has no attribute {commandObj.methodName}"
0355 else:
0356 try:
0357
0358 useCache = False
0359 doExec = True
0360 if "useResultCache" in commandObj.argMap:
0361
0362 timeRange = commandObj.argMap["useResultCache"]
0363
0364 del commandObj.argMap["useResultCache"]
0365
0366 tmpCacheKey = self.makeKey(className, commandObj.methodName, commandObj.argList, commandObj.argMap)
0367 if tmpCacheKey is not None:
0368 useCache = True
0369
0370 if tmpCacheKey in self.cacheMap and self.cacheMap[tmpCacheKey]["utime"] + datetime.timedelta(seconds=timeRange) > naive_utcnow():
0371 tmpRet = self.cacheMap[tmpCacheKey]["value"]
0372 doExec = False
0373
0374 if doExec:
0375
0376 functionObj = getattr(self, commandObj.methodName)
0377
0378 tmpRet = functionObj(*commandObj.argList, **commandObj.argMap)
0379 if isinstance(tmpRet, StatusCode):
0380
0381 retObj.statusCode = tmpRet
0382 elif (isinstance(tmpRet, tuple) or isinstance(tmpRet, list)) and len(tmpRet) > 0 and isinstance(tmpRet[0], StatusCode):
0383 retObj.statusCode = tmpRet[0]
0384
0385 if len(tmpRet) > 1:
0386 if retObj.statusCode == self.SC_SUCCEEDED:
0387 if len(tmpRet) == 2:
0388 retObj.returnValue = tmpRet[1]
0389 else:
0390 retObj.returnValue = tmpRet[1:]
0391 else:
0392 if len(tmpRet) == 2:
0393 retObj.errorValue = tmpRet[1]
0394 else:
0395 retObj.errorValue = tmpRet[1:]
0396 else:
0397 retObj.statusCode = self.SC_SUCCEEDED
0398 retObj.returnValue = tmpRet
0399 except Exception:
0400 errtype, errvalue = sys.exc_info()[:2]
0401
0402 retObj.statusCode = self.SC_FATAL
0403 retObj.errorValue = f"type={errtype.__name__} : {className}.{commandObj.methodName} : {errvalue}"
0404
0405 if useCache and doExec and retObj.statusCode == self.SC_SUCCEEDED:
0406 self.cacheMap[tmpCacheKey] = {"utime": naive_utcnow(), "value": tmpRet}
0407
0408 self.con.send(retObj)
0409
0410
0411
0412 installSC(CommandReceiveInterface)
0413
0414
0415
0416
0417
0418
0419
0420
0421
0422 class JEDITemporaryError(Exception):
0423 pass
0424
0425
0426
0427 class JEDIFatalError(Exception):
0428 pass
0429
0430
0431
0432 class JEDITimeoutError(Exception):
0433 pass