Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import datetime
0002 import os
0003 import re
0004 import socket
0005 import sys
0006 import time
0007 import traceback
0008 
0009 # logger
0010 from pandacommon.pandalogger.PandaLogger import PandaLogger
0011 from pandacommon.pandautils.PandaUtils import naive_utcnow
0012 
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.ThreadUtils import ListWithLock, ThreadPool, WorkerThread
0015 from pandaserver.taskbuffer.DataCarousel import (
0016     DataCarouselInterface,
0017     DataCarouselRequestSpec,
0018     DataCarouselRequestStatus,
0019 )
0020 
0021 from .WatchDogBase import WatchDogBase
0022 
0023 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0024 
0025 # ==============================================================
0026 
0027 # lock time limits in minutes
0028 StageDCRequests_LOCK_TIME_MINUTES = 5
0029 CheckDCRequests_LOCK_TIME_MINUTES = 5
0030 KeepRulesAlive_LOCK_TIME_MINUTES = 60
0031 CleanDCRequests_LOCK_TIME_MINUTES = 1440
0032 RescuePendingTasks_LOCK_TIME_MINUTES = 120
0033 
0034 # ==============================================================
0035 
0036 
0037 class AtlasDataCarouselWatchDog(WatchDogBase):
0038     """
0039     Data Carousel watchdog for ATLAS
0040     """
0041 
0042     # constructor
0043     def __init__(self, taskBufferIF, ddmIF):
0044         WatchDogBase.__init__(self, taskBufferIF, ddmIF)
0045         self.vo = "atlas"
0046         self.ddmIF = ddmIF.getInterface(self.vo)
0047         self.data_carousel_interface = DataCarouselInterface(taskBufferIF, self.ddmIF)
0048 
0049     def doStageDCRequests(self):
0050         """
0051         Action to get queued DC requests and start staging
0052         """
0053         tmpLog = MsgWrapper(logger, " #ATM #KV doStageDCRequests")
0054         tmpLog.debug("start")
0055         try:
0056             # timer start
0057             t0 = time.monotonic()
0058             lock_time_sec = StageDCRequests_LOCK_TIME_MINUTES * 60
0059             # watchdog lock
0060             got_lock = self.get_process_lock("AtlasDataCarousDog.doStageDCReq", timeLimit=StageDCRequests_LOCK_TIME_MINUTES)
0061             if not got_lock:
0062                 tmpLog.debug("locked by another watchdog process. Skipped")
0063                 return
0064             tmpLog.debug("got watchdog lock")
0065             # get DC requests to stage
0066             to_stage_list = self.data_carousel_interface.get_requests_to_stage()
0067             # stage the requests
0068             for dc_req_spec, extra_params in to_stage_list:
0069                 self.data_carousel_interface.stage_request(dc_req_spec, extra_params=extra_params)
0070                 tmpLog.debug(f"stage request_id={dc_req_spec.request_id} dataset={dc_req_spec.dataset}")
0071                 # check timer
0072                 t1 = time.monotonic()
0073                 if t1 - t0 > max(lock_time_sec - 60, lock_time_sec * 0.8):
0074                     tmpLog.debug("approaching lock time limit. Stop staging more requests")
0075                     break
0076             # done
0077             tmpLog.debug("done")
0078         except Exception:
0079             errtype, errvalue = sys.exc_info()[:2]
0080             tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0081 
0082     def doCheckDCRequests(self):
0083         """
0084         Action to check active DC requests
0085         """
0086         tmpLog = MsgWrapper(logger, " #ATM #KV doCheckDCRequests")
0087         tmpLog.debug("start")
0088         try:
0089             # watchdog lock
0090             got_lock = self.get_process_lock("AtlasDataCarousDog.doCheckDCRequests", timeLimit=CheckDCRequests_LOCK_TIME_MINUTES)
0091             if not got_lock:
0092                 tmpLog.debug("locked by another watchdog process. Skipped")
0093                 return
0094             tmpLog.debug("got watchdog lock")
0095             # check for staging
0096             self.data_carousel_interface.check_staging_requests()
0097             # resume tasks with requests in staging
0098             self.data_carousel_interface.resume_tasks_from_staging()
0099             # done
0100             tmpLog.debug(f"done")
0101         except Exception:
0102             errtype, errvalue = sys.exc_info()[:2]
0103             tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0104 
0105     def doKeepRulesAlive(self):
0106         """
0107         Action to keep DDM staging rules alive when tasks running
0108         """
0109         tmpLog = MsgWrapper(logger, " #ATM #KV doKeepRulesAlive")
0110         tmpLog.debug("start")
0111         try:
0112             # watchdog lock
0113             got_lock = self.get_process_lock("AtlasDataCarousDog.doKeepRulesAlive", timeLimit=KeepRulesAlive_LOCK_TIME_MINUTES)
0114             if not got_lock:
0115                 tmpLog.debug("locked by another watchdog process. Skipped")
0116                 return
0117             tmpLog.debug("got watchdog lock")
0118             # get requests of active tasks
0119             self.data_carousel_interface.keep_alive_ddm_rules()
0120             # done
0121             tmpLog.debug(f"done")
0122         except Exception:
0123             errtype, errvalue = sys.exc_info()[:2]
0124             tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0125 
0126     def doCleanDCRequests(self):
0127         """
0128         Action to clean up old DC requests in DB table
0129         """
0130         tmpLog = MsgWrapper(logger, " #ATM #KV doCleanDCRequests")
0131         tmpLog.debug("start")
0132         try:
0133             # watchdog lock
0134             got_lock = self.get_process_lock("AtlasDataCarousDog.doCleanDCReq", timeLimit=CleanDCRequests_LOCK_TIME_MINUTES)
0135             if not got_lock:
0136                 tmpLog.debug("locked by another watchdog process. Skipped")
0137                 return
0138             tmpLog.debug("got watchdog lock")
0139             # clean up
0140             self.data_carousel_interface.clean_up_requests()
0141             # done
0142             tmpLog.debug("done")
0143         except Exception:
0144             errtype, errvalue = sys.exc_info()[:2]
0145             tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0146 
0147     def doRescuePendingTasks(self):
0148         """
0149         Action to rescue pending tasks stuck due to never updated staged files about previously done DC requests
0150         """
0151         tmpLog = MsgWrapper(logger, " #ATM #KV doRescuePendingTasks")
0152         tmpLog.debug("start")
0153         try:
0154             # watchdog lock
0155             got_lock = self.get_process_lock("AtlasDataCarousDog.doRescuePendingTasks", timeLimit=RescuePendingTasks_LOCK_TIME_MINUTES)
0156             if not got_lock:
0157                 tmpLog.debug("locked by another watchdog process. Skipped")
0158                 return
0159             tmpLog.debug("got watchdog lock")
0160             # rescue pending tasks
0161             self.data_carousel_interface.rescue_pending_tasks_with_done_requests()
0162             # done
0163             tmpLog.debug(f"done")
0164         except Exception:
0165             errtype, errvalue = sys.exc_info()[:2]
0166             tmpLog.error(f"failed with {errtype} {errvalue} {traceback.format_exc()}")
0167 
0168     # main
0169     def doAction(self):
0170         try:
0171             # get logger
0172             origTmpLog = MsgWrapper(logger)
0173             origTmpLog.debug("start")
0174             # clean up old DC requests
0175             self.doCleanDCRequests()
0176             # keep staging rules alive
0177             self.doKeepRulesAlive()
0178             # check staging requests
0179             self.doCheckDCRequests()
0180             # stage queued requests
0181             self.doStageDCRequests()
0182             # rescue pending tasks
0183             # self.doRescuePendingTasks()
0184         except Exception:
0185             errtype, errvalue = sys.exc_info()[:2]
0186             origTmpLog.error(f"failed with {errtype} {errvalue}")
0187         # return
0188         origTmpLog.debug("done")
0189         return self.SC_SUCCEEDED