File indexing completed on 2026-04-10 08:39:02
0001 """
0002 watch job
0003
0004 """
0005
0006 import datetime
0007 import threading
0008 import time
0009 import traceback
0010
0011 from pandacommon.pandalogger.LogWrapper import LogWrapper
0012 from pandacommon.pandalogger.PandaLogger import PandaLogger
0013 from pandacommon.pandautils.PandaUtils import naive_utcnow
0014
0015 from pandaserver.dataservice.closer import Closer
0016 from pandaserver.jobdispatcher import ErrorCode
0017 from pandaserver.taskbuffer import EventServiceUtils, retryModule
0018 from pandaserver.taskbuffer.JobSpec import JobSpec
0019 from pandaserver.taskbuffer.SupErrors import SupErrors
0020
0021
0022 _logger = PandaLogger().getLogger("Watcher")
0023
0024
0025 class Watcher(threading.Thread):
0026
0027 def __init__(self, taskBuffer, pandaID, single=False, sleepTime=360, sitemapper=None):
0028 threading.Thread.__init__(self)
0029 self.pandaID = pandaID
0030 self.taskBuffer = taskBuffer
0031 self.sleepTime = sleepTime
0032 self.single = single
0033 self.siteMapper = sitemapper
0034 self.logger = LogWrapper(_logger, str(pandaID))
0035
0036
0037 def run(self):
0038 try:
0039 while True:
0040 self.logger.debug("start")
0041
0042 job = self.taskBuffer.peekJobs(
0043 [self.pandaID],
0044 fromDefined=False,
0045 fromArchived=False,
0046 fromWaiting=False,
0047 )[0]
0048
0049 if job is None:
0050 self.logger.debug("escape : not found")
0051 return
0052 self.logger.debug(f"in {job.jobStatus}")
0053 if job.jobStatus not in [
0054 "running",
0055 "sent",
0056 "starting",
0057 "holding",
0058 "stagein",
0059 "stageout",
0060 ]:
0061 if job.jobStatus == "transferring" and (job.prodSourceLabel in ["user", "panda"] or job.jobSubStatus not in [None, "NULL", ""]):
0062 pass
0063 else:
0064 self.logger.debug(f"escape : wrong status {job.jobStatus}")
0065 return
0066
0067 timeLimit = naive_utcnow() - datetime.timedelta(minutes=self.sleepTime)
0068 if job.modificationTime < timeLimit or (job.endTime != "NULL" and job.endTime < timeLimit):
0069 self.logger.debug(f"{job.jobStatus} lastmod:{str(job.modificationTime)} endtime:{str(job.endTime)}")
0070 destDBList = []
0071 if job.jobStatus == "sent":
0072
0073 job.jobDispatcherErrorCode = ErrorCode.EC_SendError
0074 job.jobDispatcherErrorDiag = "Sent job didn't receive reply from pilot within 30 min"
0075 elif job.exeErrorDiag == "NULL" and job.pilotErrorDiag == "NULL":
0076
0077 if job.jobDispatcherErrorDiag == "NULL":
0078 if job.endTime == "NULL":
0079
0080 job.jobDispatcherErrorCode = ErrorCode.EC_Watcher
0081 job.jobDispatcherErrorDiag = f"lost heartbeat : {str(job.modificationTime)}"
0082 else:
0083 if job.jobStatus == "holding":
0084 job.jobDispatcherErrorCode = ErrorCode.EC_Holding
0085 elif job.jobStatus == "transferring":
0086 job.jobDispatcherErrorCode = ErrorCode.EC_Transferring
0087 else:
0088 job.jobDispatcherErrorCode = ErrorCode.EC_Timeout
0089 job.jobDispatcherErrorDiag = f"timeout in {job.jobStatus} : last heartbeat at {str(job.endTime)}"
0090
0091 workerSpecs = self.taskBuffer.getWorkersForJob(job.PandaID)
0092 if len(workerSpecs) > 0:
0093 workerSpec = workerSpecs[0]
0094 if workerSpec.status in [
0095 "finished",
0096 "failed",
0097 "cancelled",
0098 "missed",
0099 ]:
0100 job.supErrorCode = SupErrors.error_codes["WORKER_ALREADY_DONE"]
0101 job.supErrorDiag = f"worker already {workerSpec.status} at {str(workerSpec.endTime)} with {workerSpec.diagMessage}"
0102 job.supErrorDiag = JobSpec.truncateStringAttr("supErrorDiag", job.supErrorDiag)
0103 else:
0104
0105 job.jobDispatcherErrorCode = ErrorCode.EC_Recovery
0106 job.jobDispatcherErrorDiag = f"job recovery failed for {self.sleepTime / 60} hours"
0107
0108 job.jobStatus = "failed"
0109
0110 if job.endTime == "NULL":
0111
0112 job.endTime = job.modificationTime
0113
0114 for file in job.Files:
0115 if file.type == "output" or file.type == "log":
0116 file.status = "failed"
0117 if file.destinationDBlock not in destDBList:
0118 destDBList.append(file.destinationDBlock)
0119
0120 if EventServiceUtils.isEventServiceJob(job) and not EventServiceUtils.isJobCloningJob(job):
0121 eventStat = self.taskBuffer.getEventStat(job.jediTaskID, job.PandaID)
0122
0123 if EventServiceUtils.ST_finished not in eventStat:
0124 job.jobSubStatus = "es_heartbeat"
0125
0126 self.taskBuffer.updateJobs([job], False)
0127
0128 if job.jobStatus == "failed":
0129 source = "jobDispatcherErrorCode"
0130 error_code = job.jobDispatcherErrorCode
0131 error_diag = job.jobDispatcherErrorDiag
0132 errors = [
0133 {
0134 "source": source,
0135 "error_code": error_code,
0136 "error_diag": error_diag,
0137 }
0138 ]
0139
0140 try:
0141 self.logger.debug("Watcher will call job_failure_postprocessing")
0142 retryModule.job_failure_postprocessing(self.taskBuffer, job.PandaID, errors, job.attemptNr)
0143 self.logger.debug("job_failure_postprocessing is back")
0144 except Exception as e:
0145 self.logger.debug(f"job_failure_postprocessing excepted and needs to be investigated ({e}): {traceback.format_exc()}")
0146
0147
0148 try:
0149 self.logger.debug("Watcher.run will peek the job")
0150 job_tmp = self.taskBuffer.peekJobs(
0151 [job.PandaID],
0152 fromDefined=False,
0153 fromArchived=True,
0154 fromWaiting=False,
0155 )[0]
0156 if job_tmp.taskBufferErrorCode:
0157 source = "taskBufferErrorCode"
0158 error_code = job_tmp.taskBufferErrorCode
0159 error_diag = job_tmp.taskBufferErrorDiag
0160 self.logger.debug("Watcher.run 2 will call job_failure_postprocessing")
0161 retryModule.job_failure_postprocessing(
0162 self.taskBuffer,
0163 job_tmp.PandaID,
0164 source,
0165 error_code,
0166 error_diag,
0167 job_tmp.attemptNr,
0168 )
0169 self.logger.debug("job_failure_postprocessing 2 is back")
0170 except IndexError:
0171 pass
0172 except Exception as e:
0173 self.logger.error(f"job_failure_postprocessing 2 excepted and needs to be investigated ({e}): {traceback.format_exc()}")
0174
0175 cThr = Closer(self.taskBuffer, destDBList, job)
0176 cThr.run()
0177 self.logger.debug("done")
0178 return
0179
0180 if self.single:
0181 return
0182
0183 time.sleep(60 * self.sleepTime)
0184 except Exception as e:
0185 self.logger.error(f"run() : {str(e)} {traceback.format_exc()}")
0186 return