File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import logging
0013 import os
0014 import re
0015
0016 from pilot.common.errorcodes import ErrorCodes
0017 from pilot.util.filehandling import calculate_checksum, get_checksum_type, get_checksum_value
0018
0019 logger = logging.getLogger(__name__)
0020
0021
0022 def get_timeout(filesize, add=0):
0023 """
0024 Get a proper time-out limit based on the file size.
0025
0026 :param filesize: file size (int).
0027 :param add: optional additional time to be added [s] (int)
0028 :return: time-out in seconds (int).
0029 """
0030
0031 timeout_max = 3 * 3600
0032 timeout_min = 300
0033
0034 timeout = timeout_min + int(filesize / 0.1e7) + add
0035
0036 return min(timeout, timeout_max)
0037
0038
0039 def verify_catalog_checksum(fspec, path):
0040 """
0041 Verify that the local and remote (fspec) checksum values are the same.
0042 The function will update the fspec object.
0043
0044 :param fspec: FileSpec object for a given file.
0045 :param path: path to local file (string).
0046 :return: state (string), diagnostics (string).
0047 """
0048
0049 diagnostics = ""
0050 state = ""
0051
0052 checksum_type = get_checksum_type(fspec.checksum)
0053 checksum_catalog = get_checksum_value(fspec.checksum)
0054 if checksum_type == 'unknown':
0055 diagnostics = 'unknown checksum type for checksum(catalog): %s' % fspec.checksum
0056 logger.warning(diagnostics)
0057 fspec.status_code = ErrorCodes.UNKNOWNCHECKSUMTYPE
0058 fspec.status = 'failed'
0059 state = 'UNKNOWN_CHECKSUM_TYPE'
0060 else:
0061 checksum_local = calculate_checksum(path, algorithm=checksum_type)
0062 if checksum_type == 'ad32':
0063 checksum_type = 'adler32'
0064 logger.info('checksum (catalog): %s (type: %s)', checksum_catalog, checksum_type)
0065 logger.info('checksum (local): %s', checksum_local)
0066 if checksum_local and checksum_local != '' and checksum_local != checksum_catalog:
0067 diagnostics = 'checksum verification failed for LFN=%s: checksum (catalog)=%s != checksum (local)=%s' % \
0068 (fspec.lfn, checksum_catalog, checksum_local)
0069 logger.warning(diagnostics)
0070 fspec.status_code = ErrorCodes.GETADMISMATCH if checksum_type == 'adler32' else ErrorCodes.GETMD5MISMATCH
0071 fspec.status = 'failed'
0072 state = 'AD_MISMATCH' if checksum_type == 'ad32' else 'MD_MISMATCH'
0073 else:
0074 logger.info('catalog and local checksum values are the same')
0075
0076 return state, diagnostics
0077
0078
0079 def merge_destinations(files):
0080 """
0081 Converts the file-with-destination dict to a destination-with-files dict
0082
0083 :param files Files to merge
0084
0085 :returns destination-with-files dictionary
0086 """
0087 destinations = {}
0088
0089 for f in files:
0090
0091 if not os.path.exists(f['destination']):
0092 f['status'] = 'failed'
0093 f['errmsg'] = 'Destination directory does not exist: %s' % f['destination']
0094 f['errno'] = 1
0095 else:
0096
0097 f['status'] = 'running'
0098 f['errmsg'] = 'File not yet successfully downloaded.'
0099 f['errno'] = 2
0100 lfn = '%s:%s' % (f['scope'], f['name'])
0101 dst = destinations.setdefault(f['destination'], {'lfns': set(), 'files': list()})
0102 dst['lfns'].add(lfn)
0103 dst['files'].append(f)
0104 return destinations
0105
0106
0107 def get_copysetup(copytools, copytool_name):
0108 """
0109 Return the copysetup for the given copytool.
0110
0111 :param copytools: copytools list from infosys.
0112 :param copytool name: name of copytool (string).
0113 :return: copysetup (string).
0114 """
0115 copysetup = ""
0116
0117 if not copytools:
0118 return ""
0119
0120 for ct in list(copytools.keys()):
0121 if copytool_name == ct:
0122 copysetup = copytools[ct].get('setup')
0123 break
0124
0125 return copysetup
0126
0127
0128 def get_error_info(rcode, state, error_msg):
0129 """
0130 Return an error info dictionary specific to transfer errors.
0131 Helper function to resolve_common_transfer_errors().
0132
0133 :param rcode: return code (int).
0134 :param state: state string used in Rucio traces.
0135 :param error_msg: transfer command stdout (string).
0136 :return: dictionary with format {'rcode': rcode, 'state': state, 'error': error_msg}.
0137 """
0138
0139 return {'rcode': rcode, 'state': state, 'error': error_msg}
0140
0141
0142 def output_line_scan(ret, output):
0143 """
0144 Do some reg exp on the transfer command output to search for special errors.
0145 Helper function to resolve_common_transfer_errors().
0146
0147 :param ret: pre-filled error info dictionary with format {'rcode': rcode, 'state': state, 'error': error_msg}
0148 :param output: transfer command stdout (string).
0149 :return: updated error info dictionary.
0150 """
0151
0152 for line in output.split('\n'):
0153 m = re.search(r"[Dd]etails\s*:\s*(?P<error>.*)", line)
0154 if m:
0155 ret['error'] = m.group('error')
0156 elif 'service_unavailable' in line:
0157 ret['error'] = 'service_unavailable'
0158 ret['rcode'] = ErrorCodes.RUCIOSERVICEUNAVAILABLE
0159
0160 return ret
0161
0162
0163 def resolve_common_transfer_errors(output, is_stagein=True):
0164 """
0165 Resolve any common transfer related errors.
0166
0167 :param output: stdout from transfer command (string).
0168 :param is_stagein: optional (boolean).
0169 :return: dict {'rcode': rcode, 'state': state, 'error': error_msg}.
0170 """
0171
0172
0173 ret = get_error_info(ErrorCodes.STAGEINFAILED if is_stagein else ErrorCodes.STAGEOUTFAILED, 'COPY_ERROR', output)
0174 if not output:
0175 return ret
0176
0177 if "timeout" in output:
0178 ret = get_error_info(ErrorCodes.STAGEINTIMEOUT if is_stagein else ErrorCodes.STAGEOUTTIMEOUT,
0179 'CP_TIMEOUT', 'copy command timed out: %s' % output)
0180 elif "failed xrdadler32" in output:
0181 ret = get_error_info(ErrorCodes.GETADMISMATCH if is_stagein else ErrorCodes.PUTADMISMATCH,
0182 'AD_MISMATCH', output)
0183 elif "does not match the checksum" in output and 'adler32' in output:
0184 ret = get_error_info(ErrorCodes.GETADMISMATCH if is_stagein else ErrorCodes.PUTADMISMATCH,
0185 'AD_MISMATCH', output)
0186 elif "does not match the checksum" in output and 'adler32' not in output:
0187 ret = get_error_info(ErrorCodes.GETMD5MISMATCH if is_stagein else ErrorCodes.PUTMD5MISMATCH,
0188 'MD5_MISMATCH', output)
0189 elif "globus_xio:" in output:
0190 ret = get_error_info(ErrorCodes.GETGLOBUSSYSERR if is_stagein else ErrorCodes.PUTGLOBUSSYSERR,
0191 'GLOBUS_FAIL', "Globus system error: %s" % output)
0192 elif "File exists" in output or 'SRM_FILE_BUSY' in output or 'file already exists' in output:
0193 ret = get_error_info(ErrorCodes.FILEEXISTS, 'FILE_EXISTS',
0194 "File already exists in the destination: %s" % output)
0195 elif "No such file or directory" in output and is_stagein:
0196 ret = get_error_info(ErrorCodes.MISSINGINPUTFILE, 'MISSING_INPUT', output)
0197 elif "query chksum is not supported" in output or "Unable to checksum" in output:
0198 ret = get_error_info(ErrorCodes.CHKSUMNOTSUP, 'CHKSUM_NOTSUP', output)
0199 elif "Could not establish context" in output:
0200 error_msg = "Could not establish context: Proxy / VO extension of proxy has probably expired: %s" % output
0201 ret = get_error_info(ErrorCodes.NOPROXY, 'CONTEXT_FAIL', error_msg)
0202 elif "No space left on device" in output:
0203 ret = get_error_info(ErrorCodes.NOLOCALSPACE if is_stagein else ErrorCodes.NOREMOTESPACE,
0204 'NO_SPACE', "No available space left on disk: %s" % output)
0205 elif "No such file or directory" in output:
0206 ret = get_error_info(ErrorCodes.NOSUCHFILE, 'NO_FILE', output)
0207 elif "service is not available at the moment" in output:
0208 ret = get_error_info(ErrorCodes.SERVICENOTAVAILABLE, 'SERVICE_ERROR', output)
0209 elif "Network is unreachable" in output:
0210 ret = get_error_info(ErrorCodes.UNREACHABLENETWORK, 'NETWORK_UNREACHABLE', output)
0211 elif "Run: [ERROR] Server responded with an error" in output:
0212 ret = get_error_info(ErrorCodes.XRDCPERROR, 'XRDCP_ERROR', output)
0213 elif "Unable to locate credentials" in output:
0214 ret = get_error_info(ErrorCodes.MISSINGCREDENTIALS, 'S3_ERROR', output)
0215
0216
0217 ret = output_line_scan(ret, output)
0218
0219 return ret