File indexing completed on 2026-04-10 08:38:59
0001 import datetime
0002 import smtplib
0003 import time
0004 import uuid
0005
0006 from pandacommon.pandautils.PandaUtils import naive_utcnow
0007
0008 from pandajedi.jedicore import Interaction
0009 from pandaserver.config import panda_config
0010 from pandaserver.taskbuffer import EventServiceUtils
0011
0012
0013 smtpPortList = [25, 587]
0014
0015
0016
0017 class StderrLogger(object):
0018 def __init__(self, tmpLog):
0019 self.tmpLog = tmpLog
0020
0021 def write(self, message):
0022 message = message.strip()
0023 if message != "":
0024 self.tmpLog.debug(message)
0025
0026
0027
0028 class MySMTP(smtplib.SMTP):
0029 def set_log(self, tmp_log):
0030 self.tmpLog = tmp_log
0031 try:
0032 self.org_stderr = getattr(smtplib, "stderr")
0033 setattr(smtplib, "stderr", tmp_log)
0034 except Exception:
0035 self.org_stderr = None
0036
0037 def _print_debug(self, *args):
0038 self.tmpLog.write(" ".join(map(str, args)))
0039
0040 def reset_log(self):
0041 if self.org_stderr is not None:
0042 setattr(smtplib, "stderr", self.org_stderr)
0043
0044
0045
0046 class PostProcessorBase(object):
0047
0048 def __init__(self, taskBufferIF, ddmIF):
0049 self.ddmIF = ddmIF
0050 self.taskBufferIF = taskBufferIF
0051 self.msgType = "postprocessor"
0052 self.failOnZeroOkFile = False
0053 self.refresh()
0054
0055
0056 def refresh(self):
0057 self.siteMapper = self.taskBufferIF.get_site_mapper()
0058
0059
0060 def doBasicPostProcess(self, taskSpec, tmpLog):
0061
0062 taskSpec.lockedBy = None
0063 taskSpec.status = self.getFinalTaskStatus(taskSpec)
0064 if taskSpec.status == "failed":
0065
0066 if taskSpec.usePrePro() and not taskSpec.checkPreProcessed():
0067 taskSpec.setErrDiag("Preprocessing step failed", True)
0068 tmpMsg = f"set task_status={taskSpec.status}"
0069 tmpLog.info(tmpMsg)
0070 tmpLog.sendMsg(f"set task_status={taskSpec.status}", self.msgType)
0071
0072 for datasetSpec in taskSpec.datasetSpecList:
0073 if taskSpec.status in ["failed", "broken", "aborted"]:
0074 datasetSpec.status = "failed"
0075 else:
0076
0077 if datasetSpec.type in ["output", "log", "lib"]:
0078
0079 if datasetSpec.nFiles and datasetSpec.nFilesFinished and datasetSpec.nFiles > datasetSpec.nFilesFinished:
0080 datasetSpec.status = "finished"
0081 else:
0082 datasetSpec.status = "done"
0083 elif datasetSpec.type.startswith("trn_") or datasetSpec.type.startswith("tmpl_"):
0084
0085 datasetSpec.status = "done"
0086 else:
0087
0088 continue
0089
0090 if datasetSpec.type in ["output", "log", "lib"]:
0091 datasetSpec.nFiles = datasetSpec.nFilesFinished
0092 self.taskBufferIF.updateDataset_JEDI(datasetSpec, {"datasetID": datasetSpec.datasetID, "jediTaskID": datasetSpec.jediTaskID})
0093
0094 self.taskBufferIF.trigger_cleanup_internal_datasets(taskSpec.jediTaskID)
0095
0096 taskSpec.endTime = naive_utcnow()
0097
0098 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}, updateDEFT=True)
0099
0100 if taskSpec.status in ["failed", "broken", "aborted"]:
0101 self.taskBufferIF.killChildTasks_JEDI(taskSpec.jediTaskID, taskSpec.status)
0102 else:
0103 self.taskBufferIF.kickChildTasks_JEDI(taskSpec.jediTaskID)
0104 tmpLog.debug(f"doBasicPostProcess done with taskStatus={taskSpec.status}")
0105 return
0106
0107
0108 def doFinalProcedure(self, taskSpec, tmpLog):
0109 return self.SC_SUCCEEDED
0110
0111
0112 def sendMail(self, jediTaskID, fromAdd, toAdd, msgBody, nTry, fileBackUp, tmpLog):
0113 tmpLog.debug(f"sending notification to {toAdd}\n{msgBody}")
0114 for iTry in range(nTry):
0115 try:
0116 stderrLog = StderrLogger(tmpLog)
0117 smtpPort = smtpPortList[iTry % len(smtpPortList)]
0118 server = MySMTP(panda_config.emailSMTPsrv, smtpPort)
0119 server.set_debuglevel(1)
0120 server.set_log(stderrLog)
0121 server.ehlo()
0122 server.starttls()
0123 out = server.sendmail(fromAdd, toAdd, msgBody)
0124 tmpLog.debug(str(out))
0125 server.quit()
0126 break
0127 except Exception as e:
0128 if iTry + 1 < nTry:
0129
0130 tmpLog.debug(f"sleep {iTry} due to {str(e)}")
0131 time.sleep(30)
0132 else:
0133 tmpLog.error(f"failed to send notification with {str(e)}")
0134 if fileBackUp:
0135
0136 mailFile = "{0}/jmail_{1}_{2}" % (panda_config.logdir, jediTaskID, uuid.uuid4())
0137 oMail = open(mailFile, "w")
0138 oMail.write(str(jediTaskID) + "\n" + toAdd + "\n" + msgBody)
0139 oMail.close()
0140 break
0141 try:
0142 server.reset_log()
0143 except Exception:
0144 pass
0145
0146
0147 def senderAddress(self):
0148 return panda_config.emailSender
0149
0150
0151 def getTaskCompleteness(self, taskSpec):
0152 nFiles = 0
0153 nFilesFinished = 0
0154 totalInputEvents = 0
0155 totalOkEvents = 0
0156 for datasetSpec in taskSpec.datasetSpecList:
0157 if datasetSpec.isMasterInput():
0158 if datasetSpec.status == "removed":
0159 continue
0160 nFiles += datasetSpec.nFiles
0161 nFilesFinished += datasetSpec.nFilesFinished
0162 try:
0163 totalInputEvents += datasetSpec.nEvents
0164 except Exception:
0165 pass
0166 try:
0167 totalOkEvents += datasetSpec.nEventsUsed
0168 except Exception:
0169 pass
0170
0171 if totalInputEvents != 0:
0172 taskCompleteness = float(totalOkEvents) / float(totalInputEvents) * 1000.0
0173 elif nFiles != 0:
0174 taskCompleteness = float(nFilesFinished) / float(nFiles) * 1000.0
0175 else:
0176 taskCompleteness = 0
0177 return nFiles, nFilesFinished, totalInputEvents, totalOkEvents, taskCompleteness
0178
0179
0180 def getFinalTaskStatus(self, taskSpec, checkParent=True, checkGoal=False):
0181
0182 nFiles, nFilesFinished, totalInputEvents, totalOkEvents, taskCompleteness = self.getTaskCompleteness(taskSpec)
0183
0184 if taskSpec.status == "tobroken":
0185 status = "broken"
0186 elif taskSpec.status == "toabort":
0187 status = "aborted"
0188 elif taskSpec.status == "paused":
0189 status = "paused"
0190 elif self.failOnZeroOkFile and nFiles == nFilesFinished == 0:
0191 status = "failed"
0192 elif nFiles == nFilesFinished:
0193
0194 if checkParent and taskSpec.parent_tid not in [None, taskSpec.jediTaskID]:
0195 parent_status = self.taskBufferIF.getTaskStatus_JEDI(taskSpec.parent_tid)
0196 if parent_status in ["failed", "broken", "aborted"]:
0197 status = "failed"
0198 elif parent_status != "done":
0199 status = "finished"
0200 else:
0201
0202 inputMutable = False
0203 for datasetSpec in taskSpec.datasetSpecList:
0204 if datasetSpec.isMasterInput() and datasetSpec.state == "mutable":
0205 inputMutable = True
0206 break
0207 if inputMutable:
0208 status = "finished"
0209 else:
0210 status = "done"
0211 else:
0212 status = "done"
0213 elif nFilesFinished == 0:
0214 status = "failed"
0215 else:
0216 status = "finished"
0217
0218 if taskSpec.goal is None:
0219 taskGoal = 1000
0220 else:
0221 taskGoal = taskSpec.goal
0222
0223 if (
0224 taskSpec.failGoalUnreached()
0225 and status == "finished"
0226 and (not taskSpec.useExhausted() or (taskSpec.useExhausted() and taskSpec.status in ["passed"]))
0227 ):
0228 if taskCompleteness < taskGoal:
0229 status = "failed"
0230
0231 if taskSpec.is_hpo_workflow():
0232 event_stat = self.taskBufferIF.get_event_statistics(taskSpec.jediTaskID)
0233 if event_stat is not None and event_stat.get(EventServiceUtils.ST_finished):
0234 status = "finished"
0235
0236 if checkGoal:
0237
0238 if taskSpec.goal is not None and taskCompleteness >= taskGoal:
0239 return True
0240 return False
0241
0242 return status
0243
0244
0245 def doPreCheck(self, taskSpec, tmpLog):
0246
0247 if (
0248 taskSpec.useExhausted()
0249 and taskSpec.status not in ["passed"]
0250 and self.getFinalTaskStatus(taskSpec) in ["finished"]
0251 and not self.getFinalTaskStatus(taskSpec, checkGoal=True)
0252 ):
0253 taskSpec.status = "exhausted"
0254 if self.getFinalTaskStatus(taskSpec, checkParent=False) == "done":
0255 taskSpec.errorDialog = "exhausted since the parent task was incomplete and didn't reach the goal"
0256 else:
0257 taskSpec.errorDialog = "exhausted since the task was incomplete and didn't reach the goal"
0258 taskSpec.lockedBy = None
0259 taskSpec.lockedTime = None
0260
0261 tmpLog.info(f"set task_status={taskSpec.status}")
0262 self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}, updateDEFT=True)
0263
0264 self.taskBufferIF.kickChildTasks_JEDI(taskSpec.jediTaskID)
0265 return True
0266 return False
0267
0268
0269 Interaction.installSC(PostProcessorBase)