Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 from pandaharvester.harvesterconfig import harvester_config
0002 from pandaharvester.harvestercore import core_utils
0003 from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
0004 from pandaharvester.harvestermover import mover_utils
0005 from pandaharvester.harvesterstager import go_bulk_stager
0006 from pandaharvester.harvesterstager.go_bulk_stager import GlobusBulkStager
0007 from rucio.client import Client as RucioClient
0008 from rucio.common.exception import (
0009     DataIdentifierAlreadyExists,
0010     DataIdentifierNotFound,
0011     DuplicateRule,
0012     FileAlreadyExists,
0013 )
0014 
0015 # logger
0016 _logger = core_utils.setup_logger("go_rucio_stager")
0017 go_bulk_stager._logger = _logger
0018 
0019 
0020 # plugin with Globus + Rucio + bulk transfers
0021 class GlobusRucioStager(GlobusBulkStager):
0022     # constructor
0023     def __init__(self, **kwarg):
0024         GlobusBulkStager.__init__(self, **kwarg)
0025         self.changeFileStatusOnSuccess = False
0026 
0027     # check status
0028     def check_stage_out_status(self, jobspec):
0029         # make logger
0030         tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0031         tmpLog.debug("executing base check_stage_out_status")
0032         tmpStat, tmpMsg = GlobusBulkStager.check_stage_out_status(self, jobspec)
0033         tmpLog.debug(f"got {tmpStat} {tmpMsg}")
0034         if tmpStat is not True:
0035             return tmpStat, tmpMsg
0036         # get transfer groups
0037         groups = jobspec.get_groups_of_output_files()
0038         if len(groups) == 0:
0039             return tmpStat, tmpMsg
0040         # get the queueConfig and corresponding objStoreID_ES
0041         queueConfigMapper = QueueConfigMapper()
0042         queueConfig = queueConfigMapper.get_queue(jobspec.computingSite)
0043         # write to debug log queueConfig.stager
0044         tmpLog.debug(f"jobspec.computingSite - {jobspec.computingSite} queueConfig.stager {queueConfig.stager}")
0045         # check queueConfig stager section to see if srcRSE is set
0046         if "srcRSE" in queueConfig.stager:
0047             srcRSE = queueConfig.stager["srcRSE"]
0048         else:
0049             tmpLog.debug("Warning srcRSE not defined in stager portion of queue config file")
0050         # get destination endpoint
0051         nucleus = jobspec.jobParams["nucleus"]
0052         agis = self.dbInterface.get_cache("panda_queues.json").data
0053         dstRSE = [agis[x]["astorages"]["pr"][0] for x in agis if agis[x]["atlas_site"] == nucleus][0]
0054         # if debugging log source and destination RSEs
0055         tmpLog.debug(f"srcRSE - {srcRSE} dstRSE - {dstRSE}")
0056         # test that srcRSE and dstRSE are defined
0057         tmpLog.debug(f"srcRSE - {srcRSE} dstRSE - {dstRSE}")
0058         errStr = ""
0059         if srcRSE is None:
0060             errStr = "Source RSE is not defined "
0061         if dstRSE is None:
0062             errStr = errStr + " Desitination RSE is not defined"
0063         if (srcRSE is None) or (dstRSE is None):
0064             tmpLog.error(errStr)
0065             return None, errStr
0066         # check queueConfig stager section to see if jobtype is set
0067         if "jobtype" in queueConfig.stager:
0068             if queueConfig.stager["jobtype"] == "Yoda":
0069                 self.Yodajob = True
0070         # set the location of the files in fileSpec.objstoreID
0071         # see file /cvmfs/atlas.cern.ch/repo/sw/local/etc/agis_ddmendpoints.json
0072         ddm = self.dbInterface.get_cache("agis_ddmendpoints.json").data
0073         self.objstoreID = ddm[dstRSE]["id"]
0074         if self.Yodajob:
0075             self.pathConvention = int(queueConfig.stager["pathConvention"])
0076             tmpLog.debug(f"Yoda Job - PandaID = {jobspec.PandaID} objstoreID = {self.objstoreID} pathConvention ={self.pathConvention}")
0077         else:
0078             self.pathConvention = None
0079             tmpLog.debug(f"PandaID = {jobspec.PandaID} objstoreID = {self.objstoreID}")
0080         # set the location of the files in fileSpec.objstoreID
0081         self.set_FileSpec_objstoreID(jobspec, self.objstoreID, self.pathConvention)
0082         # create the Rucio Client
0083         try:
0084             # register dataset
0085             rucioAPI = RucioClient()
0086         except Exception:
0087             core_utils.dump_error_message(tmpLog)
0088             # treat as a temporary error
0089             tmpStat = None
0090             tmpMsg = f"failed to add a rule for {datasetScope}:{datasetName}"
0091             return tmpStat, tmpMsg
0092         # loop over all transfers
0093         tmpStat = True
0094         tmpMsg = ""
0095         for transferID in groups:
0096             if transferID is None:
0097                 continue
0098             datasetName = f"panda.harvester.{jobspec.PandaID}.{transferID}"
0099             datasetScope = "transient"
0100             # lock
0101             have_db_lock = self.dbInterface.get_object_lock(transferID, lock_interval=120)
0102             if not have_db_lock:
0103                 msgStr = f"escape since {transferID} is locked by another thread"
0104                 tmpLog.debug(msgStr)
0105                 return None, msgStr
0106             # get transfer status
0107             groupStatus = self.dbInterface.get_file_group_status(transferID)
0108             if "hopped" in groupStatus:
0109                 # already succeeded
0110                 pass
0111             elif "failed" in groupStatus:
0112                 # transfer failure
0113                 tmpStat = False
0114                 tmpMsg = f"rucio rule for {datasetScope}:{datasetName} already failed"
0115             elif "hopping" in groupStatus:
0116                 # check rucio rule
0117                 ruleStatus = "FAILED"
0118                 try:
0119                     tmpLog.debug(f"check state for {datasetScope}:{datasetName}")
0120                     for ruleInfo in rucioAPI.list_did_rules(datasetScope, datasetName):
0121                         if ruleInfo["rse_expression"] != dstRSE:
0122                             continue
0123                         ruleStatus = ruleInfo["state"]
0124                         tmpLog.debug(f"got state={ruleStatus}")
0125                         if ruleStatus == "OK":
0126                             break
0127                 except DataIdentifierNotFound:
0128                     tmpLog.error("dataset not found")
0129                 except Exception:
0130                     core_utils.dump_error_message(tmpLog)
0131                     ruleStatus = None
0132                 if ruleStatus in ["FAILED", "CANCELED"]:
0133                     # transfer failure
0134                     tmpStat = False
0135                     tmpMsg = f"rucio rule for {datasetScope}:{datasetName} failed with {ruleStatus}"
0136                     # update file group status
0137                     self.dbInterface.update_file_group_status(transferID, "failed")
0138                 elif ruleStatus == "OK":
0139                     # update successful file group status
0140                     self.dbInterface.update_file_group_status(transferID, "hopped")
0141                 else:
0142                     # replicating or temporary error
0143                     tmpStat = None
0144                     tmpMsg = f"replicating or temporary error for {datasetScope}:{datasetName}"
0145             else:
0146                 # make rucio rule
0147                 fileSpecs = self.dbInterface.get_files_with_group_id(transferID)
0148                 fileList = []
0149                 for fileSpec in fileSpecs:
0150                     tmpFile = dict()
0151                     tmpFile["scope"] = datasetScope
0152                     tmpFile["name"] = fileSpec.lfn
0153                     tmpFile["bytes"] = fileSpec.fsize
0154                     tmpFile["adler32"] = fileSpec.chksum
0155                     if fileSpec.fileAttributes is not None and "guid" in fileSpec.fileAttributes:
0156                         tmpFile["meta"] = {"guid": fileSpec.fileAttributes["guid"]}
0157                     else:
0158                         tmpLog.debug(f"File - {fileSpec.lfn} does not have a guid value")
0159                     tmpLog.debug(f"Adding file {fileSpec.lfn} to fileList")
0160                     fileList.append(tmpFile)
0161                     # get source RSE
0162                     if srcRSE is None and fileSpec.objstoreID is not None:
0163                         ddm = self.dbInterface.get_cache("agis_ddmendpoints.json").data
0164                         srcRSE = [x for x in ddm if ddm[x]["id"] == fileSpec.objstoreID][0]
0165                 try:
0166                     # register dataset
0167                     tmpLog.debug(f"register {datasetScope}:{datasetName} rse = {srcRSE} meta=(hidden: True) lifetime = {30 * 24 * 60 * 60}")
0168                     try:
0169                         rucioAPI.add_dataset(datasetScope, datasetName, meta={"hidden": True}, lifetime=30 * 24 * 60 * 60, rse=srcRSE)
0170                     except DataIdentifierAlreadyExists:
0171                         # ignore even if the dataset already exists
0172                         pass
0173                     except Exception:
0174                         errMsg = f"Could not create dataset {datasetScope}:{datasetName} srcRSE - {srcRSE}"
0175                         core_utils.dump_error_message(tmpLog)
0176                         tmpLog.error(errMsg)
0177                         raise
0178                         # return None,errMsg
0179                     # add files to dataset
0180                     #  add 500 files at a time
0181                     numfiles = len(fileList)
0182                     maxfiles = 500
0183                     numslices = numfiles / maxfiles
0184                     if (numfiles % maxfiles) > 0:
0185                         numslices = numslices + 1
0186                     start = 0
0187                     for i in range(numslices):
0188                         try:
0189                             stop = start + maxfiles
0190                             if stop > numfiles:
0191                                 stop = numfiles
0192 
0193                             rucioAPI.add_files_to_datasets(
0194                                 [{"scope": datasetScope, "name": datasetName, "dids": fileList[start:stop], "rse": srcRSE}], ignore_duplicate=True
0195                             )
0196                             start = stop
0197                         except FileAlreadyExists:
0198                             # ignore if files already exist
0199                             pass
0200                         except Exception:
0201                             errMsg = f"Could not add files to DS - {datasetScope}:{datasetName}  rse - {srcRSE} files - {fileList}"
0202                             core_utils.dump_error_message(tmpLog)
0203                             tmpLog.error(errMsg)
0204                             return None, errMsg
0205                     # add rule
0206                     try:
0207                         tmpDID = dict()
0208                         tmpDID["scope"] = datasetScope
0209                         tmpDID["name"] = datasetName
0210                         tmpRet = rucioAPI.add_replication_rule([tmpDID], 1, dstRSE, lifetime=30 * 24 * 60 * 60)
0211                         ruleIDs = tmpRet[0]
0212                         tmpLog.debug(f"registered dataset {datasetScope}:{datasetName} with rule {str(ruleIDs)}")
0213                     except DuplicateRule:
0214                         # ignore duplicated rule
0215                         tmpLog.debug("rule is already available")
0216                     except Exception:
0217                         errMsg = f"Error creating rule for dataset {datasetScope}:{datasetName}"
0218                         core_utils.dump_error_message(tmpLog)
0219                         tmpLog.debug(errMsg)
0220                         # raise
0221                         return None, errMsg
0222                     # update file group status
0223                     self.dbInterface.update_file_group_status(transferID, "hopping")
0224                 except Exception:
0225                     core_utils.dump_error_message(tmpLog)
0226                     # treat as a temporary error
0227                     tmpStat = None
0228                     tmpMsg = f"failed to add a rule for {datasetScope}:{datasetName}"
0229             # release lock
0230             self.dbInterface.release_object_lock(transferID)
0231             # escape if already failed
0232             if tmpStat is False:
0233                 break
0234         # all done
0235         if tmpStat is True:
0236             self.set_FileSpec_status(jobspec, "finished")
0237         tmpLog.debug(f"done with {tmpStat} : {tmpMsg}")
0238         return tmpStat, tmpMsg