Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 import importlib
0002 import queue
0003 
0004 from pandaharvester.harvesterconfig import harvester_config
0005 
0006 from . import core_utils
0007 
0008 # logger
0009 _logger = core_utils.setup_logger("communicator_pool")
0010 
0011 
0012 # method wrapper
0013 class CommunicatorMethod(object):
0014     # constructor
0015     def __init__(self, method_name, pool):
0016         self.methodName = method_name
0017         self.pool = pool
0018 
0019     # method emulation
0020     def __call__(self, *args, **kwargs):
0021         tmpLog = core_utils.make_logger(_logger, f"method={self.methodName}", method_name="call")
0022         sw = core_utils.get_stopwatch()
0023         try:
0024             # get connection
0025             con = self.pool.get()
0026             tmpLog.debug(f"got lock. qsize={self.pool.qsize()} {sw.get_elapsed_time()}")
0027             sw.reset()
0028             # get function
0029             func = getattr(con, self.methodName)
0030             # exec
0031             return func(*args, **kwargs)
0032         finally:
0033             tmpLog.debug("release lock" + sw.get_elapsed_time())
0034             self.pool.put(con)
0035 
0036 
0037 # connection class
0038 class CommunicatorPool(object):
0039     # constructor
0040     def __init__(self):
0041         # install members
0042         object.__setattr__(self, "pool", None)
0043         # connection pool
0044         try:
0045             nConnections = harvester_config.communicator.nConnections
0046         except Exception:
0047             nConnections = harvester_config.pandacon.nConnections
0048         self.pool = queue.Queue(nConnections)
0049         try:
0050             Communicator = importlib.import_module(harvester_config.communicator.className, harvester_config.communicator.moduleName)
0051         except Exception:
0052             from pandaharvester.harvestercommunicator.panda_communicator import (
0053                 PandaCommunicator as Communicator,
0054             )
0055         for i in range(nConnections):
0056             con = Communicator()
0057             self.pool.put(con)
0058 
0059     # override __getattribute__
0060     def __getattribute__(self, name):
0061         try:
0062             return object.__getattribute__(self, name)
0063         except Exception:
0064             pass
0065         # method object
0066         tmpO = CommunicatorMethod(name, self.pool)
0067         object.__setattr__(self, name, tmpO)
0068         return tmpO
0069 
0070     # force credential renewal
0071     def force_credential_renewal(self):
0072         for con in list(self.pool.queue):
0073             con.force_credential_renewal()