File indexing completed on 2026-04-20 07:58:58
0001 import os
0002 import queue
0003 import threading
0004
0005 from pandaharvester.harvesterconfig import harvester_config
0006
0007 from . import core_utils
0008 from .db_proxy import DBProxy
0009
0010
0011 _logger = core_utils.setup_logger("db_proxy_pool")
0012
0013
0014
0015 class DBProxyMethod(object):
0016
0017 def __init__(self, method_name, pool):
0018 self.methodName = method_name
0019 self.pool = pool
0020
0021
0022 def __call__(self, *args, **kwargs):
0023 tmpLog = core_utils.make_logger(_logger, f"method={self.methodName}", method_name="call")
0024 sw = core_utils.get_stopwatch()
0025 try:
0026
0027 con = self.pool.get()
0028 tmpLog.debug(f"got lock. qsize={self.pool.qsize()} {sw.get_elapsed_time()}")
0029 sw.reset()
0030
0031 func = getattr(con, self.methodName)
0032
0033 return func(*args, **kwargs)
0034 finally:
0035 tmpLog.debug("release lock" + sw.get_elapsed_time())
0036 self.pool.put(con)
0037
0038
0039
0040 class DBProxyPool(object):
0041 instance = None
0042 lock = threading.Lock()
0043
0044
0045 def __init__(self, read_only=False):
0046 pass
0047
0048
0049 def initialize(self, read_only=False):
0050
0051 object.__setattr__(self, "pool", None)
0052
0053 self.pool = queue.Queue(harvester_config.db.nConnections)
0054 currentThr = threading.current_thread()
0055 if currentThr is None:
0056 thrID = None
0057 else:
0058 thrID = currentThr.ident
0059 thrName = f"{os.getpid()}-{thrID}"
0060 for i in range(harvester_config.db.nConnections):
0061 con = DBProxy(thr_name=f"{thrName}-{i}", read_only=read_only)
0062 self.pool.put(con)
0063
0064
0065 def __new__(cls, *args, **kwargs):
0066 if cls.instance is None:
0067 with cls.lock:
0068 if cls.instance is None:
0069 if "read_only" in kwargs and kwargs["read_only"]:
0070 read_only = True
0071 else:
0072 read_only = False
0073 cls.instance = super(DBProxyPool, cls).__new__(cls, *args, **kwargs)
0074 cls.instance.initialize(read_only=read_only)
0075 return cls.instance
0076
0077
0078 def __getattribute__(self, name):
0079 try:
0080 return object.__getattribute__(self, name)
0081 except Exception:
0082 pass
0083
0084 tmpO = DBProxyMethod(name, self.pool)
0085 object.__setattr__(self, name, tmpO)
0086 return tmpO