File indexing completed on 2026-04-10 08:39:00
0001 from pandajedi.jedicore import Interaction
0002 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0003
0004
0005 THR_LEVEL5 = 5
0006
0007 LEVEL_None = 0
0008 LEVEL_GS = 1
0009 LEVEL_MS = 2
0010 LEVEL_RT = 3
0011
0012 NQUEUELIMIT = "NQUEUELIMIT"
0013 NRUNNINGCAP = "NRUNNINGCAP"
0014 NQUEUECAP = "NQUEUECAP"
0015
0016
0017
0018
0019 non_rt_wqs = ["eventservice"]
0020
0021
0022
0023 class JobThrottlerBase(object):
0024 def __init__(self, taskBufferIF):
0025 self.taskBufferIF = taskBufferIF
0026
0027 self.retTmpError = self.SC_FAILED, True
0028 self.retThrottled = self.SC_SUCCEEDED, True
0029 self.retUnThrottled = self.SC_SUCCEEDED, False
0030 self.retMergeUnThr = self.SC_SUCCEEDED, THR_LEVEL5
0031
0032 self.refresh()
0033 self.msgType = "jobthrottler"
0034 self.comp_name = "base_job_throttler"
0035 self.app = "jedi"
0036
0037
0038 def refresh(self):
0039 self.maxNumJobs = None
0040 self.minPriority = None
0041 self.underNqLimit = False
0042 self.siteMapper = self.taskBufferIF.get_site_mapper()
0043
0044
0045 def setMaxNumJobs(self, maxNumJobs):
0046 self.maxNumJobs = maxNumJobs
0047
0048
0049 def setMinPriority(self, minPriority):
0050 self.minPriority = minPriority
0051
0052
0053 def mergeThrottled(self, thrLevel):
0054
0055 if thrLevel in [True, False]:
0056 return thrLevel
0057 return thrLevel > THR_LEVEL5
0058
0059
0060 def lackOfJobs(self):
0061 return self.underNqLimit
0062
0063
0064 def notEnoughJobsQueued(self):
0065 self.underNqLimit = True
0066
0067 def __getConfiguration(self, vo, queue_name, resource_name):
0068
0069 comp_name = self.comp_name
0070 app = self.app
0071
0072
0073 resource_ms = None
0074 if resource_name.startswith("MCORE"):
0075 resource_ms = "MCORE"
0076 elif resource_name.startswith("SCORE"):
0077 resource_ms = "SCORE"
0078
0079
0080 config_map = {
0081 NQUEUELIMIT: {"value": None, "level": LEVEL_None, "key": None},
0082 NRUNNINGCAP: {"value": None, "level": LEVEL_None, "key": None},
0083 NQUEUECAP: {"value": None, "level": LEVEL_None, "key": None},
0084 }
0085
0086 for tag in (NQUEUELIMIT, NRUNNINGCAP, NQUEUECAP):
0087
0088 key_name = f"{tag}_{queue_name}_{resource_name}"
0089 value = self.taskBufferIF.getConfigValue(comp_name, key_name, app, vo)
0090 if value:
0091 config_map[tag] = {"value": value, "level": LEVEL_RT, "key": key_name}
0092 continue
0093
0094
0095 key_name = f"{tag}_{queue_name}_{resource_ms}*"
0096 value = self.taskBufferIF.getConfigValue(comp_name, key_name, app, vo)
0097 if value:
0098 config_map[tag] = {"value": value, "level": LEVEL_MS, "key": key_name}
0099 continue
0100
0101
0102 key_name = f"{tag}_{queue_name}"
0103 value = self.taskBufferIF.getConfigValue(comp_name, key_name, app, vo)
0104 if value:
0105 config_map[tag] = {"value": value, "level": LEVEL_GS, "key": key_name}
0106
0107 return config_map
0108
0109 def __prepareJobStats(self, work_queue, resource_name, config_map):
0110 """
0111 Calculates the jobs at resource level (SCORE or MCORE) and in total.
0112
0113 :param work_queue: work_queue object
0114 :param resource_name: resource name, e.g. SCORE, MCORE, SCORE_HIMEM, MCORE_HIMEM
0115 :return: resource_level, nRunning, nRunning_level, nNotRun, nNotRun_level, nDefine, nDefine_level, nWaiting, nWaiting_level
0116 """
0117
0118 if resource_name.startswith("MCORE"):
0119 ms = "MCORE"
0120 else:
0121 ms = "SCORE"
0122
0123
0124 status, wq_stats = self.taskBufferIF.getJobStatisticsByResourceType(work_queue)
0125 if not status:
0126 raise RuntimeError("failed to get job statistics")
0127
0128
0129 standby_num_static, standby_num_static_dynamic = self.taskBufferIF.getNumMapForStandbyJobs_JEDI(work_queue)
0130
0131
0132 if "running" not in wq_stats and (len(standby_num_static) > 0 or len(standby_num_static_dynamic) > 0):
0133 wq_stats["running"] = {}
0134
0135
0136 if len(standby_num_static_dynamic) > 0:
0137 wq_stats["dummy"] = standby_num_static_dynamic
0138
0139
0140
0141
0142 nRunning_rt, nRunning_ms, nRunning_gs = 0, 0, 0
0143 nNotRun_rt, nNotRun_ms, nNotRun_gs = 0, 0, 0
0144 nDefine_rt, nDefine_ms, nDefine_gs = 0, 0, 0
0145 nWaiting_rt, nWaiting_gs = 0, 0
0146
0147 for status in wq_stats:
0148 nJobs_rt, nJobs_ms, nJobs_gs = 0, 0, 0
0149 stats_list = list(wq_stats[status].items())
0150
0151 if status == "running":
0152 stats_list += list(standby_num_static.items())
0153 stats_list += list(standby_num_static_dynamic.items())
0154 for resource_type, count in stats_list:
0155 if resource_type == resource_name:
0156 nJobs_rt = count
0157 if resource_type.startswith(ms):
0158 nJobs_ms += count
0159 nJobs_gs += count
0160
0161 if status == "running":
0162 nRunning_rt = nJobs_rt
0163 nRunning_ms = nJobs_ms
0164 nRunning_gs = nJobs_gs
0165 elif status == "defined":
0166 nDefine_rt = nJobs_rt
0167 nDefine_ms = nJobs_ms
0168 nDefine_gs = nJobs_gs
0169 elif status == "waiting":
0170 nWaiting_rt = nJobs_rt
0171 nWaiting_gs = nJobs_gs
0172 elif status in ["assigned", "activated", "starting"]:
0173 nNotRun_rt += nJobs_rt
0174 nNotRun_ms += nJobs_ms
0175 nNotRun_gs += nJobs_gs
0176 elif status == "dummy":
0177 nNotRun_rt -= nJobs_rt
0178 nNotRun_ms -= nJobs_ms
0179 nNotRun_gs -= nJobs_gs
0180
0181
0182
0183 if config_map[NRUNNINGCAP]["level"] == LEVEL_GS:
0184 nRunning_runningcap = nRunning_gs
0185 elif config_map[NRUNNINGCAP]["level"] == LEVEL_MS:
0186 nRunning_runningcap = nRunning_ms
0187 else:
0188 nRunning_runningcap = nRunning_rt
0189
0190
0191 if config_map[NQUEUELIMIT]["level"] == LEVEL_GS:
0192 nNotRun_queuelimit = nNotRun_gs
0193 nDefine_queuelimit = nDefine_gs
0194 elif config_map[NQUEUELIMIT]["level"] == LEVEL_MS:
0195 nNotRun_queuelimit = nNotRun_ms
0196 nDefine_queuelimit = nDefine_ms
0197 else:
0198 nNotRun_queuelimit = nNotRun_rt
0199 nDefine_queuelimit = nDefine_rt
0200
0201
0202 if config_map[NQUEUECAP]["level"] == LEVEL_GS:
0203 nNotRun_queuecap = nNotRun_gs
0204 nDefine_queuecap = nDefine_gs
0205 elif config_map[NQUEUECAP]["level"] == LEVEL_MS:
0206 nNotRun_queuecap = nNotRun_ms
0207 nDefine_queuecap = nDefine_ms
0208 else:
0209 nNotRun_queuecap = nNotRun_rt
0210 nDefine_queuecap = nDefine_rt
0211
0212 return_map = {
0213 "nRunning_rt": nRunning_rt,
0214 "nRunning_gs": nRunning_gs,
0215 "nRunning_runningcap": nRunning_runningcap,
0216 "nNotRun_rt": nNotRun_rt,
0217 "nNotRun_gs": nNotRun_gs,
0218 "nNotRun_queuelimit": nNotRun_queuelimit,
0219 "nNotRun_queuecap": nNotRun_queuecap,
0220 "nDefine_rt": nDefine_rt,
0221 "nDefine_gs": nDefine_gs,
0222 "nDefine_queuelimit": nDefine_queuelimit,
0223 "nDefine_queuecap": nDefine_queuecap,
0224 "nWaiting_rt": nWaiting_rt,
0225 "nWaiting_gs": nWaiting_gs,
0226 }
0227
0228 return return_map
0229
0230
0231 def toBeThrottledBase(self, vo, prodSourceLabel, cloud_name, workQueue, resource_name, logger):
0232 workqueue_id = workQueue.getID()
0233 workqueue_name = "_".join(workQueue.queue_name.split(" "))
0234
0235
0236 nBunch = 4
0237 work_shortage = self.taskBufferIF.getConfigValue("core", "WORK_SHORTAGE", self.app, vo)
0238 if work_shortage is True:
0239 threshold = self.taskBufferIF.getConfigValue(self.comp_name, "THROTTLE_THRESHOLD_FOR_WORK_SHORTAGE", self.app, vo)
0240 else:
0241 threshold = self.taskBufferIF.getConfigValue(self.comp_name, "THROTTLE_THRESHOLD", self.app, vo)
0242 if threshold is None:
0243 threshold = 2.0
0244 nJobsInBunchMax = 600
0245 nJobsInBunchMin = 500
0246 minTotalWalltime = 50 * 1000 * 1000
0247 nWaitingLimit = 4
0248 nWaitingBunchLimit = 2
0249 nParallel = 2
0250 nParallelCap = 5
0251
0252
0253 tmp_log = MsgWrapper(logger)
0254 msg_header = f"{vo}:{prodSourceLabel} cloud={cloud_name} queue={workqueue_name} resource_type={resource_name}:"
0255 tmp_log.debug(f"{msg_header} start workqueue_id={workqueue_id} threshold={threshold}")
0256
0257
0258 if not workQueue.throttled:
0259 msg_body = "PASS unthrottled since GS_throttled is False"
0260 tmp_log.info(msg_header + " " + msg_body)
0261 return self.retUnThrottled
0262
0263
0264 config_map = self.__getConfiguration(vo, workQueue.queue_name, resource_name)
0265 configQueueLimit = config_map[NQUEUELIMIT]["value"]
0266 configQueueLimitKey = config_map[NQUEUELIMIT]["key"]
0267 configQueueCap = config_map[NQUEUECAP]["value"]
0268 configQueueCapKey = config_map[NQUEUECAP]["key"]
0269 configRunningCap = config_map[NRUNNINGCAP]["value"]
0270 configRunningCapKey = config_map[NRUNNINGCAP]["key"]
0271
0272 tmp_log.debug(
0273 msg_header + " got configuration configQueueLimit={} ({}), configQueueCap={} ({}),"
0274 " configRunningCap={} ({})".format(configQueueLimit, configQueueLimitKey, configQueueCap, configQueueCapKey, configRunningCap, configRunningCapKey)
0275 )
0276
0277
0278 jobstats_map = self.__prepareJobStats(workQueue, resource_name, config_map)
0279
0280 tmp_log.debug(f"jobstats_map: {jobstats_map}")
0281 nRunning_rt = jobstats_map["nRunning_rt"]
0282 nRunning_gs = jobstats_map["nRunning_gs"]
0283 nRunning_runningcap = jobstats_map["nRunning_runningcap"]
0284 nNotRun_rt = jobstats_map["nNotRun_rt"]
0285 nNotRun_gs = jobstats_map["nNotRun_gs"]
0286 nNotRun_queuelimit = jobstats_map["nNotRun_queuelimit"]
0287 nNotRun_queuecap = jobstats_map["nNotRun_queuecap"]
0288 nDefine_rt = jobstats_map["nDefine_rt"]
0289 nDefine_gs = jobstats_map["nDefine_gs"]
0290 nDefine_queuelimit = jobstats_map["nDefine_queuelimit"]
0291 nDefine_queuecap = jobstats_map["nDefine_queuecap"]
0292 nWaiting_rt = jobstats_map["nWaiting_rt"]
0293 nWaiting_gs = jobstats_map["nWaiting_gs"]
0294
0295
0296 if workQueue.queue_name in non_rt_wqs:
0297
0298 tmpStat, highestPrioJobStat = self.taskBufferIF.getHighestPrioJobStat_JEDI(prodSourceLabel, cloud_name, workQueue)
0299
0300 highestPrioWaiting = self.taskBufferIF.checkWaitingTaskPrio_JEDI(vo, workQueue, prodSourceLabel, cloud_name)
0301 else:
0302
0303 tmpStat, highestPrioJobStat = self.taskBufferIF.getHighestPrioJobStat_JEDI(prodSourceLabel, cloud_name, workQueue, resource_name)
0304
0305 highestPrioWaiting = self.taskBufferIF.checkWaitingTaskPrio_JEDI(vo, workQueue, prodSourceLabel, cloud_name, resource_name)
0306
0307 highestPrioInPandaDB = highestPrioJobStat["highestPrio"]
0308 nNotRunHighestPrio = highestPrioJobStat["nNotRun"]
0309 if highestPrioWaiting is None:
0310 msg_body = "failed to get the highest priority of waiting tasks"
0311 tmp_log.error(f"{msg_header} {msg_body}")
0312 return self.retTmpError
0313
0314
0315 highPrioQueued = False
0316 if prodSourceLabel != "user":
0317 if highestPrioWaiting > highestPrioInPandaDB or (highestPrioWaiting == highestPrioInPandaDB and nNotRunHighestPrio < nJobsInBunchMin):
0318 highPrioQueued = True
0319 tmp_log.debug(
0320 "{0} highestPrio waiting:{1} inPanda:{2} numNotRun:{3} -> highPrioQueued={4}".format(
0321 msg_header, highestPrioWaiting, highestPrioInPandaDB, nNotRunHighestPrio, highPrioQueued
0322 )
0323 )
0324
0325 if workQueue.queue_name in non_rt_wqs:
0326 tmpRemainingSlot = int(nRunning_gs * threshold - nNotRun_gs)
0327 else:
0328 tmpRemainingSlot = int(nRunning_rt * threshold - nNotRun_rt)
0329
0330 nJobsInBunch = min(max(nJobsInBunchMin, tmpRemainingSlot), nJobsInBunchMax)
0331
0332 if configQueueLimit is not None:
0333 nQueueLimit = configQueueLimit
0334 else:
0335 nQueueLimit = nJobsInBunch * nBunch
0336
0337
0338 if workQueue.queue_name in ["Heavy Ion", "Reprocessing default"]:
0339
0340 if nQueueLimit > (nNotRun_queuelimit + nDefine_queuelimit):
0341 tmpRemainingSlot = nQueueLimit - (nNotRun_queuelimit + nDefine_queuelimit)
0342 if tmpRemainingSlot > nJobsInBunch:
0343 nJobsInBunch = min(tmpRemainingSlot, nJobsInBunchMax)
0344
0345
0346
0347 if configQueueCap is None:
0348 self.setMaxNumJobs(nJobsInBunch // nParallel)
0349 else:
0350 self.setMaxNumJobs(configQueueCap // nParallelCap)
0351
0352
0353 totWalltime = self.taskBufferIF.getTotalWallTime_JEDI(vo, prodSourceLabel, workQueue, resource_name)
0354
0355
0356 tmp_log.info(f"{msg_header} nQueueLimit={nQueueLimit} nRunCap={configRunningCap} nQueueCap={configQueueCap}")
0357 tmp_log.info(f"{msg_header} at global share level: nQueued={nNotRun_gs + nDefine_gs} nDefine={nDefine_gs} nRunning={nRunning_gs}")
0358 tmpMsg = ""
0359 if config_map[NQUEUECAP]["level"] == LEVEL_MS:
0360 tmpMsg = f"{msg_header} at MCORE/SCORE level: "
0361 tmpMsg += f"nQueued_ms={nNotRun_queuecap} "
0362 if config_map[NRUNNINGCAP]["level"] == LEVEL_MS:
0363 if not tmpMsg:
0364 tmpMsg = f"{msg_header} at MCORE/SCORE level: "
0365 tmpMsg += f"nRunning_ms={nRunning_runningcap} "
0366 if tmpMsg:
0367 tmp_log.info(tmpMsg)
0368 tmp_log.info(
0369 "{0} at resource type level: nQueued_rt={1} nDefine_rt={2} nRunning_rt={3} totWalltime={4}".format(
0370 msg_header, nNotRun_rt + nDefine_rt, nDefine_rt, nRunning_rt, totWalltime
0371 )
0372 )
0373
0374
0375 limitPriority = False
0376 if (
0377 workQueue.queue_name not in non_rt_wqs
0378 and nRunning_rt == 0
0379 and (nNotRun_queuelimit + nDefine_queuelimit) > nQueueLimit
0380 and (totWalltime is None or totWalltime > minTotalWalltime)
0381 ):
0382 limitPriority = True
0383 if not highPrioQueued:
0384
0385 msg_body = "SKIP no running and enough nQueued_queuelimit={}>({}={}) & totWalltime({})>{} ".format(
0386 nNotRun_queuelimit + nDefine_queuelimit, configQueueLimitKey, nQueueLimit, totWalltime, minTotalWalltime
0387 )
0388 tmp_log.info(f"{msg_header} {msg_body}")
0389 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0390 return self.retMergeUnThr
0391
0392 elif workQueue.queue_name in non_rt_wqs and nRunning_gs == 0 and (nNotRun_queuelimit + nDefine_queuelimit) > nQueueLimit:
0393 limitPriority = True
0394 if not highPrioQueued:
0395
0396 msg_body = f"SKIP no running and enough nQueued_queuelimit={nNotRun_queuelimit + nDefine_queuelimit}>({configQueueLimitKey}={nQueueLimit})"
0397 tmp_log.info(f"{msg_header} {msg_body}")
0398 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0399 return self.retMergeUnThr
0400
0401 elif (
0402 workQueue.queue_name not in non_rt_wqs
0403 and nRunning_rt != 0
0404 and float(nNotRun_rt + nDefine_rt) / float(nRunning_rt) > threshold
0405 and (nNotRun_queuelimit + nDefine_queuelimit) > nQueueLimit
0406 and (totWalltime is None or totWalltime > minTotalWalltime)
0407 ):
0408 limitPriority = True
0409 if not highPrioQueued:
0410
0411 msg_body = "SKIP nQueued_rt({})/nRunning_rt({})>{} & nQueued_queuelimit={}>({}={}) & totWalltime({})>{}".format(
0412 nNotRun_rt + nDefine_rt,
0413 nRunning_rt,
0414 threshold,
0415 nNotRun_queuelimit + nDefine_queuelimit,
0416 configQueueLimitKey,
0417 nQueueLimit,
0418 totWalltime,
0419 minTotalWalltime,
0420 )
0421 tmp_log.info(f"{msg_header} {msg_body}")
0422 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0423 return self.retMergeUnThr
0424
0425 elif (
0426 workQueue.queue_name in non_rt_wqs
0427 and nRunning_gs != 0
0428 and float(nNotRun_gs + nDefine_gs) / float(nRunning_gs) > threshold
0429 and (nNotRun_queuelimit + nDefine_queuelimit) > nQueueLimit
0430 ):
0431 limitPriority = True
0432 if not highPrioQueued:
0433
0434 msg_body = "SKIP nQueued_gs({})/nRunning_gs({})>{} & nQueued_queuelimit={}>({}={})".format(
0435 nNotRun_gs + nDefine_gs, nRunning_gs, threshold, nNotRun_queuelimit + nDefine_queuelimit, configQueueLimitKey, nQueueLimit
0436 )
0437 tmp_log.info(f"{msg_header} {msg_body}")
0438 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0439 return self.retMergeUnThr
0440
0441 elif nDefine_queuelimit > nQueueLimit:
0442 limitPriority = True
0443 if not highPrioQueued:
0444
0445 msg_body = f"SKIP too many nDefined_queuelimit={nDefine_queuelimit}>({configQueueLimitKey}={nQueueLimit})"
0446 tmp_log.info(f"{msg_header} {msg_body}")
0447 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0448 return self.retMergeUnThr
0449
0450 elif nWaiting_rt > max(nRunning_rt * nWaitingLimit, nJobsInBunch * nWaitingBunchLimit):
0451 limitPriority = True
0452 if not highPrioQueued:
0453
0454 msg_body = f"SKIP too many nWaiting_rt({nWaiting_rt})>max(nRunning_rt({nRunning_rt})x{nWaitingLimit},{nJobsInBunch}x{nWaitingBunchLimit})"
0455 tmp_log.info(f"{msg_header} {msg_body}")
0456 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0457 return self.retMergeUnThr
0458
0459 elif configRunningCap and nRunning_runningcap > configRunningCap:
0460
0461 msg_body = f"SKIP nRunning_runningcap({nRunning_runningcap})>nRunningCap({configRunningCapKey}={configRunningCap})"
0462 tmp_log.info(f"{msg_header} {msg_body}")
0463 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0464 return self.retMergeUnThr
0465
0466 elif configQueueCap and nNotRun_queuecap + nDefine_queuecap > configQueueCap:
0467 limitPriority = True
0468 if not highPrioQueued:
0469
0470 msg_body = f"SKIP nQueued_queuecap({nNotRun_queuecap + nDefine_queuecap})>nQueueCap({configQueueCapKey}={configQueueCap})"
0471 tmp_log.info(f"{msg_header} {msg_body}")
0472 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0473 return self.retMergeUnThr
0474
0475
0476 limitPriorityValue = None
0477 if limitPriority:
0478 limitPriorityValue = highestPrioWaiting
0479 self.setMinPriority(limitPriorityValue)
0480 else:
0481
0482 if (
0483 (nNotRun_queuelimit + nDefine_queuelimit < nQueueLimit * 0.9)
0484 or (workQueue.queue_name in non_rt_wqs and nNotRun_gs + nDefine_gs < nRunning_gs)
0485 or (workQueue.queue_name not in non_rt_wqs and nNotRun_rt + nDefine_rt < nRunning_rt)
0486 ):
0487 tmp_log.debug(msg_header + " not enough jobs queued")
0488 self.notEnoughJobsQueued()
0489 self.setMaxNumJobs(max(self.maxNumJobs, nQueueLimit / 20))
0490
0491 msg_body = f"PASS - not throttled since priority limit={limitPriorityValue} maxNumJobs={self.maxNumJobs}"
0492 tmp_log.info(msg_header + " " + msg_body)
0493 return self.retUnThrottled
0494
0495
0496 Interaction.installSC(JobThrottlerBase)