Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import os
0002 import subprocess
0003 import uuid
0004 
0005 from pandaharvester.harvestercore import core_utils
0006 
0007 from .base_stager import BaseStager
0008 
0009 # TODO: retry of failed transfers
0010 
0011 # logger
0012 baseLogger = core_utils.setup_logger("rucio_stager_hpc")
0013 
0014 
0015 # plugin for stage-out with Rucio on an HPC site that must copy output elsewhere
0016 class RucioStagerHPC(BaseStager):
0017     # constructor
0018     def __init__(self, **kwarg):
0019         BaseStager.__init__(self, **kwarg)
0020         if not hasattr(self, "scopeForTmp"):
0021             self.scopeForTmp = "panda"
0022         if not hasattr(self, "pathConvention"):
0023             self.pathConvention = None
0024         if not hasattr(self, "objstoreID"):
0025             self.objstoreID = None
0026         if not hasattr(self, "maxAttempts"):
0027             self.maxAttempts = 3
0028         if not hasattr(self, "objectstore_additions"):
0029             self.objectstore_additions = None
0030 
0031     # check status
0032     def check_stage_out_status(self, jobspec):
0033         # make logger
0034         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="check_stage_out_status")
0035         tmpLog.debug("start")
0036         return (True, "")
0037 
0038     # trigger stage out
0039     def trigger_stage_out(self, jobspec):
0040         # make logger
0041         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0042         tmpLog.debug("start")
0043         # loop over all files
0044         allChecked = True
0045         ErrMsg = "These files failed to upload : "
0046         zip_datasetName = f"harvester_stage_out.{str(uuid.uuid4())}"
0047         fileAttrs = jobspec.get_output_file_attributes()
0048         for fileSpec in jobspec.outFiles:
0049             # fileSpec.fileAttributes['transferID'] = None  # synchronius transfer
0050             # skip already done
0051             tmpLog.debug(f"file: {fileSpec.lfn} status: {fileSpec.status}")
0052             if fileSpec.status in ["finished", "failed"]:
0053                 continue
0054 
0055             fileSpec.pathConvention = self.pathConvention
0056             fileSpec.objstoreID = self.objstoreID
0057             # set destination RSE
0058             if fileSpec.fileType in ["es_output", "zip_output", "output"]:
0059                 dstRSE = self.dstRSE_Out
0060             elif fileSpec.fileType == "log":
0061                 dstRSE = self.dstRSE_Log
0062             else:
0063                 errMsg = f"unsupported file type {fileSpec.fileType}"
0064                 tmpLog.error(errMsg)
0065                 return (False, errMsg)
0066             # skip if destination is None
0067             if dstRSE is None:
0068                 continue
0069 
0070             # get/set scope and dataset name
0071             if fileSpec.fileType == "log":
0072                 if fileSpec.lfn in fileAttrs:
0073                     scope = fileAttrs[fileSpec.lfn]["scope"]
0074                     datasetName = fileAttrs[fileSpec.lfn]["dataset"]
0075                 else:
0076                     lfnWithoutWorkerID = ".".join(fileSpec.lfn.split(".")[:-1])
0077                     scope = fileAttrs[lfnWithoutWorkerID]["scope"]
0078                     datasetName = fileAttrs[lfnWithoutWorkerID]["dataset"]
0079             elif fileSpec.fileType != "zip_output" and fileSpec.lfn in fileAttrs:
0080                 scope = fileAttrs[fileSpec.lfn]["scope"]
0081                 datasetName = fileAttrs[fileSpec.lfn]["dataset"]
0082             else:
0083                 # use panda scope for zipped files
0084                 scope = self.scopeForTmp
0085                 datasetName = zip_datasetName
0086 
0087             # for now mimic behaviour and code of pilot v2 rucio copy tool (rucio download) change when needed
0088 
0089             executable_prefix = None
0090             pfn_prefix = None
0091             if self.objectstore_additions and dstRSE in self.objectstore_additions:
0092                 if "storage_id" in self.objectstore_additions[dstRSE]:
0093                     fileSpec.objstoreID = self.objectstore_additions[dstRSE]["storage_id"]
0094                 if (
0095                     "access_key" in self.objectstore_additions[dstRSE]
0096                     and "secret_key" in self.objectstore_additions[dstRSE]
0097                     and "is_secure" in self.objectstore_additions[dstRSE]
0098                 ):
0099                     executable_prefix = "export S3_ACCESS_KEY=%s; export S3_SECRET_KEY=%s; export S3_IS_SECURE=%s" % (
0100                         self.objectstore_additions[dstRSE]["access_key"],
0101                         self.objectstore_additions[dstRSE]["secret_key"],
0102                         self.objectstore_additions[dstRSE]["is_secure"],
0103                     )
0104                 if "pfn_prefix" in self.objectstore_additions[dstRSE]:
0105                     pfn_prefix = self.objectstore_additions[dstRSE]["pfn_prefix"]
0106 
0107             executable = ["/usr/bin/env", "rucio", "-v", "upload"]
0108             executable += ["--no-register"]
0109             if hasattr(self, "lifetime"):
0110                 executable += ["--lifetime", ("%d" % self.lifetime)]
0111                 if fileSpec.fileAttributes is not None and "guid" in fileSpec.fileAttributes:
0112                     executable += ["--guid", fileSpec.fileAttributes["guid"]]
0113 
0114             executable += ["--rse", dstRSE]
0115             executable += ["--scope", scope]
0116             if pfn_prefix:
0117                 executable += [f"--pfn {os.path.join(pfn_prefix, os.path.basename(fileSpec.path))}"]
0118             else:
0119                 executable += [f"{scope}:{datasetName}"]
0120             executable += [f"{fileSpec.path}"]
0121 
0122             tmpLog.debug(f"rucio upload command: {executable} ")
0123             tmpLog.debug(f"rucio upload command (for human): {' '.join(executable)} ")
0124 
0125             if executable_prefix:
0126                 cmd = executable_prefix + "; " + " ".join(executable)
0127                 process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
0128             else:
0129                 process = subprocess.Popen(executable, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
0130 
0131             stdout, stderr = process.communicate()
0132             fileSpec.attemptNr += 1
0133             stdout = stdout.decode() + f" attemptNr: {fileSpec.attemptNr}"
0134             tmpLog.debug(f"stdout: {stdout}")
0135             tmpLog.debug(f"stderr: {stderr}")
0136             if process.returncode == 0:
0137                 fileSpec.status = "finished"
0138             else:
0139                 # check what failed
0140                 file_exists = False
0141                 rucio_sessions_limit_error = False
0142                 for line in stdout.split("\n"):
0143                     if "File name in specified scope already exists" in line:
0144                         file_exists = True
0145                         break
0146                     elif "File already exists on RSE" in line:
0147                         # can skip if file exist on RSE since no register
0148                         tmpLog.warning(f"rucio skipped upload and returned stdout: {stdout}")
0149                         file_exists = True
0150                         break
0151                     elif "exceeded simultaneous SESSIONS_PER_USER limit" in line:
0152                         rucio_sessions_limit_error = True
0153                 if file_exists:
0154                     tmpLog.debug("file exists, marking transfer as finished")
0155                     fileSpec.status = "finished"
0156                 elif rucio_sessions_limit_error:
0157                     # do nothing
0158                     tmpLog.warning(f"rucio returned error, will retry: stdout: {stdout}")
0159                     # do not change fileSpec.status and Harvester will retry if this function returns False
0160                     allChecked = False
0161                     continue
0162                 else:
0163                     tmpLog.error(f"rucio upload failed with stdout: {stdout}")
0164                     ErrMsg += f'{fileSpec.lfn} failed with rucio error stdout="{stdout}"'
0165                     allChecked = False
0166                     if fileSpec.attemptNr >= self.maxAttempts:
0167                         tmpLog.error(f"reached maxattempts: {self.maxAttempts}, marked it as failed")
0168                         fileSpec.status = "failed"
0169 
0170             # force update
0171             fileSpec.force_update("status")
0172 
0173             tmpLog.debug(f"file: {fileSpec.lfn} status: {fileSpec.status}")
0174 
0175         # return
0176         tmpLog.debug("done")
0177         if allChecked:
0178             return True, ""
0179         else:
0180             return False, ErrMsg
0181 
0182     # zip output files
0183     def zip_output(self, jobspec):
0184         # make logger
0185         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0186         return self.simple_zip_output(jobspec, tmpLog)