Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:05

0001 """
0002 ConBridge allows having database interactions in separate processes and killing them independently when interactions are stalled.
0003 This avoids clogged httpd processes due to stalled database accesses.
0004 """
0005 
0006 import os
0007 import pickle
0008 import random
0009 import signal
0010 import socket
0011 import sys
0012 import threading
0013 import time
0014 import traceback
0015 import types
0016 
0017 from pandacommon.pandalogger.PandaLogger import PandaLogger
0018 
0019 from pandaserver.config import panda_config
0020 from pandaserver.taskbuffer import OraDBProxy as DBProxy
0021 from pandaserver.taskbuffer.DatasetSpec import DatasetSpec
0022 from pandaserver.taskbuffer.FileSpec import FileSpec
0023 from pandaserver.taskbuffer.JobSpec import JobSpec
0024 
0025 # logger
0026 _logger = PandaLogger().getLogger("ConBridge")
0027 
0028 
0029 # exception for normal termination
0030 class HarmlessEx(Exception):
0031     pass
0032 
0033 
0034 # terminate child process by itself when master has gone
0035 class Terminator(threading.Thread):
0036     # constructor
0037     def __init__(self, consock):
0038         threading.Thread.__init__(self)
0039         self.consock = consock
0040 
0041     # main
0042     def run(self):
0043         # watching control socket
0044         try:
0045             rcvSize = self.consock.recv(1)
0046         except Exception:
0047             pass
0048         # get PID
0049         pid = os.getpid()
0050         _logger.debug(f"child  {pid} received termination")
0051         # kill
0052         try:
0053             os.kill(pid, signal.SIGTERM)
0054         except Exception:
0055             pass
0056         try:
0057             os.kill(pid, signal.SIGKILL)
0058         except Exception:
0059             pass
0060 
0061 
0062 # connection bridge with timeout
0063 class ConBridge(object):
0064     # constructor
0065     def __init__(self):
0066         self.child_pid = 0
0067         self.isMaster = False
0068         self.mysock = None
0069         self.consock = None
0070         self.proxy = None
0071         self.pid = os.getpid()
0072         # timeout
0073         if hasattr(panda_config, "dbtimeout"):
0074             self.timeout = int(panda_config.dbtimeout)
0075         else:
0076             self.timeout = 600
0077         # verbose
0078         if hasattr(panda_config, "dbbridgeverbose"):
0079             self.verbose = panda_config.dbbridgeverbose
0080         else:
0081             self.verbose = False
0082 
0083     # destructor
0084     def __del__(self):
0085         # kill old child process
0086         self.bridge_killChild()
0087 
0088     # connect
0089     def connect(
0090         self,
0091         dbhost=panda_config.dbhost,
0092         dbpasswd=panda_config.dbpasswd,
0093         dbuser=panda_config.dbuser,
0094         dbname=panda_config.dbname,
0095         dbtimeout=None,
0096         reconnect=False,
0097     ):
0098         # kill old child process
0099         self.bridge_killChild()
0100         _logger.debug(f"master {self.pid} connecting")
0101         # reset child PID and sockets
0102         self.child_pid = 0
0103         self.mysock = None
0104         self.consock = None
0105         # create socket
0106         datpair = socket.socketpair()
0107         conpair = socket.socketpair()
0108         # fork
0109         self.child_pid = os.fork()
0110         if self.child_pid == 0:
0111             # child
0112             self.isMaster = False
0113             self.pid = os.getpid()
0114             # keep socket
0115             self.mysock = datpair[1]
0116             self.consock = conpair[1]
0117             datpair[0].close()
0118             conpair[0].close()
0119             # connect to database
0120             _logger.debug(f"child  {self.pid} connecting to database")
0121             self.proxy = DBProxy.DBProxy()
0122             if not self.proxy.connect(dbhost=dbhost, dbpasswd=dbpasswd, dbtimeout=60):
0123                 _logger.error(f"child  {self.pid} failed to connect")
0124                 # send error
0125                 self.bridge_sendError((RuntimeError, f"child  {self.pid} connection failed"))
0126                 # exit
0127                 self.bridge_childExit()
0128             # send OK just for ACK
0129             _logger.debug(f"child  {self.pid} connection is ready")
0130             self.bridge_sendResponse(None)
0131             # start terminator
0132             Terminator(self.consock).start()
0133             # go main loop
0134             _logger.debug(f"child  {self.pid} going into the main loop")
0135             self.bridge_run()
0136             # exit
0137             self.bridge_childExit(0)
0138         else:
0139             # master
0140             self.isMaster = True
0141             # keep socket
0142             self.mysock = datpair[0]
0143             self.consock = conpair[0]
0144             datpair[1].close()
0145             conpair[1].close()
0146             try:
0147                 # get ACK
0148                 _logger.debug(f"master {self.pid} waiting ack from child={self.child_pid}")
0149                 self.bridge_getResponse()
0150                 _logger.debug(f"master {self.pid} got ready from child={self.child_pid}")
0151                 return True
0152             except Exception as e:
0153                 _logger.error(f"master {self.pid} failed to setup child={self.child_pid} : {str(e)} {traceback.format_exc()}")
0154                 # kill child
0155                 self.bridge_killChild()
0156                 return False
0157 
0158     #######################
0159     # communication methods
0160 
0161     # send packet
0162     def bridge_send(self, val):
0163         try:
0164             # set timeout
0165             if self.isMaster:
0166                 self.mysock.settimeout(self.timeout)
0167             # serialize
0168             tmpStr = pickle.dumps(val, protocol=0)
0169             # send size
0170             sizeStr = "%50s" % len(tmpStr)
0171             self.mysock.sendall(sizeStr.encode())
0172             # send body
0173             self.mysock.sendall(tmpStr)
0174             # set timeout back
0175             if self.isMaster:
0176                 self.mysock.settimeout(None)
0177         except Exception as e:
0178             if self.isMaster:
0179                 roleType = "master"
0180             else:
0181                 roleType = "child "
0182             _logger.error(f"{roleType} {self.pid} send error : val={str(val)[:1024]} - {str(e)} {traceback.format_exc()}")
0183             # terminate child
0184             if not self.isMaster:
0185                 self.bridge_childExit()
0186             raise e
0187 
0188     # receive packet
0189     def bridge_recv(self):
0190         try:
0191             # set timeout
0192             if self.isMaster:
0193                 self.mysock.settimeout(self.timeout)
0194             # get size
0195             strSize = None
0196             headSize = 50
0197             while strSize is None or len(strSize) < headSize:
0198                 if strSize is None:
0199                     tmpSize = headSize
0200                 else:
0201                     tmpSize = headSize - len(strSize)
0202                 tmpStr = self.mysock.recv(tmpSize)
0203                 if len(tmpStr) == 0:
0204                     if self.isMaster:
0205                         raise socket.error("empty packet")
0206                     else:
0207                         # master closed socket
0208                         raise HarmlessEx("empty packet")
0209                 if strSize is None:
0210                     strSize = tmpStr
0211                 else:
0212                     strSize += tmpStr
0213             if strSize is None:
0214                 strSize = ""
0215             else:
0216                 strSize = strSize.decode()
0217             # get body
0218             strBody = None
0219             bodySize = int(strSize)
0220             while strBody is None or len(strBody) < bodySize:
0221                 if strBody is None:
0222                     tmpSize = bodySize
0223                 else:
0224                     tmpSize = bodySize - len(strBody)
0225                 tmpStr = self.mysock.recv(tmpSize)
0226                 if len(tmpStr) == 0:
0227                     if self.isMaster:
0228                         raise socket.error("empty packet")
0229                     else:
0230                         # master closed socket
0231                         raise HarmlessEx("empty packet")
0232                 if strBody is None:
0233                     strBody = tmpStr
0234                 else:
0235                     strBody += tmpStr
0236             if strBody is None:
0237                 strBody = "".encode()
0238             # set timeout back
0239             if self.isMaster:
0240                 self.mysock.settimeout(None)
0241             # deserialize
0242             retVal = pickle.loads(strBody)
0243             return True, retVal
0244         except Exception as e:
0245             if self.isMaster:
0246                 roleType = "master"
0247             else:
0248                 roleType = "child "
0249             if isinstance(e, HarmlessEx):
0250                 _logger.debug(f"{roleType} {self.pid} recv harmless ex : {str(e)}")
0251             else:
0252                 _logger.error(f"{roleType} {self.pid} recv error : {str(e)} {traceback.format_exc()}")
0253             # terminate child
0254             if not self.isMaster:
0255                 self.bridge_childExit()
0256             raise e
0257 
0258     #######################
0259     # child's methods
0260 
0261     # send error
0262     def bridge_sendError(self, val):
0263         # send status
0264         self.bridge_send("NG")
0265         # check if pickle-able
0266         try:
0267             pickle.dumps(val, protocol=0)
0268         except Exception:
0269             # use RuntimeError
0270             val = (RuntimeError, str(val[-1]))
0271         # send exceptions
0272         self.bridge_send(val)
0273 
0274     # send response
0275     def bridge_sendResponse(self, val):
0276         # send status
0277         self.bridge_send("OK")
0278         # send response
0279         self.bridge_send(val)
0280 
0281     # termination of child
0282     def bridge_childExit(self, exitCode=1):
0283         if not self.isMaster:
0284             # close database connection
0285             _logger.debug(f"child  {self.pid} closing database connection")
0286             if self.proxy:
0287                 self.proxy.cleanup()
0288             # close sockets
0289             _logger.debug(f"child  {self.pid} closing sockets")
0290             try:
0291                 self.mysock.shutdown(socket.SHUT_RDWR)
0292             except Exception:
0293                 pass
0294             try:
0295                 self.consock.shutdown(socket.SHUT_RDWR)
0296             except Exception:
0297                 pass
0298             # exit
0299             _logger.debug(f"child  {self.pid} going to exit")
0300             os._exit(exitCode)
0301 
0302     # child main
0303     def bridge_run(self):
0304         comStr = ""
0305         while True:
0306             try:
0307                 # get command
0308                 status, comStr = self.bridge_recv()
0309                 if not status:
0310                     raise RuntimeError("invalid command")
0311                 # get variables
0312                 status, variables = self.bridge_recv()
0313                 if not status:
0314                     raise RuntimeError("invalid variables")
0315             except Exception:
0316                 errType, errValue = sys.exc_info()[:2]
0317                 _logger.error(f"child  {self.pid} died : {errType} {errValue}")
0318                 # exit
0319                 self.bridge_childExit()
0320             if self.verbose:
0321                 _logger.debug(f"child  {self.pid} method {comStr} executing")
0322             try:
0323                 # execute
0324                 method = getattr(self.proxy, comStr)
0325                 res = method(*variables[0], **variables[1])
0326                 # FIXME : modify response since oracledb types cannot be picked
0327                 if comStr in ["querySQLS"]:
0328                     newRes = [True] + list(res[1:])
0329                     res = newRes
0330                 if self.verbose:
0331                     _logger.debug(f"child  {self.pid} method {comStr} completed")
0332                 # return
0333                 self.bridge_sendResponse((res, variables[0], variables[1]))
0334             except Exception:
0335                 errType, errValue = sys.exc_info()[:2]
0336                 _logger.error(f"child  {self.pid} method {comStr} failed : {errType} {errValue}")
0337                 if errType in [socket.error, socket.timeout]:
0338                     _logger.error(f"child  {self.pid} died : {errType} {errValue}")
0339                     # exit
0340                     self.bridge_childExit()
0341                 # send error
0342                 self.bridge_sendError((errType, errValue))
0343 
0344     #######################
0345     # master's methods
0346 
0347     # kill child
0348     def bridge_killChild(self):
0349         # kill old child process
0350         if self.child_pid != 0:
0351             # close sockets
0352             _logger.debug(f"master {self.pid} closing sockets for child={self.child_pid}")
0353             try:
0354                 if self.mysock is not None:
0355                     self.mysock.shutdown(socket.SHUT_RDWR)
0356             except Exception:
0357                 pass
0358             try:
0359                 if self.consock is not None:
0360                     self.consock.shutdown(socket.SHUT_RDWR)
0361             except Exception:
0362                 pass
0363             _logger.debug(f"master {self.pid} killing child={self.child_pid}")
0364             # send SIGTERM
0365             try:
0366                 os.kill(self.child_pid, signal.SIGTERM)
0367             except Exception:
0368                 pass
0369             time.sleep(2)
0370             # send SIGKILL
0371             try:
0372                 os.kill(self.child_pid, signal.SIGKILL)
0373             except Exception:
0374                 pass
0375             # wait for completion of child
0376             _logger.debug(f"master {self.pid} waiting child={self.child_pid}")
0377             try:
0378                 os.waitpid(self.child_pid, 0)
0379             except Exception:
0380                 pass
0381             # sleep to avoid burst reconnection
0382             time.sleep(random.randint(5, 15))
0383             _logger.debug(f"master {self.pid} killed child={self.child_pid}")
0384 
0385     # get response
0386     def bridge_getResponse(self):
0387         # get status
0388         status, strStatus = self.bridge_recv()
0389         if not status:
0390             raise RuntimeError(f"master {self.pid} got invalid status response from child={self.child_pid}")
0391         if strStatus == "OK":
0392             # return res
0393             status, ret = self.bridge_recv()
0394             if not status:
0395                 raise RuntimeError(f"master {self.pid} got invalid response body from child={self.child_pid}")
0396             return ret
0397         elif strStatus == "NG":
0398             # raise error
0399             status, ret = self.bridge_recv()
0400             if not status:
0401                 raise RuntimeError(f"master {self.pid} got invalid response value from child={self.child_pid}")
0402             raise ret[0](ret[1])
0403         else:
0404             raise RuntimeError(f"master {self.pid} got invalid response from child={self.child_pid} : {str(strStatus)}")
0405 
0406     # method wrapper class
0407     class bridge_masterMethod:
0408         # constructor
0409         def __init__(self, name, parent):
0410             self.name = name
0411             self.parent = parent
0412             self.pid = os.getpid()
0413 
0414         # copy changes in taskbuff objects to master
0415         def copyTbObjChanges(self, oldPar, newPar):
0416             # check they have the same type
0417             if not isinstance(oldPar, type(newPar)):
0418                 return False
0419             # copy some Specs since they are passed via ref's
0420             if isinstance(oldPar, JobSpec) or isinstance(oldPar, FileSpec) or isinstance(oldPar, DatasetSpec):
0421                 if hasattr(oldPar, "__getstate__"):
0422                     tmpStat = newPar.__getstate__()
0423                     oldPar.__setstate__(tmpStat)
0424                 else:
0425                     tmpStat = newPar.values()
0426                     oldPar.pack(tmpStat)
0427                 return True
0428             # copy Datasets
0429             return False
0430 
0431         # copy changes in objects to master
0432         def copyChanges(self, oldPar, newPar):
0433             if isinstance(oldPar, list):
0434                 # delete all elements first
0435                 while len(oldPar) > 0:
0436                     oldPar.pop()
0437                 # append
0438                 for tmpItem in newPar:
0439                     oldPar.append(tmpItem)
0440             elif isinstance(oldPar, dict):
0441                 # replace
0442                 for tmpKey in newPar:
0443                     oldPar[tmpKey] = newPar[tmpKey]
0444             else:
0445                 self.copyTbObjChanges(oldPar, newPar)
0446 
0447         # method emulation
0448         def __call__(self, *args, **keywords):
0449             while True:
0450                 try:
0451                     # send command name
0452                     self.parent.bridge_send(self.name)
0453                     # send variables
0454                     self.parent.bridge_send((args, keywords))
0455                     # get response
0456                     retVal, newArgs, newKeywords = self.parent.bridge_getResponse()
0457                     # propagate child's changes in args to master
0458                     for idxArg, tmpArg in enumerate(args):
0459                         self.copyChanges(tmpArg, newArgs[idxArg])
0460                     # propagate child's changes in keywords to master
0461                     for tmpKey in keywords:
0462                         tmpArg = keywords[tmpKey]
0463                         self.copyChanges(tmpArg, newKeywords[tmpKey])
0464                     # return
0465                     return retVal
0466                 except Exception:
0467                     errType, errValue = sys.exc_info()[:2]
0468                     _logger.error(f"master {self.pid} method {self.name} failed : {errType} {errValue}")
0469                     # reconnect when socket has a problem
0470                     if errType not in [socket.error, socket.timeout]:
0471                         # kill old child process
0472                         self.parent.bridge_killChild()
0473                         _logger.error(f"master {self.pid} killed child")
0474                         # raise errType,errValue
0475                     # sleep
0476                     time.sleep(5)
0477                     # reconnect
0478                     try:
0479                         _logger.debug(f"master {self.pid} trying to reconnect")
0480                         is_ok = self.parent.connect()
0481                         if is_ok:
0482                             _logger.debug(f"master {self.pid} reconnect completed")
0483                         else:
0484                             _logger.debug(f"master {self.pid} reconnect failed. sleep")
0485                             time.sleep(120)
0486                     except Exception:
0487                         _logger.error(f"master {self.pid} connect failed. sleep")
0488                         time.sleep(120)
0489 
0490     # get atter for cursor attributes
0491     def __getattribute__(self, name):
0492         if object.__getattribute__(self, "isMaster"):
0493             try:
0494                 # return original attribute
0495                 return object.__getattribute__(self, name)
0496             except Exception:
0497                 # append methods
0498                 if (
0499                     not name.startswith("_")
0500                     and hasattr(DBProxy.DBProxy, name)
0501                     and isinstance(
0502                         getattr(DBProxy.DBProxy, name),
0503                         (types.MethodType, types.FunctionType),
0504                     )
0505                 ):
0506                     # get DBProxy's method wrapper
0507                     method = ConBridge.bridge_masterMethod(name, self)
0508                     # set method
0509                     setattr(self, name, method)
0510                     # return
0511                     return method
0512         # return original attribute for child
0513         return object.__getattribute__(self, name)