Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import datetime
0002 import smtplib
0003 import time
0004 import uuid
0005 
0006 from pandacommon.pandautils.PandaUtils import naive_utcnow
0007 
0008 from pandajedi.jedicore import Interaction
0009 from pandaserver.config import panda_config
0010 from pandaserver.taskbuffer import EventServiceUtils
0011 
0012 # port for SMTP server
0013 smtpPortList = [25, 587]
0014 
0015 
0016 # wrapper to patch smtplib.stderr to send debug info to logger
0017 class StderrLogger(object):
0018     def __init__(self, tmpLog):
0019         self.tmpLog = tmpLog
0020 
0021     def write(self, message):
0022         message = message.strip()
0023         if message != "":
0024             self.tmpLog.debug(message)
0025 
0026 
0027 # wrapper of SMTP to redirect messages
0028 class MySMTP(smtplib.SMTP):
0029     def set_log(self, tmp_log):
0030         self.tmpLog = tmp_log
0031         try:
0032             self.org_stderr = getattr(smtplib, "stderr")
0033             setattr(smtplib, "stderr", tmp_log)
0034         except Exception:
0035             self.org_stderr = None
0036 
0037     def _print_debug(self, *args):
0038         self.tmpLog.write(" ".join(map(str, args)))
0039 
0040     def reset_log(self):
0041         if self.org_stderr is not None:
0042             setattr(smtplib, "stderr", self.org_stderr)
0043 
0044 
0045 # base class for post process
0046 class PostProcessorBase(object):
0047     # constructor
0048     def __init__(self, taskBufferIF, ddmIF):
0049         self.ddmIF = ddmIF
0050         self.taskBufferIF = taskBufferIF
0051         self.msgType = "postprocessor"
0052         self.failOnZeroOkFile = False
0053         self.refresh()
0054 
0055     # refresh
0056     def refresh(self):
0057         self.siteMapper = self.taskBufferIF.get_site_mapper()
0058 
0059     # basic post procedure
0060     def doBasicPostProcess(self, taskSpec, tmpLog):
0061         # update task status
0062         taskSpec.lockedBy = None
0063         taskSpec.status = self.getFinalTaskStatus(taskSpec)
0064         if taskSpec.status == "failed":
0065             # set dialog for preprocess
0066             if taskSpec.usePrePro() and not taskSpec.checkPreProcessed():
0067                 taskSpec.setErrDiag("Preprocessing step failed", True)
0068         tmpMsg = f"set task_status={taskSpec.status}"
0069         tmpLog.info(tmpMsg)
0070         tmpLog.sendMsg(f"set task_status={taskSpec.status}", self.msgType)
0071         # update dataset
0072         for datasetSpec in taskSpec.datasetSpecList:
0073             if taskSpec.status in ["failed", "broken", "aborted"]:
0074                 datasetSpec.status = "failed"
0075             else:
0076                 # set dataset status
0077                 if datasetSpec.type in ["output", "log", "lib"]:
0078                     # normal output datasets
0079                     if datasetSpec.nFiles and datasetSpec.nFilesFinished and datasetSpec.nFiles > datasetSpec.nFilesFinished:
0080                         datasetSpec.status = "finished"
0081                     else:
0082                         datasetSpec.status = "done"
0083                 elif datasetSpec.type.startswith("trn_") or datasetSpec.type.startswith("tmpl_"):
0084                     # set done for template or transient datasets
0085                     datasetSpec.status = "done"
0086                 else:
0087                     # not for input
0088                     continue
0089             # set nFiles
0090             if datasetSpec.type in ["output", "log", "lib"]:
0091                 datasetSpec.nFiles = datasetSpec.nFilesFinished
0092             self.taskBufferIF.updateDataset_JEDI(datasetSpec, {"datasetID": datasetSpec.datasetID, "jediTaskID": datasetSpec.jediTaskID})
0093         # trigger internal dataset cleanup
0094         self.taskBufferIF.trigger_cleanup_internal_datasets(taskSpec.jediTaskID)
0095         # end time
0096         taskSpec.endTime = naive_utcnow()
0097         # update task
0098         self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}, updateDEFT=True)
0099         # kill or kick child tasks
0100         if taskSpec.status in ["failed", "broken", "aborted"]:
0101             self.taskBufferIF.killChildTasks_JEDI(taskSpec.jediTaskID, taskSpec.status)
0102         else:
0103             self.taskBufferIF.kickChildTasks_JEDI(taskSpec.jediTaskID)
0104         tmpLog.debug(f"doBasicPostProcess done with taskStatus={taskSpec.status}")
0105         return
0106 
0107     # final procedure
0108     def doFinalProcedure(self, taskSpec, tmpLog):
0109         return self.SC_SUCCEEDED
0110 
0111     # send mail
0112     def sendMail(self, jediTaskID, fromAdd, toAdd, msgBody, nTry, fileBackUp, tmpLog):
0113         tmpLog.debug(f"sending notification to {toAdd}\n{msgBody}")
0114         for iTry in range(nTry):
0115             try:
0116                 stderrLog = StderrLogger(tmpLog)
0117                 smtpPort = smtpPortList[iTry % len(smtpPortList)]
0118                 server = MySMTP(panda_config.emailSMTPsrv, smtpPort)
0119                 server.set_debuglevel(1)
0120                 server.set_log(stderrLog)
0121                 server.ehlo()
0122                 server.starttls()
0123                 out = server.sendmail(fromAdd, toAdd, msgBody)
0124                 tmpLog.debug(str(out))
0125                 server.quit()
0126                 break
0127             except Exception as e:
0128                 if iTry + 1 < nTry:
0129                     # sleep for retry
0130                     tmpLog.debug(f"sleep {iTry} due to {str(e)}")
0131                     time.sleep(30)
0132                 else:
0133                     tmpLog.error(f"failed to send notification with {str(e)}")
0134                     if fileBackUp:
0135                         # write to file which is processed in add.py
0136                         mailFile = "{0}/jmail_{1}_{2}" % (panda_config.logdir, jediTaskID, uuid.uuid4())
0137                         oMail = open(mailFile, "w")
0138                         oMail.write(str(jediTaskID) + "\n" + toAdd + "\n" + msgBody)
0139                         oMail.close()
0140                 break
0141         try:
0142             server.reset_log()
0143         except Exception:
0144             pass
0145 
0146     # return email sender
0147     def senderAddress(self):
0148         return panda_config.emailSender
0149 
0150     # get task completeness
0151     def getTaskCompleteness(self, taskSpec):
0152         nFiles = 0
0153         nFilesFinished = 0
0154         totalInputEvents = 0
0155         totalOkEvents = 0
0156         for datasetSpec in taskSpec.datasetSpecList:
0157             if datasetSpec.isMasterInput():
0158                 if datasetSpec.status == "removed":
0159                     continue
0160                 nFiles += datasetSpec.nFiles
0161                 nFilesFinished += datasetSpec.nFilesFinished
0162                 try:
0163                     totalInputEvents += datasetSpec.nEvents
0164                 except Exception:
0165                     pass
0166                 try:
0167                     totalOkEvents += datasetSpec.nEventsUsed
0168                 except Exception:
0169                     pass
0170         # completeness
0171         if totalInputEvents != 0:
0172             taskCompleteness = float(totalOkEvents) / float(totalInputEvents) * 1000.0
0173         elif nFiles != 0:
0174             taskCompleteness = float(nFilesFinished) / float(nFiles) * 1000.0
0175         else:
0176             taskCompleteness = 0
0177         return nFiles, nFilesFinished, totalInputEvents, totalOkEvents, taskCompleteness
0178 
0179     # get final task status
0180     def getFinalTaskStatus(self, taskSpec, checkParent=True, checkGoal=False):
0181         # count nFiles and nEvents
0182         nFiles, nFilesFinished, totalInputEvents, totalOkEvents, taskCompleteness = self.getTaskCompleteness(taskSpec)
0183         # set new task status
0184         if taskSpec.status == "tobroken":
0185             status = "broken"
0186         elif taskSpec.status == "toabort":
0187             status = "aborted"
0188         elif taskSpec.status == "paused":
0189             status = "paused"
0190         elif self.failOnZeroOkFile and nFiles == nFilesFinished == 0:
0191             status = "failed"
0192         elif nFiles == nFilesFinished:
0193             # check parent status
0194             if checkParent and taskSpec.parent_tid not in [None, taskSpec.jediTaskID]:
0195                 parent_status = self.taskBufferIF.getTaskStatus_JEDI(taskSpec.parent_tid)
0196                 if parent_status in ["failed", "broken", "aborted"]:
0197                     status = "failed"
0198                 elif parent_status != "done":
0199                     status = "finished"
0200                 else:
0201                     # check if input is mutable
0202                     inputMutable = False
0203                     for datasetSpec in taskSpec.datasetSpecList:
0204                         if datasetSpec.isMasterInput() and datasetSpec.state == "mutable":
0205                             inputMutable = True
0206                             break
0207                     if inputMutable:
0208                         status = "finished"
0209                     else:
0210                         status = "done"
0211             else:
0212                 status = "done"
0213         elif nFilesFinished == 0:
0214             status = "failed"
0215         else:
0216             status = "finished"
0217         # task goal
0218         if taskSpec.goal is None:
0219             taskGoal = 1000
0220         else:
0221             taskGoal = taskSpec.goal
0222         # fail if goal is not reached
0223         if (
0224             taskSpec.failGoalUnreached()
0225             and status == "finished"
0226             and (not taskSpec.useExhausted() or (taskSpec.useExhausted() and taskSpec.status in ["passed"]))
0227         ):
0228             if taskCompleteness < taskGoal:
0229                 status = "failed"
0230         # HPO tasks always go to finished
0231         if taskSpec.is_hpo_workflow():
0232             event_stat = self.taskBufferIF.get_event_statistics(taskSpec.jediTaskID)
0233             if event_stat is not None and event_stat.get(EventServiceUtils.ST_finished):
0234                 status = "finished"
0235         # check goal only
0236         if checkGoal:
0237             # no goal
0238             if taskSpec.goal is not None and taskCompleteness >= taskGoal:
0239                 return True
0240             return False
0241         # return status
0242         return status
0243 
0244     # pre-check
0245     def doPreCheck(self, taskSpec, tmpLog):
0246         # send task to exhausted
0247         if (
0248             taskSpec.useExhausted()
0249             and taskSpec.status not in ["passed"]
0250             and self.getFinalTaskStatus(taskSpec) in ["finished"]
0251             and not self.getFinalTaskStatus(taskSpec, checkGoal=True)
0252         ):
0253             taskSpec.status = "exhausted"
0254             if self.getFinalTaskStatus(taskSpec, checkParent=False) == "done":
0255                 taskSpec.errorDialog = "exhausted since the parent task was incomplete and didn't reach the goal"
0256             else:
0257                 taskSpec.errorDialog = "exhausted since the task was incomplete and didn't reach the goal"
0258             taskSpec.lockedBy = None
0259             taskSpec.lockedTime = None
0260             # update task
0261             tmpLog.info(f"set task_status={taskSpec.status}")
0262             self.taskBufferIF.updateTask_JEDI(taskSpec, {"jediTaskID": taskSpec.jediTaskID}, updateDEFT=True)
0263             # kick child tasks
0264             self.taskBufferIF.kickChildTasks_JEDI(taskSpec.jediTaskID)
0265             return True
0266         return False
0267 
0268 
0269 Interaction.installSC(PostProcessorBase)