File indexing completed on 2026-04-20 07:59:00
0001 import os
0002 import re
0003 import subprocess
0004 import tempfile
0005
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestermover import mover_utils
0008
0009 from .base_stager import BaseStager
0010
0011
0012 baseLogger = core_utils.setup_logger("gridftp_stager")
0013
0014
0015
0016 """
0017 -- example of plugin config
0018 "stager":{
0019 "name":"GridFtpStager",
0020 "module":"pandaharvester.harvesterstager.gridftp_stager",
0021 "objstoreID":117,
0022 # base path for local access to the files to be copied
0023 "srcOldBasePath":"/tmp/workdirs",
0024 # base path for access through source GridFTP server to the files to be copied
0025 "srcNewBasePath":"gsiftp://dcdum02.aglt2.org/pnfs/aglt2.org",
0026 # base path for destination GridFTP server
0027 "dstBasePath":"gsiftp://dcgftp.usatlas.bnl.gov:2811/pnfs/usatlas.bnl.gov/atlasscratchdisk/rucio",
0028 # max number of attempts
0029 "maxAttempts": 3,
0030 # options for globus-url-copy
0031 "gulOpts":"-verify-checksum -v"
0032 # A list of base paths of intermediate locations: [ p1, p2, p3, ..., pn ] . The transfers will be done in serial: srcPath -> p1, p1 ->p2, ..., pn -> dstPath
0033 # where p1 can be a path (e.g. "gsiftp://..."), or a list of incoming and outgoing paths (if different, say due to FS mount) of the same files (e.g. ["file:///data/abc", "gsiftp://dtn.server//data/abc"])
0034 # If ommited, direct transfer srcPath -> dstPath occurs
0035 "intermediateBasePaths":[ ["file:///nfs/at3/scratch/at3sgm001/", "gsiftp://some.dtn.server//nfs/at3/scratch/at3sgm001/"], "gsiftp://another.dtn.server//scratch/data/" ]
0036 }
0037 """
0038
0039
0040 class GridFtpStager(BaseStager):
0041
0042 def __init__(self, **kwarg):
0043 self.scopeForTmp = "transient"
0044 self.pathConvention = 1000
0045 self.objstoreID = None
0046 self.gulOpts = None
0047 self.maxAttempts = 3
0048 self.timeout = None
0049 self.intermediateBasePaths = None
0050 BaseStager.__init__(self, **kwarg)
0051
0052
0053 def check_stage_out_status(self, jobspec):
0054 for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0055 fileSpec.status = "finished"
0056 fileSpec.pathConvention = self.pathConvention
0057 fileSpec.objstoreID = self.objstoreID
0058 return True, ""
0059
0060
0061 def trigger_stage_out(self, jobspec):
0062
0063 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0064 tmpLog.debug("start")
0065
0066 gucInput = None
0067 is_multistep = isinstance(self.intermediateBasePaths, list) and len(self.intermediateBasePaths) > 0
0068 guc_inputs_list = [None] * (len(self.intermediateBasePaths) + 1) if is_multistep else []
0069 for fileSpec in jobspec.outFiles:
0070
0071 if fileSpec.status in ["finished", "failed"]:
0072 continue
0073
0074 if fileSpec.fileType in ["es_output", "zip_output"]:
0075 scope = self.scopeForTmp
0076 else:
0077 scope = fileSpec.fileAttributes.get("scope")
0078 if scope is None:
0079 scope = fileSpec.scope
0080
0081 srcPath = re.sub(self.srcOldBasePath, self.srcNewBasePath, fileSpec.path)
0082 dstPath = mover_utils.construct_file_path(self.dstBasePath, scope, fileSpec.lfn)
0083
0084 if is_multistep:
0085
0086 for ibp_i in range(len(self.intermediateBasePaths) + 1):
0087 base_paths_old = self.intermediateBasePaths[ibp_i - 1] if ibp_i > 0 else ""
0088 base_paths_new = self.intermediateBasePaths[ibp_i] if ibp_i < len(self.intermediateBasePaths) else ""
0089 src_base = base_paths_old[1] if isinstance(base_paths_old, list) else base_paths_old
0090 dst_base = base_paths_new[0] if isinstance(base_paths_new, list) else base_paths_new
0091
0092 tmp_src_path = re.sub(self.srcNewBasePath, src_base, srcPath)
0093 tmp_dest_path = re.sub(self.srcNewBasePath, dst_base, srcPath)
0094
0095 if guc_inputs_list[ibp_i] is None:
0096 guc_inputs_list[ibp_i] = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=f"_guc_out_{ibp_i}.tmp")
0097 guc_input = guc_inputs_list[ibp_i]
0098 if ibp_i == 0:
0099 guc_input.write(f"{srcPath} {tmp_dest_path}\n")
0100 tmpLog.debug(f"step {ibp_i + 1}: {srcPath} {tmp_dest_path}")
0101 elif ibp_i == len(self.intermediateBasePaths):
0102 guc_input.write(f"{tmp_src_path} {dstPath}\n")
0103 tmpLog.debug(f"step {ibp_i + 1}: {tmp_src_path} {dstPath}")
0104 else:
0105 guc_input.write(f"{tmp_src_path} {tmp_dest_path}\n")
0106 tmpLog.debug(f"step {ibp_i + 1}: {tmp_src_path} {tmp_dest_path}")
0107 else:
0108
0109
0110 if gucInput is None:
0111 gucInput = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_guc_out.tmp")
0112 gucInput.write(f"{srcPath} {dstPath}\n")
0113 fileSpec.attemptNr += 1
0114
0115 if is_multistep:
0116 for guc_input in guc_inputs_list:
0117 if guc_input is None:
0118 tmpLog.debug("done with no transfers (multistep)")
0119 return True, ""
0120 else:
0121 if gucInput is None:
0122 tmpLog.debug("done with no transfers")
0123 return True, ""
0124
0125 if is_multistep:
0126 [guc_input.close() for guc_input in guc_inputs_list]
0127 tmpLog.debug("start multistep transfer")
0128 guc_input_i = 1
0129 for guc_input in guc_inputs_list:
0130 args = ["globus-url-copy", "-f", guc_input.name, "-cd"]
0131 if self.gulOpts is not None:
0132 args += self.gulOpts.split()
0133 try:
0134 tmpLog.debug("execute: " + " ".join(args))
0135 p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0136 try:
0137 stdout, stderr = p.communicate(timeout=self.timeout)
0138 except subprocess.TimeoutExpired:
0139 p.kill()
0140 stdout, stderr = p.communicate()
0141 tmpLog.warning("command timeout")
0142 return_code = p.returncode
0143 if stdout is not None:
0144 if not isinstance(stdout, str):
0145 stdout = stdout.decode()
0146 stdout = stdout.replace("\n", " ")
0147 if stderr is not None:
0148 if not isinstance(stderr, str):
0149 stderr = stderr.decode()
0150 stderr = stderr.replace("\n", " ")
0151 tmpLog.debug(f"stdout: {stdout}")
0152 tmpLog.debug(f"stderr: {stderr}")
0153 except Exception:
0154 core_utils.dump_error_message(tmpLog)
0155 return_code = 1
0156 os.remove(guc_input.name)
0157 if return_code == 0:
0158 tmpLog.debug(f"step {guc_input_i} succeeded")
0159 guc_input_i += 1
0160 else:
0161 errMsg = f"step {guc_input_i} failed with {return_code}"
0162 tmpLog.error(errMsg)
0163
0164 for fileSpec in jobspec.inFiles:
0165 if fileSpec.attemptNr >= self.maxAttempts:
0166 errMsg = "gave up due to max attempts"
0167 tmpLog.error(errMsg)
0168 return (False, errMsg)
0169 return None, errMsg
0170 tmpLog.debug(f"multistep transfer ({len(guc_inputs_list)} steps) succeeded")
0171 return True, ""
0172 else:
0173 gucInput.close()
0174 args = ["globus-url-copy", "-f", gucInput.name, "-cd"]
0175 if self.gulOpts is not None:
0176 args += self.gulOpts.split()
0177 try:
0178 tmpLog.debug("execute: " + " ".join(args))
0179 p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0180 try:
0181 stdout, stderr = p.communicate(timeout=self.timeout)
0182 except subprocess.TimeoutExpired:
0183 p.kill()
0184 stdout, stderr = p.communicate()
0185 tmpLog.warning("command timeout")
0186 return_code = p.returncode
0187 if stdout is not None:
0188 if not isinstance(stdout, str):
0189 stdout = stdout.decode()
0190 stdout = stdout.replace("\n", " ")
0191 if stderr is not None:
0192 if not isinstance(stderr, str):
0193 stderr = stderr.decode()
0194 stderr = stderr.replace("\n", " ")
0195 tmpLog.debug(f"stdout: {stdout}")
0196 tmpLog.debug(f"stderr: {stderr}")
0197 except Exception:
0198 core_utils.dump_error_message(tmpLog)
0199 return_code = 1
0200 os.remove(gucInput.name)
0201 if return_code == 0:
0202 tmpLog.debug("succeeded")
0203 return True, ""
0204 else:
0205 errMsg = f"failed with {return_code}"
0206 tmpLog.error(errMsg)
0207
0208 for fileSpec in jobspec.inFiles:
0209 if fileSpec.attemptNr >= self.maxAttempts:
0210 errMsg = "gave up due to max attempts"
0211 tmpLog.error(errMsg)
0212 return (False, errMsg)
0213 return None, errMsg