File indexing completed on 2026-04-10 08:38:58
0001 import os
0002 import socket
0003 import sys
0004 import traceback
0005
0006 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0007
0008
0009
0010 class JumboWatchDog:
0011
0012 def __init__(self, taskBufferIF, ddmIF, log, vo, prodSourceLabel):
0013 self.taskBufferIF = taskBufferIF
0014 self.ddmIF = ddmIF
0015 self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-jumbo"
0016 self.log = log
0017 self.vo = vo
0018 self.prodSourceLabel = prodSourceLabel
0019 self.component = "JumboWatchDog"
0020 self.dryRun = True
0021
0022
0023 def run(self):
0024 try:
0025
0026 locked = self.taskBufferIF.lockProcess_JEDI(
0027 vo=self.vo,
0028 prodSourceLabel=self.prodSourceLabel,
0029 cloud=None,
0030 workqueue_id=None,
0031 resource_name=None,
0032 component=self.component,
0033 pid=self.pid,
0034 timeLimit=10,
0035 )
0036 if not locked:
0037 self.log.debug(f"component={self.component} skipped since locked by another")
0038 return
0039
0040 self.log.debug(f"component={self.component} start")
0041 maxTasks = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MAX_TASKS", "jedi", self.vo)
0042 if maxTasks is None:
0043 maxTasks = 1
0044 nEventsToDisable = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MIN_EVENTS_DISABLE", "jedi", self.vo)
0045 if nEventsToDisable is None:
0046 nEventsToDisable = 100000
0047 nEventsToEnable = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MIN_EVENTS_ENABLE", "jedi", self.vo)
0048 if nEventsToEnable is None:
0049 nEventsToEnable = nEventsToDisable * 10
0050 maxEvents = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MAX_EVENTS", "jedi", self.vo)
0051 if maxEvents is None:
0052 maxEvents = maxTasks * nEventsToEnable // 2
0053 nJumboPerTask = self.taskBufferIF.getConfigValue(self.component, "JUMBO_PER_TASK", "jedi", self.vo)
0054 if nJumboPerTask is None:
0055 nJumboPerTask = 1
0056 nJumboPerSite = self.taskBufferIF.getConfigValue(self.component, "JUMBO_PER_SITE", "jedi", self.vo)
0057 if nJumboPerSite is None:
0058 nJumboPerSite = 1
0059 maxPrio = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MAX_CURR_PRIO", "jedi", self.vo)
0060 if maxPrio is None:
0061 maxPrio = 500
0062 progressToBoost = self.taskBufferIF.getConfigValue(self.component, "JUMBO_PROG_TO_BOOST", "jedi", self.vo)
0063 if progressToBoost is None:
0064 progressToBoost = 95
0065 maxFilesToBoost = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MAX_FILES_TO_BOOST", "jedi", self.vo)
0066 if maxFilesToBoost is None:
0067 maxFilesToBoost = 500
0068 prioToBoost = 900
0069 prioWhenDisabled = self.taskBufferIF.getConfigValue(self.component, "JUMBO_PRIO_DISABLED", "jedi", self.vo)
0070 if prioWhenDisabled is None:
0071 prioWhenDisabled = 500
0072
0073 tasksWithJumbo = self.taskBufferIF.getTaskWithJumbo_JEDI(self.vo, self.prodSourceLabel)
0074 totEvents = 0
0075 doneEvents = 0
0076 nTasks = 0
0077 for jediTaskID, taskData in tasksWithJumbo.items():
0078
0079 if taskData["useJumbo"] != JediTaskSpec.enum_useJumbo["disabled"] and taskData["site"] is None:
0080 if taskData["nEvents"] - taskData["nEventsDone"] < nEventsToDisable:
0081
0082 self.log.info(
0083 "component={0} disable jumbo in jediTaskID={1} due to n_events_to_process={2} < {3}".format(
0084 self.component, jediTaskID, taskData["nEvents"] - taskData["nEventsDone"], nEventsToDisable
0085 )
0086 )
0087 self.taskBufferIF.enableJumboJobs(jediTaskID, 0, 0)
0088 else:
0089
0090 nTasks += 1
0091 totEvents += taskData["nEvents"]
0092 doneEvents += taskData["nEventsDone"]
0093 self.log.info(
0094 "component={0} keep jumbo in jediTaskID={1} due to n_events_to_process={2} > {3}".format(
0095 self.component, jediTaskID, taskData["nEvents"] - taskData["nEventsDone"], nEventsToDisable
0096 )
0097 )
0098
0099 if taskData["useJumbo"] == JediTaskSpec.enum_useJumbo["disabled"] and taskData["currentPriority"] < prioWhenDisabled:
0100 self.taskBufferIF.changeTaskPriorityPanda(jediTaskID, prioWhenDisabled)
0101 self.log.info(f"component={self.component} priority boost to {prioWhenDisabled} after disabing jumbo in in jediTaskID={jediTaskID}")
0102
0103 if (
0104 taskData["nEvents"] > 0
0105 and (taskData["nEvents"] - taskData["nEventsDone"]) * 100 // taskData["nEvents"] < progressToBoost
0106 and taskData["currentPriority"] < prioToBoost
0107 and (taskData["nFiles"] - taskData["nFilesDone"]) < maxFilesToBoost
0108 ):
0109
0110 tmpStr = "component={0} priority boost to {5} for jediTaskID={1} due to n_events_done={2} > {3}*{4}% ".format(
0111 self.component, jediTaskID, taskData["nEventsDone"], taskData["nEvents"], progressToBoost, prioToBoost
0112 )
0113 tmpStr += f"n_files_remaining={taskData['nFiles'] - taskData['nFilesDone']} < {maxFilesToBoost}"
0114 self.log.info(tmpStr)
0115 self.taskBufferIF.changeTaskPriorityPanda(jediTaskID, prioToBoost)
0116
0117 if taskData["taskStatus"] in ["pending", "running"] and taskData["useJumbo"] in [
0118 JediTaskSpec.enum_useJumbo["pending"],
0119 JediTaskSpec.enum_useJumbo["running"],
0120 ]:
0121 nActiveJumbo = 0
0122 for computingSite, jobStatusMap in taskData["jumboJobs"].items():
0123 for jobStatus, nJobs in jobStatusMap.items():
0124 if jobStatus in ["defined", "assigned", "activated", "sent", "starting", "running", "transferring", "holding"]:
0125 nActiveJumbo += nJobs
0126 if nActiveJumbo == 0:
0127 self.log.info(f"component={self.component} kick jumbo in {taskData['taskStatus']} jediTaskID={jediTaskID}")
0128 self.taskBufferIF.kickPendingTasksWithJumbo_JEDI(jediTaskID)
0129
0130 if taskData["currentPriority"] >= prioToBoost:
0131 nReset = self.taskBufferIF.resetInputToReGenCoJumbo_JEDI(jediTaskID)
0132 if nReset is not None and nReset > 0:
0133 self.log.info(f"component={self.component} reset {nReset} inputs to regenerate co-jumbo for jediTaskID={jediTaskID}")
0134 else:
0135 self.log.debug(f"component={self.component} tried to reset inputs to regenerate co-jumbo with {nReset} for jediTaskID={jediTaskID}")
0136 self.log.info(
0137 f"component={self.component} total_events={totEvents} n_events_to_process={totEvents - doneEvents} n_tasks={nTasks} available for jumbo"
0138 )
0139
0140 self.log.debug(f"component={self.component} done")
0141 except Exception:
0142
0143 errtype, errvalue = sys.exc_info()[:2]
0144 errStr = f": {errtype.__name__} {errvalue}"
0145 errStr.strip()
0146 errStr += traceback.format_exc()
0147 self.log.error(errStr)