Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-19 08:00:06

0001 import datetime
0002 
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 
0007 # logger
0008 baseLogger = core_utils.setup_logger("tiny_throttler")
0009 
0010 
0011 # Tiny throttler
0012 class TinyThrottler(PluginBase):
0013     # constructor
0014     def __init__(self, **kwarg):
0015         # logic type : AND: throttled if all rules are satisfied, OR: throttled if one rule is satisfied
0016         self.logicType = "OR"
0017         PluginBase.__init__(self, **kwarg)
0018         self.dbProxy = DBProxy()
0019 
0020         if not hasattr(self, "rulesForSiteFamilySubmitted"):
0021             self.rulesForSiteFamilySubmitted = {}
0022         if not hasattr(self, "rulesForSiteFamilyRunning"):
0023             self.rulesForSiteFamilyRunning = {}
0024         if not hasattr(self, "rulesForSiteFamilySubmittedRunning"):
0025             self.rulesForSiteFamilySubmittedRunning = {}
0026 
0027     def get_num_workers(self, worker_stats, status):
0028         num_workers = 0
0029         for job_type in worker_stats:
0030             for resource_type in worker_stats[job_type]:
0031                 for st in worker_stats[job_type][resource_type]:
0032                     if st in status:
0033                         n_workers = worker_stats[job_type][resource_type][st]
0034                         num_workers += n_workers
0035         return num_workers
0036 
0037     def get_core_factor(self, q_config, job_type, resource_type):
0038         try:
0039             nCoreFactor = q_config.submitter.get("nCoreFactor", 1)
0040             if type(nCoreFactor) in [dict]:
0041                 n_core_factor = nCoreFactor.get(job_type, {}).get(resource_type, 1)
0042                 return int(n_core_factor)
0043             return int(nCoreFactor)
0044         except Exception as ex:
0045             # logger.warning(f"Failed to get core factor: {ex}")
0046             pass
0047         return 1
0048 
0049     def get_num_cores(self, worker_stats, status, q_config):
0050         num_cores = 0
0051         for job_type in worker_stats:
0052             for resource_type in worker_stats[job_type]:
0053                 for st in worker_stats[job_type][resource_type]:
0054                     if st in status:
0055                         n_workers = worker_stats[job_type][resource_type][st]
0056                         n_core_factor = self.get_core_factor(q_config, job_type, resource_type)
0057                         num_cores += n_workers * n_core_factor
0058         return num_cores
0059 
0060     def evaluate_rule(self, rules, queues, retVal, status=['submitted']):
0061         for rule in rules:
0062             if rule['level'] == 'cores':
0063                 total_cores = 0
0064                 for queue_name in queues:
0065                     worker_stats = queues[queue_name]['stats']
0066                     q_config = queues[queue_name]['queue_config']
0067                     n_cores = self.get_num_cores(worker_stats, status, q_config)
0068                     total_cores += n_cores
0069                 maxCores = rule['maxCores']
0070                 if total_cores > maxCores:
0071                     if self.logicType == "OR":
0072                         tmpMsg = f"logic={self.logicType} and "
0073                         tmpMsg += f"total_cores={total_cores} > maxCores={maxCores} for {str(rule)}"
0074                         retVal = True, tmpMsg
0075                         break
0076                 else:
0077                     if self.logicType == "AND":
0078                         tmpMsg = f"logic={self.logicType} and "
0079                         tmpMsg += f"total_cores={total_cores} > maxCores={maxCores} for {str(rule)}"
0080                         retVal = False, tmpMsg
0081                         break
0082             elif rule['level'] == 'workers':
0083                 total_workers = 0
0084                 for queue_name in queues:
0085                     worker_stats = queues[queue_name]['stats']
0086                     num_workers = self.get_num_workers(worker_stats, status)
0087                     total_workers += num_workers
0088                 maxWorkers = rule['maxWorkers']
0089                 if total_workers > maxWorkers:
0090                     if self.logicType == "OR":
0091                         tmpMsg = f"logic={self.logicType} and "
0092                         tmpMsg += f"total_workers={total_workers} > maxWorkers={maxWorkers} for {str(rule)}"
0093                         retVal = True, tmpMsg
0094                         break
0095                 else:
0096                     if self.logicType == "AND":
0097                         tmpMsg = f"logic={self.logicType} and "
0098                         tmpMsg += f"total_workers={total_workers} <= maxWorkers={maxWorkers} for {str(rule)}"
0099                         retVal = False, tmpMsg
0100                         break
0101         return retVal
0102 
0103     # check if to be throttled
0104     def to_be_throttled(self, queue_config, queue_config_mapper=None):
0105         tmpLog = self.make_logger(baseLogger, f"computingSite={queue_config.queueName}", method_name="to_be_throttled")
0106         tmpLog.debug("start")
0107         # set default return vale
0108         if self.logicType == "OR":
0109             retVal = False, "no rule was satisfied"
0110         else:
0111             retVal = True, "all rules were satisfied"
0112         # loop over all rules
0113         criteriaList = []
0114         maxMissedList = []
0115         timeNow = datetime.datetime.utcnow()
0116         for rule in self.rulesForMissed:
0117             # convert rule to criteria
0118             if rule["level"] == "site":
0119                 criteria = dict()
0120                 criteria["siteName"] = queue_config.siteName
0121                 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0122                 criteriaList.append(criteria)
0123                 maxMissedList.append(rule["maxMissed"])
0124             elif rule["level"] == "pq":
0125                 criteria = dict()
0126                 criteria["computingSite"] = queue_config.queueName
0127                 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0128                 criteriaList.append(criteria)
0129                 maxMissedList.append(rule["maxMissed"])
0130             elif rule["level"] == "ce":
0131                 elmName = "computingElements"
0132                 if elmName not in queue_config.submitter:
0133                     tmpLog.debug(f"skipped since {elmName} is undefined in submitter config")
0134                     continue
0135                 for ce in queue_config.submitter[elmName]:
0136                     criteria = dict()
0137                     criteria["computingElement"] = ce
0138                     criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0139                     criteriaList.append(criteria)
0140                     maxMissedList.append(rule["maxMissed"])
0141         # loop over all criteria
0142         for criteria, maxMissed in zip(criteriaList, maxMissedList):
0143             nMissed = self.dbProxy.get_num_missed_workers(queue_config.queueName, criteria)
0144             if nMissed > maxMissed:
0145                 if self.logicType == "OR":
0146                     tmpMsg = f"logic={self.logicType} and "
0147                     tmpMsg += f"nMissed={nMissed} > maxMissed={maxMissed} for {str(criteria)}"
0148                     retVal = True, tmpMsg
0149                     break
0150             else:
0151                 if self.logicType == "AND":
0152                     tmpMsg = f"logic={self.logicType} and "
0153                     tmpMsg += f"nMissed={nMissed} <= maxMissed={maxMissed} for {str(criteria)}"
0154                     retVal = False, tmpMsg
0155                     break
0156         tmpLog.debug("rulesForMissed ret={0} : {1}".format(*retVal))
0157         if retVal[0]:
0158             return retVal
0159 
0160         if self.rulesForSiteFamilySubmitted or self.rulesForSiteFamilyRunning or self.rulesForSiteFamilySubmittedRunning:
0161             site_name = queue_config.siteFamily
0162             queues = {}
0163 
0164             job_stats = self.dbProxy.get_worker_stats_bulk(None)
0165 
0166             tmpLog.debug("site family name: %s" % site_name)
0167 
0168             all_queue_config = queue_config_mapper.get_all_queues()
0169             for queue_name in all_queue_config:
0170                 q_config = all_queue_config[queue_name]
0171                 if q_config.siteFamily == site_name:
0172                     queues[queue_name] = {'queue_config': q_config, 'stats': {}}
0173                     if queue_name in job_stats:
0174                         queues[queue_name]['stats'] = job_stats[queue_name]
0175 
0176             tmpLog.debug("queues: %s" % queues)
0177 
0178             if self.rulesForSiteFamilySubmitted:
0179                 retVal = self.evaluate_rule(self.rulesForSiteFamilySubmitted, queues, retVal, status=['submitted'])
0180                 tmpLog.debug("rulesForSiteFamilySubmitted ret={0} : {1}".format(*retVal))
0181                 if retVal[0]:
0182                     return retVal
0183             if self.rulesForSiteFamilyRunning:
0184                 retVal = self.evaluate_rule(self.rulesForSiteFamilyRunning, queues, retVal, status=['running'])
0185                 tmpLog.debug("rulesForSiteFamilyRunning ret={0} : {1}".format(*retVal))
0186                 if retVal[0]:
0187                     return retVal
0188             if self.rulesForSiteFamilySubmittedRunning:
0189                 retVal = self.evaluate_rule(self.rulesForSiteFamilySubmittedRunning, queues, retVal, status=['submitted', 'running'])
0190                 tmpLog.debug("rulesForSiteFamilySubmittedRunning ret={0} : {1}".format(*retVal))
0191                 if retVal[0]:
0192                     return retVal
0193         return retVal