File indexing completed on 2026-04-19 08:00:06
0001 import datetime
0002
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006
0007
0008 baseLogger = core_utils.setup_logger("tiny_throttler")
0009
0010
0011
0012 class TinyThrottler(PluginBase):
0013
0014 def __init__(self, **kwarg):
0015
0016 self.logicType = "OR"
0017 PluginBase.__init__(self, **kwarg)
0018 self.dbProxy = DBProxy()
0019
0020 if not hasattr(self, "rulesForSiteFamilySubmitted"):
0021 self.rulesForSiteFamilySubmitted = {}
0022 if not hasattr(self, "rulesForSiteFamilyRunning"):
0023 self.rulesForSiteFamilyRunning = {}
0024 if not hasattr(self, "rulesForSiteFamilySubmittedRunning"):
0025 self.rulesForSiteFamilySubmittedRunning = {}
0026
0027 def get_num_workers(self, worker_stats, status):
0028 num_workers = 0
0029 for job_type in worker_stats:
0030 for resource_type in worker_stats[job_type]:
0031 for st in worker_stats[job_type][resource_type]:
0032 if st in status:
0033 n_workers = worker_stats[job_type][resource_type][st]
0034 num_workers += n_workers
0035 return num_workers
0036
0037 def get_core_factor(self, q_config, job_type, resource_type):
0038 try:
0039 nCoreFactor = q_config.submitter.get("nCoreFactor", 1)
0040 if type(nCoreFactor) in [dict]:
0041 n_core_factor = nCoreFactor.get(job_type, {}).get(resource_type, 1)
0042 return int(n_core_factor)
0043 return int(nCoreFactor)
0044 except Exception as ex:
0045
0046 pass
0047 return 1
0048
0049 def get_num_cores(self, worker_stats, status, q_config):
0050 num_cores = 0
0051 for job_type in worker_stats:
0052 for resource_type in worker_stats[job_type]:
0053 for st in worker_stats[job_type][resource_type]:
0054 if st in status:
0055 n_workers = worker_stats[job_type][resource_type][st]
0056 n_core_factor = self.get_core_factor(q_config, job_type, resource_type)
0057 num_cores += n_workers * n_core_factor
0058 return num_cores
0059
0060 def evaluate_rule(self, rules, queues, retVal, status=['submitted']):
0061 for rule in rules:
0062 if rule['level'] == 'cores':
0063 total_cores = 0
0064 for queue_name in queues:
0065 worker_stats = queues[queue_name]['stats']
0066 q_config = queues[queue_name]['queue_config']
0067 n_cores = self.get_num_cores(worker_stats, status, q_config)
0068 total_cores += n_cores
0069 maxCores = rule['maxCores']
0070 if total_cores > maxCores:
0071 if self.logicType == "OR":
0072 tmpMsg = f"logic={self.logicType} and "
0073 tmpMsg += f"total_cores={total_cores} > maxCores={maxCores} for {str(rule)}"
0074 retVal = True, tmpMsg
0075 break
0076 else:
0077 if self.logicType == "AND":
0078 tmpMsg = f"logic={self.logicType} and "
0079 tmpMsg += f"total_cores={total_cores} > maxCores={maxCores} for {str(rule)}"
0080 retVal = False, tmpMsg
0081 break
0082 elif rule['level'] == 'workers':
0083 total_workers = 0
0084 for queue_name in queues:
0085 worker_stats = queues[queue_name]['stats']
0086 num_workers = self.get_num_workers(worker_stats, status)
0087 total_workers += num_workers
0088 maxWorkers = rule['maxWorkers']
0089 if total_workers > maxWorkers:
0090 if self.logicType == "OR":
0091 tmpMsg = f"logic={self.logicType} and "
0092 tmpMsg += f"total_workers={total_workers} > maxWorkers={maxWorkers} for {str(rule)}"
0093 retVal = True, tmpMsg
0094 break
0095 else:
0096 if self.logicType == "AND":
0097 tmpMsg = f"logic={self.logicType} and "
0098 tmpMsg += f"total_workers={total_workers} <= maxWorkers={maxWorkers} for {str(rule)}"
0099 retVal = False, tmpMsg
0100 break
0101 return retVal
0102
0103
0104 def to_be_throttled(self, queue_config, queue_config_mapper=None):
0105 tmpLog = self.make_logger(baseLogger, f"computingSite={queue_config.queueName}", method_name="to_be_throttled")
0106 tmpLog.debug("start")
0107
0108 if self.logicType == "OR":
0109 retVal = False, "no rule was satisfied"
0110 else:
0111 retVal = True, "all rules were satisfied"
0112
0113 criteriaList = []
0114 maxMissedList = []
0115 timeNow = datetime.datetime.utcnow()
0116 for rule in self.rulesForMissed:
0117
0118 if rule["level"] == "site":
0119 criteria = dict()
0120 criteria["siteName"] = queue_config.siteName
0121 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0122 criteriaList.append(criteria)
0123 maxMissedList.append(rule["maxMissed"])
0124 elif rule["level"] == "pq":
0125 criteria = dict()
0126 criteria["computingSite"] = queue_config.queueName
0127 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0128 criteriaList.append(criteria)
0129 maxMissedList.append(rule["maxMissed"])
0130 elif rule["level"] == "ce":
0131 elmName = "computingElements"
0132 if elmName not in queue_config.submitter:
0133 tmpLog.debug(f"skipped since {elmName} is undefined in submitter config")
0134 continue
0135 for ce in queue_config.submitter[elmName]:
0136 criteria = dict()
0137 criteria["computingElement"] = ce
0138 criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
0139 criteriaList.append(criteria)
0140 maxMissedList.append(rule["maxMissed"])
0141
0142 for criteria, maxMissed in zip(criteriaList, maxMissedList):
0143 nMissed = self.dbProxy.get_num_missed_workers(queue_config.queueName, criteria)
0144 if nMissed > maxMissed:
0145 if self.logicType == "OR":
0146 tmpMsg = f"logic={self.logicType} and "
0147 tmpMsg += f"nMissed={nMissed} > maxMissed={maxMissed} for {str(criteria)}"
0148 retVal = True, tmpMsg
0149 break
0150 else:
0151 if self.logicType == "AND":
0152 tmpMsg = f"logic={self.logicType} and "
0153 tmpMsg += f"nMissed={nMissed} <= maxMissed={maxMissed} for {str(criteria)}"
0154 retVal = False, tmpMsg
0155 break
0156 tmpLog.debug("rulesForMissed ret={0} : {1}".format(*retVal))
0157 if retVal[0]:
0158 return retVal
0159
0160 if self.rulesForSiteFamilySubmitted or self.rulesForSiteFamilyRunning or self.rulesForSiteFamilySubmittedRunning:
0161 site_name = queue_config.siteFamily
0162 queues = {}
0163
0164 job_stats = self.dbProxy.get_worker_stats_bulk(None)
0165
0166 tmpLog.debug("site family name: %s" % site_name)
0167
0168 all_queue_config = queue_config_mapper.get_all_queues()
0169 for queue_name in all_queue_config:
0170 q_config = all_queue_config[queue_name]
0171 if q_config.siteFamily == site_name:
0172 queues[queue_name] = {'queue_config': q_config, 'stats': {}}
0173 if queue_name in job_stats:
0174 queues[queue_name]['stats'] = job_stats[queue_name]
0175
0176 tmpLog.debug("queues: %s" % queues)
0177
0178 if self.rulesForSiteFamilySubmitted:
0179 retVal = self.evaluate_rule(self.rulesForSiteFamilySubmitted, queues, retVal, status=['submitted'])
0180 tmpLog.debug("rulesForSiteFamilySubmitted ret={0} : {1}".format(*retVal))
0181 if retVal[0]:
0182 return retVal
0183 if self.rulesForSiteFamilyRunning:
0184 retVal = self.evaluate_rule(self.rulesForSiteFamilyRunning, queues, retVal, status=['running'])
0185 tmpLog.debug("rulesForSiteFamilyRunning ret={0} : {1}".format(*retVal))
0186 if retVal[0]:
0187 return retVal
0188 if self.rulesForSiteFamilySubmittedRunning:
0189 retVal = self.evaluate_rule(self.rulesForSiteFamilySubmittedRunning, queues, retVal, status=['submitted', 'running'])
0190 tmpLog.debug("rulesForSiteFamilySubmittedRunning ret={0} : {1}".format(*retVal))
0191 if retVal[0]:
0192 return retVal
0193 return retVal