File indexing completed on 2026-04-10 08:38:58
0001 import copy
0002 import os
0003 import socket
0004 import sys
0005 import traceback
0006
0007 from pandacommon.pandalogger.PandaLogger import PandaLogger
0008
0009 from pandajedi.jedibrokerage import AtlasBrokerUtils
0010 from pandajedi.jediconfig import jedi_config
0011 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0012 from pandaserver.dataservice import DataServiceUtils
0013 from pandaserver.srvcore import CoreUtils
0014 from pandaserver.taskbuffer import JediTaskSpec, JobUtils
0015
0016 from .JumboWatchDog import JumboWatchDog
0017 from .TypicalWatchDogBase import TypicalWatchDogBase
0018
0019 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0020
0021
0022
0023 class AtlasProdWatchDog(TypicalWatchDogBase):
0024
0025 def __init__(self, taskBufferIF, ddmIF):
0026 TypicalWatchDogBase.__init__(self, taskBufferIF, ddmIF)
0027 self.pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}-dog"
0028
0029
0030 def doAction(self):
0031 try:
0032
0033 tmpLog = MsgWrapper(logger)
0034 tmpLog.debug("start")
0035
0036
0037 self.do_task_progress_based_actions(tmpLog)
0038
0039
0040 self.doActionForReassign(tmpLog)
0041
0042
0043 self.doActionForThrottled(tmpLog)
0044
0045
0046 for minPriority, timeoutVal in [
0047 (950, 10),
0048 (900, 30),
0049 ]:
0050 self.doActionForHighPrioPending(tmpLog, minPriority, timeoutVal)
0051
0052
0053 self.doActionToSetScoutJobData(tmpLog)
0054
0055
0056 self.doActionToThrottleJobInPausedTasks(tmpLog)
0057
0058
0059 jumbo = JumboWatchDog(self.taskBufferIF, self.ddmIF, tmpLog, "atlas", "managed")
0060 jumbo.run()
0061
0062
0063 self.doActionToProvokeDCTasks(tmpLog)
0064
0065 except Exception:
0066 errtype, errvalue = sys.exc_info()[:2]
0067 tmpLog.error(f"failed with {errtype.__name__}:{errvalue} {traceback.format_exc()}")
0068
0069 tmpLog.debug("done")
0070 return self.SC_SUCCEEDED
0071
0072
0073 def do_task_progress_based_actions(self, g_tmp_log: MsgWrapper) -> None:
0074 """
0075 Take actions based on task progress
0076 1) boost priority of tasks which have finished >95% of input files and have <=100 remaining jobs to prio 900
0077 2) reassign such tasks to Express global share if not already
0078 3) pause tasks with high failure rate
0079
0080 :param g_tmp_log: logger
0081 :return: None
0082 """
0083
0084 workQueueMapper = self.taskBufferIF.getWorkQueueMap()
0085
0086 workQueueList = workQueueMapper.getAlignedQueueList(self.vo, self.prodSourceLabel)
0087 resource_types = self.taskBufferIF.load_resource_types()
0088
0089
0090 failure_checker = "watchdog"
0091 max_job_failure_rate_key = f"MAX_JOB_FAILURE_RATE_TO_PAUSE_{self.prodSourceLabel}"
0092 max_job_failure_rate_base = self.taskBufferIF.getConfigValue(failure_checker, max_job_failure_rate_key, "jedi")
0093 max_hep_score_failure_rate_key = f"MAX_HEP_SCORE_FAILURE_RATE_TO_PAUSE_{self.prodSourceLabel}"
0094 max_hep_score_failure_rate_base = self.taskBufferIF.getConfigValue(failure_checker, max_hep_score_failure_rate_key, "jedi")
0095 min_jobs_to_pause_key = f"MIN_JOBS_TO_PAUSE_{self.prodSourceLabel}"
0096 min_jobs_to_pause = self.taskBufferIF.getConfigValue(failure_checker, min_jobs_to_pause_key, "jedi")
0097 if min_jobs_to_pause is None:
0098 min_jobs_to_pause = 1000
0099 min_remaining_jobs_to_pause_key = f"MIN_REMAINING_JOBS_TO_PAUSE_{self.prodSourceLabel}"
0100 min_remaining_jobs_to_pause = self.taskBufferIF.getConfigValue(failure_checker, min_remaining_jobs_to_pause_key, "jedi")
0101 if min_remaining_jobs_to_pause is None:
0102 min_remaining_jobs_to_pause = 100
0103
0104
0105 g_tmp_log.debug("start do_task_progress_based_actions")
0106 for workQueue in workQueueList:
0107 break_loop = False
0108 for resource_type in resource_types:
0109 g_tmp_log.debug(f"start workQueue={workQueue.queue_name} resource_type={resource_type.resource_name}")
0110
0111 if workQueue.is_global_share:
0112 task_criteria = {"gshare": workQueue.queue_name, "resource_type": resource_type.resource_name}
0113 else:
0114 break_loop = True
0115 task_criteria = {"workQueue_ID": workQueue.queue_id}
0116
0117
0118 max_job_failure_rate = self.taskBufferIF.getConfigValue(failure_checker, f"{max_job_failure_rate_key}_{workQueue.queue_name}", "jedi")
0119 if max_job_failure_rate is None:
0120 max_job_failure_rate = max_job_failure_rate_base
0121 max_hep_score_failure_rate = self.taskBufferIF.getConfigValue(
0122 failure_checker, f"{max_hep_score_failure_rate_key}_{workQueue.queue_name}", "jedi"
0123 )
0124 if max_hep_score_failure_rate is None:
0125 max_hep_score_failure_rate = max_hep_score_failure_rate_base
0126
0127 dataset_criteria = {"masterID": None, "type": ["input", "pseudo_input"]}
0128 task_param_list = ["jediTaskID", "currentPriority", "parent_tid", "gshare", "requestType", "splitRule"]
0129 dataset_param_list = ["nFiles", "nFilesUsed", "nFilesTobeUsed", "nFilesFinished", "nFilesFailed"]
0130 taskVarList = self.taskBufferIF.getTasksWithCriteria_JEDI(
0131 self.vo,
0132 self.prodSourceLabel,
0133 ["running"],
0134 taskCriteria=task_criteria,
0135 datasetCriteria=dataset_criteria,
0136 taskParamList=task_param_list,
0137 datasetParamList=dataset_param_list,
0138 taskLockColumn="throttledTime",
0139 taskLockInterval=20,
0140 )
0141 boostedPrio = 900
0142 toBoostRatio = 0.95
0143 for taskParam, datasetParam in taskVarList:
0144 jediTaskID = taskParam["jediTaskID"]
0145 currentPriority = taskParam["currentPriority"]
0146 parent_tid = taskParam["parent_tid"]
0147 gshare = taskParam["gshare"]
0148 request_type = taskParam["requestType"]
0149 split_rule = taskParam["splitRule"]
0150
0151 parentState = None
0152 if parent_tid not in [None, jediTaskID]:
0153 parentState = self.taskBufferIF.checkParentTask_JEDI(parent_tid, jediTaskID)
0154 if parentState != "completed":
0155 g_tmp_log.info(
0156 f"#ATM #KV label={self.prodSourceLabel} jediTaskID={jediTaskID} skip prio boost since parent_id={parent_tid} has parent_status={parentState}"
0157 )
0158 continue
0159 nFiles = datasetParam["nFiles"]
0160 nFilesFinished = datasetParam["nFilesFinished"]
0161 nFilesFailed = datasetParam["nFilesFailed"]
0162
0163 nJobs = self.taskBufferIF.getNumJobsForTask_JEDI(jediTaskID)
0164 nRemJobs = None
0165 if nJobs is not None:
0166 try:
0167 nRemJobs = int(float(nFiles - nFilesFinished - nFilesFailed) * float(nJobs) / float(nFiles))
0168 except Exception:
0169 pass
0170 failure_metrics = self.taskBufferIF.get_task_failure_metrics(jediTaskID)
0171
0172 tmpStr = f"jediTaskID={jediTaskID} nFiles={nFiles} nFilesFinishedFailed={nFilesFinished + nFilesFailed} "
0173 tmpStr += f"nJobs={nJobs} nRemJobs={nRemJobs} parent_tid={parent_tid} parentStatus={parentState} failure_metrics={failure_metrics}"
0174 g_tmp_log.debug(tmpStr)
0175
0176 try:
0177 if nRemJobs is not None and float(nFilesFinished + nFilesFailed) / float(nFiles) >= toBoostRatio and nRemJobs <= 100:
0178
0179 if currentPriority < boostedPrio:
0180 g_tmp_log.info(
0181 f" >>> action=priority_boosting of jediTaskID={jediTaskID} to priority={boostedPrio} #ATM #KV label={self.prodSourceLabel} "
0182 )
0183 self.taskBufferIF.changeTaskPriorityPanda(jediTaskID, boostedPrio)
0184
0185
0186 newShare = "Express"
0187 newShareType = self.prodSourceLabel
0188 if gshare != newShare and workQueue.is_global_share and workQueue.queue_type == newShareType:
0189 g_tmp_log.info(
0190 f" >>> action=gshare_reassignment jediTaskID={jediTaskID} from gshare_old={gshare} to gshare_new={newShare} #ATM #KV label={self.prodSourceLabel}"
0191 )
0192 self.taskBufferIF.reassignShare([jediTaskID], newShare, True)
0193 g_tmp_log.debug(f"reassigned jediTaskID={jediTaskID}")
0194 except Exception:
0195 pass
0196
0197
0198 failure_key = None
0199 bad_value = None
0200 if JediTaskSpec.is_auto_pause_disabled(split_rule):
0201
0202 pass
0203 elif not failure_metrics:
0204
0205 pass
0206 elif nJobs is None or nJobs < min_jobs_to_pause:
0207
0208 pass
0209 elif nRemJobs is None or nRemJobs < min_remaining_jobs_to_pause:
0210
0211 pass
0212 elif (
0213 max_job_failure_rate is not None
0214 and failure_metrics["single_failure_rate"] is not None
0215 and failure_metrics["single_failure_rate"] >= max_job_failure_rate > 0
0216 ):
0217
0218 failure_key = "single_failure_rate"
0219 bad_value = failure_metrics[failure_key]
0220 g_tmp_log.info(
0221 f" >>> action=pausing_high_failure_rate jediTaskID={jediTaskID} job_failure_rate={bad_value:.2f} "
0222 f"(max_allowed={max_job_failure_rate}) #ATM #KV label={self.prodSourceLabel}"
0223 )
0224 self.taskBufferIF.sendCommandTaskPanda(
0225 jediTaskID, f"{failure_checker} due to high job failure rate {bad_value:.2f} > {max_job_failure_rate}", True, "pause"
0226 )
0227 elif (
0228 max_hep_score_failure_rate is not None
0229 and failure_metrics["hep_score_failure_rate"] is not None
0230 and failure_metrics["hep_score_failure_rate"] >= max_hep_score_failure_rate > 0
0231 ):
0232
0233 failure_key = "hep_score_failure_rate"
0234 bad_value = failure_metrics[failure_key]
0235 g_tmp_log.info(
0236 f" >>> action=pausing_high_failure_rate jediTaskID={jediTaskID} hep_score_failure_rate={bad_value:.2f} "
0237 f"(max_allowed={max_hep_score_failure_rate}) #ATM #KV label={self.prodSourceLabel}"
0238 )
0239 self.taskBufferIF.sendCommandTaskPanda(
0240 jediTaskID, f"{failure_checker} due to high HEP score failure rate {bad_value:.2f} > {max_hep_score_failure_rate}", True, "pause"
0241 )
0242 if failure_key is not None:
0243
0244 if request_type:
0245 task_criteria = copy.copy(task_criteria)
0246 task_criteria["requestType"] = request_type
0247 tasks_with_same_request_type = self.taskBufferIF.getTasksWithCriteria_JEDI(
0248 self.vo,
0249 self.prodSourceLabel,
0250 ["running"],
0251 taskCriteria=task_criteria,
0252 taskParamList=["jediTaskID", "splitRule"],
0253 )
0254 for other_task_param, _ in tasks_with_same_request_type:
0255 other_jedi_task_id = other_task_param["jediTaskID"]
0256
0257 if JediTaskSpec.is_auto_pause_disabled(other_jedi_task_id):
0258 continue
0259 if other_jedi_task_id != jediTaskID:
0260 g_tmp_log.info(
0261 f" >>> action=pausing_high_failure_rate jediTaskID={other_jedi_task_id} due to requestType={request_type} shared by {jediTaskID} "
0262 f"#ATM #KV label={self.prodSourceLabel}"
0263 )
0264 if failure_key == "single_failure_rate":
0265 msg = f"{failure_checker} due to high job failure rate in another task {jediTaskID} ({bad_value:.2f} > {max_job_failure_rate})"
0266 else:
0267 msg = f"{failure_checker} due to high HEP score failure rate in another task {jediTaskID} ({bad_value:.2f} > {max_hep_score_failure_rate})"
0268 self.taskBufferIF.sendCommandTaskPanda(
0269 other_jedi_task_id,
0270 msg,
0271 True,
0272 "pause",
0273 )
0274 g_tmp_log.debug(f"paused jediTaskID={jediTaskID}")
0275
0276 if break_loop:
0277 break
0278
0279
0280 def doActionForReassign(self, gTmpLog):
0281
0282 ddmIF = self.ddmIF.getInterface(self.vo)
0283
0284 siteMapper = self.taskBufferIF.get_site_mapper()
0285
0286 taskList = self.taskBufferIF.getTasksToReassign_JEDI(self.vo, self.prodSourceLabel)
0287
0288 gTmpLog.debug(f"got {len(taskList)} tasks to reassign")
0289 for taskSpec in taskList:
0290 tmpLog = MsgWrapper(logger, f"< jediTaskID={taskSpec.jediTaskID} >")
0291 tmpLog.debug("start to reassign")
0292
0293 ddmBackEnd = taskSpec.getDdmBackEnd()
0294
0295 tmpStat, datasetSpecList = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.jediTaskID, ["output", "log"])
0296 if tmpStat is not True:
0297 tmpLog.error("failed to get datasets")
0298 continue
0299
0300
0301 if taskSpec.nucleus in [None, ""]:
0302 taskSpec.status = "assigning"
0303 taskSpec.oldStatus = None
0304 taskSpec.setToRegisterDatasets()
0305 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}, setOldModTime=True)
0306 tmpLog.debug(f"#ATM #KV label=managed action=trigger_new_brokerage by setting task_status={taskSpec.status}")
0307 continue
0308
0309
0310 nucleusSpec = siteMapper.getNucleus(taskSpec.nucleus)
0311 if nucleusSpec is None:
0312 tmpLog.error(f"nucleus={taskSpec.nucleus} doesn't exist")
0313 continue
0314
0315
0316 retMap = {taskSpec.jediTaskID: AtlasBrokerUtils.getDictToSetNucleus(nucleusSpec, datasetSpecList)}
0317 self.taskBufferIF.setCloudToTasks_JEDI(retMap)
0318
0319
0320 t1SiteName = nucleusSpec.getOnePandaSite()
0321 t1Site = siteMapper.getSite(t1SiteName)
0322
0323
0324 isOK = True
0325 for datasetSpec in datasetSpecList:
0326 tmpLog.debug(f"dataset={datasetSpec.datasetName}")
0327 if DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None:
0328 tmpLog.debug(f"skip {datasetSpec.datasetName} is distributed")
0329 continue
0330
0331 location = siteMapper.getDdmEndpoint(
0332 t1Site.sitename, datasetSpec.storageToken, taskSpec.prodSourceLabel, JobUtils.translate_tasktype_to_jobtype(taskSpec.taskType)
0333 )
0334
0335 try:
0336 tmpLog.debug(f"registering subscription to {location} with backend={ddmBackEnd}")
0337 tmpStat = ddmIF.registerDatasetSubscription(datasetSpec.datasetName, location, "Production Output", asynchronous=True)
0338 if tmpStat is not True:
0339 tmpLog.error("failed to make subscription")
0340 isOK = False
0341 break
0342 except Exception:
0343 errtype, errvalue = sys.exc_info()[:2]
0344 tmpLog.warning(f"failed to make subscription with {errtype.__name__}:{errvalue}")
0345 isOK = False
0346 break
0347
0348 if isOK:
0349
0350 if taskSpec.oldStatus in ["assigning", "exhausted", None]:
0351 taskSpec.status = "ready"
0352 else:
0353 taskSpec.status = taskSpec.oldStatus
0354 taskSpec.oldStatus = None
0355 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}, setOldModTime=True)
0356 tmpLog.debug("finished to reassign")
0357
0358
0359 def doActionForThrottled(self, gTmpLog):
0360
0361 nTasks = self.taskBufferIF.releaseThrottledTasks_JEDI(self.vo, self.prodSourceLabel)
0362 gTmpLog.debug(f"released {nTasks} tasks")
0363
0364
0365 nTasks = self.taskBufferIF.throttleTasks_JEDI(self.vo, self.prodSourceLabel, jedi_config.watchdog.waitForThrottled)
0366 gTmpLog.debug(f"throttled {nTasks} tasks")
0367
0368
0369 def doActionForHighPrioPending(self, gTmpLog, minPriority, timeoutVal):
0370 timeoutForPending = None
0371
0372 if hasattr(jedi_config.watchdog, "timeoutForPendingVoLabel"):
0373 timeoutForPending = CoreUtils.getConfigParam(jedi_config.watchdog.timeoutForPendingVoLabel, self.vo, self.prodSourceLabel)
0374 if timeoutForPending is None:
0375 timeoutForPending = jedi_config.watchdog.timeoutForPending
0376 timeoutForPending = int(timeoutForPending) * 24
0377 tmpRet, _ = self.taskBufferIF.reactivatePendingTasks_JEDI(self.vo, self.prodSourceLabel, timeoutVal, timeoutForPending, minPriority=minPriority)
0378 if tmpRet is None:
0379
0380 gTmpLog.error(f"failed to reactivate high priority (>{minPriority}) tasks")
0381 else:
0382 gTmpLog.info(f"reactivated high priority (>{minPriority}) {tmpRet} tasks")
0383
0384
0385 def doActionToThrottleJobInPausedTasks(self, gTmpLog):
0386 tmpRet = self.taskBufferIF.throttleJobsInPausedTasks_JEDI(self.vo, self.prodSourceLabel)
0387 if tmpRet is None:
0388
0389 gTmpLog.error("failed to thottle jobs in paused tasks")
0390 else:
0391 for jediTaskID, pandaIDs in tmpRet.items():
0392 gTmpLog.info(f"throttled jobs in paused jediTaskID={jediTaskID} successfully")
0393 tmpRet = self.taskBufferIF.killJobs(pandaIDs, "reassign", "51", True)
0394 gTmpLog.info(f"reassigned {len(pandaIDs)} jobs in paused jediTaskID={jediTaskID} with {tmpRet}")
0395
0396
0397 def doActionToProvokeDCTasks(self, gTmpLog):
0398
0399 got_lock = self.taskBufferIF.lockProcess_JEDI(
0400 vo=self.vo,
0401 prodSourceLabel=self.prodSourceLabel,
0402 cloud=None,
0403 workqueue_id=None,
0404 resource_name=None,
0405 component="AtlasProdWatchDog.doActionToProvokeDCT",
0406 pid=self.pid,
0407 timeLimit=30,
0408 )
0409 if not got_lock:
0410 gTmpLog.debug("doActionToProvokeDCTasks locked by another process. Skipped")
0411 return
0412
0413 res_dict = self.taskBufferIF.get_pending_dc_tasks_JEDI(task_type="prod", time_limit_minutes=60)
0414 if res_dict is None:
0415
0416 gTmpLog.error("failed to get pending DC tasks")
0417 elif not res_dict:
0418
0419 gTmpLog.debug("no pending DC task; skipped")
0420 else:
0421 gTmpLog.debug(f"got {len(res_dict)} DC tasks to provoke")
0422 ddm_if = self.ddmIF.getInterface(self.vo)
0423
0424 for task_id, ds_name_list in res_dict.items():
0425 if not ds_name_list:
0426 continue
0427 total_all_ok = True
0428 for ds_name in ds_name_list:
0429 ret_data = ddm_if.get_rules_state(ds_name)
0430 try:
0431 all_ok, rule_dict = ret_data
0432 total_all_ok = total_all_ok and all_ok
0433 except ValueError:
0434 gTmpLog.error(f"failed to get rule info for task={task_id}. data={ret_data}")
0435 total_all_ok = False
0436 break
0437 if total_all_ok:
0438
0439 gTmpLog.info(f"provoking task {task_id}")
0440 self.taskBufferIF.updateInputDatasetsStaged_JEDI(task_id, None, None, by="AtlasProdWatchDog")
0441 gTmpLog.info(f"all staging rules of task {task_id} are OK; provoked")
0442 else:
0443 gTmpLog.debug(f"not all staging rules of task {task_id} are OK; skipped ")