Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:06

0001 import datetime
0002 
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 
0007 # logger
0008 baseLogger = core_utils.setup_logger("simple_throttler")
0009 
0010 
0011 # simple throttler
0012 class SimpleThrottler(PluginBase):
0013     # constructor
0014     def __init__(self, **kwarg):
0015         # logic type : AND: throttled if all rules are satisfied, OR: throttled if one rule is satisfied
0016         self.logicType = "OR"
0017         PluginBase.__init__(self, **kwarg)
0018         self.dbProxy = DBProxy()
0019 
0020     # check if to be throttled
0021     def to_be_throttled(self, queue_config, queue_config_mapper=None):
0022         tmpLog = self.make_logger(baseLogger, f"computingSite={queue_config.queueName}", method_name="to_be_throttled")
0023         tmpLog.debug("start")
0024         # set default return vale
0025         if self.logicType == "OR":
0026             retVal = False, "no rule was satisfied"
0027         else:
0028             retVal = True, "all rules were satisfied"
0029         # loop over all rules
0030         criteriaList = []
0031         maxMissedList = []
0032         timeNow = core_utils.naive_utcnow()
0033         for rule in self.rulesForMissed:
0034             # convert rule to criteria
0035             if rule["level"] == "site":
0036                 criteria = dict()
0037                 criteria["siteName"] = queue_config.siteName
0038                 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0039                 criteriaList.append(criteria)
0040                 maxMissedList.append(rule["maxMissed"])
0041             elif rule["level"] == "pq":
0042                 criteria = dict()
0043                 criteria["computingSite"] = queue_config.queueName
0044                 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0045                 criteriaList.append(criteria)
0046                 maxMissedList.append(rule["maxMissed"])
0047             elif rule["level"] == "ce":
0048                 elmName = "computingElements"
0049                 if elmName not in queue_config.submitter:
0050                     tmpLog.debug(f"skipped since {elmName} is undefined in submitter config")
0051                     continue
0052                 for ce in queue_config.submitter[elmName]:
0053                     criteria = dict()
0054                     criteria["computingElement"] = ce
0055                     criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0056                     criteriaList.append(criteria)
0057                     maxMissedList.append(rule["maxMissed"])
0058         # loop over all criteria
0059         for criteria, maxMissed in zip(criteriaList, maxMissedList):
0060             nMissed = self.dbProxy.get_num_missed_workers(queue_config.queueName, criteria)
0061             if nMissed > maxMissed:
0062                 if self.logicType == "OR":
0063                     tmpMsg = f"logic={self.logicType} and "
0064                     tmpMsg += f"nMissed={nMissed} > maxMissed={maxMissed} for {str(criteria)}"
0065                     retVal = True, tmpMsg
0066                     break
0067             else:
0068                 if self.logicType == "AND":
0069                     tmpMsg = f"logic={self.logicType} and "
0070                     tmpMsg += f"nMissed={nMissed} <= maxMissed={maxMissed} for {str(criteria)}"
0071                     retVal = False, tmpMsg
0072                     break
0073         tmpLog.debug("ret={0} : {1}".format(*retVal))
0074         return retVal