File indexing completed on 2026-04-10 08:38:59
0001 import datetime
0002 import random
0003 import re
0004 import sys
0005 import time
0006 import traceback
0007 from email.mime.multipart import MIMEMultipart
0008 from email.mime.text import MIMEText
0009
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011
0012 from pandajedi.jedirefine import RefinerUtils
0013 from pandaserver.taskbuffer import EventServiceUtils
0014
0015 from .MailTemplates import html_head, jedi_task_html_body, jedi_task_plain
0016 from .PostProcessorBase import PostProcessorBase
0017
0018
0019 def format_weight(weight):
0020 power = 1000
0021 n = 0
0022 power_labels = {0: "gCO2", 1: "kgCO2", 2: "tCO2", 3: "MtCO2", 4: "GtCO2"}
0023 while weight > power:
0024 weight /= power
0025 n += 1
0026
0027 weight_str = f"{weight:.2f} {power_labels[n]}"
0028 return weight_str
0029
0030
0031
0032 class AtlasAnalPostProcessor(PostProcessorBase):
0033
0034 def __init__(self, taskBufferIF, ddmIF):
0035 PostProcessorBase.__init__(self, taskBufferIF, ddmIF)
0036 self.taskParamMap = None
0037 self.user_container_lifetime = taskBufferIF.getConfigValue("user_output", "OUTPUT_CONTAINER_LIFETIME", "jedi")
0038 if not self.user_container_lifetime:
0039 self.user_container_lifetime = 14
0040 self.user_container_lifetime *= 24 * 60 * 60
0041
0042
0043 def doPostProcess(self, taskSpec, tmp_logger):
0044
0045 try:
0046
0047 ddmIF = self.ddmIF.getInterface(taskSpec.vo)
0048
0049 random.shuffle(taskSpec.datasetSpecList)
0050
0051 use_lib = False
0052 n_ok_lib = 0
0053 lock_update_time = naive_utcnow()
0054 done_containers = set()
0055 for datasetSpec in taskSpec.datasetSpecList:
0056
0057 if datasetSpec.type.startswith("tmpl_"):
0058 continue
0059
0060 if not datasetSpec.type.endswith("log") and not datasetSpec.type.endswith("output") and not datasetSpec.type == "lib":
0061 continue
0062
0063 if (
0064 not datasetSpec.datasetName.startswith("user")
0065 and not datasetSpec.datasetName.startswith("panda")
0066 and not datasetSpec.datasetName.startswith("group")
0067 ):
0068 continue
0069
0070 dataset_attrs = self.taskBufferIF.getDatasetAttributes_JEDI(datasetSpec.jediTaskID, datasetSpec.datasetID, ["state"])
0071 if "state" in dataset_attrs and dataset_attrs["state"] == "closed":
0072 tmp_logger.info(f"skip freezing closed datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0073 closed_flag = True
0074 else:
0075 closed_flag = False
0076
0077 if not closed_flag and datasetSpec.type in ["output"]:
0078
0079 ok_files = self.taskBufferIF.getSuccessfulFiles_JEDI(datasetSpec.jediTaskID, datasetSpec.datasetID)
0080 if ok_files is None:
0081 tmp_logger.warning(f"failed to get successful files for {datasetSpec.datasetName}")
0082 return self.SC_FAILED
0083
0084 ddm_files = ddmIF.getFilesInDataset(datasetSpec.datasetName, skipDuplicate=False)
0085 tmp_logger.debug(
0086 f"datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName} has {len(ok_files)} files in DB, {len(ddm_files)} files in DDM"
0087 )
0088
0089 to_delete = []
0090 for tmpGUID, attMap in ddm_files.items():
0091 if attMap["lfn"] not in ok_files:
0092 did = {"scope": attMap["scope"], "name": attMap["lfn"]}
0093 to_delete.append(did)
0094 tmp_logger.debug(f"delete {attMap['lfn']} from {datasetSpec.datasetName}")
0095
0096 if to_delete:
0097 ddmIF.deleteFilesFromDataset(datasetSpec.datasetName, to_delete)
0098
0099
0100 if not closed_flag and not (datasetSpec.type.startswith("trn_") and datasetSpec.type not in ["trn_log"]):
0101 tmp_logger.debug(f"freeze datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0102 ddmIF.freezeDataset(datasetSpec.datasetName, ignoreUnknown=True)
0103 else:
0104 if datasetSpec.type.startswith("trn_") and datasetSpec.type not in ["trn_log"]:
0105 tmp_logger.debug(f"skip freezing transient datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0106
0107 datasetSpec.state = "closed"
0108 datasetSpec.stateCheckTime = naive_utcnow()
0109
0110
0111 if datasetSpec.type == "lib":
0112 use_lib = True
0113 else:
0114 n_ok_lib += 1
0115
0116 if not closed_flag:
0117 empty_only = True
0118 if datasetSpec.type.startswith("trn_") and datasetSpec.type not in ["trn_log"]:
0119 empty_only = False
0120 retStr = ddmIF.deleteDataset(datasetSpec.datasetName, empty_only, ignoreUnknown=True)
0121 tmp_logger.debug(retStr)
0122
0123 if datasetSpec.type in ["output"] and datasetSpec.datasetName.startswith("user"):
0124 tmp_logger.debug(f"extend lifetime datasetID={datasetSpec.datasetID}:Name={datasetSpec.datasetName}")
0125 ddmIF.updateReplicationRules(
0126 datasetSpec.datasetName, {"type=.+": {"lifetime": 14 * 24 * 60 * 60}, "(SCRATCH|USER)DISK": {"lifetime": 14 * 24 * 60 * 60}}
0127 )
0128
0129 if datasetSpec.containerName and datasetSpec.containerName not in done_containers:
0130 tmp_logger.debug(f"extend lifetime container:Name={datasetSpec.containerName}")
0131 ddmIF.updateReplicationRules(
0132 datasetSpec.containerName,
0133 {"type=.+": {"lifetime": self.user_container_lifetime}, "(SCRATCH|USER)DISK": {"lifetime": self.user_container_lifetime}},
0134 )
0135 done_containers.add(datasetSpec.containerName)
0136
0137 self.taskBufferIF.updateDatasetAttributes_JEDI(
0138 datasetSpec.jediTaskID, datasetSpec.datasetID, {"state": datasetSpec.state, "stateCheckTime": datasetSpec.stateCheckTime}
0139 )
0140
0141 if naive_utcnow() - lock_update_time > datetime.timedelta(minutes=5):
0142 lock_update_time = naive_utcnow()
0143
0144 self.taskBufferIF.updateTaskLock_JEDI(taskSpec.jediTaskID)
0145
0146 if use_lib and n_ok_lib == 0:
0147 taskSpec.setErrDiag("No build jobs succeeded", True)
0148 except Exception:
0149 err_type, err_value = sys.exc_info()[:2]
0150 tmp_logger.warning(f"failed to freeze datasets with {err_type.__name__}:{err_value}")
0151 ret_val = self.SC_SUCCEEDED
0152 try:
0153 self.doBasicPostProcess(taskSpec, tmp_logger)
0154 except Exception:
0155 err_type, err_value = sys.exc_info()[:2]
0156 tmp_logger.error(f"doBasicPostProcess failed with {err_type.__name__}:{err_value}")
0157 ret_val = self.SC_FATAL
0158 return ret_val
0159
0160
0161 def doFinalProcedure(self, taskSpec, tmp_logger):
0162
0163 to_add = self.getEmail(taskSpec.userName, taskSpec.vo, tmp_logger)
0164
0165
0166 try:
0167 carbon_footprint = self.taskBufferIF.get_task_carbon_footprint(taskSpec.jediTaskID, level="global")
0168 carbon_footprint_redacted = {}
0169 zero = "0 gCO2"
0170
0171 for job_status in ["finished", "failed", "cancelled", "total"]:
0172 if carbon_footprint and job_status in carbon_footprint:
0173 carbon_footprint_redacted[job_status] = format_weight(carbon_footprint[job_status])
0174 else:
0175 carbon_footprint_redacted[job_status] = zero
0176 except Exception:
0177 carbon_footprint_redacted = {}
0178 err_type, err_value = sys.exc_info()[:2]
0179 tmp_logger.error(f"failed to calculate task carbon footprint {err_type.__name__}:{err_value}")
0180
0181
0182 try:
0183 task_parameters = self.taskBufferIF.getTaskParamsWithID_JEDI(taskSpec.jediTaskID)
0184 self.taskParamMap = RefinerUtils.decodeJSON(task_parameters)
0185 except Exception:
0186 err_type, err_value = sys.exc_info()[:2]
0187 tmp_logger.error(f"task param conversion from json failed with {err_type.__name__}:{err_value}")
0188 if to_add is None or (self.taskParamMap is not None and "noEmail" in self.taskParamMap and self.taskParamMap["noEmail"] is True):
0189 tmp_logger.debug("email notification is suppressed")
0190 else:
0191 try:
0192
0193 from_add = self.senderAddress()
0194 html_text, plain_text, subject = self.compose_message(taskSpec, carbon_footprint_redacted)
0195 msg = MIMEMultipart("alternative")
0196
0197 msg["Subject"] = subject
0198 msg["From"] = from_add
0199 msg["To"] = to_add
0200
0201
0202 part1 = MIMEText(plain_text, "plain")
0203 part2 = MIMEText(html_text, "html")
0204
0205
0206
0207
0208 msg.attach(part1)
0209 msg.attach(part2)
0210 self.sendMail(taskSpec.jediTaskID, from_add, to_add, msg.as_string(), 3, False, tmp_logger)
0211 except Exception:
0212 tmp_logger.error(traceback.format_exc())
0213 return self.SC_SUCCEEDED
0214
0215
0216 def compose_message(self, taskSpec, carbon_footprint):
0217
0218 input_datasets = []
0219 output_datasets = []
0220 log_datasets = []
0221 n_total_jobs = 0
0222 n_succeeded_jobs = 0
0223 n_failed_jobs = 0
0224 n_cancelled_jobs = 0
0225
0226 if not taskSpec.is_hpo_workflow():
0227 input_str = "Inputs"
0228 cancelled_str = "Cancelled "
0229
0230 for datasetSpec in taskSpec.datasetSpecList:
0231
0232 if datasetSpec.type == "log":
0233 if datasetSpec.containerName not in log_datasets:
0234 log_datasets.append(datasetSpec.containerName)
0235 elif datasetSpec.type == "input":
0236 if datasetSpec.containerName not in input_datasets:
0237 input_datasets.append(datasetSpec.containerName)
0238 elif datasetSpec.type == "output":
0239 if datasetSpec.containerName not in output_datasets:
0240 output_datasets.append(datasetSpec.containerName)
0241
0242 if datasetSpec.isMasterInput():
0243 if datasetSpec.status == "removed":
0244 continue
0245 try:
0246 n_total_jobs += datasetSpec.nFiles
0247 n_succeeded_jobs += datasetSpec.nFilesFinished
0248 n_failed_jobs += datasetSpec.nFilesFailed
0249 except Exception:
0250 pass
0251 else:
0252 input_str = "Points"
0253 cancelled_str = "Unprocessed"
0254 n_total_jobs = taskSpec.get_total_num_jobs()
0255 event_stat = self.taskBufferIF.get_event_statistics(taskSpec.jediTaskID)
0256 if event_stat is not None:
0257 n_succeeded_jobs = event_stat.get(EventServiceUtils.ST_finished, 0)
0258 n_failed_jobs = event_stat.get(EventServiceUtils.ST_failed, 0)
0259 try:
0260 n_cancelled_jobs = n_total_jobs - n_succeeded_jobs - n_failed_jobs
0261 except Exception:
0262 pass
0263 if n_succeeded_jobs == n_total_jobs:
0264 msg_succeeded = "All Succeeded"
0265 else:
0266 msg_succeeded = "Succeeded"
0267 input_datasets.sort()
0268 output_datasets.sort()
0269 log_datasets.sort()
0270 dataset_summary = ""
0271 for tmpDS in input_datasets:
0272 dataset_summary += f"In : {tmpDS}\n"
0273 for tmpDS in output_datasets:
0274 dataset_summary += f"Out : {tmpDS}\n"
0275 for tmpDS in log_datasets:
0276 dataset_summary += f"Log : {tmpDS}\n"
0277 dataset_summary = dataset_summary[:-1]
0278
0279
0280 if "cliParams" in self.taskParamMap:
0281 cli_parameters = self.taskParamMap["cliParams"]
0282 else:
0283 cli_parameters = None
0284
0285
0286 head = html_head.format(title="Task summary notification")
0287 body = jedi_task_html_body.format(
0288 jedi_task_id=taskSpec.jediTaskID,
0289 creation_time=taskSpec.creationDate,
0290 end_time=taskSpec.endTime,
0291 task_status=taskSpec.status,
0292 error_dialog=self.removeTags(taskSpec.errorDialog),
0293 command=cli_parameters,
0294 n_total=n_total_jobs,
0295 n_succeeded=n_succeeded_jobs,
0296 n_failed=n_failed_jobs,
0297 n_cancelled=n_cancelled_jobs,
0298 carbon_succeeded=carbon_footprint["finished"],
0299 carbon_failed=carbon_footprint["failed"],
0300 carbon_cancelled=carbon_footprint["cancelled"],
0301 carbon_total=carbon_footprint["total"],
0302 datasets_in=input_datasets,
0303 datasets_out=output_datasets,
0304 datasets_log=log_datasets,
0305 msg_succeeded=msg_succeeded,
0306 input_str=input_str,
0307 cancelled_str=cancelled_str,
0308 )
0309 message_html = head + body
0310
0311 message_plain = jedi_task_plain.format(
0312 jedi_task_id=taskSpec.jediTaskID,
0313 creation_time=taskSpec.creationDate,
0314 end_time=taskSpec.endTime,
0315 task_status=taskSpec.status,
0316 error_dialog=self.removeTags(taskSpec.errorDialog),
0317 command=cli_parameters,
0318 n_total=n_total_jobs,
0319 n_succeeded=n_succeeded_jobs,
0320 n_failed=n_failed_jobs,
0321 n_cancelled=n_cancelled_jobs,
0322 carbon_succeeded=carbon_footprint["finished"],
0323 carbon_failed=carbon_footprint["failed"],
0324 carbon_cancelled=carbon_footprint["cancelled"],
0325 carbon_total=carbon_footprint["total"],
0326 dataset_summary=dataset_summary,
0327 msg_succeeded=msg_succeeded,
0328 input_str=input_str,
0329 cancelled_str=cancelled_str,
0330 )
0331
0332 subject = f"JEDI notification for TaskID:{taskSpec.jediTaskID} ({n_succeeded_jobs}/{n_total_jobs} {msg_succeeded})"
0333
0334
0335 return message_html, message_plain, subject
0336
0337
0338 def getEmail(self, user_name, vo, tmp_logger):
0339
0340 ret_suppressed = None
0341
0342 tmp_logger.debug(f"getting email for {user_name}")
0343
0344
0345 mail_address_db, dn, db_uptime = self.taskBufferIF.getEmailAddr(user_name, withDN=True)
0346 tmp_logger.debug(f"email from MetaDB : {mail_address_db}")
0347
0348 not_send_mail = False
0349 if mail_address_db is not None and mail_address_db.startswith("notsend"):
0350 not_send_mail = True
0351
0352 if dn in ["", None]:
0353
0354 tmp_logger.debug("DN is empty")
0355 else:
0356
0357 if db_uptime is not None and naive_utcnow() - db_uptime < datetime.timedelta(hours=1):
0358 tmp_logger.debug("no lookup")
0359 if not_send_mail or mail_address_db in [None, ""]:
0360 return ret_suppressed
0361 else:
0362 return mail_address_db.split(":")[-1]
0363 else:
0364
0365 tmp_logger.debug(f"getting email using dq2Info.finger({dn})")
0366 n_tries = 3
0367 for iDDMTry in range(n_tries):
0368 try:
0369 user_info = self.ddmIF.getInterface(vo).finger(dn)
0370 mail_address = user_info["email"]
0371 tmp_logger.debug(f"email from DQ2 : {mail_address}")
0372 if mail_address is None:
0373 mail_address = ""
0374
0375 mail_addr_to_db = ""
0376 if not_send_mail:
0377 mail_addr_to_db += "notsend:"
0378 mail_addr_to_db += mail_address
0379
0380 tmp_logger.debug(f"update email to {mail_addr_to_db}")
0381 self.taskBufferIF.setEmailAddr(user_name, mail_addr_to_db)
0382
0383 if not_send_mail or mail_address == "":
0384 return ret_suppressed
0385 return mail_address
0386 except Exception:
0387 if iDDMTry + 1 < n_tries:
0388 tmp_logger.debug(f"sleep for retry {iDDMTry}/{n_tries}")
0389 time.sleep(10)
0390 else:
0391 err_type, err_value = sys.exc_info()[:2]
0392 tmp_logger.error(f"{err_type}:{err_value}")
0393
0394 return ret_suppressed
0395
0396
0397 def removeTags(self, tmp_str):
0398 try:
0399 if tmp_str is not None:
0400 tmp_str = re.sub(">[^<]+<", "><", tmp_str)
0401 tmp_str = re.sub("<[^<]+>", "", tmp_str)
0402 except Exception:
0403 pass
0404 return tmp_str