File indexing completed on 2026-04-10 08:38:58
0001 import datetime
0002 import os
0003 import re
0004 import socket
0005 import sys
0006 import time
0007 import traceback
0008
0009
0010 from pandacommon.pandalogger.PandaLogger import PandaLogger
0011 from pandacommon.pandautils.PandaUtils import naive_utcnow
0012
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0015 from pandaserver.taskbuffer.DataCarousel import (
0016 DataCarouselInterface,
0017 DataCarouselRequestSpec,
0018 DataCarouselRequestStatus,
0019 )
0020
0021 from .WatchDogBase import WatchDogBase
0022
0023 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0024
0025
0026
0027
0028 StageDCRequests_LOCK_TIME_MINUTES = 5
0029 CheckDCRequests_LOCK_TIME_MINUTES = 5
0030 KeepRulesAlive_LOCK_TIME_MINUTES = 60
0031 CleanDCRequests_LOCK_TIME_MINUTES = 1440
0032 RescuePendingTasks_LOCK_TIME_MINUTES = 120
0033
0034
0035
0036
0037 class AtlasDataCarouselWatchDog(WatchDogBase):
0038 """
0039 Data Carousel watchdog for ATLAS
0040 """
0041
0042
0043 def __init__(self, taskBufferIF, ddmIF):
0044 WatchDogBase.__init__(self, taskBufferIF, ddmIF)
0045 self.vo = "atlas"
0046 self.ddmIF = ddmIF.getInterface(self.vo)
0047 self.data_carousel_interface = DataCarouselInterface(taskBufferIF, self.ddmIF)
0048
0049 def doStageDCRequests(self):
0050 """
0051 Action to get queued DC requests and start staging
0052 """
0053 tmpLog = MsgWrapper(logger, " #ATM #KV doStageDCRequests")
0054 tmpLog.debug("start")
0055 try:
0056
0057 t0 = time.monotonic()
0058 lock_time_sec = StageDCRequests_LOCK_TIME_MINUTES * 60
0059
0060 got_lock = self.get_process_lock("AtlasDataCarousDog.doStageDCReq", timeLimit=StageDCRequests_LOCK_TIME_MINUTES)
0061 if not got_lock:
0062 tmpLog.debug("locked by another watchdog process. Skipped")
0063 return
0064 tmpLog.debug("got watchdog lock")
0065
0066 to_stage_list = self.data_carousel_interface.get_requests_to_stage()
0067
0068 for dc_req_spec, extra_params in to_stage_list:
0069 self.data_carousel_interface.stage_request(dc_req_spec, extra_params=extra_params)
0070 tmpLog.debug(f"stage request_id={dc_req_spec.request_id} dataset={dc_req_spec.dataset}")
0071
0072 t1 = time.monotonic()
0073 if t1 - t0 > max(lock_time_sec - 60, lock_time_sec * 0.8):
0074 tmpLog.debug("approaching lock time limit. Stop staging more requests")
0075 break
0076
0077 tmpLog.debug("done")
0078 except Exception:
0079 errtype, errvalue = sys.exc_info()[:2]
0080 tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0081
0082 def doCheckDCRequests(self):
0083 """
0084 Action to check active DC requests
0085 """
0086 tmpLog = MsgWrapper(logger, " #ATM #KV doCheckDCRequests")
0087 tmpLog.debug("start")
0088 try:
0089
0090 got_lock = self.get_process_lock("AtlasDataCarousDog.doCheckDCRequests", timeLimit=CheckDCRequests_LOCK_TIME_MINUTES)
0091 if not got_lock:
0092 tmpLog.debug("locked by another watchdog process. Skipped")
0093 return
0094 tmpLog.debug("got watchdog lock")
0095
0096 self.data_carousel_interface.check_staging_requests()
0097
0098 self.data_carousel_interface.resume_tasks_from_staging()
0099
0100 tmpLog.debug(f"done")
0101 except Exception:
0102 errtype, errvalue = sys.exc_info()[:2]
0103 tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0104
0105 def doKeepRulesAlive(self):
0106 """
0107 Action to keep DDM staging rules alive when tasks running
0108 """
0109 tmpLog = MsgWrapper(logger, " #ATM #KV doKeepRulesAlive")
0110 tmpLog.debug("start")
0111 try:
0112
0113 got_lock = self.get_process_lock("AtlasDataCarousDog.doKeepRulesAlive", timeLimit=KeepRulesAlive_LOCK_TIME_MINUTES)
0114 if not got_lock:
0115 tmpLog.debug("locked by another watchdog process. Skipped")
0116 return
0117 tmpLog.debug("got watchdog lock")
0118
0119 self.data_carousel_interface.keep_alive_ddm_rules()
0120
0121 tmpLog.debug(f"done")
0122 except Exception:
0123 errtype, errvalue = sys.exc_info()[:2]
0124 tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0125
0126 def doCleanDCRequests(self):
0127 """
0128 Action to clean up old DC requests in DB table
0129 """
0130 tmpLog = MsgWrapper(logger, " #ATM #KV doCleanDCRequests")
0131 tmpLog.debug("start")
0132 try:
0133
0134 got_lock = self.get_process_lock("AtlasDataCarousDog.doCleanDCReq", timeLimit=CleanDCRequests_LOCK_TIME_MINUTES)
0135 if not got_lock:
0136 tmpLog.debug("locked by another watchdog process. Skipped")
0137 return
0138 tmpLog.debug("got watchdog lock")
0139
0140 self.data_carousel_interface.clean_up_requests()
0141
0142 tmpLog.debug("done")
0143 except Exception:
0144 errtype, errvalue = sys.exc_info()[:2]
0145 tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0146
0147 def doRescuePendingTasks(self):
0148 """
0149 Action to rescue pending tasks stuck due to never updated staged files about previously done DC requests
0150 """
0151 tmpLog = MsgWrapper(logger, " #ATM #KV doRescuePendingTasks")
0152 tmpLog.debug("start")
0153 try:
0154
0155 got_lock = self.get_process_lock("AtlasDataCarousDog.doRescuePendingTasks", timeLimit=RescuePendingTasks_LOCK_TIME_MINUTES)
0156 if not got_lock:
0157 tmpLog.debug("locked by another watchdog process. Skipped")
0158 return
0159 tmpLog.debug("got watchdog lock")
0160
0161 self.data_carousel_interface.rescue_pending_tasks_with_done_requests()
0162
0163 tmpLog.debug(f"done")
0164 except Exception:
0165 errtype, errvalue = sys.exc_info()[:2]
0166 tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0167
0168
0169 def doAction(self):
0170 try:
0171
0172 origTmpLog = MsgWrapper(logger)
0173 origTmpLog.debug("start")
0174
0175 self.doCleanDCRequests()
0176
0177 self.doKeepRulesAlive()
0178
0179 self.doCheckDCRequests()
0180
0181 self.doStageDCRequests()
0182
0183
0184 except Exception:
0185 errtype, errvalue = sys.exc_info()[:2]
0186 origTmpLog.error(f"failed with {errtype} {errvalue}")
0187
0188 origTmpLog.debug("done")
0189 return self.SC_SUCCEEDED