File indexing completed on 2026-04-10 08:39:01
0001 """
0002 Simple plugin of Adder for VOs with Rucio
0003
0004 """
0005
0006 import datetime
0007 import time
0008 import traceback
0009 import uuid
0010
0011 from pandacommon.pandautils.PandaUtils import naive_utcnow
0012 from rucio.common.exception import (
0013 DataIdentifierNotFound,
0014 FileConsistencyMismatch,
0015 InvalidObject,
0016 InvalidPath,
0017 InvalidRSEExpression,
0018 RSEFileNameNotSupported,
0019 RSENotFound,
0020 RSEProtocolNotSupported,
0021 UnsupportedOperation,
0022 )
0023
0024 from pandaserver.dataservice import DataServiceUtils, ErrorCode
0025 from pandaserver.dataservice.ddm import rucioAPI
0026
0027 from .adder_plugin_base import AdderPluginBase
0028
0029
0030 class AdderSimplePlugin(AdderPluginBase):
0031 """
0032 Simple plugin of Adder for VOs with Rucio.
0033 """
0034
0035
0036 def __init__(self, job, **params) -> None:
0037 """
0038 Initialize the AdderSimplePlugin.
0039
0040 :param job: The job object.
0041 :param params: Additional parameters.
0042 """
0043 AdderPluginBase.__init__(self, job, params)
0044
0045
0046 def execute(self) -> None:
0047 """
0048 Execute the simple adder plugin.
0049
0050 :return: None
0051 """
0052 try:
0053
0054 file_map = {}
0055 for file_spec in self.job.Files:
0056
0057 if file_spec.type not in ["output", "log"]:
0058 continue
0059
0060 if file_spec.destinationSE == "local":
0061 continue
0062
0063 try:
0064 file_size = int(file_spec.fsize)
0065 except Exception:
0066 file_size = None
0067
0068 if not file_spec.GUID:
0069 file_spec.GUID = str(uuid.uuid4())
0070 file_attrs = {
0071 "guid": file_spec.GUID,
0072 "lfn": file_spec.lfn,
0073 "size": file_size,
0074 "checksum": file_spec.checksum,
0075 "ds": file_spec.destinationDBlock,
0076 }
0077 if self.extra_info:
0078 if "surl" in self.extra_info and file_spec.lfn in self.extra_info["surl"]:
0079 file_attrs["surl"] = self.extra_info["surl"][file_spec.lfn]
0080 if "nevents" in self.extra_info and file_spec.lfn in self.extra_info["nevents"]:
0081 file_attrs["events"] = self.extra_info["nevents"][file_spec.lfn]
0082 file_map.setdefault(file_spec.destinationDBlock, [])
0083 file_map[file_spec.destinationDBlock].append(file_attrs)
0084
0085 if file_map:
0086 destination_rse = self.siteMapper.getSite(self.job.computingSite).ddm_output["default"]
0087 destination_id_map = {destination_rse: file_map}
0088 max_attempt = 3
0089 for attempt_number in range(max_attempt):
0090 is_fatal = False
0091 is_failed = False
0092 registration_start = naive_utcnow()
0093 try:
0094 self.logger.debug(f"registerFilesInDatasets {str(destination_id_map)}")
0095 out = rucioAPI.register_files_in_dataset(destination_id_map, {})
0096 except (
0097 DataIdentifierNotFound,
0098 FileConsistencyMismatch,
0099 UnsupportedOperation,
0100 InvalidPath,
0101 InvalidObject,
0102 RSENotFound,
0103 RSEProtocolNotSupported,
0104 InvalidRSEExpression,
0105 RSEFileNameNotSupported,
0106 KeyError,
0107 ) as e:
0108
0109 out = f"failed with {str(e)}\n {traceback.format_exc()}"
0110 is_fatal = True
0111 is_failed = True
0112 except Exception as e:
0113
0114 is_failed = True
0115 out = f"failed with unknown error: {str(e)}\n {traceback.format_exc()}"
0116 if (
0117 "value too large for column" in out
0118 or "unique constraint (ATLAS_RUCIO.DIDS_GUID_IDX) violate" in out
0119 or "unique constraint (ATLAS_RUCIO.DIDS_PK) violated" in out
0120 or "unique constraint (ATLAS_RUCIO.ARCH_CONTENTS_PK) violated" in out
0121 ):
0122 is_fatal = True
0123 else:
0124 is_fatal = False
0125 registration_time = naive_utcnow() - registration_start
0126 self.logger.debug("took %s.%03d sec" % (registration_time.seconds, registration_time.microseconds / 1000))
0127
0128
0129 if is_failed or is_fatal:
0130 self.logger.error(f"{out}")
0131 if (attempt_number + 1) == max_attempt or is_fatal:
0132 self.job.ddmErrorCode = ErrorCode.EC_Adder
0133
0134 extracted_error = DataServiceUtils.extractImportantError(out)
0135 err_msg = "Could not add files to DDM: "
0136 if extracted_error == "":
0137 self.job.ddmErrorDiag = err_msg + out.split("\n")[-1]
0138 else:
0139 self.job.ddmErrorDiag = err_msg + extracted_error
0140 if is_fatal:
0141 self.result.set_fatal()
0142 else:
0143 self.result.set_temporary()
0144 return
0145 self.logger.error(f"Try:{attempt_number}")
0146
0147 time.sleep(10)
0148 else:
0149 self.logger.debug(f"{str(out)}")
0150 break
0151
0152 self.result.set_succeeded()
0153 self.logger.debug("end plugin")
0154 except Exception as e:
0155 err_str = f"failed to execute with {str(e)}\n"
0156 err_str += traceback.format_exc()
0157 self.logger.error(err_str)
0158 self.result.set_temporary()
0159
0160 return