Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:01

0001 """
0002 Simple plugin of Adder for VOs with Rucio
0003 
0004 """
0005 
0006 import datetime
0007 import time
0008 import traceback
0009 import uuid
0010 
0011 from pandacommon.pandautils.PandaUtils import naive_utcnow
0012 from rucio.common.exception import (
0013     DataIdentifierNotFound,
0014     FileConsistencyMismatch,
0015     InvalidObject,
0016     InvalidPath,
0017     InvalidRSEExpression,
0018     RSEFileNameNotSupported,
0019     RSENotFound,
0020     RSEProtocolNotSupported,
0021     UnsupportedOperation,
0022 )
0023 
0024 from pandaserver.dataservice import DataServiceUtils, ErrorCode
0025 from pandaserver.dataservice.ddm import rucioAPI
0026 
0027 from .adder_plugin_base import AdderPluginBase
0028 
0029 
0030 class AdderSimplePlugin(AdderPluginBase):
0031     """
0032     Simple plugin of Adder for VOs with Rucio.
0033     """
0034 
0035     # constructor
0036     def __init__(self, job, **params) -> None:
0037         """
0038         Initialize the AdderSimplePlugin.
0039 
0040         :param job: The job object.
0041         :param params: Additional parameters.
0042         """
0043         AdderPluginBase.__init__(self, job, params)
0044 
0045     # main
0046     def execute(self) -> None:
0047         """
0048         Execute the simple adder plugin.
0049 
0050         :return: None
0051         """
0052         try:
0053             # loop over all files
0054             file_map = {}
0055             for file_spec in self.job.Files:
0056                 # ignore inputs
0057                 if file_spec.type not in ["output", "log"]:
0058                     continue
0059                 # ignore local
0060                 if file_spec.destinationSE == "local":
0061                     continue
0062                 # collect file attributes
0063                 try:
0064                     file_size = int(file_spec.fsize)
0065                 except Exception:
0066                     file_size = None
0067                 # set GUID if empty
0068                 if not file_spec.GUID:
0069                     file_spec.GUID = str(uuid.uuid4())
0070                 file_attrs = {
0071                     "guid": file_spec.GUID,
0072                     "lfn": file_spec.lfn,
0073                     "size": file_size,
0074                     "checksum": file_spec.checksum,
0075                     "ds": file_spec.destinationDBlock,
0076                 }
0077                 if self.extra_info:
0078                     if "surl" in self.extra_info and file_spec.lfn in self.extra_info["surl"]:
0079                         file_attrs["surl"] = self.extra_info["surl"][file_spec.lfn]
0080                     if "nevents" in self.extra_info and file_spec.lfn in self.extra_info["nevents"]:
0081                         file_attrs["events"] = self.extra_info["nevents"][file_spec.lfn]
0082                 file_map.setdefault(file_spec.destinationDBlock, [])
0083                 file_map[file_spec.destinationDBlock].append(file_attrs)
0084             # register files
0085             if file_map:
0086                 destination_rse = self.siteMapper.getSite(self.job.computingSite).ddm_output["default"]
0087                 destination_id_map = {destination_rse: file_map}
0088                 max_attempt = 3
0089                 for attempt_number in range(max_attempt):
0090                     is_fatal = False
0091                     is_failed = False
0092                     registration_start = naive_utcnow()
0093                     try:
0094                         self.logger.debug(f"registerFilesInDatasets {str(destination_id_map)}")
0095                         out = rucioAPI.register_files_in_dataset(destination_id_map, {})
0096                     except (
0097                         DataIdentifierNotFound,
0098                         FileConsistencyMismatch,
0099                         UnsupportedOperation,
0100                         InvalidPath,
0101                         InvalidObject,
0102                         RSENotFound,
0103                         RSEProtocolNotSupported,
0104                         InvalidRSEExpression,
0105                         RSEFileNameNotSupported,
0106                         KeyError,
0107                     ) as e:
0108                         # fatal errors
0109                         out = f"failed with {str(e)}\n {traceback.format_exc()}"
0110                         is_fatal = True
0111                         is_failed = True
0112                     except Exception as e:
0113                         # unknown errors
0114                         is_failed = True
0115                         out = f"failed with unknown error: {str(e)}\n {traceback.format_exc()}"
0116                         if (
0117                             "value too large for column" in out
0118                             or "unique constraint (ATLAS_RUCIO.DIDS_GUID_IDX) violate" in out
0119                             or "unique constraint (ATLAS_RUCIO.DIDS_PK) violated" in out
0120                             or "unique constraint (ATLAS_RUCIO.ARCH_CONTENTS_PK) violated" in out
0121                         ):
0122                             is_fatal = True
0123                         else:
0124                             is_fatal = False
0125                     registration_time = naive_utcnow() - registration_start
0126                     self.logger.debug("took %s.%03d sec" % (registration_time.seconds, registration_time.microseconds / 1000))
0127 
0128                     # failed
0129                     if is_failed or is_fatal:
0130                         self.logger.error(f"{out}")
0131                         if (attempt_number + 1) == max_attempt or is_fatal:
0132                             self.job.ddmErrorCode = ErrorCode.EC_Adder
0133                             # extract important error string
0134                             extracted_error = DataServiceUtils.extractImportantError(out)
0135                             err_msg = "Could not add files to DDM: "
0136                             if extracted_error == "":
0137                                 self.job.ddmErrorDiag = err_msg + out.split("\n")[-1]
0138                             else:
0139                                 self.job.ddmErrorDiag = err_msg + extracted_error
0140                             if is_fatal:
0141                                 self.result.set_fatal()
0142                             else:
0143                                 self.result.set_temporary()
0144                             return
0145                         self.logger.error(f"Try:{attempt_number}")
0146                         # sleep
0147                         time.sleep(10)
0148                     else:
0149                         self.logger.debug(f"{str(out)}")
0150                         break
0151             # done
0152             self.result.set_succeeded()
0153             self.logger.debug("end plugin")
0154         except Exception as e:
0155             err_str = f"failed to execute with {str(e)}\n"
0156             err_str += traceback.format_exc()
0157             self.logger.error(err_str)
0158             self.result.set_temporary()
0159         # return
0160         return