File indexing completed on 2026-04-20 07:58:59
0001 import os
0002 import subprocess
0003 import tempfile
0004
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestermover import mover_utils
0008
0009
0010 baseLogger = core_utils.setup_logger("gridftp_preparator")
0011
0012
0013
0014 """
0015 -- Example of plugin config
0016 "preparator": {
0017 "name": "GridFtpPreparator",
0018 "module": "pandaharvester.harvesterpreparator.gridftp_preparator",
0019 # base path for source GridFTP server
0020 "srcBasePath": "gsiftp://dcdum02.aglt2.org/pnfs/aglt2.org/atlasdatadisk/rucio/",
0021 # base path for destination GridFTP server
0022 "dstBasePath": "gsiftp://dcgftp.usatlas.bnl.gov:2811/pnfs/usatlas.bnl.gov/atlasscratchdisk/rucio/",
0023 # base path for local access to the copied files
0024 "localBasePath": "/data/rucio",
0025 # max number of attempts
0026 "maxAttempts": 3,
0027 # check paths under localBasePath. Choose false if destination on remote node
0028 "checkLocalPath": true,
0029 # options for globus-url-copy
0030 "gulOpts": "-cred /tmp/x509_u1234 -sync -sync-level 3 -verify-checksum -v"
0031 }
0032 """
0033
0034
0035 class GridFtpPreparator(PluginBase):
0036
0037 def __init__(self, **kwarg):
0038 self.gulOpts = None
0039 self.maxAttempts = 3
0040 self.timeout = None
0041 self.checkLocalPath = True
0042 PluginBase.__init__(self, **kwarg)
0043
0044
0045 def trigger_preparation(self, jobspec):
0046
0047 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0048 tmpLog.debug("start")
0049
0050 inFileInfo = jobspec.get_input_file_attributes()
0051 gucInput = None
0052 for tmpFileSpec in jobspec.inFiles:
0053
0054 srcPath = mover_utils.construct_file_path(self.srcBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0055 dstPath = mover_utils.construct_file_path(self.dstBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0056
0057 accPath = mover_utils.construct_file_path(self.localBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0058 if self.checkLocalPath:
0059
0060 if os.path.exists(accPath):
0061
0062 checksum = core_utils.calc_adler32(accPath)
0063 checksum = f"ad:{checksum}"
0064 if checksum == inFileInfo[tmpFileSpec.lfn]["checksum"]:
0065 continue
0066
0067 if not os.path.isdir(os.path.dirname(accPath)):
0068 os.makedirs(os.path.dirname(accPath))
0069
0070 if gucInput is None:
0071 gucInput = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="_guc_in.tmp")
0072 gucInput.write(f"{srcPath} {dstPath}\n")
0073 tmpFileSpec.attemptNr += 1
0074
0075 if gucInput is None:
0076 tmpLog.debug("done with no transfers")
0077 return True, ""
0078
0079 tmpLog.debug("execute globus-url-copy")
0080 gucInput.close()
0081 args = ["globus-url-copy", "-f", gucInput.name, "-cd"]
0082 if self.gulOpts is not None:
0083 args += self.gulOpts.split()
0084 try:
0085 tmpLog.debug("execute: " + " ".join(args))
0086 p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0087 try:
0088 stdout, stderr = p.communicate(timeout=self.timeout)
0089 except subprocess.TimeoutExpired:
0090 p.kill()
0091 stdout, stderr = p.communicate()
0092 tmpLog.warning("command timeout")
0093 return_code = p.returncode
0094 if stdout is not None:
0095 if not isinstance(stdout, str):
0096 stdout = stdout.decode()
0097 stdout = stdout.replace("\n", " ")
0098 if stderr is not None:
0099 if not isinstance(stderr, str):
0100 stderr = stderr.decode()
0101 stderr = stderr.replace("\n", " ")
0102 tmpLog.debug(f"stdout: {stdout}")
0103 tmpLog.debug(f"stderr: {stderr}")
0104 except Exception:
0105 core_utils.dump_error_message(tmpLog)
0106 return_code = 1
0107 os.remove(gucInput.name)
0108 if return_code == 0:
0109 tmpLog.debug("succeeded")
0110 return True, ""
0111 else:
0112 errMsg = f"failed with {return_code}"
0113 tmpLog.error(errMsg)
0114
0115 for tmpFileSpec in jobspec.inFiles:
0116 if tmpFileSpec.attemptNr >= self.maxAttempts:
0117 errMsg = "gave up due to max attempts"
0118 tmpLog.error(errMsg)
0119 return (False, errMsg)
0120 return None, errMsg
0121
0122
0123 def check_stage_in_status(self, jobspec):
0124 return True, ""
0125
0126
0127 def resolve_input_paths(self, jobspec):
0128
0129 inFileInfo = jobspec.get_input_file_attributes()
0130 pathInfo = dict()
0131 for tmpFileSpec in jobspec.inFiles:
0132 accPath = mover_utils.construct_file_path(self.localBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0133 pathInfo[tmpFileSpec.lfn] = {"path": accPath}
0134 jobspec.set_input_file_paths(pathInfo)
0135 return True, ""