Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import os
0002 import socket
0003 
0004 from pandajedi.jediconfig import jedi_config
0005 from pandajedi.jedicore import Interaction
0006 from pandaserver.proxycache.token_cache import TokenCache
0007 from pandaserver.srvcore import CoreUtils
0008 
0009 from .WatchDogBase import WatchDogBase
0010 
0011 
0012 # base class for typical watchdog (for production and analysis, etc.)
0013 class TypicalWatchDogBase(WatchDogBase):
0014     # pre-action
0015     def pre_action(self, tmpLog, vo, prodSourceLabel, pid, *args, **kwargs):
0016         # rescue picked files
0017         tmpLog.info(f"rescue tasks with picked files for vo={vo} label={prodSourceLabel}")
0018         tmpRet = self.taskBufferIF.rescuePickedFiles_JEDI(vo, prodSourceLabel, jedi_config.watchdog.waitForPicked)
0019         if tmpRet is None:
0020             # failed
0021             tmpLog.error("failed to rescue")
0022         else:
0023             tmpLog.info(f"rescued {tmpRet} tasks")
0024         # reactivate pending tasks
0025         tmpLog.info(f"reactivate pending tasks for vo={vo} label={prodSourceLabel}")
0026         timeoutForPending = self.taskBufferIF.getConfigValue("watchdog", f"PENDING_TIMEOUT_{prodSourceLabel}", "jedi", vo)
0027         if timeoutForPending is None:
0028             if hasattr(jedi_config.watchdog, "timeoutForPendingVoLabel"):
0029                 timeoutForPending = CoreUtils.getConfigParam(jedi_config.watchdog.timeoutForPendingVoLabel, vo, prodSourceLabel)
0030             if timeoutForPending is None:
0031                 timeoutForPending = jedi_config.watchdog.timeoutForPending
0032             timeoutForPending = int(timeoutForPending) * 24
0033         tmpRet, msg_driven_taskid_set = self.taskBufferIF.reactivatePendingTasks_JEDI(
0034             vo, prodSourceLabel, jedi_config.watchdog.waitForPending, timeoutForPending
0035         )
0036         if tmpRet is None:
0037             # failed
0038             tmpLog.error("failed to reactivate")
0039         else:
0040             tmpLog.info(f"reactivated {tmpRet} tasks")
0041             for jeditaskid in msg_driven_taskid_set:
0042                 push_ret = self.taskBufferIF.push_task_trigger_message("jedi_job_generator", jeditaskid)
0043                 if push_ret:
0044                     tmpLog.debug(f"pushed trigger message to jedi_job_generator for jeditaskid={jeditaskid}")
0045                 else:
0046                     tmpLog.warning(f"failed to push trigger message to jedi_job_generator for jeditaskid={jeditaskid}")
0047         # unlock tasks
0048         tmpLog.info(f"unlock tasks for vo={vo} label={prodSourceLabel} host={socket.getfqdn().split('.')[0]} pgid={os.getpgrp()}")
0049         tmpRet = self.taskBufferIF.unlockTasks_JEDI(vo, prodSourceLabel, 10, socket.getfqdn().split(".")[0], os.getpgrp())
0050         if tmpRet is None:
0051             # failed
0052             tmpLog.error("failed to unlock")
0053         else:
0054             tmpLog.info(f"unlock {tmpRet} tasks")
0055         # unlock tasks
0056         tmpLog.info(f"unlock tasks for vo={vo} label={prodSourceLabel}")
0057         tmpRet = self.taskBufferIF.unlockTasks_JEDI(vo, prodSourceLabel, jedi_config.watchdog.waitForLocked)
0058         if tmpRet is None:
0059             # failed
0060             tmpLog.error("failed to unlock")
0061         else:
0062             tmpLog.info(f"unlock {tmpRet} tasks")
0063         # restart contents update
0064         tmpLog.info(f"restart contents update for vo={vo} label={prodSourceLabel}")
0065         tmpRet, msg_driven_taskid_set = self.taskBufferIF.restartTasksForContentsUpdate_JEDI(vo, prodSourceLabel)
0066         if tmpRet is None:
0067             # failed
0068             tmpLog.error("failed to restart")
0069         else:
0070             tmpLog.info(f"restarted {tmpRet} tasks")
0071             for jeditaskid in msg_driven_taskid_set:
0072                 push_ret = self.taskBufferIF.push_task_trigger_message("jedi_contents_feeder", jeditaskid)
0073                 if push_ret:
0074                     tmpLog.debug(f"pushed trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0075                 else:
0076                     tmpLog.warning(f"failed to push trigger message to jedi_contents_feeder for jeditaskid={jeditaskid}")
0077         # kick exhausted tasks
0078         tmpLog.info(f"kick exhausted tasks for vo={vo} label={prodSourceLabel}")
0079         tmpRet = self.taskBufferIF.kickExhaustedTasks_JEDI(vo, prodSourceLabel, jedi_config.watchdog.waitForExhausted)
0080         if tmpRet is None:
0081             # failed
0082             tmpLog.error("failed to kick")
0083         else:
0084             tmpLog.info(f"kicked {tmpRet} tasks")
0085         # finish tasks when goal is reached
0086         tmpLog.info(f"finish achieved tasks for vo={vo} label={prodSourceLabel}")
0087         tmpRet = self.taskBufferIF.getAchievedTasks_JEDI(vo, prodSourceLabel, jedi_config.watchdog.waitForAchieved)
0088         if tmpRet is None:
0089             # failed
0090             tmpLog.error("failed to finish")
0091         else:
0092             for jediTaskID in tmpRet:
0093                 self.taskBufferIF.sendCommandTaskPanda(jediTaskID, "JEDI. Goal reached", True, "finish", comQualifier="soft")
0094             tmpLog.info(f"finished {tmpRet} tasks")
0095         # rescue unlocked tasks with picked files
0096         tmpLog.info(f"rescue unlocked tasks with picked files for vo={vo} label={prodSourceLabel}")
0097         tmpRet = self.taskBufferIF.rescueUnLockedTasksWithPicked_JEDI(vo, prodSourceLabel, 60, pid)
0098         if tmpRet is None:
0099             # failed
0100             tmpLog.error("failed to rescue unlocked tasks")
0101         else:
0102             tmpLog.info(f"rescue unlocked {tmpRet} tasks")
0103         # cache tokens
0104         got_lock = self.get_process_lock("TypicalWatchDogBase.cache_tokens", timeLimit=5)
0105         if not got_lock:
0106             tmpLog.debug("locked by another watchdog process. Skipped to cache tokens")
0107         else:
0108             tmpLog.info(f"cache tokens")
0109             cacher = TokenCache()
0110             cacher.run()
0111 
0112     # action to set scout job data w/o scouts
0113     def doActionToSetScoutJobData(self, gTmpLog):
0114         tmpRet = self.taskBufferIF.setScoutJobDataToTasks_JEDI(self.vo, self.prodSourceLabel)
0115         if tmpRet is None:
0116             # failed
0117             gTmpLog.error("failed to set scout job data")
0118         else:
0119             gTmpLog.info("set scout job data successfully")
0120 
0121 
0122 Interaction.installSC(TypicalWatchDogBase)