File indexing completed on 2026-04-20 07:58:59
0001 import os
0002 import subprocess
0003 import tempfile
0004
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestermover import mover_utils
0008
0009
0010 baseLogger = core_utils.setup_logger("xrdcp_preparator")
0011
0012
0013
0014 """
0015 -- Example of plugin config
0016 "preparator": {
0017 "name": "XrdcpPreparator",
0018 "module": "pandaharvester.harvesterpreparator.xrdcp_preparator",
0019 # base path for source xrdcp server
0020 "srcBasePath": " root://dcgftp.usatlas.bnl.gov:1096//pnfs/usatlas.bnl.gov/BNLT0D1/rucio",
0021 # base path for local access to the copied files
0022 "localBasePath": "/hpcgpfs01/scratch/benjamin/harvester/rucio-data-area",
0023 # max number of attempts
0024 "maxAttempts": 3,
0025 # check paths under localBasePath.
0026 "checkLocalPath": true,
0027 # options for xrdcp
0028 "xrdcpOpts": "--retry 3 --cksum adler32 --debug 1"
0029 }
0030 """
0031
0032
0033 class XrdcpPreparator(PluginBase):
0034
0035 def __init__(self, **kwarg):
0036 self.xrdcpOpts = None
0037 self.maxAttempts = 3
0038 self.timeout = None
0039 self.checkLocalPath = True
0040 PluginBase.__init__(self, **kwarg)
0041
0042
0043 def trigger_preparation(self, jobspec):
0044
0045 tmpLog = self.make_logger(baseLogger, f"PandaID={jobspec.PandaID}", method_name="trigger_preparation")
0046 tmpLog.debug("start")
0047
0048 harvester_env = os.environ.copy()
0049
0050
0051 inFileInfo = jobspec.get_input_file_attributes()
0052 xrdcpInput = None
0053 allfiles_transfered = True
0054 overall_errMsg = ""
0055 for tmpFileSpec in jobspec.inFiles:
0056
0057 srcPath = mover_utils.construct_file_path(self.srcBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0058
0059 localPath = mover_utils.construct_file_path(self.localBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0060 if self.checkLocalPath:
0061
0062 if os.path.exists(localPath):
0063
0064 checksum = core_utils.calc_adler32(localPath)
0065 checksum = f"ad:{checksum}"
0066 if checksum == inFileInfo[tmpFileSpec.lfn]["checksum"]:
0067 continue
0068
0069 if not os.path.isdir(os.path.dirname(localPath)):
0070 os.makedirs(os.path.dirname(localPath))
0071 tmpLog.debug(f"Make directory - {os.path.dirname(localPath)}")
0072
0073 if xrdcpInput is None:
0074 xrdcpInput = [srcPath]
0075 else:
0076 xrdcpInput.append[srcPath]
0077
0078 tmpLog.debug("execute xrdcp")
0079 args = ["xrdcp", "--nopbar", "--force"]
0080 args_files = [srcPath, localPath]
0081 if self.xrdcpOpts is not None:
0082 args += self.xrdcpOpts.split()
0083 args += args_files
0084 tmpFileSpec.attemptNr += 1
0085 try:
0086 xrdcp_cmd = " ".join(args)
0087 tmpLog.debug(f"execute: {xrdcp_cmd}")
0088 p = subprocess.Popen(xrdcp_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=harvester_env, shell=True)
0089 try:
0090 stdout, stderr = p.communicate(timeout=self.timeout)
0091 except subprocess.TimeoutExpired:
0092 p.kill()
0093 stdout, stderr = p.communicate()
0094 tmpLog.warning("command timeout")
0095 return_code = p.returncode
0096 if stdout is not None:
0097 if not isinstance(stdout, str):
0098 stdout = stdout.decode()
0099 stdout = stdout.replace("\n", " ")
0100 if stderr is not None:
0101 if not isinstance(stderr, str):
0102 stderr = stderr.decode()
0103 stderr = stderr.replace("\n", " ")
0104 tmpLog.debug(f"stdout: {stdout}")
0105 tmpLog.debug(f"stderr: {stderr}")
0106 except Exception:
0107 core_utils.dump_error_message(tmpLog)
0108 return_code = 1
0109 if return_code != 0:
0110 overall_errMsg += f"file - {localPath} did not transfer error code {return_code} "
0111 allfiles_transfered = False
0112 errMsg = f"failed with {return_code}"
0113 tmpLog.error(errMsg)
0114
0115 if tmpFileSpec.attemptNr >= self.maxAttempts:
0116 errMsg = "gave up due to max attempts"
0117 tmpLog.error(errMsg)
0118 return (False, errMsg)
0119
0120
0121 if xrdcpInput is None:
0122 tmpLog.debug("done with no transfers")
0123 return True, ""
0124
0125 if allfiles_transfered:
0126 return True, ""
0127 else:
0128 return None, overall_errMsg
0129
0130
0131
0132 def check_stage_in_status(self, jobspec):
0133 return True, ""
0134
0135
0136 def resolve_input_paths(self, jobspec):
0137
0138 inFileInfo = jobspec.get_input_file_attributes()
0139 pathInfo = dict()
0140 for tmpFileSpec in jobspec.inFiles:
0141 localPath = mover_utils.construct_file_path(self.localBasePath, inFileInfo[tmpFileSpec.lfn]["scope"], tmpFileSpec.lfn)
0142 pathInfo[tmpFileSpec.lfn] = {"path": localPath}
0143 jobspec.set_input_file_paths(pathInfo)
0144 return True, ""