File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 import os
0014 import logging
0015 import re
0016 from time import time
0017
0018 from .common import resolve_common_transfer_errors, verify_catalog_checksum
0019 from pilot.util.container import execute
0020 from pilot.common.exception import PilotException, ErrorCodes
0021
0022
0023 logger = logging.getLogger(__name__)
0024
0025 require_replicas = True
0026 allowed_schemas = ['root']
0027
0028 copy_command = 'xrdcp'
0029
0030
0031 def is_valid_for_copy_in(files):
0032 return True
0033
0034
0035 def is_valid_for_copy_out(files):
0036 return True
0037
0038
0039 def _resolve_checksum_option(setup, **kwargs):
0040
0041 cmd = "%s --version" % copy_command
0042 if setup:
0043 cmd = "source %s; %s" % (setup, cmd)
0044
0045 logger.info("Execute command (%s) to check xrdcp client version", cmd)
0046
0047 rcode, stdout, stderr = execute(cmd, **kwargs)
0048 logger.info("return code: %s", rcode)
0049 logger.info("return output: %s", stdout + stderr)
0050
0051 cmd = "%s -h" % copy_command
0052 if setup:
0053 cmd = "source %s; %s" % (setup, cmd)
0054
0055 logger.info("Execute command (%s) to decide which option should be used to calc/verify file checksum..", cmd)
0056
0057 rcode, stdout, stderr = execute(cmd, **kwargs)
0058 output = stdout + stderr
0059 logger.info("return code: %s", rcode)
0060 logger.debug("return output: %s", output)
0061
0062 coption = ""
0063 checksum_type = 'adler32'
0064
0065 if rcode:
0066 logger.error('FAILED to execute command=%s: %s', cmd, output)
0067 else:
0068 if "--cksum" in output:
0069 coption = "--cksum %s:print" % checksum_type
0070 elif "-adler" in output and checksum_type == 'adler32':
0071 coption = "-adler"
0072 elif "-md5" in output and checksum_type == 'md5':
0073 coption = "-md5"
0074
0075 if coption:
0076 logger.info("Use %s option to get the checksum for %s command", coption, copy_command)
0077
0078 return coption
0079
0080
0081
0082 def _stagefile(coption, source, destination, filesize, is_stagein, setup=None, **kwargs):
0083 """
0084 Stage the file (stagein or stageout)
0085 :return: destination file details (checksum, checksum_type) in case of success, throw exception in case of failure
0086 :raise: PilotException in case of controlled error
0087 """
0088
0089 filesize_cmd, checksum_cmd, checksum_type = None, None, None
0090
0091 cmd = '%s -np -f %s %s %s' % (copy_command, coption, source, destination)
0092 if setup:
0093 cmd = "source %s; %s" % (setup, cmd)
0094
0095
0096
0097
0098 rcode, stdout, stderr = execute(cmd, **kwargs)
0099 logger.info('rcode=%d, stdout=%s, stderr=%s', rcode, stdout, stderr)
0100
0101 if rcode:
0102 error = resolve_common_transfer_errors(stdout + stderr, is_stagein=is_stagein)
0103
0104
0105
0106
0107
0108
0109 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0110
0111
0112 if coption != "":
0113 filesize_cmd, checksum_cmd, checksum_type = get_file_info_from_output(stdout + stderr)
0114
0115
0116
0117
0118 is_verified = True
0119
0120 if not is_verified:
0121 rcode = ErrorCodes.GETADMISMATCH if is_stagein else ErrorCodes.PUTADMISMATCH
0122 raise PilotException("Copy command failed", code=rcode, state='AD_MISMATCH')
0123
0124 return filesize_cmd, checksum_cmd, checksum_type
0125
0126
0127
0128 def copy_in(files, **kwargs):
0129 """
0130 Download given files using xrdcp command.
0131
0132 :param files: list of `FileSpec` objects
0133 :raise: PilotException in case of controlled error
0134 """
0135
0136
0137 setup = kwargs.pop('copytools', {}).get('xrdcp', {}).get('setup')
0138 coption = _resolve_checksum_option(setup, **kwargs)
0139 trace_report = kwargs.get('trace_report')
0140
0141
0142 localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite'))
0143 for fspec in files:
0144
0145 localsite = localsite if localsite else fspec.ddmendpoint
0146 trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0147 trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0148 trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158 trace_report.update(catStart=time())
0159
0160 dst = fspec.workdir or kwargs.get('workdir') or '.'
0161 destination = os.path.join(dst, fspec.lfn)
0162 try:
0163 filesize_cmd, checksum_cmd, checksum_type = _stagefile(coption, fspec.turl, destination, fspec.filesize,
0164 is_stagein=True, setup=setup, **kwargs)
0165 fspec.status_code = 0
0166 fspec.status = 'transferred'
0167 except PilotException as error:
0168 fspec.status = 'failed'
0169 fspec.status_code = error.get_error_code()
0170 diagnostics = error.get_detail()
0171 state = 'STAGEIN_ATTEMPT_FAILED'
0172 trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time())
0173 trace_report.send()
0174 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0175 else:
0176
0177 fspec.checksum[checksum_type] = checksum_cmd
0178 state, diagnostics = verify_catalog_checksum(fspec, destination)
0179 if diagnostics != "":
0180 trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0181 timeEnd=time())
0182 trace_report.send()
0183 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0184
0185 trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0186 trace_report.send()
0187
0188 return files
0189
0190
0191
0192 def copy_out(files, **kwargs):
0193 """
0194 Upload given files using xrdcp command.
0195
0196 :param files: list of `FileSpec` objects
0197 :raise: PilotException in case of controlled error
0198 """
0199
0200 setup = kwargs.pop('copytools', {}).get('xrdcp', {}).get('setup')
0201 coption = _resolve_checksum_option(setup, **kwargs)
0202 trace_report = kwargs.get('trace_report')
0203
0204 for fspec in files:
0205 trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize)
0206 trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0207
0208 try:
0209 filesize_cmd, checksum_cmd, checksum_type = _stagefile(coption, fspec.surl, fspec.turl, fspec.filesize,
0210 is_stagein=False, setup=setup, **kwargs)
0211 fspec.status_code = 0
0212 fspec.status = 'transferred'
0213 trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0214 trace_report.send()
0215 except PilotException as error:
0216 fspec.status = 'failed'
0217 fspec.status_code = error.get_error_code()
0218 state = 'STAGEOUT_ATTEMPT_FAILED'
0219 diagnostics = error.get_detail()
0220 trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time())
0221 trace_report.send()
0222 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0223 else:
0224
0225 fspec.checksum[checksum_type] = checksum_cmd
0226 state, diagnostics = verify_catalog_checksum(fspec, fspec.surl)
0227 if diagnostics != "":
0228 trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0229 timeEnd=time())
0230 trace_report.send()
0231 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0232
0233 return files
0234
0235
0236 def get_file_info_from_output(output):
0237 """
0238 Extract file size, checksum value from xrdcp --chksum command output
0239
0240 :return: (filesize [int/None], checksum, checksum_type) or (None, None, None) in case of failure
0241 """
0242
0243 if not output:
0244 return None, None, None
0245
0246 if not ("xrootd" in output or "XRootD" in output or "adler32" in output):
0247 logger.warning("WARNING: Failed to extract checksum: Unexpected output: %s", output)
0248 return None, None, None
0249
0250 pattern = r"(?P<type>md5|adler32):\ (?P<checksum>[a-zA-Z0-9]+)\ \S+\ (?P<filesize>[0-9]+)"
0251 filesize, checksum, checksum_type = None, None, None
0252
0253 m = re.search(pattern, output)
0254 if m:
0255 checksum_type = m.group('type')
0256 checksum = m.group('checksum')
0257 checksum = checksum.zfill(8)
0258 filesize = m.group('filesize')
0259 if filesize:
0260 try:
0261 filesize = int(filesize)
0262 except ValueError as error:
0263 logger.warning('failed to convert filesize to int: %s', error)
0264 filesize = None
0265 else:
0266 logger.warning("WARNING: Checksum/file size info not found in output: failed to match pattern=%s in output=%s", pattern, output)
0267
0268 return filesize, checksum, checksum_type