Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import os
0002 import re
0003 import subprocess
0004 import tempfile
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestermover import mover_utils
0008 
0009 from .base_stager import BaseStager
0010 
0011 # logger
0012 baseLogger = core_utils.setup_logger("gridftp_stager")
0013 
0014 
0015 # stager plugin with GridFTP
0016 """
0017   -- example of plugin config
0018   "stager":{
0019         "name":"GridFtpStager",
0020         "module":"pandaharvester.harvesterstager.gridftp_stager",
0021         "objstoreID":117,
0022         # base path for local access to the files to be copied
0023         "srcOldBasePath":"/tmp/workdirs",
0024         # base path for access through source GridFTP server to the files to be copied
0025         "srcNewBasePath":"gsiftp://dcdum02.aglt2.org/pnfs/aglt2.org",
0026         # base path for destination GridFTP server
0027         "dstBasePath":"gsiftp://dcgftp.usatlas.bnl.gov:2811/pnfs/usatlas.bnl.gov/atlasscratchdisk/rucio",
0028         # max number of attempts
0029         "maxAttempts": 3,
0030         # options for globus-url-copy
0031         "gulOpts":"-verify-checksum -v"
0032         # A list of base paths of intermediate locations: [ p1, p2, p3, ..., pn ] . The transfers will be done in serial: srcPath -> p1, p1 ->p2, ..., pn -> dstPath
0033         # where p1 can be a path (e.g. "gsiftp://..."), or a list of incoming and outgoing paths (if different, say due to FS mount) of the same files (e.g. ["file:///data/abc", "gsiftp://dtn.server//data/abc"])
0034         # If ommited, direct transfer srcPath -> dstPath occurs
0035         "intermediateBasePaths":[ ["file:///nfs/at3/scratch/at3sgm001/", "gsiftp://some.dtn.server//nfs/at3/scratch/at3sgm001/"], "gsiftp://another.dtn.server//scratch/data/" ]
0036     }
0037 """
0038 
0039 
0040 class GridFtpStager(BaseStager):
0041     # constructor
0042     def __init__(self, **kwarg):
0043         self.scopeForTmp = "transient"
0044         self.pathConvention = 1000
0045         self.objstoreID = None
0046         self.gulOpts = None
0047         self.maxAttempts = 3
0048         self.timeout = None
0049         self.intermediateBasePaths = None
0050         BaseStager.__init__(self, **kwarg)
0051 
0052     # check status
0053     def check_stage_out_status(self, jobspec):
0054         for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0055             fileSpec.status = "finished"
0056             fileSpec.pathConvention = self.pathConvention
0057             fileSpec.objstoreID = self.objstoreID
0058         return True, ""
0059 
0060     # trigger stage out
0061     def trigger_stage_out(self, jobspec):
0062         # make logger
0063         tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0064         tmpLog.debug("start")
0065         # loop over all files
0066         gucInput = None
0067         is_multistep = isinstance(self.intermediateBasePaths, list) and len(self.intermediateBasePaths) > 0
0068         guc_inputs_list = [None] * (len(self.intermediateBasePaths) + 1) if is_multistep else []
0069         for fileSpec in jobspec.outFiles:
0070             # skip if already done
0071             if fileSpec.status in ["finished", "failed"]:
0072                 continue
0073             # scope
0074             if fileSpec.fileType in ["es_output", "zip_output"]:
0075                 scope = self.scopeForTmp
0076             else:
0077                 scope = fileSpec.fileAttributes.get("scope")
0078                 if scope is None:
0079                     scope = fileSpec.scope
0080             # construct source and destination paths
0081             srcPath = re.sub(self.srcOldBasePath, self.srcNewBasePath, fileSpec.path)
0082             dstPath = mover_utils.construct_file_path(self.dstBasePath, scope, fileSpec.lfn)
0083             # make tempfiles of paths to transfer
0084             if is_multistep:
0085                 # multi-step transfer
0086                 for ibp_i in range(len(self.intermediateBasePaths) + 1):
0087                     base_paths_old = self.intermediateBasePaths[ibp_i - 1] if ibp_i > 0 else ""
0088                     base_paths_new = self.intermediateBasePaths[ibp_i] if ibp_i < len(self.intermediateBasePaths) else ""
0089                     src_base = base_paths_old[1] if isinstance(base_paths_old, list) else base_paths_old
0090                     dst_base = base_paths_new[0] if isinstance(base_paths_new, list) else base_paths_new
0091                     # construct temporary source and destination paths
0092                     tmp_src_path = re.sub(self.srcNewBasePath, src_base, srcPath)
0093                     tmp_dest_path = re.sub(self.srcNewBasePath, dst_base, srcPath)
0094                     # make input for globus-url-copy
0095                     if guc_inputs_list[ibp_i] is None:
0096                         guc_inputs_list[ibp_i] = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=f"_guc_out_{ibp_i}.tmp")
0097                     guc_input = guc_inputs_list[ibp_i]
0098                     if ibp_i == 0:
0099                         guc_input.write(f"{srcPath} {tmp_dest_path}\n")
0100                         tmpLog.debug(f"step {ibp_i + 1}: {srcPath} {tmp_dest_path}")
0101                     elif ibp_i == len(self.intermediateBasePaths):
0102                         guc_input.write(f"{tmp_src_path} {dstPath}\n")
0103                         tmpLog.debug(f"step {ibp_i + 1}: {tmp_src_path} {dstPath}")
0104                     else:
0105                         guc_input.write(f"{tmp_src_path} {tmp_dest_path}\n")
0106                         tmpLog.debug(f"step {ibp_i + 1}: {tmp_src_path} {tmp_dest_path}")
0107             else:
0108                 # single-step transfer
0109                 # make input for globus-url-copy
0110                 if gucInput is None:
0111                     gucInput = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_guc_out.tmp")
0112                 gucInput.write(f"{srcPath} {dstPath}\n")
0113             fileSpec.attemptNr += 1
0114         # nothing to transfer
0115         if is_multistep:
0116             for guc_input in guc_inputs_list:
0117                 if guc_input is None:
0118                     tmpLog.debug("done with no transfers (multistep)")
0119                     return True, ""
0120         else:
0121             if gucInput is None:
0122                 tmpLog.debug("done with no transfers")
0123                 return True, ""
0124         # transfer
0125         if is_multistep:
0126             [guc_input.close() for guc_input in guc_inputs_list]
0127             tmpLog.debug("start multistep transfer")
0128             guc_input_i = 1
0129             for guc_input in guc_inputs_list:
0130                 args = ["globus-url-copy", "-f", guc_input.name, "-cd"]
0131                 if self.gulOpts is not None:
0132                     args += self.gulOpts.split()
0133                 try:
0134                     tmpLog.debug("execute: " + " ".join(args))
0135                     p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0136                     try:
0137                         stdout, stderr = p.communicate(timeout=self.timeout)
0138                     except subprocess.TimeoutExpired:
0139                         p.kill()
0140                         stdout, stderr = p.communicate()
0141                         tmpLog.warning("command timeout")
0142                     return_code = p.returncode
0143                     if stdout is not None:
0144                         if not isinstance(stdout, str):
0145                             stdout = stdout.decode()
0146                         stdout = stdout.replace("\n", " ")
0147                     if stderr is not None:
0148                         if not isinstance(stderr, str):
0149                             stderr = stderr.decode()
0150                         stderr = stderr.replace("\n", " ")
0151                     tmpLog.debug(f"stdout: {stdout}")
0152                     tmpLog.debug(f"stderr: {stderr}")
0153                 except Exception:
0154                     core_utils.dump_error_message(tmpLog)
0155                     return_code = 1
0156                 os.remove(guc_input.name)
0157                 if return_code == 0:
0158                     tmpLog.debug(f"step {guc_input_i} succeeded")
0159                     guc_input_i += 1
0160                 else:
0161                     errMsg = f"step {guc_input_i} failed with {return_code}"
0162                     tmpLog.error(errMsg)
0163                     # check attemptNr
0164                     for fileSpec in jobspec.inFiles:
0165                         if fileSpec.attemptNr >= self.maxAttempts:
0166                             errMsg = "gave up due to max attempts"
0167                             tmpLog.error(errMsg)
0168                             return (False, errMsg)
0169                     return None, errMsg
0170             tmpLog.debug(f"multistep transfer ({len(guc_inputs_list)} steps) succeeded")
0171             return True, ""
0172         else:
0173             gucInput.close()
0174             args = ["globus-url-copy", "-f", gucInput.name, "-cd"]
0175             if self.gulOpts is not None:
0176                 args += self.gulOpts.split()
0177             try:
0178                 tmpLog.debug("execute: " + " ".join(args))
0179                 p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0180                 try:
0181                     stdout, stderr = p.communicate(timeout=self.timeout)
0182                 except subprocess.TimeoutExpired:
0183                     p.kill()
0184                     stdout, stderr = p.communicate()
0185                     tmpLog.warning("command timeout")
0186                 return_code = p.returncode
0187                 if stdout is not None:
0188                     if not isinstance(stdout, str):
0189                         stdout = stdout.decode()
0190                     stdout = stdout.replace("\n", " ")
0191                 if stderr is not None:
0192                     if not isinstance(stderr, str):
0193                         stderr = stderr.decode()
0194                     stderr = stderr.replace("\n", " ")
0195                 tmpLog.debug(f"stdout: {stdout}")
0196                 tmpLog.debug(f"stderr: {stderr}")
0197             except Exception:
0198                 core_utils.dump_error_message(tmpLog)
0199                 return_code = 1
0200             os.remove(gucInput.name)
0201             if return_code == 0:
0202                 tmpLog.debug("succeeded")
0203                 return True, ""
0204             else:
0205                 errMsg = f"failed with {return_code}"
0206                 tmpLog.error(errMsg)
0207                 # check attemptNr
0208                 for fileSpec in jobspec.inFiles:
0209                     if fileSpec.attemptNr >= self.maxAttempts:
0210                         errMsg = "gave up due to max attempts"
0211                         tmpLog.error(errMsg)
0212                         return (False, errMsg)
0213                 return None, errMsg