File indexing completed on 2026-04-20 07:59:00
0001 import os
0002 import subprocess
0003 import uuid
0004
0005 from pandaharvester.harvestercore import core_utils
0006
0007 from .base_stager import BaseStager
0008
0009
0010
0011
0012 baseLogger = core_utils.setup_logger("rucio_stager_hpc")
0013
0014
0015
0016 class RucioStagerHPC(BaseStager):
0017
0018 def __init__(self, **kwarg):
0019 BaseStager.__init__(self, **kwarg)
0020 if not hasattr(self, "scopeForTmp"):
0021 self.scopeForTmp = "panda"
0022 if not hasattr(self, "pathConvention"):
0023 self.pathConvention = None
0024 if not hasattr(self, "objstoreID"):
0025 self.objstoreID = None
0026 if not hasattr(self, "maxAttempts"):
0027 self.maxAttempts = 3
0028 if not hasattr(self, "objectstore_additions"):
0029 self.objectstore_additions = None
0030
0031
0032 def check_stage_out_status(self, jobspec):
0033
0034 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0035 tmpLog.debug("start")
0036 return (True, "")
0037
0038
0039 def trigger_stage_out(self, jobspec):
0040
0041 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0042 tmpLog.debug("start")
0043
0044 allChecked = True
0045 ErrMsg = "These files failed to upload : "
0046 zip_datasetName = f"harvester_stage_out.{str(uuid.uuid4())}"
0047 fileAttrs = jobspec.get_output_file_attributes()
0048 for fileSpec in jobspec.outFiles:
0049
0050
0051 tmpLog.debug(f"file: {fileSpec.lfn} status: {fileSpec.status}")
0052 if fileSpec.status in ["finished", "failed"]:
0053 continue
0054
0055 fileSpec.pathConvention = self.pathConvention
0056 fileSpec.objstoreID = self.objstoreID
0057
0058 if fileSpec.fileType in ["es_output", "zip_output", "output"]:
0059 dstRSE = self.dstRSE_Out
0060 elif fileSpec.fileType == "log":
0061 dstRSE = self.dstRSE_Log
0062 else:
0063 errMsg = f"unsupported file type {fileSpec.fileType}"
0064 tmpLog.error(errMsg)
0065 return (False, errMsg)
0066
0067 if dstRSE is None:
0068 continue
0069
0070
0071 if fileSpec.fileType == "log":
0072 if fileSpec.lfn in fileAttrs:
0073 scope = fileAttrs[fileSpec.lfn]["scope"]
0074 datasetName = fileAttrs[fileSpec.lfn]["dataset"]
0075 else:
0076 lfnWithoutWorkerID = ".".join(fileSpec.lfn.split(".")[:-1])
0077 scope = fileAttrs[lfnWithoutWorkerID]["scope"]
0078 datasetName = fileAttrs[lfnWithoutWorkerID]["dataset"]
0079 elif fileSpec.fileType != "zip_output" and fileSpec.lfn in fileAttrs:
0080 scope = fileAttrs[fileSpec.lfn]["scope"]
0081 datasetName = fileAttrs[fileSpec.lfn]["dataset"]
0082 else:
0083
0084 scope = self.scopeForTmp
0085 datasetName = zip_datasetName
0086
0087
0088
0089 executable_prefix = None
0090 pfn_prefix = None
0091 if self.objectstore_additions and dstRSE in self.objectstore_additions:
0092 if "storage_id" in self.objectstore_additions[dstRSE]:
0093 fileSpec.objstoreID = self.objectstore_additions[dstRSE]["storage_id"]
0094 if (
0095 "access_key" in self.objectstore_additions[dstRSE]
0096 and "secret_key" in self.objectstore_additions[dstRSE]
0097 and "is_secure" in self.objectstore_additions[dstRSE]
0098 ):
0099 executable_prefix = "export S3_ACCESS_KEY=%s; export S3_SECRET_KEY=%s; export S3_IS_SECURE=%s" % (
0100 self.objectstore_additions[dstRSE]["access_key"],
0101 self.objectstore_additions[dstRSE]["secret_key"],
0102 self.objectstore_additions[dstRSE]["is_secure"],
0103 )
0104 if "pfn_prefix" in self.objectstore_additions[dstRSE]:
0105 pfn_prefix = self.objectstore_additions[dstRSE]["pfn_prefix"]
0106
0107 executable = ["/usr/bin/env", "rucio", "-v", "upload"]
0108 executable += ["--no-register"]
0109 if hasattr(self, "lifetime"):
0110 executable += ["--lifetime", ("%d" % self.lifetime)]
0111 if fileSpec.fileAttributes is not None and "guid" in fileSpec.fileAttributes:
0112 executable += ["--guid", fileSpec.fileAttributes["guid"]]
0113
0114 executable += ["--rse", dstRSE]
0115 executable += ["--scope", scope]
0116 if pfn_prefix:
0117 executable += [f"--pfn {os.path.join(pfn_prefix, os.path.basename(fileSpec.path))}"]
0118 else:
0119 executable += [f"{scope}:{datasetName}"]
0120 executable += [f"{fileSpec.path}"]
0121
0122 tmpLog.debug(f"rucio upload command: {executable} ")
0123 tmpLog.debug(f"rucio upload command (for human): {' '.join(executable)} ")
0124
0125 if executable_prefix:
0126 cmd = executable_prefix + "; " + " ".join(executable)
0127 process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
0128 else:
0129 process = subprocess.Popen(executable, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
0130
0131 stdout, stderr = process.communicate()
0132 fileSpec.attemptNr += 1
0133 stdout = stdout.decode() + f" attemptNr: {fileSpec.attemptNr}"
0134 tmpLog.debug(f"stdout: {stdout}")
0135 tmpLog.debug(f"stderr: {stderr}")
0136 if process.returncode == 0:
0137 fileSpec.status = "finished"
0138 else:
0139
0140 file_exists = False
0141 rucio_sessions_limit_error = False
0142 for line in stdout.split("\n"):
0143 if "File name in specified scope already exists" in line:
0144 file_exists = True
0145 break
0146 elif "File already exists on RSE" in line:
0147
0148 tmpLog.warning(f"rucio skipped upload and returned stdout: {stdout}")
0149 file_exists = True
0150 break
0151 elif "exceeded simultaneous SESSIONS_PER_USER limit" in line:
0152 rucio_sessions_limit_error = True
0153 if file_exists:
0154 tmpLog.debug("file exists, marking transfer as finished")
0155 fileSpec.status = "finished"
0156 elif rucio_sessions_limit_error:
0157
0158 tmpLog.warning(f"rucio returned error, will retry: stdout: {stdout}")
0159
0160 allChecked = False
0161 continue
0162 else:
0163 tmpLog.error(f"rucio upload failed with stdout: {stdout}")
0164 ErrMsg += f'{fileSpec.lfn} failed with rucio error stdout="{stdout}"'
0165 allChecked = False
0166 if fileSpec.attemptNr >= self.maxAttempts:
0167 tmpLog.error(f"reached maxattempts: {self.maxAttempts}, marked it as failed")
0168 fileSpec.status = "failed"
0169
0170
0171 fileSpec.force_update("status")
0172
0173 tmpLog.debug(f"file: {fileSpec.lfn} status: {fileSpec.status}")
0174
0175
0176 tmpLog.debug("done")
0177 if allChecked:
0178 return True, ""
0179 else:
0180 return False, ErrMsg
0181
0182
0183 def zip_output(self, jobspec):
0184
0185 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0186 return self.simple_zip_output(jobspec, tmpLog)