Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Tobias Wegner, tobias.wegner@cern.ch, 2017
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0010 # - Mario Lassnig, mario.lassnig@cern.ch, 2020
0011 
0012 import logging
0013 import os
0014 import re
0015 
0016 from pilot.common.errorcodes import ErrorCodes
0017 from pilot.util.filehandling import calculate_checksum, get_checksum_type, get_checksum_value
0018 
0019 logger = logging.getLogger(__name__)
0020 
0021 
0022 def get_timeout(filesize, add=0):
0023     """
0024     Get a proper time-out limit based on the file size.
0025 
0026     :param filesize: file size (int).
0027     :param add: optional additional time to be added [s] (int)
0028     :return: time-out in seconds (int).
0029     """
0030 
0031     timeout_max = 3 * 3600  # 3 hours
0032     timeout_min = 300  # self.timeout
0033 
0034     timeout = timeout_min + int(filesize / 0.1e7) + add  # approx < 1 Mb/sec
0035 
0036     return min(timeout, timeout_max)
0037 
0038 
0039 def verify_catalog_checksum(fspec, path):
0040     """
0041     Verify that the local and remote (fspec) checksum values are the same.
0042     The function will update the fspec object.
0043 
0044     :param fspec: FileSpec object for a given file.
0045     :param path: path to local file (string).
0046     :return: state (string), diagnostics (string).
0047     """
0048 
0049     diagnostics = ""
0050     state = ""
0051 
0052     checksum_type = get_checksum_type(fspec.checksum)
0053     checksum_catalog = get_checksum_value(fspec.checksum)
0054     if checksum_type == 'unknown':
0055         diagnostics = 'unknown checksum type for checksum(catalog): %s' % fspec.checksum
0056         logger.warning(diagnostics)
0057         fspec.status_code = ErrorCodes.UNKNOWNCHECKSUMTYPE
0058         fspec.status = 'failed'
0059         state = 'UNKNOWN_CHECKSUM_TYPE'
0060     else:
0061         checksum_local = calculate_checksum(path, algorithm=checksum_type)
0062         if checksum_type == 'ad32':
0063             checksum_type = 'adler32'
0064         logger.info('checksum (catalog): %s (type: %s)', checksum_catalog, checksum_type)
0065         logger.info('checksum (local): %s', checksum_local)
0066         if checksum_local and checksum_local != '' and checksum_local != checksum_catalog:
0067             diagnostics = 'checksum verification failed for LFN=%s: checksum (catalog)=%s != checksum (local)=%s' % \
0068                           (fspec.lfn, checksum_catalog, checksum_local)
0069             logger.warning(diagnostics)
0070             fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'adler32' else ErrorCodes.GETMD5MISMATCH
0071             fspec.status = 'failed'
0072             state = 'AD_MISMATCH' if checksum_type == 'ad32' else 'MD_MISMATCH'
0073         else:
0074             logger.info('catalog and local checksum values are the same')
0075 
0076     return state, diagnostics
0077 
0078 
0079 def merge_destinations(files):
0080     """
0081     Converts the file-with-destination dict to a destination-with-files dict
0082 
0083     :param files Files to merge
0084 
0085     :returns destination-with-files dictionary
0086     """
0087     destinations = {}
0088     # ensure type(files) == list
0089     for f in files:
0090         # ensure destination in f
0091         if not os.path.exists(f['destination']):
0092             f['status'] = 'failed'
0093             f['errmsg'] = 'Destination directory does not exist: %s' % f['destination']
0094             f['errno'] = 1
0095         else:
0096             # ensure scope, name in f
0097             f['status'] = 'running'
0098             f['errmsg'] = 'File not yet successfully downloaded.'
0099             f['errno'] = 2
0100             lfn = '%s:%s' % (f['scope'], f['name'])
0101             dst = destinations.setdefault(f['destination'], {'lfns': set(), 'files': list()})
0102             dst['lfns'].add(lfn)
0103             dst['files'].append(f)
0104     return destinations
0105 
0106 
0107 def get_copysetup(copytools, copytool_name):
0108     """
0109     Return the copysetup for the given copytool.
0110 
0111     :param copytools: copytools list from infosys.
0112     :param copytool name: name of copytool (string).
0113     :return: copysetup (string).
0114     """
0115     copysetup = ""
0116 
0117     if not copytools:
0118         return ""
0119 
0120     for ct in list(copytools.keys()):  # Python 2/3
0121         if copytool_name == ct:
0122             copysetup = copytools[ct].get('setup')
0123             break
0124 
0125     return copysetup
0126 
0127 
0128 def get_error_info(rcode, state, error_msg):
0129     """
0130     Return an error info dictionary specific to transfer errors.
0131     Helper function to resolve_common_transfer_errors().
0132 
0133     :param rcode: return code (int).
0134     :param state: state string used in Rucio traces.
0135     :param error_msg: transfer command stdout (string).
0136     :return: dictionary with format {'rcode': rcode, 'state': state, 'error': error_msg}.
0137     """
0138 
0139     return {'rcode': rcode, 'state': state, 'error': error_msg}
0140 
0141 
0142 def output_line_scan(ret, output):
0143     """
0144     Do some reg exp on the transfer command output to search for special errors.
0145     Helper function to resolve_common_transfer_errors().
0146 
0147     :param ret: pre-filled error info dictionary with format {'rcode': rcode, 'state': state, 'error': error_msg}
0148     :param output: transfer command stdout (string).
0149     :return: updated error info dictionary.
0150     """
0151 
0152     for line in output.split('\n'):
0153         m = re.search(r"[Dd]etails\s*:\s*(?P<error>.*)", line)  # Python 3 (added r)
0154         if m:
0155             ret['error'] = m.group('error')
0156         elif 'service_unavailable' in line:
0157             ret['error'] = 'service_unavailable'
0158             ret['rcode'] = ErrorCodes.RUCIOSERVICEUNAVAILABLE
0159 
0160     return ret
0161 
0162 
0163 def resolve_common_transfer_errors(output, is_stagein=True):  # noqa: C901
0164     """
0165     Resolve any common transfer related errors.
0166 
0167     :param output: stdout from transfer command (string).
0168     :param is_stagein: optional (boolean).
0169     :return: dict {'rcode': rcode, 'state': state, 'error': error_msg}.
0170     """
0171 
0172     # default to make sure dictionary exists and all fields are populated (some of which might be overwritten below)
0173     ret = get_error_info(ErrorCodes.STAGEINFAILED if is_stagein else ErrorCodes.STAGEOUTFAILED, 'COPY_ERROR', output)
0174     if not output:
0175         return ret
0176 
0177     if "timeout" in output:
0178         ret = get_error_info(ErrorCodes.STAGEINTIMEOUT if is_stagein else ErrorCodes.STAGEOUTTIMEOUT,
0179                              'CP_TIMEOUT', 'copy command timed out: %s' % output)
0180     elif "failed xrdadler32" in output:
0181         ret = get_error_info(ErrorCodes.GETADMISMATCH if is_stagein else ErrorCodes.PUTADMISMATCH,
0182                              'AD_MISMATCH', output)
0183     elif "does not match the checksum" in output and 'adler32' in output:
0184         ret = get_error_info(ErrorCodes.GETADMISMATCH if is_stagein else ErrorCodes.PUTADMISMATCH,
0185                              'AD_MISMATCH', output)
0186     elif "does not match the checksum" in output and 'adler32' not in output:
0187         ret = get_error_info(ErrorCodes.GETMD5MISMATCH if is_stagein else ErrorCodes.PUTMD5MISMATCH,
0188                              'MD5_MISMATCH', output)
0189     elif "globus_xio:" in output:
0190         ret = get_error_info(ErrorCodes.GETGLOBUSSYSERR if is_stagein else ErrorCodes.PUTGLOBUSSYSERR,
0191                              'GLOBUS_FAIL', "Globus system error: %s" % output)
0192     elif "File exists" in output or 'SRM_FILE_BUSY' in output or 'file already exists' in output:
0193         ret = get_error_info(ErrorCodes.FILEEXISTS, 'FILE_EXISTS',
0194                              "File already exists in the destination: %s" % output)
0195     elif "No such file or directory" in output and is_stagein:
0196         ret = get_error_info(ErrorCodes.MISSINGINPUTFILE, 'MISSING_INPUT', output)
0197     elif "query chksum is not supported" in output or "Unable to checksum" in output:
0198         ret = get_error_info(ErrorCodes.CHKSUMNOTSUP, 'CHKSUM_NOTSUP', output)
0199     elif "Could not establish context" in output:
0200         error_msg = "Could not establish context: Proxy / VO extension of proxy has probably expired: %s" % output
0201         ret = get_error_info(ErrorCodes.NOPROXY, 'CONTEXT_FAIL', error_msg)
0202     elif "No space left on device" in output:
0203         ret = get_error_info(ErrorCodes.NOLOCALSPACE if is_stagein else ErrorCodes.NOREMOTESPACE,
0204                              'NO_SPACE', "No available space left on disk: %s" % output)
0205     elif "No such file or directory" in output:
0206         ret = get_error_info(ErrorCodes.NOSUCHFILE, 'NO_FILE', output)
0207     elif "service is not available at the moment" in output:
0208         ret = get_error_info(ErrorCodes.SERVICENOTAVAILABLE, 'SERVICE_ERROR', output)
0209     elif "Network is unreachable" in output:
0210         ret = get_error_info(ErrorCodes.UNREACHABLENETWORK, 'NETWORK_UNREACHABLE', output)
0211     elif "Run: [ERROR] Server responded with an error" in output:
0212         ret = get_error_info(ErrorCodes.XRDCPERROR, 'XRDCP_ERROR', output)
0213     elif "Unable to locate credentials" in output:
0214         ret = get_error_info(ErrorCodes.MISSINGCREDENTIALS, 'S3_ERROR', output)
0215 
0216     # reg exp the output to get real error message
0217     ret = output_line_scan(ret, output)
0218 
0219     return ret