File indexing completed on 2026-04-10 08:39:05
0001 """
0002 ConBridge allows having database interactions in separate processes and killing them independently when interactions are stalled.
0003 This avoids clogged httpd processes due to stalled database accesses.
0004 """
0005
0006 import os
0007 import pickle
0008 import random
0009 import signal
0010 import socket
0011 import sys
0012 import threading
0013 import time
0014 import traceback
0015 import types
0016
0017 from pandacommon.pandalogger.PandaLogger import PandaLogger
0018
0019 from pandaserver.config import panda_config
0020 from pandaserver.taskbuffer import OraDBProxy as DBProxy
0021 from pandaserver.taskbuffer.DatasetSpec import DatasetSpec
0022 from pandaserver.taskbuffer.FileSpec import FileSpec
0023 from pandaserver.taskbuffer.JobSpec import JobSpec
0024
0025
0026 _logger = PandaLogger().getLogger("ConBridge")
0027
0028
0029
0030 class HarmlessEx(Exception):
0031 pass
0032
0033
0034
0035 class Terminator(threading.Thread):
0036
0037 def __init__(self, consock):
0038 threading.Thread.__init__(self)
0039 self.consock = consock
0040
0041
0042 def run(self):
0043
0044 try:
0045 rcvSize = self.consock.recv(1)
0046 except Exception:
0047 pass
0048
0049 pid = os.getpid()
0050 _logger.debug(f"child {pid} received termination")
0051
0052 try:
0053 os.kill(pid, signal.SIGTERM)
0054 except Exception:
0055 pass
0056 try:
0057 os.kill(pid, signal.SIGKILL)
0058 except Exception:
0059 pass
0060
0061
0062
0063 class ConBridge(object):
0064
0065 def __init__(self):
0066 self.child_pid = 0
0067 self.isMaster = False
0068 self.mysock = None
0069 self.consock = None
0070 self.proxy = None
0071 self.pid = os.getpid()
0072
0073 if hasattr(panda_config, "dbtimeout"):
0074 self.timeout = int(panda_config.dbtimeout)
0075 else:
0076 self.timeout = 600
0077
0078 if hasattr(panda_config, "dbbridgeverbose"):
0079 self.verbose = panda_config.dbbridgeverbose
0080 else:
0081 self.verbose = False
0082
0083
0084 def __del__(self):
0085
0086 self.bridge_killChild()
0087
0088
0089 def connect(
0090 self,
0091 dbhost=panda_config.dbhost,
0092 dbpasswd=panda_config.dbpasswd,
0093 dbuser=panda_config.dbuser,
0094 dbname=panda_config.dbname,
0095 dbtimeout=None,
0096 reconnect=False,
0097 ):
0098
0099 self.bridge_killChild()
0100 _logger.debug(f"master {self.pid} connecting")
0101
0102 self.child_pid = 0
0103 self.mysock = None
0104 self.consock = None
0105
0106 datpair = socket.socketpair()
0107 conpair = socket.socketpair()
0108
0109 self.child_pid = os.fork()
0110 if self.child_pid == 0:
0111
0112 self.isMaster = False
0113 self.pid = os.getpid()
0114
0115 self.mysock = datpair[1]
0116 self.consock = conpair[1]
0117 datpair[0].close()
0118 conpair[0].close()
0119
0120 _logger.debug(f"child {self.pid} connecting to database")
0121 self.proxy = DBProxy.DBProxy()
0122 if not self.proxy.connect(dbhost=dbhost, dbpasswd=dbpasswd, dbtimeout=60):
0123 _logger.error(f"child {self.pid} failed to connect")
0124
0125 self.bridge_sendError((RuntimeError, f"child {self.pid} connection failed"))
0126
0127 self.bridge_childExit()
0128
0129 _logger.debug(f"child {self.pid} connection is ready")
0130 self.bridge_sendResponse(None)
0131
0132 Terminator(self.consock).start()
0133
0134 _logger.debug(f"child {self.pid} going into the main loop")
0135 self.bridge_run()
0136
0137 self.bridge_childExit(0)
0138 else:
0139
0140 self.isMaster = True
0141
0142 self.mysock = datpair[0]
0143 self.consock = conpair[0]
0144 datpair[1].close()
0145 conpair[1].close()
0146 try:
0147
0148 _logger.debug(f"master {self.pid} waiting ack from child={self.child_pid}")
0149 self.bridge_getResponse()
0150 _logger.debug(f"master {self.pid} got ready from child={self.child_pid}")
0151 return True
0152 except Exception as e:
0153 _logger.error(f"master {self.pid} failed to setup child={self.child_pid} : {str(e)} {traceback.format_exc()}")
0154
0155 self.bridge_killChild()
0156 return False
0157
0158
0159
0160
0161
0162 def bridge_send(self, val):
0163 try:
0164
0165 if self.isMaster:
0166 self.mysock.settimeout(self.timeout)
0167
0168 tmpStr = pickle.dumps(val, protocol=0)
0169
0170 sizeStr = "%50s" % len(tmpStr)
0171 self.mysock.sendall(sizeStr.encode())
0172
0173 self.mysock.sendall(tmpStr)
0174
0175 if self.isMaster:
0176 self.mysock.settimeout(None)
0177 except Exception as e:
0178 if self.isMaster:
0179 roleType = "master"
0180 else:
0181 roleType = "child "
0182 _logger.error(f"{roleType} {self.pid} send error : val={str(val)[:1024]} - {str(e)} {traceback.format_exc()}")
0183
0184 if not self.isMaster:
0185 self.bridge_childExit()
0186 raise e
0187
0188
0189 def bridge_recv(self):
0190 try:
0191
0192 if self.isMaster:
0193 self.mysock.settimeout(self.timeout)
0194
0195 strSize = None
0196 headSize = 50
0197 while strSize is None or len(strSize) < headSize:
0198 if strSize is None:
0199 tmpSize = headSize
0200 else:
0201 tmpSize = headSize - len(strSize)
0202 tmpStr = self.mysock.recv(tmpSize)
0203 if len(tmpStr) == 0:
0204 if self.isMaster:
0205 raise socket.error("empty packet")
0206 else:
0207
0208 raise HarmlessEx("empty packet")
0209 if strSize is None:
0210 strSize = tmpStr
0211 else:
0212 strSize += tmpStr
0213 if strSize is None:
0214 strSize = ""
0215 else:
0216 strSize = strSize.decode()
0217
0218 strBody = None
0219 bodySize = int(strSize)
0220 while strBody is None or len(strBody) < bodySize:
0221 if strBody is None:
0222 tmpSize = bodySize
0223 else:
0224 tmpSize = bodySize - len(strBody)
0225 tmpStr = self.mysock.recv(tmpSize)
0226 if len(tmpStr) == 0:
0227 if self.isMaster:
0228 raise socket.error("empty packet")
0229 else:
0230
0231 raise HarmlessEx("empty packet")
0232 if strBody is None:
0233 strBody = tmpStr
0234 else:
0235 strBody += tmpStr
0236 if strBody is None:
0237 strBody = "".encode()
0238
0239 if self.isMaster:
0240 self.mysock.settimeout(None)
0241
0242 retVal = pickle.loads(strBody)
0243 return True, retVal
0244 except Exception as e:
0245 if self.isMaster:
0246 roleType = "master"
0247 else:
0248 roleType = "child "
0249 if isinstance(e, HarmlessEx):
0250 _logger.debug(f"{roleType} {self.pid} recv harmless ex : {str(e)}")
0251 else:
0252 _logger.error(f"{roleType} {self.pid} recv error : {str(e)} {traceback.format_exc()}")
0253
0254 if not self.isMaster:
0255 self.bridge_childExit()
0256 raise e
0257
0258
0259
0260
0261
0262 def bridge_sendError(self, val):
0263
0264 self.bridge_send("NG")
0265
0266 try:
0267 pickle.dumps(val, protocol=0)
0268 except Exception:
0269
0270 val = (RuntimeError, str(val[-1]))
0271
0272 self.bridge_send(val)
0273
0274
0275 def bridge_sendResponse(self, val):
0276
0277 self.bridge_send("OK")
0278
0279 self.bridge_send(val)
0280
0281
0282 def bridge_childExit(self, exitCode=1):
0283 if not self.isMaster:
0284
0285 _logger.debug(f"child {self.pid} closing database connection")
0286 if self.proxy:
0287 self.proxy.cleanup()
0288
0289 _logger.debug(f"child {self.pid} closing sockets")
0290 try:
0291 self.mysock.shutdown(socket.SHUT_RDWR)
0292 except Exception:
0293 pass
0294 try:
0295 self.consock.shutdown(socket.SHUT_RDWR)
0296 except Exception:
0297 pass
0298
0299 _logger.debug(f"child {self.pid} going to exit")
0300 os._exit(exitCode)
0301
0302
0303 def bridge_run(self):
0304 comStr = ""
0305 while True:
0306 try:
0307
0308 status, comStr = self.bridge_recv()
0309 if not status:
0310 raise RuntimeError("invalid command")
0311
0312 status, variables = self.bridge_recv()
0313 if not status:
0314 raise RuntimeError("invalid variables")
0315 except Exception:
0316 errType, errValue = sys.exc_info()[:2]
0317 _logger.error(f"child {self.pid} died : {errType} {errValue}")
0318
0319 self.bridge_childExit()
0320 if self.verbose:
0321 _logger.debug(f"child {self.pid} method {comStr} executing")
0322 try:
0323
0324 method = getattr(self.proxy, comStr)
0325 res = method(*variables[0], **variables[1])
0326
0327 if comStr in ["querySQLS"]:
0328 newRes = [True] + list(res[1:])
0329 res = newRes
0330 if self.verbose:
0331 _logger.debug(f"child {self.pid} method {comStr} completed")
0332
0333 self.bridge_sendResponse((res, variables[0], variables[1]))
0334 except Exception:
0335 errType, errValue = sys.exc_info()[:2]
0336 _logger.error(f"child {self.pid} method {comStr} failed : {errType} {errValue}")
0337 if errType in [socket.error, socket.timeout]:
0338 _logger.error(f"child {self.pid} died : {errType} {errValue}")
0339
0340 self.bridge_childExit()
0341
0342 self.bridge_sendError((errType, errValue))
0343
0344
0345
0346
0347
0348 def bridge_killChild(self):
0349
0350 if self.child_pid != 0:
0351
0352 _logger.debug(f"master {self.pid} closing sockets for child={self.child_pid}")
0353 try:
0354 if self.mysock is not None:
0355 self.mysock.shutdown(socket.SHUT_RDWR)
0356 except Exception:
0357 pass
0358 try:
0359 if self.consock is not None:
0360 self.consock.shutdown(socket.SHUT_RDWR)
0361 except Exception:
0362 pass
0363 _logger.debug(f"master {self.pid} killing child={self.child_pid}")
0364
0365 try:
0366 os.kill(self.child_pid, signal.SIGTERM)
0367 except Exception:
0368 pass
0369 time.sleep(2)
0370
0371 try:
0372 os.kill(self.child_pid, signal.SIGKILL)
0373 except Exception:
0374 pass
0375
0376 _logger.debug(f"master {self.pid} waiting child={self.child_pid}")
0377 try:
0378 os.waitpid(self.child_pid, 0)
0379 except Exception:
0380 pass
0381
0382 time.sleep(random.randint(5, 15))
0383 _logger.debug(f"master {self.pid} killed child={self.child_pid}")
0384
0385
0386 def bridge_getResponse(self):
0387
0388 status, strStatus = self.bridge_recv()
0389 if not status:
0390 raise RuntimeError(f"master {self.pid} got invalid status response from child={self.child_pid}")
0391 if strStatus == "OK":
0392
0393 status, ret = self.bridge_recv()
0394 if not status:
0395 raise RuntimeError(f"master {self.pid} got invalid response body from child={self.child_pid}")
0396 return ret
0397 elif strStatus == "NG":
0398
0399 status, ret = self.bridge_recv()
0400 if not status:
0401 raise RuntimeError(f"master {self.pid} got invalid response value from child={self.child_pid}")
0402 raise ret[0](ret[1])
0403 else:
0404 raise RuntimeError(f"master {self.pid} got invalid response from child={self.child_pid} : {str(strStatus)}")
0405
0406
0407 class bridge_masterMethod:
0408
0409 def __init__(self, name, parent):
0410 self.name = name
0411 self.parent = parent
0412 self.pid = os.getpid()
0413
0414
0415 def copyTbObjChanges(self, oldPar, newPar):
0416
0417 if not isinstance(oldPar, type(newPar)):
0418 return False
0419
0420 if isinstance(oldPar, JobSpec) or isinstance(oldPar, FileSpec) or isinstance(oldPar, DatasetSpec):
0421 if hasattr(oldPar, "__getstate__"):
0422 tmpStat = newPar.__getstate__()
0423 oldPar.__setstate__(tmpStat)
0424 else:
0425 tmpStat = newPar.values()
0426 oldPar.pack(tmpStat)
0427 return True
0428
0429 return False
0430
0431
0432 def copyChanges(self, oldPar, newPar):
0433 if isinstance(oldPar, list):
0434
0435 while len(oldPar) > 0:
0436 oldPar.pop()
0437
0438 for tmpItem in newPar:
0439 oldPar.append(tmpItem)
0440 elif isinstance(oldPar, dict):
0441
0442 for tmpKey in newPar:
0443 oldPar[tmpKey] = newPar[tmpKey]
0444 else:
0445 self.copyTbObjChanges(oldPar, newPar)
0446
0447
0448 def __call__(self, *args, **keywords):
0449 while True:
0450 try:
0451
0452 self.parent.bridge_send(self.name)
0453
0454 self.parent.bridge_send((args, keywords))
0455
0456 retVal, newArgs, newKeywords = self.parent.bridge_getResponse()
0457
0458 for idxArg, tmpArg in enumerate(args):
0459 self.copyChanges(tmpArg, newArgs[idxArg])
0460
0461 for tmpKey in keywords:
0462 tmpArg = keywords[tmpKey]
0463 self.copyChanges(tmpArg, newKeywords[tmpKey])
0464
0465 return retVal
0466 except Exception:
0467 errType, errValue = sys.exc_info()[:2]
0468 _logger.error(f"master {self.pid} method {self.name} failed : {errType} {errValue}")
0469
0470 if errType not in [socket.error, socket.timeout]:
0471
0472 self.parent.bridge_killChild()
0473 _logger.error(f"master {self.pid} killed child")
0474
0475
0476 time.sleep(5)
0477
0478 try:
0479 _logger.debug(f"master {self.pid} trying to reconnect")
0480 is_ok = self.parent.connect()
0481 if is_ok:
0482 _logger.debug(f"master {self.pid} reconnect completed")
0483 else:
0484 _logger.debug(f"master {self.pid} reconnect failed. sleep")
0485 time.sleep(120)
0486 except Exception:
0487 _logger.error(f"master {self.pid} connect failed. sleep")
0488 time.sleep(120)
0489
0490
0491 def __getattribute__(self, name):
0492 if object.__getattribute__(self, "isMaster"):
0493 try:
0494
0495 return object.__getattribute__(self, name)
0496 except Exception:
0497
0498 if (
0499 not name.startswith("_")
0500 and hasattr(DBProxy.DBProxy, name)
0501 and isinstance(
0502 getattr(DBProxy.DBProxy, name),
0503 (types.MethodType, types.FunctionType),
0504 )
0505 ):
0506
0507 method = ConBridge.bridge_masterMethod(name, self)
0508
0509 setattr(self, name, method)
0510
0511 return method
0512
0513 return object.__getattribute__(self, name)