Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import os
0002 import socket
0003 import sys
0004 import traceback
0005 
0006 from pandaserver.taskbuffer.JediTaskSpec import JediTaskSpec
0007 
0008 
0009 # watchdog to take actions for jumbo jobs
0010 class JumboWatchDog:
0011     # constructor
0012     def __init__(self, taskBufferIF, ddmIF, log, vo, prodSourceLabel):
0013         self.taskBufferIF = taskBufferIF
0014         self.ddmIF = ddmIF
0015         self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-jumbo"
0016         self.log = log
0017         self.vo = vo
0018         self.prodSourceLabel = prodSourceLabel
0019         self.component = "JumboWatchDog"
0020         self.dryRun = True
0021 
0022     # main
0023     def run(self):
0024         try:
0025             # get process lock
0026             locked = self.taskBufferIF.lockProcess_JEDI(
0027                 vo=self.vo,
0028                 prodSourceLabel=self.prodSourceLabel,
0029                 cloud=None,
0030                 workqueue_id=None,
0031                 resource_name=None,
0032                 component=self.component,
0033                 pid=self.pid,
0034                 timeLimit=10,
0035             )
0036             if not locked:
0037                 self.log.debug(f"component={self.component} skipped since locked by another")
0038                 return
0039             # get parameters for conversion
0040             self.log.debug(f"component={self.component} start")
0041             maxTasks = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MAX_TASKS", "jedi", self.vo)
0042             if maxTasks is None:
0043                 maxTasks = 1
0044             nEventsToDisable = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MIN_EVENTS_DISABLE", "jedi", self.vo)
0045             if nEventsToDisable is None:
0046                 nEventsToDisable = 100000
0047             nEventsToEnable = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MIN_EVENTS_ENABLE", "jedi", self.vo)
0048             if nEventsToEnable is None:
0049                 nEventsToEnable = nEventsToDisable * 10
0050             maxEvents = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MAX_EVENTS", "jedi", self.vo)
0051             if maxEvents is None:
0052                 maxEvents = maxTasks * nEventsToEnable // 2
0053             nJumboPerTask = self.taskBufferIF.getConfigValue(self.component, "JUMBO_PER_TASK", "jedi", self.vo)
0054             if nJumboPerTask is None:
0055                 nJumboPerTask = 1
0056             nJumboPerSite = self.taskBufferIF.getConfigValue(self.component, "JUMBO_PER_SITE", "jedi", self.vo)
0057             if nJumboPerSite is None:
0058                 nJumboPerSite = 1
0059             maxPrio = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MAX_CURR_PRIO", "jedi", self.vo)
0060             if maxPrio is None:
0061                 maxPrio = 500
0062             progressToBoost = self.taskBufferIF.getConfigValue(self.component, "JUMBO_PROG_TO_BOOST", "jedi", self.vo)
0063             if progressToBoost is None:
0064                 progressToBoost = 95
0065             maxFilesToBoost = self.taskBufferIF.getConfigValue(self.component, "JUMBO_MAX_FILES_TO_BOOST", "jedi", self.vo)
0066             if maxFilesToBoost is None:
0067                 maxFilesToBoost = 500
0068             prioToBoost = 900
0069             prioWhenDisabled = self.taskBufferIF.getConfigValue(self.component, "JUMBO_PRIO_DISABLED", "jedi", self.vo)
0070             if prioWhenDisabled is None:
0071                 prioWhenDisabled = 500
0072             # get current info
0073             tasksWithJumbo = self.taskBufferIF.getTaskWithJumbo_JEDI(self.vo, self.prodSourceLabel)
0074             totEvents = 0
0075             doneEvents = 0
0076             nTasks = 0
0077             for jediTaskID, taskData in tasksWithJumbo.items():
0078                 # disable jumbo
0079                 if taskData["useJumbo"] != JediTaskSpec.enum_useJumbo["disabled"] and taskData["site"] is None:
0080                     if taskData["nEvents"] - taskData["nEventsDone"] < nEventsToDisable:
0081                         # disable
0082                         self.log.info(
0083                             "component={0} disable jumbo in jediTaskID={1} due to n_events_to_process={2} < {3}".format(
0084                                 self.component, jediTaskID, taskData["nEvents"] - taskData["nEventsDone"], nEventsToDisable
0085                             )
0086                         )
0087                         self.taskBufferIF.enableJumboJobs(jediTaskID, 0, 0)
0088                     else:
0089                         # wait
0090                         nTasks += 1
0091                         totEvents += taskData["nEvents"]
0092                         doneEvents += taskData["nEventsDone"]
0093                         self.log.info(
0094                             "component={0} keep jumbo in jediTaskID={1} due to n_events_to_process={2} > {3}".format(
0095                                 self.component, jediTaskID, taskData["nEvents"] - taskData["nEventsDone"], nEventsToDisable
0096                             )
0097                         )
0098                 # increase priority for jumbo disabled
0099                 if taskData["useJumbo"] == JediTaskSpec.enum_useJumbo["disabled"] and taskData["currentPriority"] < prioWhenDisabled:
0100                     self.taskBufferIF.changeTaskPriorityPanda(jediTaskID, prioWhenDisabled)
0101                     self.log.info(f"component={self.component} priority boost to {prioWhenDisabled} after disabing jumbo in in jediTaskID={jediTaskID}")
0102                 # increase priority when close to completion
0103                 if (
0104                     taskData["nEvents"] > 0
0105                     and (taskData["nEvents"] - taskData["nEventsDone"]) * 100 // taskData["nEvents"] < progressToBoost
0106                     and taskData["currentPriority"] < prioToBoost
0107                     and (taskData["nFiles"] - taskData["nFilesDone"]) < maxFilesToBoost
0108                 ):
0109                     # boost
0110                     tmpStr = "component={0} priority boost to {5} for jediTaskID={1} due to n_events_done={2} > {3}*{4}% ".format(
0111                         self.component, jediTaskID, taskData["nEventsDone"], taskData["nEvents"], progressToBoost, prioToBoost
0112                     )
0113                     tmpStr += f"n_files_remaining={taskData['nFiles'] - taskData['nFilesDone']} < {maxFilesToBoost}"
0114                     self.log.info(tmpStr)
0115                     self.taskBufferIF.changeTaskPriorityPanda(jediTaskID, prioToBoost)
0116                 # kick pending
0117                 if taskData["taskStatus"] in ["pending", "running"] and taskData["useJumbo"] in [
0118                     JediTaskSpec.enum_useJumbo["pending"],
0119                     JediTaskSpec.enum_useJumbo["running"],
0120                 ]:
0121                     nActiveJumbo = 0
0122                     for computingSite, jobStatusMap in taskData["jumboJobs"].items():
0123                         for jobStatus, nJobs in jobStatusMap.items():
0124                             if jobStatus in ["defined", "assigned", "activated", "sent", "starting", "running", "transferring", "holding"]:
0125                                 nActiveJumbo += nJobs
0126                     if nActiveJumbo == 0:
0127                         self.log.info(f"component={self.component} kick jumbo in {taskData['taskStatus']} jediTaskID={jediTaskID}")
0128                         self.taskBufferIF.kickPendingTasksWithJumbo_JEDI(jediTaskID)
0129                 # reset input to re-generate co-jumbo
0130                 if taskData["currentPriority"] >= prioToBoost:
0131                     nReset = self.taskBufferIF.resetInputToReGenCoJumbo_JEDI(jediTaskID)
0132                     if nReset is not None and nReset > 0:
0133                         self.log.info(f"component={self.component} reset {nReset} inputs to regenerate co-jumbo for jediTaskID={jediTaskID}")
0134                     else:
0135                         self.log.debug(f"component={self.component} tried to reset inputs to regenerate co-jumbo with {nReset} for jediTaskID={jediTaskID}")
0136             self.log.info(
0137                 f"component={self.component} total_events={totEvents} n_events_to_process={totEvents - doneEvents} n_tasks={nTasks} available for jumbo"
0138             )
0139 
0140             self.log.debug(f"component={self.component} done")
0141         except Exception:
0142             # error
0143             errtype, errvalue = sys.exc_info()[:2]
0144             errStr = f": {errtype.__name__} {errvalue}"
0145             errStr.strip()
0146             errStr += traceback.format_exc()
0147             self.log.error(errStr)