File indexing completed on 2026-04-20 07:58:57
0001 import importlib
0002 import queue
0003
0004 from pandaharvester.harvesterconfig import harvester_config
0005
0006 from . import core_utils
0007
0008
0009 _logger = core_utils.setup_logger("communicator_pool")
0010
0011
0012
0013 class CommunicatorMethod(object):
0014
0015 def __init__(self, method_name, pool):
0016 self.methodName = method_name
0017 self.pool = pool
0018
0019
0020 def __call__(self, *args, **kwargs):
0021 tmpLog = core_utils.make_logger(_logger, f"method={self.methodName}", method_name="call")
0022 sw = core_utils.get_stopwatch()
0023 try:
0024
0025 con = self.pool.get()
0026 tmpLog.debug(f"got lock. qsize={self.pool.qsize()} {sw.get_elapsed_time()}")
0027 sw.reset()
0028
0029 func = getattr(con, self.methodName)
0030
0031 return func(*args, **kwargs)
0032 finally:
0033 tmpLog.debug("release lock" + sw.get_elapsed_time())
0034 self.pool.put(con)
0035
0036
0037
0038 class CommunicatorPool(object):
0039
0040 def __init__(self):
0041
0042 object.__setattr__(self, "pool", None)
0043
0044 try:
0045 nConnections = harvester_config.communicator.nConnections
0046 except Exception:
0047 nConnections = harvester_config.pandacon.nConnections
0048 self.pool = queue.Queue(nConnections)
0049 try:
0050 Communicator = importlib.import_module(harvester_config.communicator.className, harvester_config.communicator.moduleName)
0051 except Exception:
0052 from pandaharvester.harvestercommunicator.panda_communicator import (
0053 PandaCommunicator as Communicator,
0054 )
0055 for i in range(nConnections):
0056 con = Communicator()
0057 self.pool.put(con)
0058
0059
0060 def __getattribute__(self, name):
0061 try:
0062 return object.__getattribute__(self, name)
0063 except Exception:
0064 pass
0065
0066 tmpO = CommunicatorMethod(name, self.pool)
0067 object.__setattr__(self, name, tmpO)
0068 return tmpO
0069
0070
0071 def force_credential_renewal(self):
0072 for con in list(self.pool.queue):
0073 con.force_credential_renewal()