Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:02

0001 """
0002 watch job
0003 
0004 """
0005 
0006 import datetime
0007 import threading
0008 import time
0009 import traceback
0010 
0011 from pandacommon.pandalogger.LogWrapper import LogWrapper
0012 from pandacommon.pandalogger.PandaLogger import PandaLogger
0013 from pandacommon.pandautils.PandaUtils import naive_utcnow
0014 
0015 from pandaserver.dataservice.closer import Closer
0016 from pandaserver.jobdispatcher import ErrorCode
0017 from pandaserver.taskbuffer import EventServiceUtils, retryModule
0018 from pandaserver.taskbuffer.JobSpec import JobSpec
0019 from pandaserver.taskbuffer.SupErrors import SupErrors
0020 
0021 # logger
0022 _logger = PandaLogger().getLogger("Watcher")
0023 
0024 
0025 class Watcher(threading.Thread):
0026     # constructor
0027     def __init__(self, taskBuffer, pandaID, single=False, sleepTime=360, sitemapper=None):
0028         threading.Thread.__init__(self)
0029         self.pandaID = pandaID
0030         self.taskBuffer = taskBuffer
0031         self.sleepTime = sleepTime
0032         self.single = single
0033         self.siteMapper = sitemapper
0034         self.logger = LogWrapper(_logger, str(pandaID))
0035 
0036     # main
0037     def run(self):
0038         try:
0039             while True:
0040                 self.logger.debug("start")
0041                 # query job
0042                 job = self.taskBuffer.peekJobs(
0043                     [self.pandaID],
0044                     fromDefined=False,
0045                     fromArchived=False,
0046                     fromWaiting=False,
0047                 )[0]
0048                 # check job status
0049                 if job is None:
0050                     self.logger.debug("escape : not found")
0051                     return
0052                 self.logger.debug(f"in {job.jobStatus}")
0053                 if job.jobStatus not in [
0054                     "running",
0055                     "sent",
0056                     "starting",
0057                     "holding",
0058                     "stagein",
0059                     "stageout",
0060                 ]:
0061                     if job.jobStatus == "transferring" and (job.prodSourceLabel in ["user", "panda"] or job.jobSubStatus not in [None, "NULL", ""]):
0062                         pass
0063                     else:
0064                         self.logger.debug(f"escape : wrong status {job.jobStatus}")
0065                         return
0066                 # time limit
0067                 timeLimit = naive_utcnow() - datetime.timedelta(minutes=self.sleepTime)
0068                 if job.modificationTime < timeLimit or (job.endTime != "NULL" and job.endTime < timeLimit):
0069                     self.logger.debug(f"{job.jobStatus} lastmod:{str(job.modificationTime)} endtime:{str(job.endTime)}")
0070                     destDBList = []
0071                     if job.jobStatus == "sent":
0072                         # sent job didn't receive reply from pilot within 30 min
0073                         job.jobDispatcherErrorCode = ErrorCode.EC_SendError
0074                         job.jobDispatcherErrorDiag = "Sent job didn't receive reply from pilot within 30 min"
0075                     elif job.exeErrorDiag == "NULL" and job.pilotErrorDiag == "NULL":
0076                         # lost heartbeat
0077                         if job.jobDispatcherErrorDiag == "NULL":
0078                             if job.endTime == "NULL":
0079                                 # normal lost heartbeat
0080                                 job.jobDispatcherErrorCode = ErrorCode.EC_Watcher
0081                                 job.jobDispatcherErrorDiag = f"lost heartbeat : {str(job.modificationTime)}"
0082                             else:
0083                                 if job.jobStatus == "holding":
0084                                     job.jobDispatcherErrorCode = ErrorCode.EC_Holding
0085                                 elif job.jobStatus == "transferring":
0086                                     job.jobDispatcherErrorCode = ErrorCode.EC_Transferring
0087                                 else:
0088                                     job.jobDispatcherErrorCode = ErrorCode.EC_Timeout
0089                                 job.jobDispatcherErrorDiag = f"timeout in {job.jobStatus} : last heartbeat at {str(job.endTime)}"
0090                             # get worker
0091                             workerSpecs = self.taskBuffer.getWorkersForJob(job.PandaID)
0092                             if len(workerSpecs) > 0:
0093                                 workerSpec = workerSpecs[0]
0094                                 if workerSpec.status in [
0095                                     "finished",
0096                                     "failed",
0097                                     "cancelled",
0098                                     "missed",
0099                                 ]:
0100                                     job.supErrorCode = SupErrors.error_codes["WORKER_ALREADY_DONE"]
0101                                     job.supErrorDiag = f"worker already {workerSpec.status} at {str(workerSpec.endTime)} with {workerSpec.diagMessage}"
0102                                     job.supErrorDiag = JobSpec.truncateStringAttr("supErrorDiag", job.supErrorDiag)
0103                     else:
0104                         # job recovery failed
0105                         job.jobDispatcherErrorCode = ErrorCode.EC_Recovery
0106                         job.jobDispatcherErrorDiag = f"job recovery failed for {self.sleepTime / 60} hours"
0107                     # set job status
0108                     job.jobStatus = "failed"
0109                     # set endTime for lost heartbeat
0110                     if job.endTime == "NULL":
0111                         # normal lost heartbeat
0112                         job.endTime = job.modificationTime
0113                     # set files status
0114                     for file in job.Files:
0115                         if file.type == "output" or file.type == "log":
0116                             file.status = "failed"
0117                             if file.destinationDBlock not in destDBList:
0118                                 destDBList.append(file.destinationDBlock)
0119                     # event service
0120                     if EventServiceUtils.isEventServiceJob(job) and not EventServiceUtils.isJobCloningJob(job):
0121                         eventStat = self.taskBuffer.getEventStat(job.jediTaskID, job.PandaID)
0122                         # set sub status when no successful events
0123                         if EventServiceUtils.ST_finished not in eventStat:
0124                             job.jobSubStatus = "es_heartbeat"
0125                     # update job
0126                     self.taskBuffer.updateJobs([job], False)
0127                     # start closer
0128                     if job.jobStatus == "failed":
0129                         source = "jobDispatcherErrorCode"
0130                         error_code = job.jobDispatcherErrorCode
0131                         error_diag = job.jobDispatcherErrorDiag
0132                         errors = [
0133                             {
0134                                 "source": source,
0135                                 "error_code": error_code,
0136                                 "error_diag": error_diag,
0137                             }
0138                         ]
0139 
0140                         try:
0141                             self.logger.debug("Watcher will call job_failure_postprocessing")
0142                             retryModule.job_failure_postprocessing(self.taskBuffer, job.PandaID, errors, job.attemptNr)
0143                             self.logger.debug("job_failure_postprocessing is back")
0144                         except Exception as e:
0145                             self.logger.debug(f"job_failure_postprocessing excepted and needs to be investigated ({e}): {traceback.format_exc()}")
0146 
0147                         # updateJobs was successful and it failed a job with taskBufferErrorCode
0148                         try:
0149                             self.logger.debug("Watcher.run will peek the job")
0150                             job_tmp = self.taskBuffer.peekJobs(
0151                                 [job.PandaID],
0152                                 fromDefined=False,
0153                                 fromArchived=True,
0154                                 fromWaiting=False,
0155                             )[0]
0156                             if job_tmp.taskBufferErrorCode:
0157                                 source = "taskBufferErrorCode"
0158                                 error_code = job_tmp.taskBufferErrorCode
0159                                 error_diag = job_tmp.taskBufferErrorDiag
0160                                 self.logger.debug("Watcher.run 2 will call job_failure_postprocessing")
0161                                 retryModule.job_failure_postprocessing(
0162                                     self.taskBuffer,
0163                                     job_tmp.PandaID,
0164                                     source,
0165                                     error_code,
0166                                     error_diag,
0167                                     job_tmp.attemptNr,
0168                                 )
0169                                 self.logger.debug("job_failure_postprocessing 2 is back")
0170                         except IndexError:
0171                             pass
0172                         except Exception as e:
0173                             self.logger.error(f"job_failure_postprocessing 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")
0174 
0175                         cThr = Closer(self.taskBuffer, destDBList, job)
0176                         cThr.run()
0177                     self.logger.debug("done")
0178                     return
0179                 # single action
0180                 if self.single:
0181                     return
0182                 # sleep
0183                 time.sleep(60 * self.sleepTime)
0184         except Exception as e:
0185             self.logger.error(f"run() : {str(e)} {traceback.format_exc()}")
0186             return