File indexing completed on 2026-04-19 08:00:06
0001 import datetime
0002
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006
0007
0008 baseLogger = core_utils.setup_logger("simple_throttler")
0009
0010
0011
0012 class SimpleThrottler(PluginBase):
0013
0014 def __init__(self, **kwarg):
0015
0016 self.logicType = "OR"
0017 PluginBase.__init__(self, **kwarg)
0018 self.dbProxy = DBProxy()
0019
0020
0021 def to_be_throttled(self, queue_config, queue_config_mapper=None):
0022 tmpLog = self.make_logger(baseLogger, f"computingSite={queue_config.queueName}", method_name="to_be_throttled")
0023 tmpLog.debug("start")
0024
0025 if self.logicType == "OR":
0026 retVal = False, "no rule was satisfied"
0027 else:
0028 retVal = True, "all rules were satisfied"
0029
0030 criteriaList = []
0031 maxMissedList = []
0032 timeNow = core_utils.naive_utcnow()
0033 for rule in self.rulesForMissed:
0034
0035 if rule["level"] == "site":
0036 criteria = dict()
0037 criteria["siteName"] = queue_config.siteName
0038 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0039 criteriaList.append(criteria)
0040 maxMissedList.append(rule["maxMissed"])
0041 elif rule["level"] == "pq":
0042 criteria = dict()
0043 criteria["computingSite"] = queue_config.queueName
0044 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0045 criteriaList.append(criteria)
0046 maxMissedList.append(rule["maxMissed"])
0047 elif rule["level"] == "ce":
0048 elmName = "computingElements"
0049 if elmName not in queue_config.submitter:
0050 tmpLog.debug(f"skipped since {elmName} is undefined in submitter config")
0051 continue
0052 for ce in queue_config.submitter[elmName]:
0053 criteria = dict()
0054 criteria["computingElement"] = ce
0055 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0056 criteriaList.append(criteria)
0057 maxMissedList.append(rule["maxMissed"])
0058
0059 for criteria, maxMissed in zip(criteriaList, maxMissedList):
0060 nMissed = self.dbProxy.get_num_missed_workers(queue_config.queueName, criteria)
0061 if nMissed > maxMissed:
0062 if self.logicType == "OR":
0063 tmpMsg = f"logic={self.logicType} and "
0064 tmpMsg += f"nMissed={nMissed} > maxMissed={maxMissed} for {str(criteria)}"
0065 retVal = True, tmpMsg
0066 break
0067 else:
0068 if self.logicType == "AND":
0069 tmpMsg = f"logic={self.logicType} and "
0070 tmpMsg += f"nMissed={nMissed} <= maxMissed={maxMissed} for {str(criteria)}"
0071 retVal = False, tmpMsg
0072 break
0073 tmpLog.debug("ret={0} : {1}".format(*retVal))
0074 return retVal