Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 import os
0002 import queue
0003 import threading
0004 
0005 from pandaharvester.harvesterconfig import harvester_config
0006 
0007 from . import core_utils
0008 from .db_proxy import DBProxy
0009 
0010 # logger
0011 _logger = core_utils.setup_logger("db_proxy_pool")
0012 
0013 
0014 # method wrapper
0015 class DBProxyMethod(object):
0016     # constructor
0017     def __init__(self, method_name, pool):
0018         self.methodName = method_name
0019         self.pool = pool
0020 
0021     # method emulation
0022     def __call__(self, *args, **kwargs):
0023         tmpLog = core_utils.make_logger(_logger, f"method={self.methodName}", method_name="call")
0024         sw = core_utils.get_stopwatch()
0025         try:
0026             # get connection
0027             con = self.pool.get()
0028             tmpLog.debug(f"got lock. qsize={self.pool.qsize()} {sw.get_elapsed_time()}")
0029             sw.reset()
0030             # get function
0031             func = getattr(con, self.methodName)
0032             # exec
0033             return func(*args, **kwargs)
0034         finally:
0035             tmpLog.debug("release lock" + sw.get_elapsed_time())
0036             self.pool.put(con)
0037 
0038 
0039 # connection class
0040 class DBProxyPool(object):
0041     instance = None
0042     lock = threading.Lock()
0043 
0044     # constructor
0045     def __init__(self, read_only=False):
0046         pass
0047 
0048     # initialize
0049     def initialize(self, read_only=False):
0050         # install members
0051         object.__setattr__(self, "pool", None)
0052         # connection pool
0053         self.pool = queue.Queue(harvester_config.db.nConnections)
0054         currentThr = threading.current_thread()
0055         if currentThr is None:
0056             thrID = None
0057         else:
0058             thrID = currentThr.ident
0059         thrName = f"{os.getpid()}-{thrID}"
0060         for i in range(harvester_config.db.nConnections):
0061             con = DBProxy(thr_name=f"{thrName}-{i}", read_only=read_only)
0062             self.pool.put(con)
0063 
0064     # override __new__ to have a singleton
0065     def __new__(cls, *args, **kwargs):
0066         if cls.instance is None:
0067             with cls.lock:
0068                 if cls.instance is None:
0069                     if "read_only" in kwargs and kwargs["read_only"]:
0070                         read_only = True
0071                     else:
0072                         read_only = False
0073                     cls.instance = super(DBProxyPool, cls).__new__(cls, *args, **kwargs)
0074                     cls.instance.initialize(read_only=read_only)
0075         return cls.instance
0076 
0077     # override __getattribute__
0078     def __getattribute__(self, name):
0079         try:
0080             return object.__getattribute__(self, name)
0081         except Exception:
0082             pass
0083         # method object
0084         tmpO = DBProxyMethod(name, self.pool)
0085         object.__setattr__(self, name, tmpO)
0086         return tmpO