Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Tobias Wegner, tobias.wegner@cern.ch, 2017-2018
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0010 
0011 # Reimplemented by Alexey Anisenkov
0012 
0013 import os
0014 import logging
0015 import re
0016 from time import time
0017 
0018 from .common import resolve_common_transfer_errors, verify_catalog_checksum  #, get_timeout
0019 from pilot.util.container import execute
0020 from pilot.common.exception import PilotException, ErrorCodes
0021 #from pilot.util.timer import timeout
0022 
0023 logger = logging.getLogger(__name__)
0024 
0025 require_replicas = True  ## indicate if given copytool requires input replicas to be resolved
0026 allowed_schemas = ['root']  # prioritized list of supported schemas for transfers by given copytool
0027 
0028 copy_command = 'xrdcp'
0029 
0030 
0031 def is_valid_for_copy_in(files):
0032     return True  ## FIX ME LATER
0033 
0034 
0035 def is_valid_for_copy_out(files):
0036     return True  ## FIX ME LATER
0037 
0038 
0039 def _resolve_checksum_option(setup, **kwargs):
0040 
0041     cmd = "%s --version" % copy_command
0042     if setup:
0043         cmd = "source %s; %s" % (setup, cmd)
0044 
0045     logger.info("Execute command (%s) to check xrdcp client version", cmd)
0046 
0047     rcode, stdout, stderr = execute(cmd, **kwargs)
0048     logger.info("return code: %s", rcode)
0049     logger.info("return output: %s", stdout + stderr)
0050 
0051     cmd = "%s -h" % copy_command
0052     if setup:
0053         cmd = "source %s; %s" % (setup, cmd)
0054 
0055     logger.info("Execute command (%s) to decide which option should be used to calc/verify file checksum..", cmd)
0056 
0057     rcode, stdout, stderr = execute(cmd, **kwargs)
0058     output = stdout + stderr
0059     logger.info("return code: %s", rcode)
0060     logger.debug("return output: %s", output)
0061 
0062     coption = ""
0063     checksum_type = 'adler32'  ## consider only adler32 for now
0064 
0065     if rcode:
0066         logger.error('FAILED to execute command=%s: %s', cmd, output)
0067     else:
0068         if "--cksum" in output:
0069             coption = "--cksum %s:print" % checksum_type
0070         elif "-adler" in output and checksum_type == 'adler32':
0071             coption = "-adler"
0072         elif "-md5" in output and checksum_type == 'md5':
0073             coption = "-md5"
0074 
0075     if coption:
0076         logger.info("Use %s option to get the checksum for %s command", coption, copy_command)
0077 
0078     return coption
0079 
0080 
0081 #@timeout(seconds=10800)
0082 def _stagefile(coption, source, destination, filesize, is_stagein, setup=None, **kwargs):
0083     """
0084         Stage the file (stagein or stageout)
0085         :return: destination file details (checksum, checksum_type) in case of success, throw exception in case of failure
0086         :raise: PilotException in case of controlled error
0087     """
0088 
0089     filesize_cmd, checksum_cmd, checksum_type = None, None, None
0090 
0091     cmd = '%s -np -f %s %s %s' % (copy_command, coption, source, destination)
0092     if setup:
0093         cmd = "source %s; %s" % (setup, cmd)
0094 
0095     #timeout = get_timeout(filesize)
0096     #logger.info("Executing command: %s, timeout=%s" % (cmd, timeout))
0097 
0098     rcode, stdout, stderr = execute(cmd, **kwargs)
0099     logger.info('rcode=%d, stdout=%s, stderr=%s', rcode, stdout, stderr)
0100 
0101     if rcode:  ## error occurred
0102         error = resolve_common_transfer_errors(stdout + stderr, is_stagein=is_stagein)
0103 
0104         #rcode = error.get('rcode')  ## TO BE IMPLEMENTED
0105         #if not is_stagein and rcode == PilotErrors.ERR_CHKSUMNOTSUP: ## stage-out, on fly checksum verification is not supported .. ignore
0106         #    logger.info('stage-out: ignore ERR_CHKSUMNOTSUP error .. will explicitly verify uploaded file')
0107         #    return None, None
0108 
0109         raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0110 
0111     # extract filesize and checksum values from output
0112     if coption != "":
0113         filesize_cmd, checksum_cmd, checksum_type = get_file_info_from_output(stdout + stderr)
0114 
0115     ## verify transfer by returned checksum or call remote checksum calculation
0116     ## to be moved at the base level
0117 
0118     is_verified = True   ## TO BE IMPLEMENTED LATER
0119 
0120     if not is_verified:
0121         rcode = ErrorCodes.GETADMISMATCH if is_stagein else ErrorCodes.PUTADMISMATCH
0122         raise PilotException("Copy command failed", code=rcode, state='AD_MISMATCH')
0123 
0124     return filesize_cmd, checksum_cmd, checksum_type
0125 
0126 
0127 # @timeout(seconds=10800)
0128 def copy_in(files, **kwargs):
0129     """
0130         Download given files using xrdcp command.
0131 
0132         :param files: list of `FileSpec` objects
0133         :raise: PilotException in case of controlled error
0134     """
0135 
0136     #allow_direct_access = kwargs.get('allow_direct_access') or False
0137     setup = kwargs.pop('copytools', {}).get('xrdcp', {}).get('setup')
0138     coption = _resolve_checksum_option(setup, **kwargs)
0139     trace_report = kwargs.get('trace_report')
0140 
0141     # note, env vars might be unknown inside middleware contrainers, if so get the value already in the trace report
0142     localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite'))
0143     for fspec in files:
0144         # update the trace report
0145         localsite = localsite if localsite else fspec.ddmendpoint
0146         trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0147         trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0148         trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0149 
0150         # continue loop for files that are to be accessed directly  ## TOBE DEPRECATED (anisyonk)
0151         #if fspec.is_directaccess(ensure_replica=False) and allow_direct_access and fspec.accessmode == 'direct':
0152         #    fspec.status_code = 0
0153         #    fspec.status = 'remote_io'
0154         #    trace_report.update(url=fspec.turl, clientState='FOUND_ROOT', stateReason='direct_access')
0155         #    trace_report.send()
0156         #    continue
0157 
0158         trace_report.update(catStart=time())
0159 
0160         dst = fspec.workdir or kwargs.get('workdir') or '.'
0161         destination = os.path.join(dst, fspec.lfn)
0162         try:
0163             filesize_cmd, checksum_cmd, checksum_type = _stagefile(coption, fspec.turl, destination, fspec.filesize,
0164                                                                    is_stagein=True, setup=setup, **kwargs)
0165             fspec.status_code = 0
0166             fspec.status = 'transferred'
0167         except PilotException as error:
0168             fspec.status = 'failed'
0169             fspec.status_code = error.get_error_code()
0170             diagnostics = error.get_detail()
0171             state = 'STAGEIN_ATTEMPT_FAILED'
0172             trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time())
0173             trace_report.send()
0174             raise PilotException(diagnostics, code=fspec.status_code, state=state)
0175         else:
0176             # compare checksums
0177             fspec.checksum[checksum_type] = checksum_cmd  # remote checksum
0178             state, diagnostics = verify_catalog_checksum(fspec, destination)
0179             if diagnostics != "":
0180                 trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0181                                     timeEnd=time())
0182                 trace_report.send()
0183                 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0184 
0185         trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0186         trace_report.send()
0187 
0188     return files
0189 
0190 
0191 # @timeout(seconds=10800)
0192 def copy_out(files, **kwargs):
0193     """
0194         Upload given files using xrdcp command.
0195 
0196         :param files: list of `FileSpec` objects
0197         :raise: PilotException in case of controlled error
0198     """
0199 
0200     setup = kwargs.pop('copytools', {}).get('xrdcp', {}).get('setup')
0201     coption = _resolve_checksum_option(setup, **kwargs)
0202     trace_report = kwargs.get('trace_report')
0203 
0204     for fspec in files:
0205         trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize)
0206         trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0207 
0208         try:
0209             filesize_cmd, checksum_cmd, checksum_type = _stagefile(coption, fspec.surl, fspec.turl, fspec.filesize,
0210                                                                    is_stagein=False, setup=setup, **kwargs)
0211             fspec.status_code = 0
0212             fspec.status = 'transferred'
0213             trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0214             trace_report.send()
0215         except PilotException as error:
0216             fspec.status = 'failed'
0217             fspec.status_code = error.get_error_code()
0218             state = 'STAGEOUT_ATTEMPT_FAILED'
0219             diagnostics = error.get_detail()
0220             trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time())
0221             trace_report.send()
0222             raise PilotException(diagnostics, code=fspec.status_code, state=state)
0223         else:
0224             # compare checksums
0225             fspec.checksum[checksum_type] = checksum_cmd  # remote checksum
0226             state, diagnostics = verify_catalog_checksum(fspec, fspec.surl)
0227             if diagnostics != "":
0228                 trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0229                                     timeEnd=time())
0230                 trace_report.send()
0231                 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0232 
0233     return files
0234 
0235 
0236 def get_file_info_from_output(output):
0237     """
0238     Extract file size, checksum value from xrdcp --chksum command output
0239 
0240     :return: (filesize [int/None], checksum, checksum_type) or (None, None, None) in case of failure
0241     """
0242 
0243     if not output:
0244         return None, None, None
0245 
0246     if not ("xrootd" in output or "XRootD" in output or "adler32" in output):
0247         logger.warning("WARNING: Failed to extract checksum: Unexpected output: %s", output)
0248         return None, None, None
0249 
0250     pattern = r"(?P<type>md5|adler32):\ (?P<checksum>[a-zA-Z0-9]+)\ \S+\ (?P<filesize>[0-9]+)"  # Python 3 (added r)
0251     filesize, checksum, checksum_type = None, None, None
0252 
0253     m = re.search(pattern, output)
0254     if m:
0255         checksum_type = m.group('type')
0256         checksum = m.group('checksum')
0257         checksum = checksum.zfill(8)  # make it 8 chars length (adler32 xrdcp fix)
0258         filesize = m.group('filesize')
0259         if filesize:
0260             try:
0261                 filesize = int(filesize)
0262             except ValueError as error:
0263                 logger.warning('failed to convert filesize to int: %s', error)
0264                 filesize = None
0265     else:
0266         logger.warning("WARNING: Checksum/file size info not found in output: failed to match pattern=%s in output=%s", pattern, output)
0267 
0268     return filesize, checksum, checksum_type