File indexing completed on 2026-04-10 08:38:58
0001 import os
0002 import socket
0003
0004 from pandajedi.jediconfig import jedi_config
0005 from pandajedi.jedicore import Interaction
0006 from pandaserver.proxycache.token_cache import TokenCache
0007 from pandaserver.srvcore import CoreUtils
0008
0009 from .WatchDogBase import WatchDogBase
0010
0011
0012
0013 class TypicalWatchDogBase(WatchDogBase):
0014
0015 def pre_action(self, tmpLog, vo, prodSourceLabel, pid, *args, **kwargs):
0016
0017 tmpLog.info(f"rescue tasks with picked files for vo={vo} label={prodSourceLabel}")
0018 tmpRet = self.taskBufferIF.rescuePickedFiles_JEDI(vo, prodSourceLabel, jedi_config.watchdog.waitForPicked)
0019 if tmpRet is None:
0020
0021 tmpLog.error("failed to rescue")
0022 else:
0023 tmpLog.info(f"rescued {tmpRet} tasks")
0024
0025 tmpLog.info(f"reactivate pending tasks for vo={vo} label={prodSourceLabel}")
0026 timeoutForPending = self.taskBufferIF.getConfigValue("watchdog", f"PENDING_TIMEOUT_{prodSourceLabel}", "jedi", vo)
0027 if timeoutForPending is None:
0028 if hasattr(jedi_config.watchdog, "timeoutForPendingVoLabel"):
0029 timeoutForPending = CoreUtils.getConfigParam(jedi_config.watchdog.timeoutForPendingVoLabel, vo, prodSourceLabel)
0030 if timeoutForPending is None:
0031 timeoutForPending = jedi_config.watchdog.timeoutForPending
0032 timeoutForPending = int(timeoutForPending) * 24
0033 tmpRet, msg_driven_taskid_set = self.taskBufferIF.reactivatePendingTasks_JEDI(
0034 vo, prodSourceLabel, jedi_config.watchdog.waitForPending, timeoutForPending
0035 )
0036 if tmpRet is None:
0037
0038 tmpLog.error("failed to reactivate")
0039 else:
0040 tmpLog.info(f"reactivated {tmpRet} tasks")
0041 for jeditaskid in msg_driven_taskid_set:
0042 push_ret = self.taskBufferIF.push_task_trigger_message("jedi_job_generator", jeditaskid)
0043 if push_ret:
0044 tmpLog.debug(f"pushed trigger message to jedi_job_generator for jeditaskid={jeditaskid}")
0045 else:
0046 tmpLog.warning(f"failed to push trigger message to jedi_job_generator for jeditaskid={jeditaskid}")
0047
0048 tmpLog.info(f"unlock tasks for vo={vo} label={prodSourceLabel} host={socket.getfqdn().split('.')[0]} pgid={os.getpgrp()}")
0049 tmpRet = self.taskBufferIF.unlockTasks_JEDI(vo, prodSourceLabel, 10, socket.getfqdn().split(".")[0], os.getpgrp())
0050 if tmpRet is None:
0051
0052 tmpLog.error("failed to unlock")
0053 else:
0054 tmpLog.info(f"unlock {tmpRet} tasks")
0055
0056 tmpLog.info(f"unlock tasks for vo={vo} label={prodSourceLabel}")
0057 tmpRet = self.taskBufferIF.unlockTasks_JEDI(vo, prodSourceLabel, jedi_config.watchdog.waitForLocked)
0058 if tmpRet is None:
0059
0060 tmpLog.error("failed to unlock")
0061 else:
0062 tmpLog.info(f"unlock {tmpRet} tasks")
0063
0064 tmpLog.info(f"restart contents update for vo={vo} label={prodSourceLabel}")
0065 tmpRet, msg_driven_taskid_set = self.taskBufferIF.restartTasksForContentsUpdate_JEDI(vo, prodSourceLabel)
0066 if tmpRet is None:
0067
0068 tmpLog.error("failed to restart")
0069 else:
0070 tmpLog.info(f"restarted {tmpRet} tasks")
0071 for jeditaskid in msg_driven_taskid_set:
0072 push_ret = self.taskBufferIF.push_task_trigger_message("jedi_contents_feeder", jeditaskid)
0073 if push_ret:
0074 tmpLog.debug(f"pushed trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0075 else:
0076 tmpLog.warning(f"failed to push trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0077
0078 tmpLog.info(f"kick exhausted tasks for vo={vo} label={prodSourceLabel}")
0079 tmpRet = self.taskBufferIF.kickExhaustedTasks_JEDI(vo, prodSourceLabel, jedi_config.watchdog.waitForExhausted)
0080 if tmpRet is None:
0081
0082 tmpLog.error("failed to kick")
0083 else:
0084 tmpLog.info(f"kicked {tmpRet} tasks")
0085
0086 tmpLog.info(f"finish achieved tasks for vo={vo} label={prodSourceLabel}")
0087 tmpRet = self.taskBufferIF.getAchievedTasks_JEDI(vo, prodSourceLabel, jedi_config.watchdog.waitForAchieved)
0088 if tmpRet is None:
0089
0090 tmpLog.error("failed to finish")
0091 else:
0092 for jediTaskID in tmpRet:
0093 self.taskBufferIF.sendCommandTaskPanda(jediTaskID, "JEDI. Goal reached", True, "finish", comQualifier="soft")
0094 tmpLog.info(f"finished {tmpRet} tasks")
0095
0096 tmpLog.info(f"rescue unlocked tasks with picked files for vo={vo} label={prodSourceLabel}")
0097 tmpRet = self.taskBufferIF.rescueUnLockedTasksWithPicked_JEDI(vo, prodSourceLabel, 60, pid)
0098 if tmpRet is None:
0099
0100 tmpLog.error("failed to rescue unlocked tasks")
0101 else:
0102 tmpLog.info(f"rescue unlocked {tmpRet} tasks")
0103
0104 got_lock = self.get_process_lock("TypicalWatchDogBase.cache_tokens", timeLimit=5)
0105 if not got_lock:
0106 tmpLog.debug("locked by another watchdog process. Skipped to cache tokens")
0107 else:
0108 tmpLog.info(f"cache tokens")
0109 cacher = TokenCache()
0110 cacher.run()
0111
0112
0113 def doActionToSetScoutJobData(self, gTmpLog):
0114 tmpRet = self.taskBufferIF.setScoutJobDataToTasks_JEDI(self.vo, self.prodSourceLabel)
0115 if tmpRet is None:
0116
0117 gTmpLog.error("failed to set scout job data")
0118 else:
0119 gTmpLog.info("set scout job data successfully")
0120
0121
0122 Interaction.installSC(TypicalWatchDogBase)