Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import copy
0002 import os
0003 import socket
0004 import sys
0005 import traceback
0006 
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008 
0009 from pandajedi.jedibrokerage import AtlasBrokerUtils
0010 from pandajedi.jediconfig import jedi_config
0011 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0012 from pandaserver.dataservice import DataServiceUtils
0013 from pandaserver.srvcore import CoreUtils
0014 from pandaserver.taskbuffer import JediTaskSpec, JobUtils
0015 
0016 from .JumboWatchDog import JumboWatchDog
0017 from .TypicalWatchDogBase import TypicalWatchDogBase
0018 
0019 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0020 
0021 
0022 # watchdog for ATLAS production
0023 class AtlasProdWatchDog(TypicalWatchDogBase):
0024     # constructor
0025     def __init__(self, taskBufferIF, ddmIF):
0026         TypicalWatchDogBase.__init__(self, taskBufferIF, ddmIF)
0027         self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-dog"
0028 
0029     # main
0030     def doAction(self):
0031         try:
0032             # get logger
0033             tmpLog = MsgWrapper(logger)
0034             tmpLog.debug("start")
0035 
0036             # actions based on task progress
0037             self.do_task_progress_based_actions(tmpLog)
0038 
0039             # action for reassign
0040             self.doActionForReassign(tmpLog)
0041 
0042             # action for throttled
0043             self.doActionForThrottled(tmpLog)
0044 
0045             # action for high prio pending
0046             for minPriority, timeoutVal in [
0047                 (950, 10),
0048                 (900, 30),
0049             ]:
0050                 self.doActionForHighPrioPending(tmpLog, minPriority, timeoutVal)
0051 
0052             # action to set scout job data w/o scouts
0053             self.doActionToSetScoutJobData(tmpLog)
0054 
0055             # action to throttle jobs in paused tasks
0056             self.doActionToThrottleJobInPausedTasks(tmpLog)
0057 
0058             # action for jumbo
0059             jumbo = JumboWatchDog(self.taskBufferIF, self.ddmIF, tmpLog, "atlas", "managed")
0060             jumbo.run()
0061 
0062             # action to provoke (mark files ready) data carousel tasks to start if DDM rules of input DS are done
0063             self.doActionToProvokeDCTasks(tmpLog)
0064 
0065         except Exception:
0066             errtype, errvalue = sys.exc_info()[:2]
0067             tmpLog.error(f"failed with {errtype.__name__}:{errvalue} {traceback.format_exc()}")
0068         # return
0069         tmpLog.debug("done")
0070         return self.SC_SUCCEEDED
0071 
0072     # actions based on task progress
0073     def do_task_progress_based_actions(self, g_tmp_log: MsgWrapper) -> None:
0074         """
0075         Take actions based on task progress
0076         1) boost priority of tasks which have finished >95% of input files and have <=100 remaining jobs to prio 900
0077         2) reassign such tasks to Express global share if not already
0078         3) pause tasks with high failure rate
0079 
0080         :param g_tmp_log: logger
0081         :return: None
0082         """
0083         # get work queue mapper
0084         workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0085         # get list of work queues
0086         workQueueList = workQueueMapper.getAlignedQueueList(self.vo, self.prodSourceLabel)
0087         resource_types = self.taskBufferIF.load_resource_types()
0088 
0089         # get config values
0090         failure_checker = "watchdog"
0091         max_job_failure_rate_key = f"MAX_JOB_FAILURE_RATE_TO_PAUSE_{self.prodSourceLabel}"
0092         max_job_failure_rate_base = self.taskBufferIF.getConfigValue(failure_checker, max_job_failure_rate_key, "jedi")
0093         max_hep_score_failure_rate_key = f"MAX_HEP_SCORE_FAILURE_RATE_TO_PAUSE_{self.prodSourceLabel}"
0094         max_hep_score_failure_rate_base = self.taskBufferIF.getConfigValue(failure_checker, max_hep_score_failure_rate_key, "jedi")
0095         min_jobs_to_pause_key = f"MIN_JOBS_TO_PAUSE_{self.prodSourceLabel}"
0096         min_jobs_to_pause = self.taskBufferIF.getConfigValue(failure_checker, min_jobs_to_pause_key, "jedi")
0097         if min_jobs_to_pause is None:
0098             min_jobs_to_pause = 1000
0099         min_remaining_jobs_to_pause_key = f"MIN_REMAINING_JOBS_TO_PAUSE_{self.prodSourceLabel}"
0100         min_remaining_jobs_to_pause = self.taskBufferIF.getConfigValue(failure_checker, min_remaining_jobs_to_pause_key, "jedi")
0101         if min_remaining_jobs_to_pause is None:
0102             min_remaining_jobs_to_pause = 100
0103 
0104         # loop over all work queues
0105         g_tmp_log.debug("start do_task_progress_based_actions")
0106         for workQueue in workQueueList:
0107             break_loop = False  # for special workqueues we only need to iterate once
0108             for resource_type in resource_types:
0109                 g_tmp_log.debug(f"start workQueue={workQueue.queue_name} resource_type={resource_type.resource_name}")
0110                 # get tasks to be boosted
0111                 if workQueue.is_global_share:
0112                     task_criteria = {"gshare": workQueue.queue_name, "resource_type": resource_type.resource_name}
0113                 else:
0114                     break_loop = True
0115                     task_criteria = {"workQueue_ID": workQueue.queue_id}
0116 
0117                 # define max failure rate
0118                 max_job_failure_rate = self.taskBufferIF.getConfigValue(failure_checker, f"{max_job_failure_rate_key}_{workQueue.queue_name}", "jedi")
0119                 if max_job_failure_rate is None:
0120                     max_job_failure_rate = max_job_failure_rate_base
0121                 max_hep_score_failure_rate = self.taskBufferIF.getConfigValue(
0122                     failure_checker, f"{max_hep_score_failure_rate_key}_{workQueue.queue_name}", "jedi"
0123                 )
0124                 if max_hep_score_failure_rate is None:
0125                     max_hep_score_failure_rate = max_hep_score_failure_rate_base
0126 
0127                 dataset_criteria = {"masterID": None, "type": ["input", "pseudo_input"]}
0128                 task_param_list = ["jediTaskID", "currentPriority", "parent_tid", "gshare", "requestType", "splitRule"]
0129                 dataset_param_list = ["nFiles", "nFilesUsed", "nFilesTobeUsed", "nFilesFinished", "nFilesFailed"]
0130                 taskVarList = self.taskBufferIF.getTasksWithCriteria_JEDI(
0131                     self.vo,
0132                     self.prodSourceLabel,
0133                     ["running"],
0134                     taskCriteria=task_criteria,
0135                     datasetCriteria=dataset_criteria,
0136                     taskParamList=task_param_list,
0137                     datasetParamList=dataset_param_list,
0138                     taskLockColumn="throttledTime",
0139                     taskLockInterval=20,
0140                 )
0141                 boostedPrio = 900
0142                 toBoostRatio = 0.95
0143                 for taskParam, datasetParam in taskVarList:
0144                     jediTaskID = taskParam["jediTaskID"]
0145                     currentPriority = taskParam["currentPriority"]
0146                     parent_tid = taskParam["parent_tid"]
0147                     gshare = taskParam["gshare"]
0148                     request_type = taskParam["requestType"]
0149                     split_rule = taskParam["splitRule"]
0150                     # check parent
0151                     parentState = None
0152                     if parent_tid not in [None, jediTaskID]:
0153                         parentState = self.taskBufferIF.checkParentTask_JEDI(parent_tid, jediTaskID)
0154                         if parentState != "completed":
0155                             g_tmp_log.info(
0156                                 f"#ATM #KV label={self.prodSourceLabel} jediTaskID={jediTaskID} skip prio boost since parent_id={parent_tid} has parent_status={parentState}"
0157                             )
0158                             continue
0159                     nFiles = datasetParam["nFiles"]
0160                     nFilesFinished = datasetParam["nFilesFinished"]
0161                     nFilesFailed = datasetParam["nFilesFailed"]
0162                     # get num jobs
0163                     nJobs = self.taskBufferIF.getNumJobsForTask_JEDI(jediTaskID)
0164                     nRemJobs = None
0165                     if nJobs is not None:
0166                         try:
0167                             nRemJobs = int(float(nFiles - nFilesFinished - nFilesFailed) * float(nJobs) / float(nFiles))
0168                         except Exception:
0169                             pass
0170                     failure_metrics = self.taskBufferIF.get_task_failure_metrics(jediTaskID)
0171 
0172                     tmpStr = f"jediTaskID={jediTaskID} nFiles={nFiles} nFilesFinishedFailed={nFilesFinished + nFilesFailed} "
0173                     tmpStr += f"nJobs={nJobs} nRemJobs={nRemJobs} parent_tid={parent_tid} parentStatus={parentState} failure_metrics={failure_metrics}"
0174                     g_tmp_log.debug(tmpStr)
0175 
0176                     try:
0177                         if nRemJobs is not None and float(nFilesFinished + nFilesFailed) / float(nFiles) >= toBoostRatio and nRemJobs <= 100:
0178                             # skip high enough
0179                             if currentPriority < boostedPrio:
0180                                 g_tmp_log.info(
0181                                     f" >>> action=priority_boosting of jediTaskID={jediTaskID} to priority={boostedPrio} #ATM #KV label={self.prodSourceLabel} "
0182                                 )
0183                                 self.taskBufferIF.changeTaskPriorityPanda(jediTaskID, boostedPrio)
0184 
0185                             # skip express or non global share
0186                             newShare = "Express"
0187                             newShareType = self.prodSourceLabel
0188                             if gshare != newShare and workQueue.is_global_share and workQueue.queue_type == newShareType:
0189                                 g_tmp_log.info(
0190                                     f" >>> action=gshare_reassignment jediTaskID={jediTaskID} from gshare_old={gshare} to gshare_new={newShare} #ATM #KV label={self.prodSourceLabel}"
0191                                 )
0192                                 self.taskBufferIF.reassignShare([jediTaskID], newShare, True)
0193                             g_tmp_log.debug(f"reassigned jediTaskID={jediTaskID}")
0194                     except Exception:
0195                         pass
0196 
0197                     # check failure rate
0198                     failure_key = None
0199                     bad_value = None
0200                     if JediTaskSpec.is_auto_pause_disabled(split_rule):
0201                         # auto pause disabled
0202                         pass
0203                     elif not failure_metrics:
0204                         # failure metrics is unavailable
0205                         pass
0206                     elif nJobs is None or nJobs < min_jobs_to_pause:
0207                         # total num jobs is small
0208                         pass
0209                     elif nRemJobs is None or nRemJobs < min_remaining_jobs_to_pause:
0210                         # remaining num jobs is small
0211                         pass
0212                     elif (
0213                         max_job_failure_rate is not None
0214                         and failure_metrics["single_failure_rate"] is not None
0215                         and failure_metrics["single_failure_rate"] >= max_job_failure_rate > 0
0216                     ):
0217                         # job failure rate is high
0218                         failure_key = "single_failure_rate"
0219                         bad_value = failure_metrics[failure_key]
0220                         g_tmp_log.info(
0221                             f" >>> action=pausing_high_failure_rate jediTaskID={jediTaskID} job_failure_rate={bad_value:.2f} "
0222                             f"(max_allowed={max_job_failure_rate}) #ATM #KV label={self.prodSourceLabel}"
0223                         )
0224                         self.taskBufferIF.sendCommandTaskPanda(
0225                             jediTaskID, f"{failure_checker} due to high job failure rate {bad_value:.2f} > {max_job_failure_rate}", True, "pause"
0226                         )
0227                     elif (
0228                         max_hep_score_failure_rate is not None
0229                         and failure_metrics["hep_score_failure_rate"] is not None
0230                         and failure_metrics["hep_score_failure_rate"] >= max_hep_score_failure_rate > 0
0231                     ):
0232                         # failed HEP score rate is high
0233                         failure_key = "hep_score_failure_rate"
0234                         bad_value = failure_metrics[failure_key]
0235                         g_tmp_log.info(
0236                             f" >>> action=pausing_high_failure_rate jediTaskID={jediTaskID} hep_score_failure_rate={bad_value:.2f} "
0237                             f"(max_allowed={max_hep_score_failure_rate}) #ATM #KV label={self.prodSourceLabel}"
0238                         )
0239                         self.taskBufferIF.sendCommandTaskPanda(
0240                             jediTaskID, f"{failure_checker} due to high HEP score failure rate {bad_value:.2f} > {max_hep_score_failure_rate}", True, "pause"
0241                         )
0242                     if failure_key is not None:
0243                         # get tasks with the same requestType to be paused
0244                         if request_type:
0245                             task_criteria = copy.copy(task_criteria)
0246                             task_criteria["requestType"] = request_type
0247                             tasks_with_same_request_type = self.taskBufferIF.getTasksWithCriteria_JEDI(
0248                                 self.vo,
0249                                 self.prodSourceLabel,
0250                                 ["running"],
0251                                 taskCriteria=task_criteria,
0252                                 taskParamList=["jediTaskID", "splitRule"],
0253                             )
0254                             for other_task_param, _ in tasks_with_same_request_type:
0255                                 other_jedi_task_id = other_task_param["jediTaskID"]
0256                                 # auto pause disabled
0257                                 if JediTaskSpec.is_auto_pause_disabled(other_jedi_task_id):
0258                                     continue
0259                                 if other_jedi_task_id != jediTaskID:
0260                                     g_tmp_log.info(
0261                                         f" >>> action=pausing_high_failure_rate jediTaskID={other_jedi_task_id} due to requestType={request_type} shared by {jediTaskID} "
0262                                         f"#ATM #KV label={self.prodSourceLabel}"
0263                                     )
0264                                     if failure_key == "single_failure_rate":
0265                                         msg = f"{failure_checker} due to high job failure rate in another task {jediTaskID} ({bad_value:.2f} > {max_job_failure_rate})"
0266                                     else:
0267                                         msg = f"{failure_checker} due to high HEP score failure rate in another task {jediTaskID} ({bad_value:.2f} > {max_hep_score_failure_rate})"
0268                                     self.taskBufferIF.sendCommandTaskPanda(
0269                                         other_jedi_task_id,
0270                                         msg,
0271                                         True,
0272                                         "pause",
0273                                     )
0274                         g_tmp_log.debug(f"paused jediTaskID={jediTaskID}")
0275 
0276                 if break_loop:
0277                     break
0278 
0279     # action for reassignment
0280     def doActionForReassign(self, gTmpLog):
0281         # get DDM I/F
0282         ddmIF = self.ddmIF.getInterface(self.vo)
0283         # get site mapper
0284         siteMapper = self.taskBufferIF.get_site_mapper()
0285         # get tasks to get reassigned
0286         taskList = self.taskBufferIF.getTasksToReassign_JEDI(self.vo, self.prodSourceLabel)
0287 
0288         gTmpLog.debug(f"got {len(taskList)} tasks to reassign")
0289         for taskSpec in taskList:
0290             tmpLog = MsgWrapper(logger, f"< jediTaskID={taskSpec.jediTaskID} >")
0291             tmpLog.debug("start to reassign")
0292             # DDM backend
0293             ddmBackEnd = taskSpec.getDdmBackEnd()
0294             # get datasets
0295             tmpStat, datasetSpecList = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.jediTaskID, ["output", "log"])
0296             if tmpStat is not True:
0297                 tmpLog.error("failed to get datasets")
0298                 continue
0299 
0300             # re-run task brokerage
0301             if taskSpec.nucleus in [None, ""]:
0302                 taskSpec.status = "assigning"
0303                 taskSpec.oldStatus = None
0304                 taskSpec.setToRegisterDatasets()
0305                 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}, setOldModTime=True)
0306                 tmpLog.debug(f"#ATM #KV label=managed action=trigger_new_brokerage by setting task_status={taskSpec.status}")
0307                 continue
0308 
0309             # get nucleus
0310             nucleusSpec = siteMapper.getNucleus(taskSpec.nucleus)
0311             if nucleusSpec is None:
0312                 tmpLog.error(f"nucleus={taskSpec.nucleus} doesn't exist")
0313                 continue
0314 
0315             # set nucleus
0316             retMap = {taskSpec.jediTaskID: AtlasBrokerUtils.getDictToSetNucleus(nucleusSpec, datasetSpecList)}
0317             self.taskBufferIF.setCloudToTasks_JEDI(retMap)
0318 
0319             # get nucleus
0320             t1SiteName = nucleusSpec.getOnePandaSite()
0321             t1Site = siteMapper.getSite(t1SiteName)
0322 
0323             # loop over all datasets
0324             isOK = True
0325             for datasetSpec in datasetSpecList:
0326                 tmpLog.debug(f"dataset={datasetSpec.datasetName}")
0327                 if DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None:
0328                     tmpLog.debug(f"skip {datasetSpec.datasetName} is distributed")
0329                     continue
0330                 # get location
0331                 location = siteMapper.getDdmEndpoint(
0332                     t1Site.sitename, datasetSpec.storageToken, taskSpec.prodSourceLabel, JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType)
0333                 )
0334                 # make subscription
0335                 try:
0336                     tmpLog.debug(f"registering subscription to {location} with backend={ddmBackEnd}")
0337                     tmpStat = ddmIF.registerDatasetSubscription(datasetSpec.datasetName, location, "Production Output", asynchronous=True)
0338                     if tmpStat is not True:
0339                         tmpLog.error("failed to make subscription")
0340                         isOK = False
0341                         break
0342                 except Exception:
0343                     errtype, errvalue = sys.exc_info()[:2]
0344                     tmpLog.warning(f"failed to make subscription with {errtype.__name__}:{errvalue}")
0345                     isOK = False
0346                     break
0347             # succeeded
0348             if isOK:
0349                 # activate task
0350                 if taskSpec.oldStatus in ["assigning", "exhausted", None]:
0351                     taskSpec.status = "ready"
0352                 else:
0353                     taskSpec.status = taskSpec.oldStatus
0354                 taskSpec.oldStatus = None
0355                 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}, setOldModTime=True)
0356                 tmpLog.debug("finished to reassign")
0357 
0358     # action for throttled tasks
0359     def doActionForThrottled(self, gTmpLog):
0360         # release tasks
0361         nTasks = self.taskBufferIF.releaseThrottledTasks_JEDI(self.vo, self.prodSourceLabel)
0362         gTmpLog.debug(f"released {nTasks} tasks")
0363 
0364         # throttle tasks
0365         nTasks = self.taskBufferIF.throttleTasks_JEDI(self.vo, self.prodSourceLabel, jedi_config.watchdog.waitForThrottled)
0366         gTmpLog.debug(f"throttled {nTasks} tasks")
0367 
0368     # action for high priority pending tasks
0369     def doActionForHighPrioPending(self, gTmpLog, minPriority, timeoutVal):
0370         timeoutForPending = None
0371         # try to get the timeout from the config files
0372         if hasattr(jedi_config.watchdog, "timeoutForPendingVoLabel"):
0373             timeoutForPending = CoreUtils.getConfigParam(jedi_config.watchdog.timeoutForPendingVoLabel, self.vo, self.prodSourceLabel)
0374         if timeoutForPending is None:
0375             timeoutForPending = jedi_config.watchdog.timeoutForPending
0376         timeoutForPending = int(timeoutForPending) * 24
0377         tmpRet, _ = self.taskBufferIF.reactivatePendingTasks_JEDI(self.vo, self.prodSourceLabel, timeoutVal, timeoutForPending, minPriority=minPriority)
0378         if tmpRet is None:
0379             # failed
0380             gTmpLog.error(f"failed to reactivate high priority (>{minPriority}) tasks")
0381         else:
0382             gTmpLog.info(f"reactivated high priority (>{minPriority}) {tmpRet} tasks")
0383 
0384     # action to throttle jobs in paused tasks
0385     def doActionToThrottleJobInPausedTasks(self, gTmpLog):
0386         tmpRet = self.taskBufferIF.throttleJobsInPausedTasks_JEDI(self.vo, self.prodSourceLabel)
0387         if tmpRet is None:
0388             # failed
0389             gTmpLog.error("failed to thottle jobs in paused tasks")
0390         else:
0391             for jediTaskID, pandaIDs in tmpRet.items():
0392                 gTmpLog.info(f"throttled jobs in paused jediTaskID={jediTaskID} successfully")
0393                 tmpRet = self.taskBufferIF.killJobs(pandaIDs, "reassign", "51", True)
0394                 gTmpLog.info(f"reassigned {len(pandaIDs)} jobs in paused jediTaskID={jediTaskID} with {tmpRet}")
0395 
0396     # action to provoke (mark files ready) data carousel tasks to start if DDM rules of input DS are done
0397     def doActionToProvokeDCTasks(self, gTmpLog):
0398         # lock
0399         got_lock = self.taskBufferIF.lockProcess_JEDI(
0400             vo=self.vo,
0401             prodSourceLabel=self.prodSourceLabel,
0402             cloud=None,
0403             workqueue_id=None,
0404             resource_name=None,
0405             component="AtlasProdWatchDog.doActionToProvokeDCT",
0406             pid=self.pid,
0407             timeLimit=30,
0408         )
0409         if not got_lock:
0410             gTmpLog.debug("doActionToProvokeDCTasks locked by another process. Skipped")
0411             return
0412         # run
0413         res_dict = self.taskBufferIF.get_pending_dc_tasks_JEDI(task_type="prod", time_limit_minutes=60)
0414         if res_dict is None:
0415             # failed
0416             gTmpLog.error("failed to get pending DC tasks")
0417         elif not res_dict:
0418             # empty
0419             gTmpLog.debug("no pending DC task; skipped")
0420         else:
0421             gTmpLog.debug(f"got {len(res_dict)} DC tasks to provoke")
0422             ddm_if = self.ddmIF.getInterface(self.vo)
0423             # loop over pending DC tasks
0424             for task_id, ds_name_list in res_dict.items():
0425                 if not ds_name_list:
0426                     continue
0427                 total_all_ok = True
0428                 for ds_name in ds_name_list:
0429                     ret_data = ddm_if.get_rules_state(ds_name)
0430                     try:
0431                         all_ok, rule_dict = ret_data
0432                         total_all_ok = total_all_ok and all_ok
0433                     except ValueError:
0434                         gTmpLog.error(f"failed to get rule info for task={task_id}. data={ret_data}")
0435                         total_all_ok = False
0436                         break
0437                 if total_all_ok:
0438                     # all rules ok; provoke the task
0439                     gTmpLog.info(f"provoking task {task_id}")
0440                     self.taskBufferIF.updateInputDatasetsStaged_JEDI(task_id, None, None, by="AtlasProdWatchDog")
0441                     gTmpLog.info(f"all staging rules of task {task_id} are OK; provoked")
0442                 else:
0443                     gTmpLog.debug(f"not all staging rules of task {task_id} are OK; skipped ")