Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import os
0002 import os.path
0003 import shutil
0004 import sys
0005 import uuid
0006 
0007 from rucio.client import Client as RucioClient
0008 from rucio.common.exception import RuleNotFound
0009 
0010 from pandaharvester.harvestercore import core_utils
0011 from pandaharvester.harvestermover import mover_utils
0012 
0013 from .base_stager import BaseStager
0014 
0015 # logger
0016 baseLogger = core_utils.setup_logger("rucio_stager")
0017 
0018 
0019 # plugin for stage-out with Rucio
0020 class RucioStager(BaseStager):
0021     # constructor
0022     def __init__(self, **kwarg):
0023         BaseStager.__init__(self, **kwarg)
0024         if not hasattr(self, "scopeForTmp"):
0025             self.scopeForTmp = "panda"
0026 
0027     # check status
0028     def check_stage_out_status(self, jobspec):
0029         # make logger
0030         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0031         tmpLog.debug("start")
0032         # loop over all files
0033         allChecked = True
0034         oneErrMsg = None
0035         transferStatus = dict()
0036         for fileSpec in jobspec.outFiles:
0037             # skip already don
0038             if fileSpec.status in ["finished", "failed"]:
0039                 continue
0040             # get transfer ID
0041             transferID = fileSpec.fileAttributes["transferID"]
0042             if transferID not in transferStatus:
0043                 # get status
0044                 try:
0045                     rucioAPI = RucioClient()
0046                     ruleInfo = rucioAPI.get_replication_rule(transferID)
0047                     tmpTransferStatus = ruleInfo["state"]
0048                     tmpLog.debug(f"got state={tmpTransferStatus} for rule={transferID}")
0049                 except RuleNotFound:
0050                     tmpLog.error(f"rule {transferID} not found")
0051                     tmpTransferStatus = "FAILED"
0052                 except BaseException:
0053                     err_type, err_value = sys.exc_info()[:2]
0054                     errMsg = f"{err_type.__name__} {err_value}"
0055                     tmpLog.error(f"failed to get status for rule={transferID} with {errMsg}")
0056                     # set dummy not to lookup again
0057                     tmpTransferStatus = None
0058                     allChecked = False
0059                     # keep one message
0060                     if oneErrMsg is None:
0061                         oneErrMsg = errMsg
0062                 tmpTransferStatus = "OK"
0063                 transferStatus[transferID] = tmpTransferStatus
0064             # final status
0065             if transferStatus[transferID] == "OK":
0066                 fileSpec.status = "finished"
0067             elif transferStatus[transferID] in ["FAILED", "CANCELED"]:
0068                 fileSpec.status = "failed"
0069         if allChecked:
0070             return True, ""
0071         else:
0072             return False, oneErrMsg
0073 
0074     # trigger stage out
0075     def trigger_stage_out(self, jobspec):
0076         # make logger
0077         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0078         tmpLog.debug("start")
0079         # loop over all files
0080         files = dict()
0081         transferIDs = dict()
0082         transferDatasets = dict()
0083         fileAttrs = jobspec.get_output_file_attributes()
0084         for fileSpec in jobspec.outFiles:
0085             # skip zipped files
0086             if fileSpec.zipFileID is not None:
0087                 continue
0088             # skip if already processed
0089             if "transferDataset" in fileSpec.fileAttributes:
0090                 if fileSpec.fileType not in transferDatasets:
0091                     transferDatasets[fileSpec.fileType] = fileSpec.fileAttributes["transferDataset"]
0092                 if fileSpec.fileType not in transferIDs:
0093                     transferIDs[fileSpec.fileType] = fileSpec.fileAttributes["transferID"]
0094                 continue
0095             # set OS ID
0096             if fileSpec.fileType == ["es_output", "zip_output"]:
0097                 fileSpec.objstoreID = self.objStoreID_ES
0098             # make path where file is copied for transfer
0099             if fileSpec.fileType != "zip_output":
0100                 scope = fileAttrs[fileSpec.lfn]["scope"]
0101                 datasetName = fileAttrs[fileSpec.lfn]["dataset"]
0102             else:
0103                 # use panda scope for zipped files
0104                 scope = self.scopeForTmp
0105                 datasetName = "dummy"
0106             srcPath = fileSpec.path
0107             dstPath = mover_utils.construct_file_path(self.srcBasePath, scope, fileSpec.lfn)
0108             # remove
0109             if os.path.exists(dstPath):
0110                 os.remove(dstPath)
0111             # copy
0112             tmpLog.debug(f"copy src={srcPath} dst={dstPath}")
0113             dstDir = os.path.dirname(dstPath)
0114             if not os.path.exists(dstDir):
0115                 os.makedirs(dstDir)
0116             shutil.copyfile(srcPath, dstPath)
0117             # collect files
0118             tmpFile = dict()
0119             tmpFile["scope"] = scope
0120             tmpFile["name"] = fileSpec.lfn
0121             tmpFile["bytes"] = fileSpec.fsize
0122             if fileSpec.fileType not in files:
0123                 files[fileSpec.fileType] = []
0124             files[fileSpec.fileType].append(tmpFile)
0125         # loop over all file types to be registered to rucio
0126         rucioAPI = RucioClient()
0127         for fileType, fileList in files.items():
0128             # set destination RSE
0129             if fileType in ["es_output", "zip_output"]:
0130                 dstRSE = self.dstRSE_ES
0131             elif fileType == "output":
0132                 dstRSE = self.dstRSE_Out
0133             elif fileType == "log":
0134                 dstRSE = self.dstRSE_Log
0135             else:
0136                 errMsg = f"unsupported file type {fileType}"
0137                 tmpLog.error(errMsg)
0138                 return (False, errMsg)
0139             # skip if destination is None
0140             if dstRSE is None:
0141                 continue
0142             # make datasets if missing
0143             if fileType not in transferDatasets:
0144                 try:
0145                     tmpScope = self.scopeForTmp
0146                     tmpDS = f"panda.harvester_stage_out.{str(uuid.uuid4())}"
0147                     rucioAPI.add_dataset(tmpScope, tmpDS, meta={"hidden": True}, lifetime=30 * 24 * 60 * 60, files=fileList, rse=self.srcRSE)
0148                     transferDatasets[fileType] = tmpDS
0149                     # add rule
0150                     tmpDID = dict()
0151                     tmpDID["scope"] = tmpScope
0152                     tmpDID["name"] = tmpDS
0153                     tmpRet = rucioAPI.add_replication_rule([tmpDID], 1, dstRSE, lifetime=30 * 24 * 60 * 60)
0154                     tmpTransferIDs = tmpRet[0]
0155                     transferIDs[fileType] = tmpTransferIDs
0156                     tmpLog.debug(f"register dataset {tmpDS} with rule {str(tmpTransferIDs)}")
0157                 except BaseException:
0158                     errMsg = core_utils.dump_error_message(tmpLog)
0159                     return (False, errMsg)
0160             else:
0161                 # add files to existing dataset
0162                 try:
0163                     tmpScope = self.scopeForTmp
0164                     tmpDS = transferDatasets[fileType]
0165                     rucioAPI.add_files_to_dataset(tmpScope, tmpDS, fileList, self.srcRSE)
0166                     tmpLog.debug(f"added files to {tmpDS}")
0167                 except BaseException:
0168                     errMsg = core_utils.dump_error_message(tmpLog)
0169                     return (False, errMsg)
0170         # set transfer datasets and rules
0171         for fileSpec in jobspec.outFiles:
0172             # skip zipped files
0173             if fileSpec.zipFileID is not None:
0174                 continue
0175             # skip already done
0176             if fileSpec.status in ["finished", "failed"]:
0177                 continue
0178             # skip if already processed
0179             if "transferDataset" in fileSpec.fileAttributes:
0180                 continue
0181             # no destination
0182             if fileSpec.fileType not in transferDatasets:
0183                 fileSpec.status = "finished"
0184                 continue
0185             # set dataset
0186             fileSpec.fileAttributes["transferDataset"] = transferDatasets[fileSpec.fileType]
0187             # set rule
0188             fileSpec.fileAttributes["transferID"] = transferIDs[fileSpec.fileType]
0189             # force update
0190             fileSpec.force_update("fileAttributes")
0191         # return
0192         tmpLog.debug("done")
0193         return (True, "")
0194 
0195     # zip output files
0196     def zip_output(self, jobspec):
0197         # make logger
0198         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0199         return self.simple_zip_output(jobspec, tmpLog)