Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 """
0002 Plugin class for adding data to datasets in ATLAS.
0003 Inherits from AdderPluginBase.
0004 
0005 """
0006 
0007 import datetime
0008 import gc
0009 import re
0010 import sys
0011 import time
0012 import traceback
0013 from typing import Dict, List
0014 
0015 from pandacommon.pandautils.PandaUtils import naive_utcnow
0016 from rucio.common.exception import (
0017     DataIdentifierNotFound,
0018     FileConsistencyMismatch,
0019     InsufficientAccountLimit,
0020     InvalidObject,
0021     InvalidPath,
0022     InvalidRSEExpression,
0023     RSENotFound,
0024     RSEProtocolNotSupported,
0025     UnsupportedOperation,
0026 )
0027 
0028 from pandaserver.config import panda_config
0029 from pandaserver.dataservice import DataServiceUtils, ErrorCode
0030 from pandaserver.dataservice.adder_plugin_base import AdderPluginBase
0031 from pandaserver.dataservice.DataServiceUtils import select_scope
0032 from pandaserver.dataservice.ddm import rucioAPI
0033 from pandaserver.srvcore.MailUtils import MailUtils
0034 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0035 
0036 
0037 class AdderAtlasPlugin(AdderPluginBase):
0038     """
0039     Plugin class for adding data to datasets in ATLAS.
0040     Inherits from AdderPluginBase.
0041     """
0042 
0043     # constructor
0044     def __init__(self, job, **params):
0045         """
0046         Constructor for AdderAtlasPlugin.
0047 
0048         Args:
0049             job: The job object containing job details.
0050             **params: Additional parameters.
0051         """
0052         AdderPluginBase.__init__(self, job, params)
0053         self.job_id = self.job.PandaID
0054         self.job_status = self.job.jobStatus
0055         self.dataset_map = {}
0056         self.add_to_top_only = False
0057         self.go_to_transferring = False
0058         self.log_transferring = False
0059         self.subscription_map = {}
0060         self.go_to_merging = False
0061 
0062     # main
0063     def execute(self) -> None:
0064         """
0065         Main execution method for the plugin.
0066         Handles the logic for adding files to datasets based on job status and other conditions.
0067         """
0068         try:
0069             self.logger.debug(f"start plugin : {self.job_status}")
0070             # add files only to top-level datasets for transferring jobs
0071             if self.job.jobStatus == "transferring":
0072                 self.add_to_top_only = True
0073                 self.logger.debug("adder for transferring")
0074             self.check_output_file_metadata()
0075             # check if the job goes to merging
0076             if self.job.produceUnMerge():
0077                 self.go_to_merging = True
0078             # check if the job should go to transferring
0079             src_site_spec = self.siteMapper.getSite(self.job.computingSite)
0080             _, scope_src_site_spec_output = select_scope(src_site_spec, self.job.prodSourceLabel, self.job.job_label)
0081             tmp_src_ddm = src_site_spec.ddm_output[scope_src_site_spec_output]
0082             if self.job.prodSourceLabel == "user" and self.job.destinationSE not in self.siteMapper.siteSpecList:
0083                 # DQ2 ID was set by using --destSE for analysis job to transfer output
0084                 tmp_dst_ddm = self.job.destinationSE
0085             else:
0086                 dst_site_spec = self.siteMapper.getSite(self.job.destinationSE)
0087                 _, scope_dst_site_spec_output = select_scope(dst_site_spec, self.job.prodSourceLabel, self.job.job_label)
0088                 tmp_dst_ddm = dst_site_spec.ddm_output[scope_dst_site_spec_output]
0089                 # protection against disappearance of dest from schedconfig
0090                 if self.job.destinationSE not in ["NULL", None] and not self.siteMapper.checkSite(self.job.destinationSE) and self.job.destinationSE != "local":
0091                     self.job.ddmErrorCode = ErrorCode.EC_Adder
0092                     self.job.ddmErrorDiag = f"destinationSE {self.job.destinationSE} is unknown in schedconfig"
0093                     self.logger.error(f"{self.job.ddmErrorDiag}")
0094                     # set fatal error code and return
0095                     self.result.set_fatal()
0096                     return
0097 
0098             # protection against disappearance of src from schedconfig
0099             if not self.siteMapper.checkSite(self.job.computingSite):
0100                 self.job.ddmErrorCode = ErrorCode.EC_Adder
0101                 self.job.ddmErrorDiag = f"computingSite {self.job.computingSite} is unknown in schedconfig"
0102                 self.logger.error(f"{self.job.ddmErrorDiag}")
0103                 # set fatal error code and return
0104                 self.result.set_fatal()
0105                 return
0106             # check if the job has something to transfer
0107             self.logger.debug(f"alt stage-out:{str(self.job.altStgOutFileList())}")
0108             something_to_transfer = False
0109             for file in self.job.Files:
0110                 if file.type in {"output", "log"}:
0111                     if file.status == "nooutput":
0112                         continue
0113                     if DataServiceUtils.getDistributedDestination(file.destinationDBlockToken) is None and file.lfn not in self.job.altStgOutFileList():
0114                         something_to_transfer = True
0115                         break
0116             self.logger.debug(f"DDM src:{tmp_src_ddm} dst:{tmp_dst_ddm}")
0117             job_type = JobUtils.translate_prodsourcelabel_to_jobtype(src_site_spec.type, self.job.prodSourceLabel)
0118             if re.search("^ANALY_", self.job.computingSite) is not None:
0119                 # analysis site. Should be obsoleted by the next check
0120                 pass
0121             elif job_type == JobUtils.ANALY_PS:
0122                 # analysis job
0123                 pass
0124             elif self.job.computingSite == self.job.destinationSE:
0125                 # same site ID for computingSite and destinationSE
0126                 pass
0127             elif tmp_src_ddm == tmp_dst_ddm:
0128                 # same DDMID for src/dest
0129                 pass
0130             elif self.job.destinationSE in ["NULL", None]:
0131                 # jobs stayed without destinationSE, e.g. there is no job output
0132                 pass
0133             elif self.add_to_top_only:
0134                 # already in transferring
0135                 pass
0136             elif self.go_to_merging:
0137                 # no transferring for merging
0138                 pass
0139             elif self.job.jobStatus == "failed":
0140                 # failed jobs
0141                 if self.job.prodSourceLabel in ["managed", "test"]:
0142                     self.log_transferring = True
0143             elif (
0144                 self.job.jobStatus == "finished"
0145                 and (EventServiceUtils.isEventServiceJob(self.job) or EventServiceUtils.isJumboJob(self.job))
0146                 and not EventServiceUtils.isJobCloningJob(self.job)
0147             ):
0148                 # transfer only log file for normal ES jobs
0149                 self.log_transferring = True
0150             elif not something_to_transfer:
0151                 # nothing to transfer
0152                 pass
0153             else:
0154                 self.go_to_transferring = True
0155             self.logger.debug(f"somethingToTransfer={something_to_transfer}")
0156             self.logger.debug(f"goToTransferring={self.go_to_transferring}")
0157             self.logger.debug(f"logTransferring={self.log_transferring}")
0158             self.logger.debug(f"goToMerging={self.go_to_merging}")
0159             self.logger.debug(f"addToTopOnly={self.add_to_top_only}")
0160             return_output = self.update_outputs()
0161             self.logger.debug(f"added outputs with {return_output}")
0162             if return_output != 0:
0163                 self.logger.debug("terminated when adding")
0164                 return
0165             # succeeded
0166             self.result.set_succeeded()
0167             self.logger.debug("end plugin")
0168         except Exception:
0169             error_type, error_value = sys.exc_info()[:2]
0170             err_str = f"execute() : {error_type} {error_value}"
0171             err_str += traceback.format_exc()
0172             self.logger.debug(err_str)
0173             # set fatal error code
0174             self.result.set_fatal()
0175         # return
0176         return
0177 
0178     # check output file metadata
0179     def check_output_file_metadata(self):
0180         """
0181         Check the metadata of output files for consistency and change the job status if inconsistencies are found.
0182         """
0183         # check only successful jobs
0184         if self.job_status in ["failed", "cancelled", "closed"]:
0185             return
0186         # get the zip file map
0187         zip_file_map = self.job.getZipFileMap()
0188         # check only output files
0189         for file in self.job.Files:
0190             is_zip_file = file.lfn in zip_file_map
0191             # check num events and GUID
0192             if file.type == "output" and not is_zip_file and not self.add_to_top_only and self.job.prodSourceLabel == "managed":
0193                 if file.lfn not in self.extra_info["nevents"]:
0194                     to_skip = False
0195                     # exclude some formats
0196                     for patt in ["TXT", "NTUP", "HIST"]:
0197                         if file.lfn.startswith(patt):
0198                             to_skip = True
0199                     if not to_skip:
0200                         err_msg = f"nEvents is missing in jobReport for {file.lfn}"
0201                         self.logger.error(err_msg)
0202                         self.job.ddmErrorCode = ErrorCode.EC_MissingNumEvents
0203                         self.job.ddmErrorDiag = err_msg
0204                         self.job.jobStatus = "failed"
0205                         self.job_status = "failed"
0206                         return
0207                 if file.lfn not in self.extra_info["guid"] or file.GUID != self.extra_info["guid"][file.lfn]:
0208                     self.logger.debug(f"extra_info = {str(self.extra_info)}")
0209                     err_msg = f"GUID is inconsistent between jobReport and pilot report for {file.lfn}"
0210                     self.logger.error(err_msg)
0211                     self.job.ddmErrorCode = ErrorCode.EC_InconsistentGUID
0212                     self.job.ddmErrorDiag = err_msg
0213                     self.job.jobStatus = "failed"
0214                     self.job_status = "failed"
0215                     return
0216 
0217     # update output files
0218     def update_outputs(self):
0219         """
0220         Update output files for the job.
0221         Handles the logic for adding files to datasets and registering them with Rucio.
0222 
0223         Returns:
0224             int: 0 if successful, 1 if there was an error.
0225         """
0226         # return if non-DQ2
0227         if self.job.destinationSE == "local":
0228             return 0
0229 
0230         # Get the computingsite spec and scope
0231         src_site_spec = self.siteMapper.getSite(self.job.computingSite)
0232         _, scope_src_site_spec_output = select_scope(src_site_spec, self.job.prodSourceLabel, self.job.job_label)
0233 
0234         # Get the zip file map
0235         zip_file_map = self.job.getZipFileMap()
0236 
0237         # Get campaign and nEvents input
0238         campaign = None
0239         n_events_input = {}
0240 
0241         if self.job.jediTaskID not in [0, None, "NULL"]:
0242             tmp_ret = self.taskBuffer.getTaskAttributesPanda(self.job.jediTaskID, ["campaign"])
0243             campaign = tmp_ret.get("campaign")
0244             for file_spec in self.job.Files:
0245                 if file_spec.type == "input":
0246                     tmp_dict = self.taskBuffer.getJediFileAttributes(
0247                         file_spec.PandaID, file_spec.jediTaskID, file_spec.datasetID, file_spec.fileID, ["nEvents"]
0248                     )
0249                     if "nEvents" in tmp_dict:
0250                         n_events_input[file_spec.lfn] = tmp_dict["nEvents"]
0251 
0252         # Add nEvents info for zip files
0253         for tmp_zip_file_name, tmp_zip_contents in zip_file_map.items():
0254             if tmp_zip_file_name not in self.extra_info["nevents"]:
0255                 for tmp_zip_content in tmp_zip_contents:
0256                     for tmp_lfn in list(n_events_input):
0257                         # checking if a logical file name (LFN) matches a pattern corresponding to a zip file content and if the LFN exists in the n_events_input dictionary with a non-None value.
0258                         # If these conditions are met, the code updates the extra_info["nevents"] dictionary by setting a default value of 0 for the zip file name.
0259                         if re.search("^" + tmp_zip_content + "$", tmp_lfn) is not None and tmp_lfn in n_events_input and n_events_input[tmp_lfn] is not None:
0260                             self.extra_info["nevents"].setdefault(tmp_zip_file_name, 0)
0261                             self.extra_info["nevents"][tmp_zip_file_name] += n_events_input[tmp_lfn]
0262 
0263         # check files
0264         id_map = {}
0265         # fileList = []
0266         sub_map = {}
0267         dataset_destination_map = {}
0268         dist_datasets = set()
0269         map_for_alt_stage_out = {}
0270         alt_staged_files = set()
0271         zip_files = {}
0272         cont_zip_map = {}
0273         sub_to_ds_map = {}
0274         ds_id_to_ds_map = self.taskBuffer.getOutputDatasetsJEDI(self.job.PandaID)
0275         self.logger.debug(f"dsInJEDI={str(ds_id_to_ds_map)}")
0276 
0277         for file in self.job.Files:
0278             # gc
0279             gc.collect()
0280             if file.type not in {"output", "log"}:
0281                 continue
0282 
0283             # prepare the site spec and scope for the destinationSE site
0284             destination_se_site_spec = self.siteMapper.getSite(file.destinationSE)
0285             _, scope_dst_se_site_spec_output = select_scope(destination_se_site_spec, self.job.prodSourceLabel, self.job.job_label)
0286 
0287             # added to map
0288             sub_to_ds_map[file.destinationDBlock] = ds_id_to_ds_map.get(file.datasetID, file.dataset)
0289 
0290             # add only log file for failed jobs
0291             if self.job_status == "failed" and file.type != "log":
0292                 continue
0293             # add only log file for successful ES jobs
0294             if (
0295                 self.job.jobStatus == "finished"
0296                 and EventServiceUtils.isEventServiceJob(self.job)
0297                 and not EventServiceUtils.isJobCloningJob(self.job)
0298                 and file.type != "log"
0299             ):
0300                 continue
0301             # add only log for jumbo jobs
0302             if EventServiceUtils.isJumboJob(self.job) and file.type != "log":
0303                 continue
0304             # skip no output or failed
0305             if file.status in ["nooutput", "failed"]:
0306                 continue
0307 
0308             # check if zip file
0309             if file.lfn in zip_file_map:
0310                 is_zip_file = True
0311                 if file.lfn not in zip_files and not self.add_to_top_only:
0312                     zip_files[file.lfn] = {}
0313             else:
0314                 is_zip_file = False
0315 
0316             # check if zip content
0317             zip_file_name = None
0318             if not is_zip_file and not self.add_to_top_only:
0319                 for tmp_zip_file_name in zip_file_map:
0320                     tmp_zip_contents = zip_file_map[tmp_zip_file_name]
0321                     for tmp_zip_content in tmp_zip_contents:
0322                         if re.search("^" + tmp_zip_content + "$", file.lfn) is not None:
0323                             zip_file_name = tmp_zip_file_name
0324                             break
0325                     if zip_file_name is not None:
0326                         break
0327                 if zip_file_name is not None:
0328                     if zip_file_name not in zip_files:
0329                         zip_files[zip_file_name] = {}
0330                     cont_zip_map[file.lfn] = zip_file_name
0331 
0332             try:
0333                 # check nevents
0334                 if file.type == "output" and not is_zip_file and not self.add_to_top_only and self.job.prodSourceLabel == "managed":
0335                     if file.lfn not in self.extra_info["nevents"]:
0336                         to_skip = False
0337                         # exclude some formats
0338                         for patt in ["TXT", "NTUP", "HIST"]:
0339                             if file.lfn.startswith(patt):
0340                                 to_skip = True
0341                         if not to_skip:
0342                             err_msg = f"nevents is missing in jobReport for {file.lfn}"
0343                             self.logger.warning(err_msg)
0344                             self.job.ddmErrorCode = ErrorCode.EC_MissingNumEvents
0345                             raise ValueError(err_msg)
0346                     if file.lfn not in self.extra_info["guid"] or file.GUID != self.extra_info["guid"][file.lfn]:
0347                         self.logger.debug(f"extra_info = {str(self.extra_info)}")
0348                         err_msg = f"GUID is inconsistent between jobReport and pilot report for {file.lfn}"
0349                         self.logger.warning(err_msg)
0350                         self.job.ddmErrorCode = ErrorCode.EC_InconsistentGUID
0351                         raise ValueError(err_msg)
0352 
0353                 # fsize
0354                 fsize = None
0355                 if file.fsize not in ["NULL", "", 0]:
0356                     try:
0357                         fsize = int(file.fsize)
0358                     except Exception:
0359                         error_type, error_value, _ = sys.exc_info()
0360                         self.logger.error(f"{self.job_id} : {error_type} {error_value}")
0361                 # use top-level dataset name for alternative stage-out
0362                 if file.lfn not in self.job.altStgOutFileList():
0363                     file_destination_dispatch_block = file.destinationDBlock
0364                 else:
0365                     file_destination_dispatch_block = file.dataset
0366                 # append to map
0367                 if file_destination_dispatch_block not in id_map:
0368                     id_map[file_destination_dispatch_block] = []
0369                 file_attrs = {
0370                     "guid": file.GUID,
0371                     "lfn": file.lfn,
0372                     "size": fsize,
0373                     "checksum": file.checksum,
0374                     "ds": file_destination_dispatch_block,
0375                 }
0376                 # add SURLs if LFC registration is required
0377                 if not self.add_to_top_only:
0378                     file_attrs["surl"] = self.extra_info["surl"][file.lfn]
0379                     if file_attrs["surl"] is None:
0380                         del file_attrs["surl"]
0381 
0382                     # get destination
0383                     if file_destination_dispatch_block not in dataset_destination_map:
0384                         if file.lfn in self.job.altStgOutFileList():
0385                             # alternative stage-out
0386                             tmp_dest_list = (
0387                                 [DataServiceUtils.getDestinationSE(file.destinationDBlockToken)]
0388                                 if DataServiceUtils.getDestinationSE(file.destinationDBlockToken)
0389                                 else [
0390                                     destination_se_site_spec.setokens_output[scope_dst_se_site_spec_output].get(
0391                                         file.destinationDBlockToken, destination_se_site_spec.ddm_output[scope_dst_se_site_spec_output]
0392                                     )
0393                                 ]
0394                             )
0395                         elif file.destinationDBlockToken in ["", None, "NULL"]:
0396                             # use default endpoint
0397                             tmp_dest_list = [src_site_spec.ddm_output[scope_src_site_spec_output]]
0398                         elif (
0399                             DataServiceUtils.getDestinationSE(file.destinationDBlockToken)
0400                             and src_site_spec.ddm_output[scope_src_site_spec_output] == destination_se_site_spec.ddm_output[scope_dst_se_site_spec_output]
0401                         ):
0402                             tmp_dest_list = [DataServiceUtils.getDestinationSE(file.destinationDBlockToken)]
0403                             # RSE is specified
0404                         elif DataServiceUtils.getDistributedDestination(file.destinationDBlockToken):
0405                             tmp_dest_list = [DataServiceUtils.getDistributedDestination(file.destinationDBlockToken)]
0406                             dist_datasets.add(file_destination_dispatch_block)
0407                             # RSE is specified for distributed datasets
0408                         elif src_site_spec.cloud != self.job.cloud and (self.job.prodSourceLabel not in ["user", "panda"]):
0409                             # T1 used as T2
0410                             tmp_dest_list = [src_site_spec.ddm_output[scope_src_site_spec_output]]
0411                         else:
0412                             tmp_dest_list = []
0413                             tmp_se_tokens = src_site_spec.setokens_output[scope_src_site_spec_output]
0414                             for tmp_dest_token in file.destinationDBlockToken.split(","):
0415                                 if tmp_dest_token in tmp_se_tokens:
0416                                     tmp_dest = tmp_se_tokens[tmp_dest_token]
0417                                 else:
0418                                     tmp_dest = src_site_spec.ddm_output[scope_src_site_spec_output]
0419                                 if tmp_dest not in tmp_dest_list:
0420                                     tmp_dest_list.append(tmp_dest)
0421                         # add
0422                         dataset_destination_map[file_destination_dispatch_block] = tmp_dest_list
0423 
0424                 # extra meta data
0425                 if file.lfn in self.extra_info["lbnr"]:
0426                     file_attrs["lumiblocknr"] = self.extra_info["lbnr"][file.lfn]
0427                 if file.lfn in self.extra_info["nevents"]:
0428                     file_attrs["events"] = self.extra_info["nevents"][file.lfn]
0429                 elif self.extra_info["nevents"] != {}:
0430                     file_attrs["events"] = None
0431                 # if file.jediTaskID not in [0,None,'NULL']:
0432                 #    fileAttrs['task_id'] = file.jediTaskID
0433                 file_attrs["panda_id"] = file.PandaID
0434                 if campaign not in ["", None]:
0435                     file_attrs["campaign"] = campaign
0436 
0437                 # For files uploaded to alternative RSEs
0438                 has_normal_url = True
0439                 if file.lfn in self.extra_info["endpoint"] and self.extra_info["endpoint"][file.lfn] != []:
0440                     for pilot_end_point in self.extra_info["endpoint"][file.lfn]:
0441                         # pilot uploaded to endpoint consistently with original job definition
0442                         if pilot_end_point in dataset_destination_map[file_destination_dispatch_block]:
0443                             has_normal_url = True
0444                         else:
0445                             # alternative stage-out
0446                             alt_staged_files.add(file.lfn)
0447                             map_for_alt_stage_out.setdefault(pilot_end_point, {})
0448                             if file_destination_dispatch_block in dist_datasets:
0449                                 # add files to top-level distributed datasets without triggering aggregation
0450                                 tmp_destination_dispatch_block = sub_to_ds_map[file_destination_dispatch_block]
0451                             else:
0452                                 # add files to dispatch blocks for aggregation to the destination
0453                                 tmp_destination_dispatch_block = file_destination_dispatch_block
0454                             map_for_alt_stage_out[pilot_end_point].setdefault(tmp_destination_dispatch_block, [])
0455                             map_for_alt_stage_out[pilot_end_point][tmp_destination_dispatch_block].append(file_attrs)
0456 
0457                 if has_normal_url:
0458                     # add file to be added to dataset
0459                     id_map[file_destination_dispatch_block].append(file_attrs)
0460                     # add file to be added to zip
0461                     if not is_zip_file and zip_file_name:
0462                         if "files" not in zip_files[zip_file_name]:
0463                             zip_files[zip_file_name]["files"] = []
0464                         zip_files[zip_file_name]["files"].append(file_attrs)
0465                     if is_zip_file and not self.add_to_top_only:
0466                         # copy file attribute for zip file registration
0467                         for tmp_file_attr_name, tmp_file_attr_val in file_attrs.items():
0468                             zip_files[file.lfn][tmp_file_attr_name] = tmp_file_attr_val
0469                         zip_files[file.lfn]["scope"] = file.scope
0470                         zip_files[file.lfn]["rse"] = dataset_destination_map[file_destination_dispatch_block]
0471 
0472                 # for subscription
0473                 if re.search("_sub\d+$", file_destination_dispatch_block) and (not self.add_to_top_only) and self.job.destinationSE != "local":
0474                     if not self.siteMapper:
0475                         self.logger.error("SiteMapper is None")
0476                     else:
0477                         # get dataset spec
0478                         if file_destination_dispatch_block not in self.dataset_map:
0479                             tmp_dataset = self.taskBuffer.queryDatasetWithMap({"name": file_destination_dispatch_block})
0480                             self.dataset_map[file_destination_dispatch_block] = tmp_dataset
0481                         # check if valid dataset
0482                         if self.dataset_map[file_destination_dispatch_block] is None:
0483                             self.logger.error(f": cannot find {file_destination_dispatch_block} in DB")
0484                         else:
0485                             if self.dataset_map[file_destination_dispatch_block].status not in ["defined"]:
0486                                 # not a fresh dataset
0487                                 self.logger.debug(
0488                                     f": subscription was already made for {self.dataset_map[file_destination_dispatch_block].status}:{file_destination_dispatch_block}"
0489                                 )
0490                             else:
0491                                 # get DDM IDs
0492                                 tmp_src_ddm = src_site_spec.ddm_output[scope_src_site_spec_output]
0493                                 if self.job.prodSourceLabel == "user" and file.destinationSE not in self.siteMapper.siteSpecList:
0494                                     # DDM ID was set by using --destSE for analysis job to transfer output
0495                                     tmp_dst_ddm = file.destinationSE
0496                                 else:
0497                                     if DataServiceUtils.getDestinationSE(file.destinationDBlockToken):
0498                                         tmp_dst_ddm = DataServiceUtils.getDestinationSE(file.destinationDBlockToken)
0499                                     else:
0500                                         tmp_dst_ddm = destination_se_site_spec.ddm_output[scope_dst_se_site_spec_output]
0501                                 # if src != dest or multi-token
0502                                 if (tmp_src_ddm != tmp_dst_ddm) or (tmp_src_ddm == tmp_dst_ddm and file.destinationDBlockToken.count(",") != 0):
0503                                     opt_sub = {
0504                                         "DATASET_COMPLETE_EVENT": [
0505                                             f"http://{panda_config.pserverhosthttp}:{panda_config.pserverporthttp}/server/panda/datasetCompleted"
0506                                         ]
0507                                     }
0508                                     # append
0509                                     if file_destination_dispatch_block not in sub_map:
0510                                         sub_map[file_destination_dispatch_block] = []
0511                                         # sources
0512                                         opt_source = {}
0513                                         # set sources
0514                                         if file.destinationDBlockToken in [
0515                                             "NULL",
0516                                             "",
0517                                         ]:
0518                                             # use default DDM ID as source
0519                                             opt_source[tmp_src_ddm] = {"policy": 0}
0520                                         else:
0521                                             # convert token to DDM ID
0522                                             ddm_id = tmp_src_ddm
0523                                             # use the first token's location as source for T1D1
0524                                             tmp_src_token = file.destinationDBlockToken.split(",")[0]
0525                                             if tmp_src_token in src_site_spec.setokens_output[scope_src_site_spec_output]:
0526                                                 ddm_id = src_site_spec.setokens_output[scope_src_site_spec_output][tmp_src_token]
0527                                             opt_source[ddm_id] = {"policy": 0}
0528                                         # T1 used as T2
0529                                         if src_site_spec.cloud != self.job.cloud and (self.job.prodSourceLabel not in ["user", "panda"]):
0530                                             if tmp_src_ddm not in opt_source:
0531                                                 opt_source[tmp_src_ddm] = {"policy": 0}
0532                                         # use another location when token is set
0533                                         if file.destinationDBlockToken not in [
0534                                             "NULL",
0535                                             "",
0536                                         ]:
0537                                             tmp_ddm_id_list = []
0538                                             tmp_dst_tokens = file.destinationDBlockToken.split(",")
0539                                             # remove the first one because it is already used as a location
0540                                             if tmp_src_ddm == tmp_dst_ddm:
0541                                                 tmp_dst_tokens = tmp_dst_tokens[1:]
0542                                             # loop over all tokens
0543                                             for idx_token, tmp_dst_token in enumerate(tmp_dst_tokens):
0544                                                 ddm_id = tmp_dst_ddm
0545                                                 if tmp_dst_token in destination_se_site_spec.setokens_output[scope_dst_se_site_spec_output]:
0546                                                     ddm_id = destination_se_site_spec.setokens_output[scope_dst_se_site_spec_output][tmp_dst_token]
0547                                                 # keep the fist destination for multi-hop
0548                                                 if idx_token == 0:
0549                                                     first_destination_ddm = ddm_id
0550                                                 else:
0551                                                     # use the fist destination as source for T1D1
0552                                                     opt_source = {}
0553                                                     opt_source[first_destination_ddm] = {"policy": 0}
0554                                                 # remove looping subscription
0555                                                 if ddm_id == tmp_src_ddm:
0556                                                     continue
0557                                                 # avoid duplication
0558                                                 if ddm_id not in tmp_ddm_id_list:
0559                                                     sub_map[file_destination_dispatch_block].append((ddm_id, opt_sub, opt_source))
0560                                         else:
0561                                             # use default DDM
0562                                             for ddm_id in tmp_dst_ddm.split(","):
0563                                                 sub_map[file_destination_dispatch_block].append((ddm_id, opt_sub, opt_source))
0564             except Exception as e:
0565                 err_str = str(e)
0566                 self.logger.error(err_str + " " + traceback.format_exc())
0567                 self.result.set_fatal()
0568                 self.job.ddmErrorDiag = "failed before adding files : " + err_str
0569                 return 1
0570 
0571         # release some memory
0572         del ds_id_to_ds_map
0573         gc.collect()
0574 
0575         # zipping input files
0576         if len(zip_file_map) > 0 and not self.add_to_top_only:
0577             for file_spec in self.job.Files:
0578                 if file_spec.type != "input":
0579                     continue
0580 
0581                 zip_file_name = None
0582                 for tmp_zip_file_name in zip_file_map:
0583                     tmp_zip_contents = zip_file_map[tmp_zip_file_name]
0584                     for tmp_zip_content in tmp_zip_contents:
0585                         if re.search("^" + tmp_zip_content + "$", file_spec.lfn) is not None:
0586                             zip_file_name = tmp_zip_file_name
0587                             break
0588                     if zip_file_name:
0589                         break
0590                 if zip_file_name:
0591                     if zip_file_name not in zip_files:
0592                         continue
0593                     if "files" not in zip_files[zip_file_name]:
0594                         zip_files[zip_file_name]["files"] = []
0595                     file_attrs = {
0596                         "guid": file_spec.GUID,
0597                         "lfn": file_spec.lfn,
0598                         "size": file_spec.fsize,
0599                         "checksum": file_spec.checksum,
0600                         "ds": file_spec.dataset,
0601                     }
0602                     zip_files[zip_file_name]["files"].append(file_attrs)
0603 
0604         # cleanup submap
0605         sub_map = {k: v for k, v in sub_map.items() if v}
0606 
0607         # add data to original dataset
0608         for destination_dispatch_block in list(id_map):
0609             orig_dispatch_block = None
0610             match = re.search("^(.+)_sub\d+$", destination_dispatch_block)
0611             if match:
0612                 # add files to top-level datasets
0613                 orig_dispatch_block = sub_to_ds_map[destination_dispatch_block]
0614                 if (not self.go_to_transferring) or (not self.add_to_top_only and destination_dispatch_block in dist_datasets):
0615                     id_map[orig_dispatch_block] = id_map[destination_dispatch_block]
0616             # add files to top-level datasets only
0617             if self.add_to_top_only or self.go_to_merging or (destination_dispatch_block in dist_datasets and orig_dispatch_block):
0618                 del id_map[destination_dispatch_block]
0619             # skip sub unless getting transferred
0620             if orig_dispatch_block:
0621                 if not self.go_to_transferring and not self.log_transferring and destination_dispatch_block in id_map:
0622                     del id_map[destination_dispatch_block]
0623 
0624         # print idMap
0625         self.logger.debug(f"dataset to files map = {id_map}")
0626         self.logger.debug(f"transfer rules = {sub_map}")
0627         self.logger.debug(f"dataset to destination map = {dataset_destination_map}")
0628         self.logger.debug(f"extra info = {str(self.extra_info)}")
0629         self.logger.debug(f"""alternatively staged files = {",".join(list(alt_staged_files)) if alt_staged_files else "NA"}""")
0630 
0631         # check consistency of destinationDBlock
0632         has_sub = False
0633         for destination_dispatch_block in id_map:
0634             match = re.search("^(.+)_sub\d+$", destination_dispatch_block)
0635             if match is not None:
0636                 has_sub = True
0637                 break
0638         if id_map and self.go_to_transferring and not has_sub:
0639             err_str = "no sub datasets for transferring. destinationDBlock may be wrong"
0640             self.logger.error(err_str)
0641             self.result.set_fatal()
0642             self.job.ddmErrorDiag = "failed before adding files : " + err_str
0643             return 1
0644 
0645         # add data
0646         self.logger.debug("addFiles start")
0647         # count the number of files
0648         reg_file_list = {tmp_reg_item["lfn"] for tmp_reg_list in id_map.values() for tmp_reg_item in tmp_reg_list}
0649         reg_num_files = len(reg_file_list)
0650 
0651         # decompose idMap
0652         if self.add_to_top_only:
0653             dest_id_map = {None: id_map}
0654         else:
0655             dest_id_map = self.decompose_id_map(id_map, dataset_destination_map, map_for_alt_stage_out, sub_to_ds_map, alt_staged_files)
0656 
0657         # add files
0658         result = self.register_files(reg_num_files, zip_files, dest_id_map, cont_zip_map)
0659         if result == 1:
0660             return 1
0661 
0662         # release some memory
0663         del dest_id_map
0664         del dataset_destination_map
0665         del map_for_alt_stage_out
0666         del zip_files
0667         del cont_zip_map
0668         gc.collect()
0669 
0670         if self.job.processingType == "urgent" or self.job.currentPriority > 1000:
0671             sub_activity = "Express"
0672         else:
0673             sub_activity = "Production Output"
0674 
0675         # register dataset subscription
0676         self.process_subscriptions(sub_map, sub_to_ds_map, dist_datasets, sub_activity)
0677 
0678         # collect list of merging files
0679         if self.go_to_merging and self.job_status not in ["failed", "cancelled", "closed"]:
0680             for tmp_file_list in id_map.values():
0681                 for tmp_file in tmp_file_list:
0682                     if tmp_file["lfn"] not in self.result.merging_files:
0683                         self.result.merging_files.append(tmp_file["lfn"])
0684 
0685         # register ES files
0686         if (EventServiceUtils.isEventServiceJob(self.job) or EventServiceUtils.isJumboJob(self.job)) and not EventServiceUtils.isJobCloningJob(self.job):
0687             if self.job.registerEsFiles():
0688                 try:
0689                     self.register_event_service_files()
0690                 except Exception:
0691                     err_type, err_value = sys.exc_info()[:2]
0692                     self.logger.error(f"failed to register ES files with {err_type}:{err_value}")
0693                     self.result.set_temporary()
0694                     return 1
0695 
0696         # properly finished
0697         self.logger.debug("addFiles end")
0698         return 0
0699 
0700     def register_files(self, reg_num_files: int, zip_files: list, dest_id_map: dict, cont_zip_map: dict) -> int | None:
0701         """
0702         Register files with Rucio.
0703 
0704         :param reg_num_files: Number of files to register.
0705         :param zip_files: List of zip files to register.
0706         :param dest_id_map: Destination ID map.
0707         :param cont_zip_map: Container zip map.
0708         :return: 1 if registration fails, None otherwise.
0709         """
0710         max_attempt = 3
0711         for attempt_number in range(max_attempt):
0712             is_fatal = False
0713             is_failed = False
0714             reg_start = naive_utcnow()
0715             try:
0716                 if self.add_to_top_only:
0717                     reg_msg_str = f"File registration for {reg_num_files} files "
0718                 else:
0719                     reg_msg_str = f"File registration with rucio for {reg_num_files} files "
0720                 if len(zip_files) > 0:
0721                     self.logger.debug(f"registerZipFiles {str(zip_files)}")
0722                     rucioAPI.register_zip_files(zip_files)
0723                 self.logger.debug(f"registerFilesInDatasets {str(dest_id_map)} zip={str(cont_zip_map)}")
0724                 out = rucioAPI.register_files_in_dataset(dest_id_map, cont_zip_map)
0725             except (
0726                 DataIdentifierNotFound,
0727                 FileConsistencyMismatch,
0728                 UnsupportedOperation,
0729                 InvalidPath,
0730                 InvalidObject,
0731                 RSENotFound,
0732                 RSEProtocolNotSupported,
0733                 InvalidRSEExpression,
0734                 KeyError,
0735             ):
0736                 # fatal errors
0737                 err_type, err_value = sys.exc_info()[:2]
0738                 out = f"{err_type} : {err_value}"
0739                 out += traceback.format_exc()
0740                 is_fatal = True
0741                 is_failed = True
0742             except Exception:
0743                 # unknown errors
0744                 err_type, err_value = sys.exc_info()[:2]
0745                 out = f"{err_type} : {err_value}"
0746                 out += traceback.format_exc()
0747                 is_fatal = (
0748                     "value too large for column" in out
0749                     or "unique constraint (ATLAS_RUCIO.DIDS_GUID_IDX) violate" in out
0750                     or "unique constraint (ATLAS_RUCIO.DIDS_PK) violated" in out
0751                     or "unique constraint (ATLAS_RUCIO.ARCH_CONTENTS_PK) violated" in out
0752                 )
0753                 is_failed = True
0754             reg_time = naive_utcnow() - reg_start
0755             self.logger.debug(f"{reg_msg_str} took {reg_time.seconds}.{reg_time.microseconds // 1000:03d} sec")
0756             # failed
0757             if is_failed or is_fatal:
0758                 self.logger.error(f"{out}")
0759                 if (attempt_number + 1) == max_attempt or is_fatal:
0760                     self.job.ddmErrorCode = ErrorCode.EC_Adder
0761                     # extract important error string
0762                     extracted_err_str = DataServiceUtils.extractImportantError(out)
0763                     err_msg = "Could not add files to DDM: "
0764                     if extracted_err_str == "":
0765                         self.job.ddmErrorDiag = err_msg + out.split("\n")[-1]
0766                     else:
0767                         self.job.ddmErrorDiag = err_msg + extracted_err_str
0768                     if is_fatal:
0769                         self.result.set_fatal()
0770                     else:
0771                         self.result.set_temporary()
0772                     return 1
0773                 self.logger.error(f"Try:{attempt_number}")
0774                 # sleep
0775                 time.sleep(10)
0776             else:
0777                 self.logger.debug(f"{str(out)}")
0778                 break
0779 
0780     def process_subscriptions(self, sub_map: Dict[str, str], sub_to_ds_map: Dict[str, List[str]], dist_datasets: List[str], sub_activity: str):
0781         """
0782         Process the subscriptions for the job.
0783 
0784         This method handles the processing of subscriptions, including keeping subscriptions,
0785         collecting transferring jobs, and sending requests to DaTRI.
0786 
0787         :param sub_map: A dictionary mapping subscription names to their values.
0788         :param sub_to_ds_map: A dictionary mapping subscriptions to datasets.
0789         :param dist_datasets: A list of distributed datasets.
0790         :param sub_activity: The subscription activity type.
0791         """
0792         if self.job.prodSourceLabel not in ["user"]:
0793             for tmp_name, tmp_val in sub_map.items():
0794                 for ddm_id, opt_sub, opt_source in tmp_val:
0795                     if not self.go_to_merging:
0796                         # make subscription for prod jobs
0797                         rep_life_time = 14
0798                         self.logger.debug(
0799                             f"registerDatasetSubscription {tmp_name} {ddm_id} {{'activity': {sub_activity}, 'replica_lifetime': {rep_life_time}}}"
0800                         )
0801                         for _ in range(3):
0802                             is_failed = False
0803                             try:
0804                                 status = rucioAPI.register_dataset_subscription(
0805                                     tmp_name,
0806                                     [ddm_id],
0807                                     owner="panda",
0808                                     activity=sub_activity,
0809                                     lifetime=rep_life_time,
0810                                 )
0811                                 out = "OK"
0812                                 break
0813                             except InvalidRSEExpression:
0814                                 status = False
0815                                 err_type, err_value = sys.exc_info()[:2]
0816                                 out = f"{err_type} {err_value}"
0817                                 is_failed = True
0818                                 self.job.ddmErrorCode = ErrorCode.EC_Subscription
0819                                 break
0820                             except Exception:
0821                                 status = False
0822                                 err_type, err_value = sys.exc_info()[:2]
0823                                 out = f"{err_type} {err_value}"
0824                                 is_failed = True
0825                                 # retry for temporary errors
0826                                 time.sleep(10)
0827                         if is_failed:
0828                             self.logger.error(f"{out}")
0829                             if self.job.ddmErrorCode == ErrorCode.EC_Subscription:
0830                                 # fatal error
0831                                 self.job.ddmErrorDiag = f"subscription failure with {out}"
0832                                 self.result.set_fatal()
0833                             else:
0834                                 # temporary errors
0835                                 self.job.ddmErrorCode = ErrorCode.EC_Adder
0836                                 self.job.ddmErrorDiag = f"could not register subscription : {tmp_name}"
0837                                 self.result.set_temporary()
0838                             return 1
0839                         self.logger.debug(f"{str(out)}")
0840                     else:
0841                         # register location
0842                         tmp_dataset_name_location = sub_to_ds_map[tmp_name]
0843                         rep_life_time = 14
0844                         for tmp_location_name in opt_source:
0845                             self.logger.debug(f"registerDatasetLocation {tmp_dataset_name_location} {tmp_location_name} {{'lifetime': '14 days'}}")
0846 
0847                             for _ in range(3):
0848                                 is_failed = False
0849                                 try:
0850                                     rucioAPI.register_dataset_location(
0851                                         tmp_dataset_name_location,
0852                                         [tmp_location_name],
0853                                         owner="panda",
0854                                         activity=sub_activity,
0855                                         lifetime=rep_life_time,
0856                                     )
0857                                     out = "OK"
0858                                     break
0859                                 except Exception:
0860                                     status = False
0861                                     err_type, err_value = sys.exc_info()[:2]
0862                                     out = f"{err_type} {err_value}"
0863                                     is_failed = True
0864                                     # retry for temporary errors
0865                                     time.sleep(10)
0866                             if is_failed:
0867                                 self.logger.error(f"{out}")
0868                                 if self.job.ddmErrorCode == ErrorCode.EC_Location:
0869                                     # fatal error
0870                                     self.job.ddmErrorDiag = f"location registration failure with {out}"
0871                                     self.result.set_fatal()
0872                                 else:
0873                                     # temporary errors
0874                                     self.job.ddmErrorCode = ErrorCode.EC_Adder
0875                                     self.job.ddmErrorDiag = f"could not register location : {tmp_dataset_name_location}"
0876                                     self.result.set_temporary()
0877                                 return 1
0878                             self.logger.debug(f"{str(out)}")
0879 
0880                     # set dataset status
0881                     self.dataset_map[tmp_name].status = "running"
0882 
0883             # keep subscriptions
0884             self.subscription_map = sub_map
0885 
0886             # collect list of transferring jobs
0887             for tmp_file in self.job.Files:
0888                 if tmp_file.type in ["log", "output"]:
0889                     if self.go_to_transferring or (self.log_transferring and tmp_file.type == "log"):
0890                         # don't go to transferring for successful ES jobs
0891                         if (
0892                             self.job.jobStatus == "finished"
0893                             and (EventServiceUtils.isEventServiceJob(self.job) and EventServiceUtils.isJumboJob(self.job))
0894                             and not EventServiceUtils.isJobCloningJob(self.job)
0895                         ):
0896                             continue
0897                         # skip distributed datasets
0898                         if tmp_file.destinationDBlock in dist_datasets:
0899                             continue
0900                         # skip no output
0901                         if tmp_file.status == "nooutput":
0902                             continue
0903                         # skip alternative stage-out
0904                         if tmp_file.lfn in self.job.altStgOutFileList():
0905                             continue
0906                         self.result.transferring_files.append(tmp_file.lfn)
0907 
0908         elif "--mergeOutput" not in self.job.jobParameters:
0909             # send request to DaTRI unless files will be merged
0910             tmp_top_datasets = {}
0911             # collect top-level datasets
0912             for tmp_name, tmp_val in sub_map.items():
0913                 for ddm_id, opt_sub, opt_source in tmp_val:
0914                     tmp_top_name = sub_to_ds_map[tmp_name]
0915                     # append
0916                     if tmp_top_name not in tmp_top_datasets:
0917                         tmp_top_datasets[tmp_top_name] = []
0918                     if ddm_id not in tmp_top_datasets[tmp_top_name]:
0919                         tmp_top_datasets[tmp_top_name].append(ddm_id)
0920             # remove redundant CN from DN
0921             tmp_dn = self.job.prodUserID
0922             # send request
0923             if tmp_top_datasets and self.job_status == "finished":
0924                 try:
0925                     status, user_info = rucioAPI.finger(tmp_dn)
0926                     if not status:
0927                         raise RuntimeError(f"user info not found for {tmp_dn} with {user_info}")
0928                     user_endpoints = []
0929                     # loop over all output datasets
0930                     for tmp_dataset_name, ddm_id_list in tmp_top_datasets.items():
0931                         for tmp_ddm_id in ddm_id_list:
0932                             if tmp_ddm_id == "NULL":
0933                                 continue
0934                             if tmp_ddm_id not in user_endpoints:
0935                                 user_endpoints.append(tmp_ddm_id)
0936                             # use group account for group.*
0937                             if tmp_dataset_name.startswith("group") and self.job.workingGroup not in ["", "NULL", None]:
0938                                 tmp_dn = self.job.workingGroup
0939                             else:
0940                                 tmp_dn = user_info["nickname"]
0941                             tmp_msg = f"registerDatasetLocation for Rucio ds={tmp_dataset_name} site={tmp_ddm_id} id={tmp_dn}"
0942                             self.logger.debug(tmp_msg)
0943                             rucioAPI.register_dataset_location(
0944                                 tmp_dataset_name,
0945                                 [tmp_ddm_id],
0946                                 owner=tmp_dn,
0947                                 activity="Analysis Output",
0948                             )
0949                     # set dataset status
0950                     for tmp_name in sub_map:
0951                         self.dataset_map[tmp_name].status = "running"
0952                 except (InsufficientAccountLimit, InvalidRSEExpression) as err_type:
0953                     tmp_msg = f"Rucio rejected to transfer files to {','.join(user_endpoints)} since {err_type}"
0954                     self.logger.error(tmp_msg)
0955                     self.job.ddmErrorCode = ErrorCode.EC_Adder
0956                     self.job.ddmErrorDiag = f"Rucio failed with {err_type}"
0957                     # set dataset status
0958                     for tmp_name in sub_map:
0959                         self.dataset_map[tmp_name].status = "running"
0960                     # send warning
0961                     tmp_st = self.taskBuffer.update_problematic_resource_info(self.job.prodUserName, self.job.jediTaskID, user_endpoints[0], "dest")
0962                     if not tmp_st:
0963                         self.logger.debug("skip to send warning since already done")
0964                     else:
0965                         to_adder = self.taskBuffer.getEmailAddr(self.job.prodUserName)
0966                         if to_adder is None or to_adder.startswith("notsend"):
0967                             self.logger.debug("skip to send warning since suppressed")
0968                         else:
0969                             tmp_sm = self.send_email(to_adder, tmp_msg, self.job.jediTaskID)
0970                             self.logger.debug(f"sent warning with {tmp_sm}")
0971                 except Exception:
0972                     err_type, err_value = sys.exc_info()[:2]
0973                     tmp_msg = f"registerDatasetLocation failed with {err_type} {err_value}"
0974                     self.logger.error(tmp_msg)
0975                     self.job.ddmErrorCode = ErrorCode.EC_Adder
0976                     self.job.ddmErrorDiag = f"Rucio failed with {err_type} {err_value}"
0977 
0978     # decompose idMap
0979     def decompose_id_map(self, id_map, dataset_destination_map, map_for_alt_stage_out, sub_to_dataset_map, alt_staged_files):
0980         """
0981         Decompose the idMap into a structure suitable for file registration.
0982 
0983         Args:
0984             id_map (dict): A dictionary mapping dataset names to file attributes.
0985             dataset_destination_map (dict): A dictionary mapping dataset names to destination sites.
0986             map_for_alt_stage_out (dict): A dictionary mapping alternative destination sites to file attributes.
0987             sub_to_dataset_map (dict): A dictionary mapping sub-dataset names to top-level dataset names.
0988             alt_staged_files (set): A set of files uploaded with alternative stage-out.
0989 
0990         Returns:
0991             dict: A decomposed idMap suitable for file registration.
0992         """
0993         # add item for top datasets
0994         for tmp_dataset in list(dataset_destination_map):
0995             tmp_top_dataset = sub_to_dataset_map[tmp_dataset]
0996             if tmp_top_dataset != tmp_dataset:
0997                 dataset_destination_map[tmp_top_dataset] = dataset_destination_map[tmp_dataset]
0998 
0999         destination_id_map = {}
1000         for tmp_dataset in id_map:
1001             # exclude files uploaded with alternative stage-out
1002             tmp_files = [f for f in id_map[tmp_dataset] if f["lfn"] not in alt_staged_files]
1003             if not tmp_files:
1004                 continue
1005             for tmp_dest in dataset_destination_map[tmp_dataset]:
1006                 if tmp_dest not in destination_id_map:
1007                     destination_id_map[tmp_dest] = {}
1008                 destination_id_map[tmp_dest][tmp_dataset] = tmp_files
1009 
1010         # add alternative stage-out info
1011         for tmp_dest in map_for_alt_stage_out:
1012             tmp_id_map = map_for_alt_stage_out[tmp_dest]
1013             for tmp_dataset in tmp_id_map:
1014                 tmp_files = tmp_id_map[tmp_dataset]
1015                 if tmp_dest not in destination_id_map:
1016                     destination_id_map[tmp_dest] = {}
1017                 destination_id_map[tmp_dest][tmp_dataset] = tmp_files
1018         return destination_id_map
1019 
1020     # send email notification
1021     def send_email(self, to_adder, message, jedi_task_id):
1022         """
1023         Send an email notification.
1024 
1025         Args:
1026             to_adder (str): The email address to send the notification to.
1027             message (str): The message body of the email.
1028             jedi_task_id (int): The JEDI task ID associated with the email.
1029 
1030         Returns:
1031             bool: True if the email was sent successfully, False otherwise.
1032         """
1033         # subject
1034         mail_subject = f"PANDA WARNING for TaskID:{jedi_task_id} with --destSE"
1035         # message
1036         mail_body = f"Hello,\n\nTaskID:{jedi_task_id} cannot process the --destSE request\n\n"
1037         mail_body += f"Reason : {message}\n"
1038         # send
1039         ret_val = MailUtils().send(to_adder, mail_subject, mail_body)
1040         # return
1041         return ret_val
1042 
1043     # register ES files
1044     def register_event_service_files(self) -> None:
1045         """
1046         Register Event Service (ES) files with Rucio.
1047 
1048         Raises:
1049             Exception: If there is an error during the registration process.
1050         """
1051         self.logger.debug("registering ES files")
1052         try:
1053             # get ES dataset name
1054             event_service_dataset = EventServiceUtils.getEsDatasetName(self.job.jediTaskID)
1055             # collect files
1056             id_map = {}
1057             file_set = set()
1058             for file_spec in self.job.Files:
1059                 if file_spec.type != "zipoutput":
1060                     continue
1061                 if file_spec.lfn in file_set:
1062                     continue
1063                 file_set.add(file_spec.lfn)
1064                 # make file data
1065                 file_data = {
1066                     "scope": EventServiceUtils.esScopeDDM,
1067                     "name": file_spec.lfn,
1068                     "bytes": file_spec.fsize,
1069                     "panda_id": file_spec.PandaID,
1070                     "task_id": file_spec.jediTaskID,
1071                 }
1072                 if file_spec.GUID not in [None, "NULL", ""]:
1073                     file_data["guid"] = file_spec.GUID
1074                 if file_spec.dispatchDBlockToken not in [None, "NULL", ""]:
1075                     try:
1076                         file_data["events"] = int(file_spec.dispatchDBlockToken)
1077                     except Exception:
1078                         pass
1079                 if file_spec.checksum not in [None, "NULL", ""]:
1080                     file_data["checksum"] = file_spec.checksum
1081                 # get endpoint ID
1082                 endpoint_id = int(file_spec.destinationSE.split("/")[0])
1083                 # convert to DDM endpoint
1084                 rse = self.taskBuffer.convertObjIDtoEndPoint(panda_config.endpoint_mapfile, endpoint_id)
1085                 if rse is not None and rse["is_deterministic"]:
1086                     endpoint_name = rse["name"]
1087                     if endpoint_name not in id_map:
1088                         id_map[endpoint_name] = {}
1089                     if event_service_dataset not in id_map[endpoint_name]:
1090                         id_map[endpoint_name][event_service_dataset] = []
1091                     id_map[endpoint_name][event_service_dataset].append(file_data)
1092 
1093             # add files to dataset
1094             if id_map:
1095                 self.logger.debug(f"adding ES files {str(id_map)}")
1096                 try:
1097                     rucioAPI.register_files_in_dataset(id_map)
1098                 except DataIdentifierNotFound:
1099                     self.logger.debug("ignored DataIdentifierNotFound")
1100         except Exception:
1101             err_type, err_value = sys.exc_info()[:2]
1102             err_str = f" : {err_type} {err_value}"
1103             err_str += traceback.format_exc()
1104             self.logger.error(err_str)
1105             raise
1106         self.logger.debug("done")