Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 from pandajedi.jedicore import Interaction
0002 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0003 
0004 # throttle level
0005 THR_LEVEL5 = 5
0006 
0007 LEVEL_None = 0  # There is no configuration defined
0008 LEVEL_GS = 1  # There is a configuration defined at global share level
0009 LEVEL_MS = 2  # There is a configuration defined at MCORE/SCORE level
0010 LEVEL_RT = 3  # There is a configuration defined at resource type level
0011 
0012 NQUEUELIMIT = "NQUEUELIMIT"
0013 NRUNNINGCAP = "NRUNNINGCAP"
0014 NQUEUECAP = "NQUEUECAP"
0015 
0016 # workqueues that do not work at resource type level.
0017 # E.g. event service is a special case, since MCORE tasks generate SCORE jobs. Therefore we can't work at
0018 # resource type level and need to go to the global level, in order to avoid over-generating jobs
0019 non_rt_wqs = ["eventservice"]
0020 
0021 
0022 # base class for job throttle
0023 class JobThrottlerBase(object):
0024     def __init__(self, taskBufferIF):
0025         self.taskBufferIF = taskBufferIF
0026         # returns
0027         self.retTmpError = self.SC_FAILED, True
0028         self.retThrottled = self.SC_SUCCEEDED, True
0029         self.retUnThrottled = self.SC_SUCCEEDED, False
0030         self.retMergeUnThr = self.SC_SUCCEEDED, THR_LEVEL5
0031         # limit
0032         self.refresh()
0033         self.msgType = "jobthrottler"
0034         self.comp_name = "base_job_throttler"
0035         self.app = "jedi"
0036 
0037     # refresh
0038     def refresh(self):
0039         self.maxNumJobs = None
0040         self.minPriority = None
0041         self.underNqLimit = False
0042         self.siteMapper = self.taskBufferIF.get_site_mapper()
0043 
0044     # set maximum number of jobs to be submitted
0045     def setMaxNumJobs(self, maxNumJobs):
0046         self.maxNumJobs = maxNumJobs
0047 
0048     # set min priority of jobs to be submitted
0049     def setMinPriority(self, minPriority):
0050         self.minPriority = minPriority
0051 
0052     # check throttle level
0053     def mergeThrottled(self, thrLevel):
0054         # un-leveled flag
0055         if thrLevel in [True, False]:
0056             return thrLevel
0057         return thrLevel > THR_LEVEL5
0058 
0059     # check if lack of jobs
0060     def lackOfJobs(self):
0061         return self.underNqLimit
0062 
0063     # not enough jobs are queued
0064     def notEnoughJobsQueued(self):
0065         self.underNqLimit = True
0066 
0067     def __getConfiguration(self, vo, queue_name, resource_name):
0068         # component name
0069         comp_name = self.comp_name
0070         app = self.app
0071 
0072         # Avoid memory fragmentation
0073         resource_ms = None
0074         if resource_name.startswith("MCORE"):
0075             resource_ms = "MCORE"
0076         elif resource_name.startswith("SCORE"):
0077             resource_ms = "SCORE"
0078 
0079         # Read the WQ config values from the DB
0080         config_map = {
0081             NQUEUELIMIT: {"value": None, "level": LEVEL_None, "key": None},
0082             NRUNNINGCAP: {"value": None, "level": LEVEL_None, "key": None},
0083             NQUEUECAP: {"value": None, "level": LEVEL_None, "key": None},
0084         }
0085 
0086         for tag in (NQUEUELIMIT, NRUNNINGCAP, NQUEUECAP):
0087             # 1. try to get a wq + resource_type specific limit
0088             key_name = f"{tag}_{queue_name}_{resource_name}"
0089             value = self.taskBufferIF.getConfigValue(comp_name, key_name, app, vo)
0090             if value:
0091                 config_map[tag] = {"value": value, "level": LEVEL_RT, "key": key_name}
0092                 continue
0093 
0094             # 2. try to get a wq + MCORE/SCORE specific limit
0095             key_name = f"{tag}_{queue_name}_{resource_ms}*"
0096             value = self.taskBufferIF.getConfigValue(comp_name, key_name, app, vo)
0097             if value:
0098                 config_map[tag] = {"value": value, "level": LEVEL_MS, "key": key_name}
0099                 continue
0100 
0101             # 3. try to get a wq specific limit
0102             key_name = f"{tag}_{queue_name}"
0103             value = self.taskBufferIF.getConfigValue(comp_name, key_name, app, vo)
0104             if value:
0105                 config_map[tag] = {"value": value, "level": LEVEL_GS, "key": key_name}
0106 
0107         return config_map
0108 
0109     def __prepareJobStats(self, work_queue, resource_name, config_map):
0110         """
0111         Calculates the jobs at resource level (SCORE or MCORE) and in total.
0112 
0113         :param work_queue: work_queue object
0114         :param resource_name: resource name, e.g. SCORE, MCORE, SCORE_HIMEM, MCORE_HIMEM
0115         :return: resource_level, nRunning, nRunning_level, nNotRun, nNotRun_level, nDefine, nDefine_level, nWaiting, nWaiting_level
0116         """
0117         # SCORE vs MCORE
0118         if resource_name.startswith("MCORE"):
0119             ms = "MCORE"
0120         else:
0121             ms = "SCORE"
0122 
0123         # get job statistics
0124         status, wq_stats = self.taskBufferIF.getJobStatisticsByResourceType(work_queue)
0125         if not status:
0126             raise RuntimeError("failed to get job statistics")
0127 
0128         # get the number of standby jobs which is used as the number of running jobs
0129         standby_num_static, standby_num_static_dynamic = self.taskBufferIF.getNumMapForStandbyJobs_JEDI(work_queue)
0130 
0131         # add running if the original stat doesn't have running and standby jobs are required
0132         if "running" not in wq_stats and (len(standby_num_static) > 0 or len(standby_num_static_dynamic) > 0):
0133             wq_stats["running"] = {}
0134 
0135         # add dummy to subtract # of starting for dynamic number of standby jobs
0136         if len(standby_num_static_dynamic) > 0:
0137             wq_stats["dummy"] = standby_num_static_dynamic
0138 
0139         # Count number of jobs in each status
0140         # We want to generate one value for the total, one value for the relevant MCORE/SCORE level
0141         # and one value for the full global share
0142         nRunning_rt, nRunning_ms, nRunning_gs = 0, 0, 0
0143         nNotRun_rt, nNotRun_ms, nNotRun_gs = 0, 0, 0
0144         nDefine_rt, nDefine_ms, nDefine_gs = 0, 0, 0
0145         nWaiting_rt, nWaiting_gs = 0, 0
0146 
0147         for status in wq_stats:
0148             nJobs_rt, nJobs_ms, nJobs_gs = 0, 0, 0
0149             stats_list = list(wq_stats[status].items())
0150             # take into account the number of standby jobs
0151             if status == "running":
0152                 stats_list += list(standby_num_static.items())
0153                 stats_list += list(standby_num_static_dynamic.items())
0154             for resource_type, count in stats_list:
0155                 if resource_type == resource_name:
0156                     nJobs_rt = count
0157                 if resource_type.startswith(ms):
0158                     nJobs_ms += count
0159                 nJobs_gs += count
0160 
0161             if status == "running":
0162                 nRunning_rt = nJobs_rt
0163                 nRunning_ms = nJobs_ms
0164                 nRunning_gs = nJobs_gs
0165             elif status == "defined":
0166                 nDefine_rt = nJobs_rt
0167                 nDefine_ms = nJobs_ms
0168                 nDefine_gs = nJobs_gs
0169             elif status == "waiting":
0170                 nWaiting_rt = nJobs_rt
0171                 nWaiting_gs = nJobs_gs
0172             elif status in ["assigned", "activated", "starting"]:
0173                 nNotRun_rt += nJobs_rt
0174                 nNotRun_ms += nJobs_ms
0175                 nNotRun_gs += nJobs_gs
0176             elif status == "dummy":
0177                 nNotRun_rt -= nJobs_rt
0178                 nNotRun_ms -= nJobs_ms
0179                 nNotRun_gs -= nJobs_gs
0180 
0181         # Get the job stats at the same level as the configured parameters
0182         # nRunning is compared with the nRunningCap
0183         if config_map[NRUNNINGCAP]["level"] == LEVEL_GS:
0184             nRunning_runningcap = nRunning_gs
0185         elif config_map[NRUNNINGCAP]["level"] == LEVEL_MS:
0186             nRunning_runningcap = nRunning_ms
0187         else:
0188             nRunning_runningcap = nRunning_rt
0189 
0190         # nNotRun and nDefine are compared with the nQueueLimit
0191         if config_map[NQUEUELIMIT]["level"] == LEVEL_GS:
0192             nNotRun_queuelimit = nNotRun_gs
0193             nDefine_queuelimit = nDefine_gs
0194         elif config_map[NQUEUELIMIT]["level"] == LEVEL_MS:
0195             nNotRun_queuelimit = nNotRun_ms
0196             nDefine_queuelimit = nDefine_ms
0197         else:
0198             nNotRun_queuelimit = nNotRun_rt
0199             nDefine_queuelimit = nDefine_rt
0200 
0201         # nNotRun and nDefine are compared with the nQueueCap
0202         if config_map[NQUEUECAP]["level"] == LEVEL_GS:
0203             nNotRun_queuecap = nNotRun_gs
0204             nDefine_queuecap = nDefine_gs
0205         elif config_map[NQUEUECAP]["level"] == LEVEL_MS:
0206             nNotRun_queuecap = nNotRun_ms
0207             nDefine_queuecap = nDefine_ms
0208         else:
0209             nNotRun_queuecap = nNotRun_rt
0210             nDefine_queuecap = nDefine_rt
0211 
0212         return_map = {
0213             "nRunning_rt": nRunning_rt,
0214             "nRunning_gs": nRunning_gs,
0215             "nRunning_runningcap": nRunning_runningcap,
0216             "nNotRun_rt": nNotRun_rt,
0217             "nNotRun_gs": nNotRun_gs,
0218             "nNotRun_queuelimit": nNotRun_queuelimit,
0219             "nNotRun_queuecap": nNotRun_queuecap,
0220             "nDefine_rt": nDefine_rt,
0221             "nDefine_gs": nDefine_gs,
0222             "nDefine_queuelimit": nDefine_queuelimit,
0223             "nDefine_queuecap": nDefine_queuecap,
0224             "nWaiting_rt": nWaiting_rt,
0225             "nWaiting_gs": nWaiting_gs,
0226         }
0227 
0228         return return_map
0229 
0230     # check if throttled
0231     def toBeThrottledBase(self, vo, prodSourceLabel, cloud_name, workQueue, resource_name, logger):
0232         workqueue_id = workQueue.getID()
0233         workqueue_name = "_".join(workQueue.queue_name.split(" "))
0234 
0235         # params
0236         nBunch = 4
0237         work_shortage = self.taskBufferIF.getConfigValue("core", "WORK_SHORTAGE", self.app, vo)
0238         if work_shortage is True:
0239             threshold = self.taskBufferIF.getConfigValue(self.comp_name, "THROTTLE_THRESHOLD_FOR_WORK_SHORTAGE", self.app, vo)
0240         else:
0241             threshold = self.taskBufferIF.getConfigValue(self.comp_name, "THROTTLE_THRESHOLD", self.app, vo)
0242         if threshold is None:
0243             threshold = 2.0
0244         nJobsInBunchMax = 600
0245         nJobsInBunchMin = 500
0246         minTotalWalltime = 50 * 1000 * 1000
0247         nWaitingLimit = 4
0248         nWaitingBunchLimit = 2
0249         nParallel = 2
0250         nParallelCap = 5
0251 
0252         # make logger
0253         tmp_log = MsgWrapper(logger)
0254         msg_header = f"{vo}:{prodSourceLabel} cloud={cloud_name} queue={workqueue_name} resource_type={resource_name}:"
0255         tmp_log.debug(f"{msg_header} start workqueue_id={workqueue_id} threshold={threshold}")
0256 
0257         # check if unthrottled
0258         if not workQueue.throttled:
0259             msg_body = "PASS unthrottled since GS_throttled is False"
0260             tmp_log.info(msg_header + " " + msg_body)
0261             return self.retUnThrottled
0262 
0263         # get central configuration values
0264         config_map = self.__getConfiguration(vo, workQueue.queue_name, resource_name)
0265         configQueueLimit = config_map[NQUEUELIMIT]["value"]
0266         configQueueLimitKey = config_map[NQUEUELIMIT]["key"]
0267         configQueueCap = config_map[NQUEUECAP]["value"]
0268         configQueueCapKey = config_map[NQUEUECAP]["key"]
0269         configRunningCap = config_map[NRUNNINGCAP]["value"]
0270         configRunningCapKey = config_map[NRUNNINGCAP]["key"]
0271 
0272         tmp_log.debug(
0273             msg_header + " got configuration configQueueLimit={} ({}), configQueueCap={} ({}),"
0274             " configRunningCap={} ({})".format(configQueueLimit, configQueueLimitKey, configQueueCap, configQueueCapKey, configRunningCap, configRunningCapKey)
0275         )
0276 
0277         # get the jobs statistics for our wq/gs and expand the stats map
0278         jobstats_map = self.__prepareJobStats(workQueue, resource_name, config_map)
0279 
0280         tmp_log.debug(f"jobstats_map: {jobstats_map}")
0281         nRunning_rt = jobstats_map["nRunning_rt"]
0282         nRunning_gs = jobstats_map["nRunning_gs"]
0283         nRunning_runningcap = jobstats_map["nRunning_runningcap"]
0284         nNotRun_rt = jobstats_map["nNotRun_rt"]
0285         nNotRun_gs = jobstats_map["nNotRun_gs"]
0286         nNotRun_queuelimit = jobstats_map["nNotRun_queuelimit"]
0287         nNotRun_queuecap = jobstats_map["nNotRun_queuecap"]
0288         nDefine_rt = jobstats_map["nDefine_rt"]
0289         nDefine_gs = jobstats_map["nDefine_gs"]
0290         nDefine_queuelimit = jobstats_map["nDefine_queuelimit"]
0291         nDefine_queuecap = jobstats_map["nDefine_queuecap"]
0292         nWaiting_rt = jobstats_map["nWaiting_rt"]
0293         nWaiting_gs = jobstats_map["nWaiting_gs"]
0294 
0295         # check if higher prio tasks are waiting
0296         if workQueue.queue_name in non_rt_wqs:
0297             # find highest priority of currently defined jobs
0298             tmpStat, highestPrioJobStat = self.taskBufferIF.getHighestPrioJobStat_JEDI(prodSourceLabel, cloud_name, workQueue)
0299             # the highest priority of waiting tasks
0300             highestPrioWaiting = self.taskBufferIF.checkWaitingTaskPrio_JEDI(vo, workQueue, prodSourceLabel, cloud_name)
0301         else:
0302             # find highest priority of currently defined jobs
0303             tmpStat, highestPrioJobStat = self.taskBufferIF.getHighestPrioJobStat_JEDI(prodSourceLabel, cloud_name, workQueue, resource_name)
0304             # the highest priority of waiting tasks
0305             highestPrioWaiting = self.taskBufferIF.checkWaitingTaskPrio_JEDI(vo, workQueue, prodSourceLabel, cloud_name, resource_name)
0306 
0307         highestPrioInPandaDB = highestPrioJobStat["highestPrio"]
0308         nNotRunHighestPrio = highestPrioJobStat["nNotRun"]
0309         if highestPrioWaiting is None:
0310             msg_body = "failed to get the highest priority of waiting tasks"
0311             tmp_log.error(f"{msg_header} {msg_body}")
0312             return self.retTmpError
0313 
0314         # high priority tasks are waiting
0315         highPrioQueued = False
0316         if prodSourceLabel != "user":
0317             if highestPrioWaiting > highestPrioInPandaDB or (highestPrioWaiting == highestPrioInPandaDB and nNotRunHighestPrio < nJobsInBunchMin):
0318                 highPrioQueued = True
0319         tmp_log.debug(
0320             "{0} highestPrio waiting:{1} inPanda:{2} numNotRun:{3} -> highPrioQueued={4}".format(
0321                 msg_header, highestPrioWaiting, highestPrioInPandaDB, nNotRunHighestPrio, highPrioQueued
0322             )
0323         )
0324         # set maximum number of jobs to be submitted
0325         if workQueue.queue_name in non_rt_wqs:
0326             tmpRemainingSlot = int(nRunning_gs * threshold - nNotRun_gs)
0327         else:
0328             tmpRemainingSlot = int(nRunning_rt * threshold - nNotRun_rt)
0329         # use the lower limit to avoid creating too many _sub/_dis datasets
0330         nJobsInBunch = min(max(nJobsInBunchMin, tmpRemainingSlot), nJobsInBunchMax)
0331 
0332         if configQueueLimit is not None:
0333             nQueueLimit = configQueueLimit
0334         else:
0335             nQueueLimit = nJobsInBunch * nBunch
0336 
0337         # use nPrestage for reprocessing
0338         if workQueue.queue_name in ["Heavy Ion", "Reprocessing default"]:
0339             # reset nJobsInBunch
0340             if nQueueLimit > (nNotRun_queuelimit + nDefine_queuelimit):
0341                 tmpRemainingSlot = nQueueLimit - (nNotRun_queuelimit + nDefine_queuelimit)
0342                 if tmpRemainingSlot > nJobsInBunch:
0343                     nJobsInBunch = min(tmpRemainingSlot, nJobsInBunchMax)
0344 
0345         # get cap
0346         # set number of jobs to be submitted
0347         if configQueueCap is None:
0348             self.setMaxNumJobs(nJobsInBunch // nParallel)
0349         else:
0350             self.setMaxNumJobs(configQueueCap // nParallelCap)
0351 
0352         # get total walltime
0353         totWalltime = self.taskBufferIF.getTotalWallTime_JEDI(vo, prodSourceLabel, workQueue, resource_name)
0354 
0355         # log the current situation and limits
0356         tmp_log.info(f"{msg_header} nQueueLimit={nQueueLimit} nRunCap={configRunningCap} nQueueCap={configQueueCap}")
0357         tmp_log.info(f"{msg_header} at global share level: nQueued={nNotRun_gs + nDefine_gs} nDefine={nDefine_gs} nRunning={nRunning_gs}")
0358         tmpMsg = ""
0359         if config_map[NQUEUECAP]["level"] == LEVEL_MS:
0360             tmpMsg = f"{msg_header} at MCORE/SCORE level: "
0361             tmpMsg += f"nQueued_ms={nNotRun_queuecap} "
0362         if config_map[NRUNNINGCAP]["level"] == LEVEL_MS:
0363             if not tmpMsg:
0364                 tmpMsg = f"{msg_header} at MCORE/SCORE level: "
0365             tmpMsg += f"nRunning_ms={nRunning_runningcap} "
0366         if tmpMsg:
0367             tmp_log.info(tmpMsg)
0368         tmp_log.info(
0369             "{0} at resource type level: nQueued_rt={1} nDefine_rt={2} nRunning_rt={3} totWalltime={4}".format(
0370                 msg_header, nNotRun_rt + nDefine_rt, nDefine_rt, nRunning_rt, totWalltime
0371             )
0372         )
0373 
0374         # check number of jobs when high priority jobs are not waiting. test jobs are sent without throttling
0375         limitPriority = False
0376         if (
0377             workQueue.queue_name not in non_rt_wqs
0378             and nRunning_rt == 0
0379             and (nNotRun_queuelimit + nDefine_queuelimit) > nQueueLimit
0380             and (totWalltime is None or totWalltime > minTotalWalltime)
0381         ):
0382             limitPriority = True
0383             if not highPrioQueued:
0384                 # pilot is not running or DDM has a problem
0385                 msg_body = "SKIP no running and enough nQueued_queuelimit={}>({}={}) & totWalltime({})>{} ".format(
0386                     nNotRun_queuelimit + nDefine_queuelimit, configQueueLimitKey, nQueueLimit, totWalltime, minTotalWalltime
0387                 )
0388                 tmp_log.info(f"{msg_header} {msg_body}")
0389                 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0390                 return self.retMergeUnThr
0391 
0392         elif workQueue.queue_name in non_rt_wqs and nRunning_gs == 0 and (nNotRun_queuelimit + nDefine_queuelimit) > nQueueLimit:
0393             limitPriority = True
0394             if not highPrioQueued:
0395                 # pilot is not running or DDM has a problem
0396                 msg_body = f"SKIP no running and enough nQueued_queuelimit={nNotRun_queuelimit + nDefine_queuelimit}>({configQueueLimitKey}={nQueueLimit})"
0397                 tmp_log.info(f"{msg_header} {msg_body}")
0398                 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0399                 return self.retMergeUnThr
0400 
0401         elif (
0402             workQueue.queue_name not in non_rt_wqs
0403             and nRunning_rt != 0
0404             and float(nNotRun_rt + nDefine_rt) / float(nRunning_rt) > threshold
0405             and (nNotRun_queuelimit + nDefine_queuelimit) > nQueueLimit
0406             and (totWalltime is None or totWalltime > minTotalWalltime)
0407         ):
0408             limitPriority = True
0409             if not highPrioQueued:
0410                 # enough jobs in Panda
0411                 msg_body = "SKIP nQueued_rt({})/nRunning_rt({})>{} & nQueued_queuelimit={}>({}={}) & totWalltime({})>{}".format(
0412                     nNotRun_rt + nDefine_rt,
0413                     nRunning_rt,
0414                     threshold,
0415                     nNotRun_queuelimit + nDefine_queuelimit,
0416                     configQueueLimitKey,
0417                     nQueueLimit,
0418                     totWalltime,
0419                     minTotalWalltime,
0420                 )
0421                 tmp_log.info(f"{msg_header} {msg_body}")
0422                 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0423                 return self.retMergeUnThr
0424 
0425         elif (
0426             workQueue.queue_name in non_rt_wqs
0427             and nRunning_gs != 0
0428             and float(nNotRun_gs + nDefine_gs) / float(nRunning_gs) > threshold
0429             and (nNotRun_queuelimit + nDefine_queuelimit) > nQueueLimit
0430         ):
0431             limitPriority = True
0432             if not highPrioQueued:
0433                 # enough jobs in Panda
0434                 msg_body = "SKIP nQueued_gs({})/nRunning_gs({})>{} & nQueued_queuelimit={}>({}={})".format(
0435                     nNotRun_gs + nDefine_gs, nRunning_gs, threshold, nNotRun_queuelimit + nDefine_queuelimit, configQueueLimitKey, nQueueLimit
0436                 )
0437                 tmp_log.info(f"{msg_header} {msg_body}")
0438                 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0439                 return self.retMergeUnThr
0440 
0441         elif nDefine_queuelimit > nQueueLimit:
0442             limitPriority = True
0443             if not highPrioQueued:
0444                 # brokerage is stuck
0445                 msg_body = f"SKIP too many nDefined_queuelimit={nDefine_queuelimit}>({configQueueLimitKey}={nQueueLimit})"
0446                 tmp_log.info(f"{msg_header} {msg_body}")
0447                 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0448                 return self.retMergeUnThr
0449 
0450         elif nWaiting_rt > max(nRunning_rt * nWaitingLimit, nJobsInBunch * nWaitingBunchLimit):
0451             limitPriority = True
0452             if not highPrioQueued:
0453                 # too many waiting
0454                 msg_body = f"SKIP too many nWaiting_rt({nWaiting_rt})>max(nRunning_rt({nRunning_rt})x{nWaitingLimit},{nJobsInBunch}x{nWaitingBunchLimit})"
0455                 tmp_log.info(f"{msg_header} {msg_body}")
0456                 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0457                 return self.retMergeUnThr
0458 
0459         elif configRunningCap and nRunning_runningcap > configRunningCap:
0460             # cap on running
0461             msg_body = f"SKIP nRunning_runningcap({nRunning_runningcap})>nRunningCap({configRunningCapKey}={configRunningCap})"
0462             tmp_log.info(f"{msg_header} {msg_body}")
0463             tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0464             return self.retMergeUnThr
0465 
0466         elif configQueueCap and nNotRun_queuecap + nDefine_queuecap > configQueueCap:
0467             limitPriority = True
0468             if not highPrioQueued:
0469                 # cap on queued
0470                 msg_body = f"SKIP nQueued_queuecap({nNotRun_queuecap + nDefine_queuecap})>nQueueCap({configQueueCapKey}={configQueueCap})"
0471                 tmp_log.info(f"{msg_header} {msg_body}")
0472                 tmp_log.sendMsg(f"{msg_header} {msg_body}", self.msgType, msgLevel="info", escapeChar=True)
0473                 return self.retMergeUnThr
0474 
0475         # get jobs from prodDB
0476         limitPriorityValue = None
0477         if limitPriority:
0478             limitPriorityValue = highestPrioWaiting
0479             self.setMinPriority(limitPriorityValue)
0480         else:
0481             # not enough jobs are queued
0482             if (
0483                 (nNotRun_queuelimit + nDefine_queuelimit < nQueueLimit * 0.9)
0484                 or (workQueue.queue_name in non_rt_wqs and nNotRun_gs + nDefine_gs < nRunning_gs)
0485                 or (workQueue.queue_name not in non_rt_wqs and nNotRun_rt + nDefine_rt < nRunning_rt)
0486             ):
0487                 tmp_log.debug(msg_header + " not enough jobs queued")
0488                 self.notEnoughJobsQueued()
0489                 self.setMaxNumJobs(max(self.maxNumJobs, nQueueLimit / 20))
0490 
0491         msg_body = f"PASS - not throttled since priority limit={limitPriorityValue} maxNumJobs={self.maxNumJobs}"
0492         tmp_log.info(msg_header + " " + msg_body)
0493         return self.retUnThrottled
0494 
0495 
0496 Interaction.installSC(JobThrottlerBase)