File indexing completed on 2026-04-20 07:59:00
0001 import gc
0002 import os
0003 import subprocess
0004 import tempfile
0005 import uuid
0006
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestermover import mover_utils
0009 from pandaharvester.harvesterstager.base_stager import BaseStager
0010
0011
0012 _logger = core_utils.setup_logger("xrdcp_stager")
0013
0014
0015 """
0016 -- Example of plugin config
0017 "stager": {
0018 "name": "XrdcpStager",
0019 "module": "pandaharvester.harvesterstager.xrdcp_stager",
0020 # base path for destinattion xrdcp server
0021 "dstBasePath": " root://dcgftp.usatlas.bnl.gov:1096//pnfs/usatlas.bnl.gov/BNLT0D1/rucio",
0022 # base path for local access to the copied files
0023 "localBasePath": "/hpcgpfs01/scratch/benjamin/harvester/rucio-data-area",
0024 # max number of attempts
0025 "maxAttempts": 3,
0026 # check paths under localBasePath.
0027 "checkLocalPath": true,
0028 # options for xrdcp
0029 "xrdcpOpts": "--retry 3 --cksum adler32 --debug 1"
0030 }
0031 """
0032
0033
0034
0035 class XrdcpStager(BaseStager):
0036
0037 def __init__(self, **kwarg):
0038 BaseStager.__init__(self, **kwarg)
0039 if not hasattr(self, "xrdcpOpts"):
0040 self.xrdcpOpts = None
0041 if not hasattr(self, "maxAttempts"):
0042 self.maxAttempts = 3
0043 if not hasattr(self, "timeout"):
0044 self.timeout = None
0045 if not hasattr(self, "checkLocalPath"):
0046 self.checkLocalPath = True
0047
0048
0049 def check_stage_out_status(self, jobspec):
0050 """Check the status of stage-out procedure. If staging-out is done synchronously in trigger_stage_out
0051 this method should always return True.
0052 Output files are available through jobspec.get_outfile_specs(skip_done=False) which gives
0053 a list of FileSpecs not yet done.
0054 FileSpec.attemptNr shows how many times the transfer was checked for the file.
0055 If the file was successfully transferred, status should be set to 'finished'.
0056 Or 'failed', if the file failed to be transferred. Once files are set to 'finished' or 'failed',
0057 jobspec.get_outfile_specs(skip_done=False) ignores them.
0058
0059 :param jobspec: job specifications
0060 :type jobspec: JobSpec
0061 :return: A tuple of return code (True: transfer success, False: fatal transfer failure,
0062 None: on-going or temporary failure) and error dialog
0063 :rtype: (bool, string)
0064 """
0065 for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0066 fileSpec.status = "finished"
0067 return True, ""
0068
0069
0070 def trigger_stage_out(self, jobspec):
0071 """Trigger the stage-out procedure for the job.
0072 Output files are available through jobspec.get_outfile_specs(skip_done=False) which gives
0073 a list of FileSpecs not yet done.
0074 FileSpec.attemptNr shows how many times transfer was tried for the file so far.
0075
0076 :param jobspec: job specifications
0077 :type jobspec: JobSpec
0078 :return: A tuple of return code (True: success, False: fatal failure, None: temporary failure)
0079 and error dialog
0080 :rtype: (bool, string)
0081 """
0082
0083
0084 gc.collect()
0085
0086
0087 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="trigger_stage_out")
0088 tmpLog.debug("start")
0089
0090 harvester_env = os.environ.copy()
0091
0092
0093 xrdcpOutput = None
0094 allfiles_transfered = True
0095 overall_errMsg = ""
0096 fileAttrs = jobspec.get_output_file_attributes()
0097
0098 for fileSpec in jobspec.get_output_file_specs(skip_done=True):
0099
0100
0101
0102 dstPath = mover_utils.construct_file_path(self.dstBasePath, fileAttrs[fileSpec.lfn]["scope"], fileSpec.lfn)
0103
0104 localPath = mover_utils.construct_file_path(self.localBasePath, fileAttrs[fileSpec.lfn]["scope"], fileSpec.lfn)
0105 tmpLog.debug(f"fileSpec.path - {fileSpec.path} fileSpec.lfn = {fileSpec.lfn}")
0106 localPath = fileSpec.path
0107 if self.checkLocalPath:
0108
0109 if os.path.exists(localPath):
0110
0111 checksum = core_utils.calc_adler32(localPath)
0112 checksum = f"ad:{checksum}"
0113 if checksum == fileAttrs[fileSpec.lfn]["checksum"]:
0114 continue
0115
0116 if xrdcpOutput is None:
0117 xrdcpOutput = [dstPath]
0118 else:
0119 if dstPath not in xrdcpOutput:
0120 xrdcpOutput.append(dstPath)
0121
0122 tmpLog.debug("execute xrdcp")
0123 args = ["xrdcp", "--nopbar", "--force"]
0124 args_files = [localPath, dstPath]
0125 if self.xrdcpOpts is not None:
0126 args += self.xrdcpOpts.split()
0127 args += args_files
0128 fileSpec.attemptNr += 1
0129 try:
0130 xrdcp_cmd = " ".join(args)
0131 tmpLog.debug(f"execute: {xrdcp_cmd}")
0132 process = subprocess.Popen(xrdcp_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=harvester_env, shell=True)
0133 try:
0134 stdout, stderr = process.communicate(timeout=self.timeout)
0135 except subprocess.TimeoutExpired:
0136 process.kill()
0137 stdout, stderr = process.communicate()
0138 tmpLog.warning("command timeout")
0139 return_code = process.returncode
0140 if stdout is not None:
0141 if not isinstance(stdout, str):
0142 stdout = stdout.decode()
0143 stdout = stdout.replace("\n", " ")
0144 if stderr is not None:
0145 if not isinstance(stderr, str):
0146 stderr = stderr.decode()
0147 stderr = stderr.replace("\n", " ")
0148 tmpLog.debug(f"stdout: {stdout}")
0149 tmpLog.debug(f"stderr: {stderr}")
0150 except Exception:
0151 core_utils.dump_error_message(tmpLog)
0152 return_code = 1
0153 if return_code == 0:
0154 fileSpec.status = "finished"
0155 else:
0156 overall_errMsg += f"file - {localPath} did not transfer error code {return_code} "
0157 allfiles_transfered = False
0158 errMsg = f"failed with {return_code}"
0159 tmpLog.error(errMsg)
0160
0161 if fileSpec.attemptNr >= self.maxAttempts:
0162 tmpLog.error(f"reached maxattempts: {self.maxAttempts}, marked it as failed")
0163 fileSpec.status = "failed"
0164
0165
0166 fileSpec.force_update("status")
0167 tmpLog.debug(f"file: {fileSpec.lfn} status: {fileSpec.status}")
0168 del process, stdout, stderr
0169
0170
0171
0172
0173 if xrdcpOutput is None:
0174 tmpLog.debug("done with no transfers")
0175 return True, ""
0176
0177 tmpLog.debug("done")
0178 if allfiles_transfered:
0179 return True, ""
0180 else:
0181 return None, overall_errMsg
0182
0183
0184
0185 def zip_output(self, jobspec):
0186 """OBSOLETE : zip functions should be implemented in zipper plugins.
0187 Zip output files. This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs,
0188 to make a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs of
0189 associated files to be zipped. The path of each associated file is available in associated
0190 file's FileSpec.path. Once zip files are made, their FileSpec.path, FileSpec.fsize and
0191 FileSpec.chksum need to be set.
0192
0193 :param jobspec: job specifications
0194 :type jobspec: JobSpec
0195 :return: A tuple of return code (True for success, False otherwise) and error dialog
0196 :rtype: (bool, string)
0197 """
0198
0199 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0200 return self.simple_zip_output(jobspec, tmpLog)
0201
0202
0203 def async_zip_output(self, jobspec):
0204 """OBSOLETE : zip functions should be implemented in zipper plugins.
0205 Zip output files asynchronously. This method is followed by post_zip_output(),
0206 which is typically useful to trigger an asynchronous zipping mechanism such as batch job.
0207 This method loops over jobspec.outFiles, which is a list of zip file's FileSpecs, to make
0208 a zip file for each zip file's FileSpec. FileSpec.associatedFiles is a list of FileSpecs
0209 of associated files to be zipped. The path of each associated file is available in associated
0210 file's FileSpec.path.
0211
0212 :param jobspec: job specifications
0213 :type jobspec: JobSpec
0214 :return: A tuple of return code (True for success, False otherwise) and error dialog
0215 :rtype: (bool, string)
0216 """
0217
0218 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0219
0220 groupID = str(uuid.uuid4())
0221 lfns = []
0222 for fileSpec in jobspec.outFiles:
0223 lfns.append(fileSpec.lfn)
0224 jobspec.set_groups_to_files({groupID: {"lfns": lfns, "groupStatus": "zipping"}})
0225 return True, ""
0226
0227
0228 def post_zip_output(self, jobspec):
0229 """OBSOLETE : zip functions should be implemented in zipper plugins.
0230 This method is executed after async_zip_output(), to do post-processing for zipping.
0231 Once zip files are made, this method needs to look over jobspec.outFiles to set their
0232 FileSpec.path, FileSpec.fsize, and FileSpec.chksum.
0233
0234 :param jobspec: job specifications
0235 :type jobspec: JobSpec
0236 :return: A tuple of return code (True for success, False otherwise) and error dialog
0237 :rtype: (bool, string)
0238 """
0239
0240 tmpLog = self.make_logger(_logger, f"PandaID={jobspec.PandaID}", method_name="zip_output")
0241
0242 groups = jobspec.get_groups_of_output_files()
0243
0244 pass
0245
0246 for fileSpec in jobspec.outFiles:
0247 fileSpec.path = "/path/to/zip"
0248 fileSpec.fsize = 12345
0249 fileSpec.chksum = "66bb0985"
0250 return True, ""