Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:59

0001 import datetime
0002 import random
0003 import re
0004 import sys
0005 import time
0006 import traceback
0007 from email.mime.multipart import MIMEMultipart
0008 from email.mime.text import MIMEText
0009 
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 
0012 from pandajedi.jedirefine import RefinerUtils
0013 from pandaserver.taskbuffer import EventServiceUtils
0014 
0015 from .MailTemplates import html_head, jedi_task_html_body, jedi_task_plain
0016 from .PostProcessorBase import PostProcessorBase
0017 
0018 
0019 def format_weight(weight):
0020     power = 1000
0021     n = 0
0022     power_labels = {0: "gCO2", 1: "kgCO2", 2: "tCO2", 3: "MtCO2", 4: "GtCO2"}
0023     while weight > power:
0024         weight /= power
0025         n += 1
0026 
0027     weight_str = f"{weight:.2f} {power_labels[n]}"
0028     return weight_str
0029 
0030 
0031 # post processor for ATLAS production
0032 class AtlasAnalPostProcessor(PostProcessorBase):
0033     # constructor
0034     def __init__(self, taskBufferIF, ddmIF):
0035         PostProcessorBase.__init__(self, taskBufferIF, ddmIF)
0036         self.taskParamMap = None
0037         self.user_container_lifetime = taskBufferIF.getConfigValue("user_output", "OUTPUT_CONTAINER_LIFETIME", "jedi")
0038         if not self.user_container_lifetime:
0039             self.user_container_lifetime = 14
0040         self.user_container_lifetime *= 24 * 60 * 60
0041 
0042     # main
0043     def doPostProcess(self, taskSpec, tmp_logger):
0044         # freeze datasets
0045         try:
0046             # get DDM I/F
0047             ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0048             # shuffle datasets
0049             random.shuffle(taskSpec.datasetSpecList)
0050             # loop over all datasets
0051             use_lib = False
0052             n_ok_lib = 0
0053             lock_update_time = naive_utcnow()
0054             done_containers = set()
0055             for datasetSpec in taskSpec.datasetSpecList:
0056                 # ignore template
0057                 if datasetSpec.type.startswith("tmpl_"):
0058                     continue
0059                 # only output, log or lib datasets
0060                 if not datasetSpec.type.endswith("log") and not datasetSpec.type.endswith("output") and not datasetSpec.type == "lib":
0061                     continue
0062                 # only user group, or panda dataset
0063                 if (
0064                     not datasetSpec.datasetName.startswith("user")
0065                     and not datasetSpec.datasetName.startswith("panda")
0066                     and not datasetSpec.datasetName.startswith("group")
0067                 ):
0068                     continue
0069                 # check if already closed
0070                 dataset_attrs = self.taskBufferIF.getDatasetAttributes_JEDI(datasetSpec.jediTaskID, datasetSpec.datasetID, ["state"])
0071                 if "state" in dataset_attrs and dataset_attrs["state"] == "closed":
0072                     tmp_logger.info(f"skip freezing closed datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0073                     closed_flag = True
0074                 else:
0075                     closed_flag = False
0076                 # remove wrong files
0077                 if not closed_flag and datasetSpec.type in ["output"]:
0078                     # get successful files
0079                     ok_files = self.taskBufferIF.getSuccessfulFiles_JEDI(datasetSpec.jediTaskID, datasetSpec.datasetID)
0080                     if ok_files is None:
0081                         tmp_logger.warning(f"failed to get successful files for {datasetSpec.datasetName}")
0082                         return self.SC_FAILED
0083                     # get files in dataset
0084                     ddm_files = ddmIF.getFilesInDataset(datasetSpec.datasetName, skipDuplicate=False)
0085                     tmp_logger.debug(
0086                         f"datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName} has {len(ok_files)} files in DB, {len(ddm_files)} files in DDM"
0087                     )
0088                     # check all files
0089                     to_delete = []
0090                     for tmpGUID, attMap in ddm_files.items():
0091                         if attMap["lfn"] not in ok_files:
0092                             did = {"scope": attMap["scope"], "name": attMap["lfn"]}
0093                             to_delete.append(did)
0094                             tmp_logger.debug(f"delete {attMap['lfn']} from {datasetSpec.datasetName}")
0095                     # delete
0096                     if to_delete:
0097                         ddmIF.deleteFilesFromDataset(datasetSpec.datasetName, to_delete)
0098 
0099                 # freeze datasets
0100                 if not closed_flag and not (datasetSpec.type.startswith("trn_") and datasetSpec.type not in ["trn_log"]):
0101                     tmp_logger.debug(f"freeze datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0102                     ddmIF.freezeDataset(datasetSpec.datasetName, ignoreUnknown=True)
0103                 else:
0104                     if datasetSpec.type.startswith("trn_") and datasetSpec.type not in ["trn_log"]:
0105                         tmp_logger.debug(f"skip freezing transient datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0106                 # update dataset
0107                 datasetSpec.state = "closed"
0108                 datasetSpec.stateCheckTime = naive_utcnow()
0109 
0110                 # check if build step was succeeded
0111                 if datasetSpec.type == "lib":
0112                     use_lib = True
0113                 else:
0114                     n_ok_lib += 1
0115                 # delete transient or empty datasets
0116                 if not closed_flag:
0117                     empty_only = True
0118                     if datasetSpec.type.startswith("trn_") and datasetSpec.type not in ["trn_log"]:
0119                         empty_only = False
0120                     retStr = ddmIF.deleteDataset(datasetSpec.datasetName, empty_only, ignoreUnknown=True)
0121                     tmp_logger.debug(retStr)
0122                 # extend lifetime
0123                 if datasetSpec.type in ["output"] and datasetSpec.datasetName.startswith("user"):
0124                     tmp_logger.debug(f"extend lifetime datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0125                     ddmIF.updateReplicationRules(
0126                         datasetSpec.datasetName, {"type=.+": {"lifetime": 14 * 24 * 60 * 60}, "(SCRATCH|USER)DISK": {"lifetime": 14 * 24 * 60 * 60}}
0127                     )
0128                     # extend container rules
0129                     if datasetSpec.containerName and datasetSpec.containerName not in done_containers:
0130                         tmp_logger.debug(f"extend lifetime container:Name={datasetSpec.containerName}")
0131                         ddmIF.updateReplicationRules(
0132                             datasetSpec.containerName,
0133                             {"type=.+": {"lifetime": self.user_container_lifetime}, "(SCRATCH|USER)DISK": {"lifetime": self.user_container_lifetime}},
0134                         )
0135                         done_containers.add(datasetSpec.containerName)
0136                 # update dataset in DB
0137                 self.taskBufferIF.updateDatasetAttributes_JEDI(
0138                     datasetSpec.jediTaskID, datasetSpec.datasetID, {"state": datasetSpec.state, "stateCheckTime": datasetSpec.stateCheckTime}
0139                 )
0140                 # update task lock
0141                 if naive_utcnow() - lock_update_time > datetime.timedelta(minutes=5):
0142                     lock_update_time = naive_utcnow()
0143                     # update lock
0144                     self.taskBufferIF.updateTaskLock_JEDI(taskSpec.jediTaskID)
0145             # dialog
0146             if use_lib and n_ok_lib == 0:
0147                 taskSpec.setErrDiag("No build jobs succeeded", True)
0148         except Exception:
0149             err_type, err_value = sys.exc_info()[:2]
0150             tmp_logger.warning(f"failed to freeze datasets with {err_type.__name__}:{err_value}")
0151         ret_val = self.SC_SUCCEEDED
0152         try:
0153             self.doBasicPostProcess(taskSpec, tmp_logger)
0154         except Exception:
0155             err_type, err_value = sys.exc_info()[:2]
0156             tmp_logger.error(f"doBasicPostProcess failed with {err_type.__name__}:{err_value}")
0157             ret_val = self.SC_FATAL
0158         return ret_val
0159 
0160     # final procedure
0161     def doFinalProcedure(self, taskSpec, tmp_logger):
0162         # check email address
0163         to_add = self.getEmail(taskSpec.userName, taskSpec.vo, tmp_logger)
0164 
0165         # calculate carbon footprint for the task
0166         try:
0167             carbon_footprint = self.taskBufferIF.get_task_carbon_footprint(taskSpec.jediTaskID, level="global")
0168             carbon_footprint_redacted = {}
0169             zero = "0 gCO2"
0170 
0171             for job_status in ["finished", "failed", "cancelled", "total"]:
0172                 if carbon_footprint and job_status in carbon_footprint:
0173                     carbon_footprint_redacted[job_status] = format_weight(carbon_footprint[job_status])
0174                 else:
0175                     carbon_footprint_redacted[job_status] = zero
0176         except Exception:
0177             carbon_footprint_redacted = {}
0178             err_type, err_value = sys.exc_info()[:2]
0179             tmp_logger.error(f"failed to calculate task carbon footprint {err_type.__name__}:{err_value}")
0180 
0181         # read task parameters
0182         try:
0183             task_parameters = self.taskBufferIF.getTaskParamsWithID_JEDI(taskSpec.jediTaskID)
0184             self.taskParamMap = RefinerUtils.decodeJSON(task_parameters)
0185         except Exception:
0186             err_type, err_value = sys.exc_info()[:2]
0187             tmp_logger.error(f"task param conversion from json failed with {err_type.__name__}:{err_value}")
0188         if to_add is None or (self.taskParamMap is not None and "noEmail" in self.taskParamMap and self.taskParamMap["noEmail"] is True):
0189             tmp_logger.debug("email notification is suppressed")
0190         else:
0191             try:
0192                 # send email notification
0193                 from_add = self.senderAddress()
0194                 html_text, plain_text, subject = self.compose_message(taskSpec, carbon_footprint_redacted)
0195                 msg = MIMEMultipart("alternative")
0196 
0197                 msg["Subject"] = subject
0198                 msg["From"] = from_add
0199                 msg["To"] = to_add
0200 
0201                 # Record the MIME types of both parts - text/plain and text/html.
0202                 part1 = MIMEText(plain_text, "plain")
0203                 part2 = MIMEText(html_text, "html")
0204 
0205                 # Attach parts into message container.
0206                 # According to RFC 2046, the last part of a multipart message, in this case
0207                 # the HTML message, is best and preferred.
0208                 msg.attach(part1)
0209                 msg.attach(part2)
0210                 self.sendMail(taskSpec.jediTaskID, from_add, to_add, msg.as_string(), 3, False, tmp_logger)
0211             except Exception:
0212                 tmp_logger.error(traceback.format_exc())
0213         return self.SC_SUCCEEDED
0214 
0215     # compose mail message
0216     def compose_message(self, taskSpec, carbon_footprint):
0217         # summary
0218         input_datasets = []
0219         output_datasets = []
0220         log_datasets = []
0221         n_total_jobs = 0
0222         n_succeeded_jobs = 0
0223         n_failed_jobs = 0
0224         n_cancelled_jobs = 0
0225 
0226         if not taskSpec.is_hpo_workflow():
0227             input_str = "Inputs"
0228             cancelled_str = "Cancelled  "
0229 
0230             for datasetSpec in taskSpec.datasetSpecList:
0231                 # dataset summary
0232                 if datasetSpec.type == "log":
0233                     if datasetSpec.containerName not in log_datasets:
0234                         log_datasets.append(datasetSpec.containerName)
0235                 elif datasetSpec.type == "input":
0236                     if datasetSpec.containerName not in input_datasets:
0237                         input_datasets.append(datasetSpec.containerName)
0238                 elif datasetSpec.type == "output":
0239                     if datasetSpec.containerName not in output_datasets:
0240                         output_datasets.append(datasetSpec.containerName)
0241                 # process summary
0242                 if datasetSpec.isMasterInput():
0243                     if datasetSpec.status == "removed":
0244                         continue
0245                     try:
0246                         n_total_jobs += datasetSpec.nFiles
0247                         n_succeeded_jobs += datasetSpec.nFilesFinished
0248                         n_failed_jobs += datasetSpec.nFilesFailed
0249                     except Exception:
0250                         pass
0251         else:
0252             input_str = "Points"
0253             cancelled_str = "Unprocessed"
0254             n_total_jobs = taskSpec.get_total_num_jobs()
0255             event_stat = self.taskBufferIF.get_event_statistics(taskSpec.jediTaskID)
0256             if event_stat is not None:
0257                 n_succeeded_jobs = event_stat.get(EventServiceUtils.ST_finished, 0)
0258                 n_failed_jobs = event_stat.get(EventServiceUtils.ST_failed, 0)
0259         try:
0260             n_cancelled_jobs = n_total_jobs - n_succeeded_jobs - n_failed_jobs
0261         except Exception:
0262             pass
0263         if n_succeeded_jobs == n_total_jobs:
0264             msg_succeeded = "All Succeeded"
0265         else:
0266             msg_succeeded = "Succeeded"
0267         input_datasets.sort()
0268         output_datasets.sort()
0269         log_datasets.sort()
0270         dataset_summary = ""
0271         for tmpDS in input_datasets:
0272             dataset_summary += f"In  : {tmpDS}\n"
0273         for tmpDS in output_datasets:
0274             dataset_summary += f"Out : {tmpDS}\n"
0275         for tmpDS in log_datasets:
0276             dataset_summary += f"Log : {tmpDS}\n"
0277         dataset_summary = dataset_summary[:-1]
0278 
0279         # CLI param
0280         if "cliParams" in self.taskParamMap:
0281             cli_parameters = self.taskParamMap["cliParams"]
0282         else:
0283             cli_parameters = None
0284 
0285         # make message
0286         head = html_head.format(title="Task summary notification")
0287         body = jedi_task_html_body.format(
0288             jedi_task_id=taskSpec.jediTaskID,
0289             creation_time=taskSpec.creationDate,
0290             end_time=taskSpec.endTime,
0291             task_status=taskSpec.status,
0292             error_dialog=self.removeTags(taskSpec.errorDialog),
0293             command=cli_parameters,
0294             n_total=n_total_jobs,
0295             n_succeeded=n_succeeded_jobs,
0296             n_failed=n_failed_jobs,
0297             n_cancelled=n_cancelled_jobs,
0298             carbon_succeeded=carbon_footprint["finished"],
0299             carbon_failed=carbon_footprint["failed"],
0300             carbon_cancelled=carbon_footprint["cancelled"],
0301             carbon_total=carbon_footprint["total"],
0302             datasets_in=input_datasets,
0303             datasets_out=output_datasets,
0304             datasets_log=log_datasets,
0305             msg_succeeded=msg_succeeded,
0306             input_str=input_str,
0307             cancelled_str=cancelled_str,
0308         )
0309         message_html = head + body
0310 
0311         message_plain = jedi_task_plain.format(
0312             jedi_task_id=taskSpec.jediTaskID,
0313             creation_time=taskSpec.creationDate,
0314             end_time=taskSpec.endTime,
0315             task_status=taskSpec.status,
0316             error_dialog=self.removeTags(taskSpec.errorDialog),
0317             command=cli_parameters,
0318             n_total=n_total_jobs,
0319             n_succeeded=n_succeeded_jobs,
0320             n_failed=n_failed_jobs,
0321             n_cancelled=n_cancelled_jobs,
0322             carbon_succeeded=carbon_footprint["finished"],
0323             carbon_failed=carbon_footprint["failed"],
0324             carbon_cancelled=carbon_footprint["cancelled"],
0325             carbon_total=carbon_footprint["total"],
0326             dataset_summary=dataset_summary,
0327             msg_succeeded=msg_succeeded,
0328             input_str=input_str,
0329             cancelled_str=cancelled_str,
0330         )
0331 
0332         subject = f"JEDI notification for TaskID:{taskSpec.jediTaskID} ({n_succeeded_jobs}/{n_total_jobs} {msg_succeeded})"
0333 
0334         # return
0335         return message_html, message_plain, subject
0336 
0337     # get email
0338     def getEmail(self, user_name, vo, tmp_logger):
0339         # return to suppress mail
0340         ret_suppressed = None
0341         # get DN
0342         tmp_logger.debug(f"getting email for {user_name}")
0343 
0344         # get email from PANDAMETA DB
0345         mail_address_db, dn, db_uptime = self.taskBufferIF.getEmailAddr(user_name, withDN=True)
0346         tmp_logger.debug(f"email from MetaDB : {mail_address_db}")
0347         # email notification is suppressed
0348         not_send_mail = False
0349         if mail_address_db is not None and mail_address_db.startswith("notsend"):
0350             not_send_mail = True
0351         # DN is unavailable
0352         if dn in ["", None]:
0353             # there will be no email
0354             tmp_logger.debug("DN is empty")
0355         else:
0356             # avoid too frequent lookup
0357             if db_uptime is not None and naive_utcnow() - db_uptime < datetime.timedelta(hours=1):
0358                 tmp_logger.debug("no lookup")
0359                 if not_send_mail or mail_address_db in [None, ""]:
0360                     return ret_suppressed
0361                 else:
0362                     return mail_address_db.split(":")[-1]
0363             else:
0364                 # get email from DQ2
0365                 tmp_logger.debug(f"getting email using dq2Info.finger({dn})")
0366                 n_tries = 3
0367                 for iDDMTry in range(n_tries):
0368                     try:
0369                         user_info = self.ddmIF.getInterface(vo).finger(dn)
0370                         mail_address = user_info["email"]
0371                         tmp_logger.debug(f"email from DQ2 : {mail_address}")
0372                         if mail_address is None:
0373                             mail_address = ""
0374                         # make email field to update DB
0375                         mail_addr_to_db = ""
0376                         if not_send_mail:
0377                             mail_addr_to_db += "notsend:"
0378                         mail_addr_to_db += mail_address
0379                         # update database
0380                         tmp_logger.debug(f"update email to {mail_addr_to_db}")
0381                         self.taskBufferIF.setEmailAddr(user_name, mail_addr_to_db)
0382 
0383                         if not_send_mail or mail_address == "":
0384                             return ret_suppressed
0385                         return mail_address
0386                     except Exception:
0387                         if iDDMTry + 1 < n_tries:
0388                             tmp_logger.debug(f"sleep for retry {iDDMTry}/{n_tries}")
0389                             time.sleep(10)
0390                         else:
0391                             err_type, err_value = sys.exc_info()[:2]
0392                             tmp_logger.error(f"{err_type}:{err_value}")
0393         # not send email
0394         return ret_suppressed
0395 
0396     # remove tags
0397     def removeTags(self, tmp_str):
0398         try:
0399             if tmp_str is not None:
0400                 tmp_str = re.sub(">[^<]+<", "><", tmp_str)
0401                 tmp_str = re.sub("<[^<]+>", "", tmp_str)
0402         except Exception:
0403             pass
0404         return tmp_str