File indexing completed on 2026-04-20 07:59:00
0001 from pandaharvester.harvesterconfig import harvester_config
0002 from pandaharvester.harvestercore import core_utils
0003 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0004 from pandaharvester.harvestermover import mover_utils
0005 from pandaharvester.harvesterstager import go_bulk_stager
0006 from pandaharvester.harvesterstager.go_bulk_stager import GlobusBulkStager
0007 from rucio.client import Client as RucioClient
0008 from rucio.common.exception import (
0009 DataIdentifierAlreadyExists,
0010 DataIdentifierNotFound,
0011 DuplicateRule,
0012 FileAlreadyExists,
0013 )
0014
0015
0016 _logger = core_utils.setup_logger("go_rucio_stager")
0017 go_bulk_stager._logger = _logger
0018
0019
0020
0021 class GlobusRucioStager(GlobusBulkStager):
0022
0023 def __init__(self, **kwarg):
0024 GlobusBulkStager.__init__(self, **kwarg)
0025 self.changeFileStatusOnSuccess = False
0026
0027
0028 def check_stage_out_status(self, jobspec):
0029
0030 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0031 tmpLog.debug("executing base check_stage_out_status")
0032 tmpStat, tmpMsg = GlobusBulkStager.check_stage_out_status(self, jobspec)
0033 tmpLog.debug(f"got {tmpStat} {tmpMsg}")
0034 if tmpStat is not True:
0035 return tmpStat, tmpMsg
0036
0037 groups = jobspec.get_groups_of_output_files()
0038 if len(groups) == 0:
0039 return tmpStat, tmpMsg
0040
0041 queueConfigMapper = QueueConfigMapper()
0042 queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0043
0044 tmpLog.debug(f"jobspec.computingSite - {jobspec.computingSite} queueConfig.stager {queueConfig.stager}")
0045
0046 if "srcRSE" in queueConfig.stager:
0047 srcRSE = queueConfig.stager["srcRSE"]
0048 else:
0049 tmpLog.debug("Warning srcRSE not defined in stager portion of queue config file")
0050
0051 nucleus = jobspec.jobParams["nucleus"]
0052 agis = self.dbInterface.get_cache("panda_queues.json").data
0053 dstRSE = [agis[x]["astorages"]["pr"][0] for x in agis if agis[x]["atlas_site"] == nucleus][0]
0054
0055 tmpLog.debug(f"srcRSE - {srcRSE} dstRSE - {dstRSE}")
0056
0057 tmpLog.debug(f"srcRSE - {srcRSE} dstRSE - {dstRSE}")
0058 errStr = ""
0059 if srcRSE is None:
0060 errStr = "Source RSE is not defined "
0061 if dstRSE is None:
0062 errStr = errStr + " Desitination RSE is not defined"
0063 if (srcRSE is None) or (dstRSE is None):
0064 tmpLog.error(errStr)
0065 return None, errStr
0066
0067 if "jobtype" in queueConfig.stager:
0068 if queueConfig.stager["jobtype"] == "Yoda":
0069 self.Yodajob = True
0070
0071
0072 ddm = self.dbInterface.get_cache("agis_ddmendpoints.json").data
0073 self.objstoreID = ddm[dstRSE]["id"]
0074 if self.Yodajob:
0075 self.pathConvention = int(queueConfig.stager["pathConvention"])
0076 tmpLog.debug(f"Yoda Job - PandaID = {jobspec.PandaID} objstoreID = {self.objstoreID} pathConvention ={self.pathConvention}")
0077 else:
0078 self.pathConvention = None
0079 tmpLog.debug(f"PandaID = {jobspec.PandaID} objstoreID = {self.objstoreID}")
0080
0081 self.set_FileSpec_objstoreID(jobspec, self.objstoreID, self.pathConvention)
0082
0083 try:
0084
0085 rucioAPI = RucioClient()
0086 except Exception:
0087 core_utils.dump_error_message(tmpLog)
0088
0089 tmpStat = None
0090 tmpMsg = f"failed to add a rule for {datasetScope}:{datasetName}"
0091 return tmpStat, tmpMsg
0092
0093 tmpStat = True
0094 tmpMsg = ""
0095 for transferID in groups:
0096 if transferID is None:
0097 continue
0098 datasetName = f"panda.harvester.{jobspec.PandaID}.{transferID}"
0099 datasetScope = "transient"
0100
0101 have_db_lock = self.dbInterface.get_object_lock(transferID, lock_interval=120)
0102 if not have_db_lock:
0103 msgStr = f"escape since {transferID} is locked by another thread"
0104 tmpLog.debug(msgStr)
0105 return None, msgStr
0106
0107 groupStatus = self.dbInterface.get_file_group_status(transferID)
0108 if "hopped" in groupStatus:
0109
0110 pass
0111 elif "failed" in groupStatus:
0112
0113 tmpStat = False
0114 tmpMsg = f"rucio rule for {datasetScope}:{datasetName} already failed"
0115 elif "hopping" in groupStatus:
0116
0117 ruleStatus = "FAILED"
0118 try:
0119 tmpLog.debug(f"check state for {datasetScope}:{datasetName}")
0120 for ruleInfo in rucioAPI.list_did_rules(datasetScope, datasetName):
0121 if ruleInfo["rse_expression"] != dstRSE:
0122 continue
0123 ruleStatus = ruleInfo["state"]
0124 tmpLog.debug(f"got state={ruleStatus}")
0125 if ruleStatus == "OK":
0126 break
0127 except DataIdentifierNotFound:
0128 tmpLog.error("dataset not found")
0129 except Exception:
0130 core_utils.dump_error_message(tmpLog)
0131 ruleStatus = None
0132 if ruleStatus in ["FAILED", "CANCELED"]:
0133
0134 tmpStat = False
0135 tmpMsg = f"rucio rule for {datasetScope}:{datasetName} failed with {ruleStatus}"
0136
0137 self.dbInterface.update_file_group_status(transferID, "failed")
0138 elif ruleStatus == "OK":
0139
0140 self.dbInterface.update_file_group_status(transferID, "hopped")
0141 else:
0142
0143 tmpStat = None
0144 tmpMsg = f"replicating or temporary error for {datasetScope}:{datasetName}"
0145 else:
0146
0147 fileSpecs = self.dbInterface.get_files_with_group_id(transferID)
0148 fileList = []
0149 for fileSpec in fileSpecs:
0150 tmpFile = dict()
0151 tmpFile["scope"] = datasetScope
0152 tmpFile["name"] = fileSpec.lfn
0153 tmpFile["bytes"] = fileSpec.fsize
0154 tmpFile["adler32"] = fileSpec.chksum
0155 if fileSpec.fileAttributes is not None and "guid" in fileSpec.fileAttributes:
0156 tmpFile["meta"] = {"guid": fileSpec.fileAttributes["guid"]}
0157 else:
0158 tmpLog.debug(f"File - {fileSpec.lfn} does not have a guid value")
0159 tmpLog.debug(f"Adding file {fileSpec.lfn} to fileList")
0160 fileList.append(tmpFile)
0161
0162 if srcRSE is None and fileSpec.objstoreID is not None:
0163 ddm = self.dbInterface.get_cache("agis_ddmendpoints.json").data
0164 srcRSE = [x for x in ddm if ddm[x]["id"] == fileSpec.objstoreID][0]
0165 try:
0166
0167 tmpLog.debug(f"register {datasetScope}:{datasetName} rse = {srcRSE} meta=(hidden: True) lifetime = {30 * 24 * 60 * 60}")
0168 try:
0169 rucioAPI.add_dataset(datasetScope, datasetName, meta={"hidden": True}, lifetime=30 * 24 * 60 * 60, rse=srcRSE)
0170 except DataIdentifierAlreadyExists:
0171
0172 pass
0173 except Exception:
0174 errMsg = f"Could not create dataset {datasetScope}:{datasetName} srcRSE - {srcRSE}"
0175 core_utils.dump_error_message(tmpLog)
0176 tmpLog.error(errMsg)
0177 raise
0178
0179
0180
0181 numfiles = len(fileList)
0182 maxfiles = 500
0183 numslices = numfiles / maxfiles
0184 if (numfiles % maxfiles) > 0:
0185 numslices = numslices + 1
0186 start = 0
0187 for i in range(numslices):
0188 try:
0189 stop = start + maxfiles
0190 if stop > numfiles:
0191 stop = numfiles
0192
0193 rucioAPI.add_files_to_datasets(
0194 [{"scope": datasetScope, "name": datasetName, "dids": fileList[start:stop], "rse": srcRSE}], ignore_duplicate=True
0195 )
0196 start = stop
0197 except FileAlreadyExists:
0198
0199 pass
0200 except Exception:
0201 errMsg = f"Could not add files to DS - {datasetScope}:{datasetName} rse - {srcRSE} files - {fileList}"
0202 core_utils.dump_error_message(tmpLog)
0203 tmpLog.error(errMsg)
0204 return None, errMsg
0205
0206 try:
0207 tmpDID = dict()
0208 tmpDID["scope"] = datasetScope
0209 tmpDID["name"] = datasetName
0210 tmpRet = rucioAPI.add_replication_rule([tmpDID], 1, dstRSE, lifetime=30 * 24 * 60 * 60)
0211 ruleIDs = tmpRet[0]
0212 tmpLog.debug(f"registered dataset {datasetScope}:{datasetName} with rule {str(ruleIDs)}")
0213 except DuplicateRule:
0214
0215 tmpLog.debug("rule is already available")
0216 except Exception:
0217 errMsg = f"Error creating rule for dataset {datasetScope}:{datasetName}"
0218 core_utils.dump_error_message(tmpLog)
0219 tmpLog.debug(errMsg)
0220
0221 return None, errMsg
0222
0223 self.dbInterface.update_file_group_status(transferID, "hopping")
0224 except Exception:
0225 core_utils.dump_error_message(tmpLog)
0226
0227 tmpStat = None
0228 tmpMsg = f"failed to add a rule for {datasetScope}:{datasetName}"
0229
0230 self.dbInterface.release_object_lock(transferID)
0231
0232 if tmpStat is False:
0233 break
0234
0235 if tmpStat is True:
0236 self.set_FileSpec_status(jobspec, "finished")
0237 tmpLog.debug(f"done with {tmpStat} : {tmpMsg}")
0238 return tmpStat, tmpMsg