File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import os
0013 import logging
0014 import errno
0015 from time import time
0016
0017 from .common import get_copysetup, verify_catalog_checksum, resolve_common_transfer_errors
0018 from pilot.common.exception import StageInFailure, StageOutFailure, PilotException, ErrorCodes
0019 from pilot.util.container import execute
0020
0021
0022
0023 logger = logging.getLogger(__name__)
0024
0025 require_replicas = True
0026
0027 allowed_schemas = ['srm', 'gsiftp', 'root']
0028
0029
0030 def is_valid_for_copy_in(files):
0031 return True
0032
0033
0034
0035
0036
0037
0038 def is_valid_for_copy_out(files):
0039
0040
0041
0042 return True
0043
0044
0045 def copy_in_old(files):
0046 """
0047 Tries to download the given files using lsm-get directly.
0048
0049 :param files: Files to download
0050 :raises PilotException: StageInFailure
0051 """
0052
0053 if not check_for_lsm(dst_in=True):
0054 raise StageInFailure("No LSM tools found")
0055 exit_code, stdout, stderr = move_all_files_in(files)
0056 if exit_code != 0:
0057
0058 raise StageInFailure(stdout)
0059
0060
0061 def copy_in(files, **kwargs):
0062 """
0063 Download given files using the lsm-get command.
0064
0065 :param files: list of `FileSpec` objects.
0066 :raise: PilotException in case of controlled error.
0067 :return: files `FileSpec` object.
0068 """
0069
0070 exit_code = 0
0071 stdout = ""
0072 stderr = ""
0073
0074 copytools = kwargs.get('copytools') or []
0075 copysetup = get_copysetup(copytools, 'lsm')
0076 trace_report = kwargs.get('trace_report')
0077
0078
0079
0080 localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite'))
0081
0082 for fspec in files:
0083
0084 localsite = localsite if localsite else fspec.ddmendpoint
0085 trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0086 trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0087 trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0088
0089
0090
0091
0092
0093
0094
0095
0096
0097 trace_report.update(catStart=time())
0098
0099 dst = fspec.workdir or kwargs.get('workdir') or '.'
0100
0101 source = fspec.turl
0102 destination = os.path.join(dst, fspec.lfn)
0103
0104 logger.info("transferring file %s from %s to %s", fspec.lfn, source, destination)
0105
0106 exit_code, stdout, stderr = move(source, destination, dst_in=True, copysetup=copysetup)
0107
0108 if exit_code != 0:
0109 logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr)
0110
0111 error = resolve_common_transfer_errors(stderr, is_stagein=True)
0112 fspec.status = 'failed'
0113 fspec.status_code = error.get('rcode')
0114 trace_report.update(clientState=error.get('state') or 'STAGEIN_ATTEMPT_FAILED',
0115 stateReason=error.get('error'), timeEnd=time())
0116 trace_report.send()
0117 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0118
0119
0120 state, diagnostics = verify_catalog_checksum(fspec, destination)
0121 if diagnostics != "":
0122 trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0123 timeEnd=time())
0124 trace_report.send()
0125 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0126
0127 fspec.status_code = 0
0128 fspec.status = 'transferred'
0129 trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0130 trace_report.send()
0131
0132
0133
0134
0135
0136 return files
0137
0138
0139 def copy_out(files, **kwargs):
0140 """
0141 Upload given files using lsm copytool.
0142
0143 :param files: list of `FileSpec` objects.
0144 :raise: PilotException in case of controlled error.
0145 """
0146
0147 copytools = kwargs.get('copytools') or []
0148 copysetup = get_copysetup(copytools, 'lsm')
0149 trace_report = kwargs.get('trace_report')
0150 ddmconf = kwargs.get('ddmconf', None)
0151 if not ddmconf:
0152 raise PilotException("copy_out() failed to resolve ddmconf from function arguments",
0153 code=ErrorCodes.STAGEOUTFAILED,
0154 state='COPY_ERROR')
0155
0156 for fspec in files:
0157 trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize)
0158 trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0159
0160
0161 ddm = ddmconf.get(fspec.ddmendpoint)
0162 token = ddm.token
0163 if not token:
0164 diagnostics = "copy_out() failed to resolve token value for ddmendpoint=%s" % (fspec.ddmendpoint)
0165 trace_report.update(clientState='STAGEOUT_ATTEMPT_FAILED',
0166 stateReason=diagnostics,
0167 timeEnd=time())
0168 trace_report.send()
0169 raise PilotException(diagnostics, code=ErrorCodes.STAGEOUTFAILED, state='COPY_ERROR')
0170
0171 src = fspec.workdir or kwargs.get('workdir') or '.'
0172
0173 source = os.path.join(src, fspec.lfn)
0174 destination = fspec.turl
0175
0176
0177
0178 checksum = "adler32:%s" % fspec.checksum.get('adler32')
0179
0180
0181 opts = {'--size': fspec.filesize,
0182 '-t': token,
0183 '--checksum': checksum,
0184 '--guid': fspec.guid}
0185 try:
0186 opts = " ".join(["%s %s" % (k, v) for (k, v) in opts.iteritems()])
0187 except Exception:
0188 opts = " ".join(["%s %s" % (k, v) for (k, v) in list(opts.items())])
0189
0190 logger.info("transferring file %s from %s to %s", fspec.lfn, source, destination)
0191
0192 nretries = 1
0193 for retry in range(nretries):
0194 exit_code, stdout, stderr = move(source, destination, dst_in=False, copysetup=copysetup, options=opts)
0195
0196 if exit_code != 0:
0197 if stderr == "":
0198 stderr = stdout
0199 error = resolve_common_transfer_errors(stderr, is_stagein=False)
0200 fspec.status = 'failed'
0201 fspec.status_code = error.get('exit_code')
0202 trace_report.update(clientState=error.get('state', None) or 'STAGEOUT_ATTEMPT_FAILED',
0203 stateReason=error.get('error', 'unknown error'),
0204 timeEnd=time())
0205 trace_report.send()
0206 raise PilotException(error.get('error'), code=error.get('exit_code'), state=error.get('state'))
0207 else:
0208 logger.info('all successful')
0209 break
0210
0211 fspec.status_code = 0
0212 fspec.status = 'transferred'
0213 trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0214 trace_report.send()
0215
0216 return files
0217
0218
0219 def copy_out_old(files):
0220 """
0221 Tries to upload the given files using lsm-put directly.
0222
0223 :param files: Files to upload
0224 :raises PilotException: StageOutFailure
0225 """
0226
0227 if not check_for_lsm(dst_in=False):
0228 raise StageOutFailure("No LSM tools found")
0229
0230 exit_code, stdout, stderr = move_all_files_out(files)
0231 if exit_code != 0:
0232
0233 raise StageOutFailure(stdout)
0234
0235
0236 def move_all_files_in(files, nretries=1):
0237 """
0238 Move all files.
0239
0240 :param files:
0241 :param nretries: number of retries; sometimes there can be a timeout copying, but the next attempt may succeed
0242 :return: exit_code, stdout, stderr
0243 """
0244
0245 exit_code = 0
0246 stdout = ""
0247 stderr = ""
0248
0249 for entry in files:
0250 logger.info("transferring file %s from %s to %s", entry['name'], entry['source'], entry['destination'])
0251
0252 source = entry['source'] + '/' + entry['name']
0253 destination = os.path.join(entry['destination'], entry['name'])
0254 for retry in range(nretries):
0255 exit_code, stdout, stderr = move(source, destination, dst_in=True)
0256
0257 if exit_code != 0:
0258 if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries:
0259 logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr)
0260 return exit_code, stdout, stderr
0261 else:
0262 break
0263
0264 return exit_code, stdout, stderr
0265
0266
0267 def move_all_files_out(files, nretries=1):
0268 """
0269 Move all files.
0270
0271 :param files:
0272 :return: exit_code, stdout, stderr
0273 """
0274
0275 exit_code = 0
0276 stdout = ""
0277 stderr = ""
0278
0279 for entry in files:
0280 logger.info("transferring file %s from %s to %s", entry['name'], entry['source'], entry['destination'])
0281
0282 destination = entry['destination'] + '/' + entry['name']
0283 source = os.path.join(entry['source'], entry['name'])
0284 for retry in range(nretries):
0285 exit_code, stdout, stderr = move(source, destination, dst_in=False)
0286
0287 if exit_code != 0:
0288 if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries:
0289 logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr)
0290 return exit_code, stdout, stderr
0291 else:
0292 break
0293
0294 return exit_code, stdout, stderr
0295
0296
0297
0298 def move(source, destination, dst_in=True, copysetup="", options=None):
0299 """
0300 Use lsm-get or lsm-put to transfer the file.
0301
0302 :param source: path to source (string).
0303 :param destination: path to destination (string).
0304 :param dst_in: True for stage-in, False for stage-out (boolean).
0305 :return: exit code, stdout, stderr
0306 """
0307
0308
0309 if copysetup != "":
0310 cmd = 'source %s;' % copysetup
0311 else:
0312 cmd = ''
0313
0314 args = "%s %s" % (source, destination)
0315 if options:
0316 args = "%s %s" % (options, args)
0317
0318 if dst_in:
0319 cmd += "lsm-get %s" % args
0320 else:
0321 cmd += "lsm-put %s" % args
0322
0323 try:
0324 exit_code, stdout, stderr = execute(cmd, usecontainer=False, copytool=True)
0325 except Exception as error:
0326 if dst_in:
0327 exit_code = ErrorCodes.STAGEINFAILED
0328 else:
0329 exit_code = ErrorCodes.STAGEOUTFAILED
0330 stdout = 'exception caught: e' % error
0331 stderr = ''
0332 logger.warning(stdout)
0333
0334 logger.info('exit_code=%d, stdout=%s, stderr=%s', exit_code, stdout, stderr)
0335 return exit_code, stdout, stderr
0336
0337
0338 def check_for_lsm(dst_in=True):
0339 cmd = None
0340 if dst_in:
0341 cmd = 'which lsm-get'
0342 else:
0343 cmd = 'which lsm-put'
0344 exit_code, gfal_path, _ = execute(cmd)
0345 return exit_code == 0