File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 import os
0014 import logging
0015 import errno
0016 from time import time
0017
0018 from .common import resolve_common_transfer_errors, get_timeout
0019 from pilot.common.exception import PilotException, ErrorCodes, StageInFailure, StageOutFailure
0020 from pilot.util.container import execute
0021
0022
0023 logger = logging.getLogger(__name__)
0024
0025 require_replicas = True
0026
0027 allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root']
0028
0029
0030 def is_valid_for_copy_in(files):
0031 return True
0032
0033
0034
0035
0036
0037
0038 def is_valid_for_copy_out(files):
0039 return True
0040
0041
0042
0043
0044
0045
0046 def copy_in(files, **kwargs):
0047 """
0048 Download given files using gfal-copy command.
0049
0050 :param files: list of `FileSpec` objects
0051 :raise: PilotException in case of controlled error
0052 """
0053
0054
0055 trace_report = kwargs.get('trace_report')
0056
0057 if not check_for_gfal():
0058 raise StageInFailure("No GFAL2 tools found")
0059
0060
0061 localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite'))
0062 for fspec in files:
0063
0064 localsite = localsite if localsite else fspec.ddmendpoint
0065 trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0066 trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0067 trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077 trace_report.update(catStart=time())
0078
0079 dst = fspec.workdir or kwargs.get('workdir') or '.'
0080
0081 timeout = get_timeout(fspec.filesize)
0082 source = fspec.turl
0083 destination = "file://%s" % os.path.abspath(os.path.join(dst, fspec.lfn))
0084
0085 cmd = ['gfal-copy --verbose -f', ' -t %s' % timeout]
0086
0087 if fspec.checksum:
0088 cmd += ['-K', '%s:%s' % list(fspec.checksum.items())[0]]
0089
0090 cmd += [source, destination]
0091
0092 rcode, stdout, stderr = execute(" ".join(cmd), **kwargs)
0093
0094 if rcode:
0095 if rcode in [errno.ETIMEDOUT, errno.ETIME]:
0096 error = {'rcode': ErrorCodes.STAGEINTIMEOUT,
0097 'state': 'CP_TIMEOUT',
0098 'error': 'Copy command timed out: %s' % stderr}
0099 else:
0100 error = resolve_common_transfer_errors(stdout + stderr, is_stagein=True)
0101 fspec.status = 'failed'
0102 fspec.status_code = error.get('rcode')
0103 trace_report.update(clientState=error.get('state') or 'STAGEIN_ATTEMPT_FAILED',
0104 stateReason=error.get('error'), timeEnd=time())
0105 trace_report.send()
0106
0107 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0108
0109 fspec.status_code = 0
0110 fspec.status = 'transferred'
0111 trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0112 trace_report.send()
0113
0114 return files
0115
0116
0117 def copy_out(files, **kwargs):
0118 """
0119 Upload given files using gfal command.
0120
0121 :param files: Files to upload
0122 :raises: PilotException in case of errors
0123 """
0124
0125 if not check_for_gfal():
0126 raise StageOutFailure("No GFAL2 tools found")
0127
0128 trace_report = kwargs.get('trace_report')
0129
0130 for fspec in files:
0131 trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize)
0132 trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0133
0134 src = fspec.workdir or kwargs.get('workdir') or '.'
0135
0136 timeout = get_timeout(fspec.filesize)
0137
0138 source = "file://%s" % os.path.abspath(fspec.surl or os.path.join(src, fspec.lfn))
0139 destination = fspec.turl
0140
0141 cmd = ['gfal-copy --verbose -f', ' -t %s' % timeout]
0142
0143 if fspec.checksum:
0144 cmd += ['-K', '%s:%s' % list(fspec.checksum.items())[0]]
0145
0146 cmd += [source, destination]
0147
0148 rcode, stdout, stderr = execute(" ".join(cmd), **kwargs)
0149
0150 if rcode:
0151 if rcode in [errno.ETIMEDOUT, errno.ETIME]:
0152 error = {'rcode': ErrorCodes.STAGEOUTTIMEOUT,
0153 'state': 'CP_TIMEOUT',
0154 'error': 'Copy command timed out: %s' % stderr}
0155 else:
0156 error = resolve_common_transfer_errors(stdout + stderr, is_stagein=False)
0157 fspec.status = 'failed'
0158 fspec.status_code = error.get('rcode')
0159 trace_report.update(clientState=error.get('state', None) or 'STAGEOUT_ATTEMPT_FAILED',
0160 stateReason=error.get('error', 'unknown error'),
0161 timeEnd=time())
0162 trace_report.send()
0163 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0164
0165 fspec.status_code = 0
0166 fspec.status = 'transferred'
0167 trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0168 trace_report.send()
0169
0170 return files
0171
0172
0173 def move_all_files_in(files, nretries=1):
0174 """
0175 Move all files.
0176
0177 :param files:
0178 :param nretries: number of retries; sometimes there can be a timeout copying, but the next attempt may succeed
0179 :return: exit_code, stdout, stderr
0180 """
0181
0182 exit_code = 0
0183 stdout = ""
0184 stderr = ""
0185
0186 for entry in files:
0187 logger.info("transferring file %s from %s to %s" % (entry['name'], entry['source'], entry['destination']))
0188
0189 source = entry['source'] + '/' + entry['name']
0190
0191
0192 destination = 'file:///' + os.path.join(entry['destination'], entry['name'])
0193 for retry in range(nretries):
0194 exit_code, stdout, stderr = move(source, destination, entry.get('recursive', False))
0195
0196 if exit_code != 0:
0197 if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries:
0198 logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s" % (exit_code, stdout, stderr))
0199 return exit_code, stdout, stderr
0200 else:
0201 break
0202
0203 return exit_code, stdout, stderr
0204
0205
0206 def move_all_files_out(files, nretries=1):
0207 """
0208 Move all files.
0209
0210 :param files:
0211 :return: exit_code, stdout, stderr
0212 """
0213
0214 exit_code = 0
0215 stdout = ""
0216 stderr = ""
0217
0218 for entry in files:
0219 logger.info("transferring file %s from %s to %s" % (entry['name'], entry['source'], entry['destination']))
0220
0221 destination = entry['destination'] + '/' + entry['name']
0222
0223
0224 source = 'file:///' + os.path.join(entry['source'], entry['name'])
0225 for retry in range(nretries):
0226 exit_code, stdout, stderr = move(source, destination)
0227
0228 if exit_code != 0:
0229 if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries:
0230 logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s" % (exit_code, stdout, stderr))
0231 return exit_code, stdout, stderr
0232 else:
0233 break
0234
0235 return exit_code, stdout, stderr
0236
0237
0238
0239 def move(source, destination, recursive=False):
0240 cmd = None
0241 if recursive:
0242 cmd = "gfal-copy -r %s %s" % (source, destination)
0243 else:
0244 cmd = "gfal-copy %s %s" % (source, destination)
0245 print(cmd)
0246 exit_code, stdout, stderr = execute(cmd)
0247
0248 return exit_code, stdout, stderr
0249
0250
0251 def check_for_gfal():
0252 exit_code, gfal_path, _ = execute('which gfal-copy')
0253 return exit_code == 0