File indexing completed on 2026-04-20 07:59:00
0001 import os
0002 import os.path
0003 import shutil
0004 import sys
0005 import uuid
0006
0007 from rucio.client import Client as RucioClient
0008 from rucio.common.exception import RuleNotFound
0009
0010 from pandaharvester.harvestercore import core_utils
0011 from pandaharvester.harvestermover import mover_utils
0012
0013 from .base_stager import BaseStager
0014
0015
0016 baseLogger = core_utils.setup_logger("rucio_stager")
0017
0018
0019
0020 class RucioStager(BaseStager):
0021
0022 def __init__(self, **kwarg):
0023 BaseStager.__init__(self, **kwarg)
0024 if not hasattr(self, "scopeForTmp"):
0025 self.scopeForTmp = "panda"
0026
0027
0028 def check_stage_out_status(self, jobspec):
0029
0030 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0031 tmpLog.debug("start")
0032
0033 allChecked = True
0034 oneErrMsg = None
0035 transferStatus = dict()
0036 for fileSpec in jobspec.outFiles:
0037
0038 if fileSpec.status in ["finished", "failed"]:
0039 continue
0040
0041 transferID = fileSpec.fileAttributes["transferID"]
0042 if transferID not in transferStatus:
0043
0044 try:
0045 rucioAPI = RucioClient()
0046 ruleInfo = rucioAPI.get_replication_rule(transferID)
0047 tmpTransferStatus = ruleInfo["state"]
0048 tmpLog.debug(f"got state={tmpTransferStatus} for rule={transferID}")
0049 except RuleNotFound:
0050 tmpLog.error(f"rule {transferID} not found")
0051 tmpTransferStatus = "FAILED"
0052 except BaseException:
0053 err_type, err_value = sys.exc_info()[:2]
0054 errMsg = f"{err_type.__name__} {err_value}"
0055 tmpLog.error(f"failed to get status for rule={transferID} with {errMsg}")
0056
0057 tmpTransferStatus = None
0058 allChecked = False
0059
0060 if oneErrMsg is None:
0061 oneErrMsg = errMsg
0062 tmpTransferStatus = "OK"
0063 transferStatus[transferID] = tmpTransferStatus
0064
0065 if transferStatus[transferID] == "OK":
0066 fileSpec.status = "finished"
0067 elif transferStatus[transferID] in ["FAILED", "CANCELED"]:
0068 fileSpec.status = "failed"
0069 if allChecked:
0070 return True, ""
0071 else:
0072 return False, oneErrMsg
0073
0074
0075 def trigger_stage_out(self, jobspec):
0076
0077 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0078 tmpLog.debug("start")
0079
0080 files = dict()
0081 transferIDs = dict()
0082 transferDatasets = dict()
0083 fileAttrs = jobspec.get_output_file_attributes()
0084 for fileSpec in jobspec.outFiles:
0085
0086 if fileSpec.zipFileID is not None:
0087 continue
0088
0089 if "transferDataset" in fileSpec.fileAttributes:
0090 if fileSpec.fileType not in transferDatasets:
0091 transferDatasets[fileSpec.fileType] = fileSpec.fileAttributes["transferDataset"]
0092 if fileSpec.fileType not in transferIDs:
0093 transferIDs[fileSpec.fileType] = fileSpec.fileAttributes["transferID"]
0094 continue
0095
0096 if fileSpec.fileType == ["es_output", "zip_output"]:
0097 fileSpec.objstoreID = self.objStoreID_ES
0098
0099 if fileSpec.fileType != "zip_output":
0100 scope = fileAttrs[fileSpec.lfn]["scope"]
0101 datasetName = fileAttrs[fileSpec.lfn]["dataset"]
0102 else:
0103
0104 scope = self.scopeForTmp
0105 datasetName = "dummy"
0106 srcPath = fileSpec.path
0107 dstPath = mover_utils.construct_file_path(self.srcBasePath, scope, fileSpec.lfn)
0108
0109 if os.path.exists(dstPath):
0110 os.remove(dstPath)
0111
0112 tmpLog.debug(f"copy src={srcPath} dst={dstPath}")
0113 dstDir = os.path.dirname(dstPath)
0114 if not os.path.exists(dstDir):
0115 os.makedirs(dstDir)
0116 shutil.copyfile(srcPath, dstPath)
0117
0118 tmpFile = dict()
0119 tmpFile["scope"] = scope
0120 tmpFile["name"] = fileSpec.lfn
0121 tmpFile["bytes"] = fileSpec.fsize
0122 if fileSpec.fileType not in files:
0123 files[fileSpec.fileType] = []
0124 files[fileSpec.fileType].append(tmpFile)
0125
0126 rucioAPI = RucioClient()
0127 for fileType, fileList in files.items():
0128
0129 if fileType in ["es_output", "zip_output"]:
0130 dstRSE = self.dstRSE_ES
0131 elif fileType == "output":
0132 dstRSE = self.dstRSE_Out
0133 elif fileType == "log":
0134 dstRSE = self.dstRSE_Log
0135 else:
0136 errMsg = f"unsupported file type {fileType}"
0137 tmpLog.error(errMsg)
0138 return (False, errMsg)
0139
0140 if dstRSE is None:
0141 continue
0142
0143 if fileType not in transferDatasets:
0144 try:
0145 tmpScope = self.scopeForTmp
0146 tmpDS = f"panda.harvester_stage_out.{str(uuid.uuid4())}"
0147 rucioAPI.add_dataset(tmpScope, tmpDS, meta={"hidden": True}, lifetime=30 * 24 * 60 * 60, files=fileList, rse=self.srcRSE)
0148 transferDatasets[fileType] = tmpDS
0149
0150 tmpDID = dict()
0151 tmpDID["scope"] = tmpScope
0152 tmpDID["name"] = tmpDS
0153 tmpRet = rucioAPI.add_replication_rule([tmpDID], 1, dstRSE, lifetime=30 * 24 * 60 * 60)
0154 tmpTransferIDs = tmpRet[0]
0155 transferIDs[fileType] = tmpTransferIDs
0156 tmpLog.debug(f"register dataset {tmpDS} with rule {str(tmpTransferIDs)}")
0157 except BaseException:
0158 errMsg = core_utils.dump_error_message(tmpLog)
0159 return (False, errMsg)
0160 else:
0161
0162 try:
0163 tmpScope = self.scopeForTmp
0164 tmpDS = transferDatasets[fileType]
0165 rucioAPI.add_files_to_dataset(tmpScope, tmpDS, fileList, self.srcRSE)
0166 tmpLog.debug(f"added files to {tmpDS}")
0167 except BaseException:
0168 errMsg = core_utils.dump_error_message(tmpLog)
0169 return (False, errMsg)
0170
0171 for fileSpec in jobspec.outFiles:
0172
0173 if fileSpec.zipFileID is not None:
0174 continue
0175
0176 if fileSpec.status in ["finished", "failed"]:
0177 continue
0178
0179 if "transferDataset" in fileSpec.fileAttributes:
0180 continue
0181
0182 if fileSpec.fileType not in transferDatasets:
0183 fileSpec.status = "finished"
0184 continue
0185
0186 fileSpec.fileAttributes["transferDataset"] = transferDatasets[fileSpec.fileType]
0187
0188 fileSpec.fileAttributes["transferID"] = transferIDs[fileSpec.fileType]
0189
0190 fileSpec.force_update("fileAttributes")
0191
0192 tmpLog.debug("done")
0193 return (True, "")
0194
0195
0196 def zip_output(self, jobspec):
0197
0198 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0199 return self.simple_zip_output(jobspec, tmpLog)