File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 from __future__ import absolute_import
0015
0016 import os
0017 import json
0018 import logging
0019 from time import time
0020 from copy import deepcopy
0021
0022 from .common import resolve_common_transfer_errors, verify_catalog_checksum, get_timeout
0023 from pilot.common.exception import PilotException, StageOutFailure, ErrorCodes
0024 from pilot.util.timer import timeout, TimedThread
0025
0026 logger = logging.getLogger(__name__)
0027 logger.setLevel(logging.DEBUG)
0028
0029
0030 require_replicas = True
0031 require_protocols = False
0032 tracing_rucio = False
0033
0034
0035 def is_valid_for_copy_in(files):
0036 return True
0037
0038
0039 def is_valid_for_copy_out(files):
0040 return True
0041
0042
0043 def verify_stage_out(fspec):
0044 """
0045 Checks that the uploaded file is physically at the destination.
0046 :param fspec: file specifications
0047 """
0048 from rucio.rse import rsemanager as rsemgr
0049 rse_settings = rsemgr.get_rse_info(fspec.ddmendpoint)
0050 uploaded_file = {'name': fspec.lfn, 'scope': fspec.scope}
0051 logger.info('Checking file: %s', str(fspec.lfn))
0052 return rsemgr.exists(rse_settings, [uploaded_file])
0053
0054
0055
0056 def copy_in(files, **kwargs):
0057 """
0058 Download given files using rucio copytool.
0059
0060 :param files: list of `FileSpec` objects
0061 :param ignore_errors: boolean, if specified then transfer failures will be ignored
0062 :raise: PilotException in case of controlled error
0063 """
0064
0065 ignore_errors = kwargs.get('ignore_errors')
0066 trace_report = kwargs.get('trace_report')
0067 use_pcache = kwargs.get('use_pcache')
0068
0069
0070
0071 os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0072
0073 logger.debug('RUCIO_LOCAL_SITE_ID=%s', os.environ.get('RUCIO_LOCAL_SITE_ID', '<unknown>'))
0074 logger.debug('trace_report[localSite]=%s', trace_report.get_value('localSite'))
0075
0076 localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite'))
0077 for fspec in files:
0078 logger.info('rucio copytool, downloading file with scope:%s lfn:%s', str(fspec.scope), str(fspec.lfn))
0079
0080 localsite = localsite if localsite else fspec.ddmendpoint
0081 trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0082 trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0083 trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0084 trace_report.update(url=fspec.turl if fspec.turl else fspec.surl)
0085 trace_report.update(catStart=time())
0086 fspec.status_code = 0
0087 dst = fspec.workdir or kwargs.get('workdir') or '.'
0088 logger.info('the file will be stored in %s' % str(dst))
0089
0090 trace_report_out = []
0091 transfer_timeout = get_timeout(fspec.filesize)
0092 ctimeout = transfer_timeout + 10
0093 logger.info('overall transfer timeout=%s' % ctimeout)
0094
0095 error_msg = ""
0096 ec = 0
0097 try:
0098 ec, trace_report_out = timeout(ctimeout, timer=TimedThread)(_stage_in_api)(dst, fspec, trace_report, trace_report_out, transfer_timeout, use_pcache)
0099
0100 except Exception as error:
0101 error_msg = str(error)
0102 error_details = handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=True)
0103 protocol = get_protocol(trace_report_out)
0104 trace_report.update(protocol=protocol)
0105 if not ignore_errors:
0106 trace_report.send()
0107 msg = ' %s:%s from %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0108 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0109 else:
0110 protocol = get_protocol(trace_report_out)
0111 trace_report.update(protocol=protocol)
0112
0113
0114
0115
0116 state_reason = None if not trace_report_out else trace_report_out[0].get('stateReason')
0117 if ec and state_reason and not error_msg:
0118 error_details = handle_rucio_error(state_reason, trace_report, trace_report_out, fspec, stagein=True)
0119
0120 if not ignore_errors:
0121 trace_report.send()
0122 msg = ' %s:%s from %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0123 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0124
0125
0126 destination = os.path.join(dst, fspec.lfn)
0127 if os.path.exists(destination):
0128 state, diagnostics = verify_catalog_checksum(fspec, destination)
0129 if diagnostics != "" and not ignore_errors:
0130 trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0131 timeEnd=time())
0132 trace_report.send()
0133 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0134 else:
0135 diagnostics = 'file does not exist: %s (cannot verify catalog checksum)' % destination
0136 logger.warning(diagnostics)
0137 state = 'STAGEIN_ATTEMPT_FAILED'
0138 fspec.status_code = ErrorCodes.STAGEINFAILED
0139 trace_report.update(clientState=state, stateReason=diagnostics,
0140 timeEnd=time())
0141 trace_report.send()
0142 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0143
0144 if not fspec.status_code:
0145 fspec.status_code = 0
0146 fspec.status = 'transferred'
0147 trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0148
0149 trace_report.send()
0150
0151 return files
0152
0153
0154 def get_protocol(trace_report_out):
0155 """
0156 Extract the protocol used for the transfer from the dictionary returned by rucio.
0157
0158 :param trace_report_out: returned rucio transfer dictionary (dictionary).
0159 :return: protocol (string).
0160 """
0161
0162 try:
0163 p = trace_report_out[0].get('protocol')
0164 except Exception as error:
0165 logger.warning('exception caught: %s' % error)
0166 p = ''
0167
0168 return p
0169
0170
0171 def handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=True):
0172 """
0173
0174 :param error_msg:
0175 :param trace_report:
0176 :param trace_report_out:
0177 :param fspec:
0178 :return:
0179 """
0180
0181
0182 error_msg_org = error_msg
0183 if trace_report_out:
0184 logger.debug('reading stateReason from trace_report_out: %s' % trace_report_out)
0185 error_msg = trace_report_out[0].get('stateReason', '')
0186 if not error_msg or error_msg == 'OK':
0187 logger.warning('could not extract error message from trace report - reverting to original error message')
0188 error_msg = error_msg_org
0189 else:
0190 logger.debug('no trace_report_out')
0191 logger.info('rucio returned an error: \"%s\"' % error_msg)
0192
0193 error_details = resolve_common_transfer_errors(error_msg, is_stagein=stagein)
0194 fspec.status = 'failed'
0195 fspec.status_code = error_details.get('rcode')
0196
0197 msg = 'STAGEIN_ATTEMPT_FAILED' if stagein else 'STAGEOUT_ATTEMPT_FAILED'
0198 trace_report.update(clientState=error_details.get('state', msg),
0199 stateReason=error_details.get('error'), timeEnd=time())
0200
0201 return error_details
0202
0203
0204 def copy_in_bulk(files, **kwargs):
0205 """
0206 Download given files using rucio copytool.
0207
0208 :param files: list of `FileSpec` objects
0209 :param ignore_errors: boolean, if specified then transfer failures will be ignored
0210 :raise: PilotException in case of controlled error
0211 """
0212
0213
0214 ignore_errors = kwargs.get('ignore_errors')
0215 trace_common_fields = kwargs.get('trace_report')
0216
0217
0218 os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0219
0220 dst = kwargs.get('workdir') or '.'
0221
0222
0223 trace_report_out = []
0224 try:
0225
0226
0227 _stage_in_bulk(dst, files, trace_report_out, trace_common_fields)
0228 except Exception as error:
0229 error_msg = str(error)
0230
0231
0232 if not trace_report_out:
0233 trace_report = deepcopy(trace_common_fields)
0234 localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', None)
0235 diagnostics = 'None of the traces received from Rucio. Response from Rucio: %s' % error_msg
0236 for fspec in files:
0237 localsite = localsite if localsite else fspec.ddmendpoint
0238 trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0239 trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0240 trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0241 trace_report.update('STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics, timeEnd=time())
0242 trace_report.send()
0243 logger.error(diagnostics)
0244 raise PilotException(diagnostics, code=fspec.status_code, state='STAGEIN_ATTEMPT_FAILED')
0245
0246
0247 files_done = []
0248 for fspec in files:
0249
0250
0251
0252 trace_candidates = _get_trace(fspec, trace_report_out)
0253 protocol = get_protocol(trace_report_out)
0254 trace_report.update(protocol=protocol)
0255 trace_report = None
0256 diagnostics = 'unknown'
0257 if len(trace_candidates) == 0:
0258 diagnostics = 'No trace retrieved for given file.'
0259 logger.error('No trace retrieved for given file. %s' % fspec.lfn)
0260 elif len(trace_candidates) != 1:
0261 diagnostics = 'Too many traces for given file.'
0262 logger.error('Rucio returned too many traces for given file. %s' % fspec.lfn)
0263 else:
0264 trace_report = trace_candidates[0]
0265
0266
0267 destination = os.path.join(dst, fspec.lfn)
0268 if os.path.exists(destination):
0269 state, diagnostics = verify_catalog_checksum(fspec, destination)
0270 if diagnostics != "" and not ignore_errors and trace_report:
0271 trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0272 timeEnd=time())
0273 logger.error(diagnostics)
0274 elif trace_report:
0275 diagnostics = 'file does not exist: %s (cannot verify catalog checksum)' % destination
0276 state = 'STAGEIN_ATTEMPT_FAILED'
0277 fspec.status_code = ErrorCodes.STAGEINFAILED
0278 trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time())
0279 logger.error(diagnostics)
0280 else:
0281 fspec.status_code = ErrorCodes.STAGEINFAILED
0282
0283 if not fspec.status_code:
0284 fspec.status_code = 0
0285 fspec.status = 'transferred'
0286 trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0287 files_done.append(fspec)
0288
0289
0290 if not trace_report:
0291 logger.error('An unknown error occurred when handling the traces. %s' % fspec.lfn)
0292 logger.warning('No trace sent!!!')
0293 trace_report.update(guid=fspec.guid.replace('-', ''))
0294 trace_report.send()
0295
0296 if len(files_done) != len(files):
0297 raise PilotException('Not all files downloaded.', code=ErrorCodes.STAGEINFAILED, state='STAGEIN_ATTEMPT_FAILED')
0298
0299 return files_done
0300
0301
0302 def _get_trace(fspec, traces):
0303 """
0304 Traces returned by Rucio are not orderred the same as input files from pilot.
0305 This method finds the proper trace.
0306
0307 :param: fspec: the file that is seeked
0308 :param: traces: all traces that are received by Rucio
0309
0310 :return: trace_candiates that correspond to the given file
0311 """
0312 try:
0313 try:
0314 trace_candidates = list(filter(lambda t: t['filename'] == fspec.lfn and t['scope'] == fspec.scope, traces))
0315 except Exception:
0316 trace_candidates = list([t for t in traces if t['filename'] == fspec.lfn and t['scope'] == fspec.scope])
0317 if trace_candidates:
0318 return trace_candidates
0319 else:
0320 logger.warning('File does not match to any trace received from Rucio: %s %s' % (fspec.lfn, fspec.scope))
0321 except Exception as error:
0322 logger.warning('Traces from pilot and rucio could not be merged: %s' % str(error))
0323 return []
0324
0325
0326
0327 def copy_out(files, **kwargs):
0328 """
0329 Upload given files using rucio copytool.
0330
0331 :param files: list of `FileSpec` objects
0332 :param ignore_errors: boolean, if specified then transfer failures will be ignored
0333 :raise: PilotException in case of controlled error
0334 """
0335
0336
0337 os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0338
0339 summary = kwargs.pop('summary', True)
0340 ignore_errors = kwargs.pop('ignore_errors', False)
0341 trace_report = kwargs.get('trace_report')
0342
0343 localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', None)
0344 for fspec in files:
0345 logger.info('rucio copytool, uploading file with scope: %s and lfn: %s' % (str(fspec.scope), str(fspec.lfn)))
0346 localsite = localsite if localsite else fspec.ddmendpoint
0347 trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint)
0348 trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize)
0349 trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0350 fspec.status_code = 0
0351
0352 summary_file_path = None
0353 cwd = fspec.workdir or kwargs.get('workdir') or '.'
0354 if summary:
0355 summary_file_path = os.path.join(cwd, 'rucio_upload.json')
0356
0357 logger.info('the file will be uploaded to %s' % str(fspec.ddmendpoint))
0358 trace_report_out = []
0359 transfer_timeout = get_timeout(fspec.filesize)
0360 ctimeout = transfer_timeout + 10
0361 logger.info('overall transfer timeout=%s' % ctimeout)
0362
0363 error_msg = ""
0364 ec = 0
0365 try:
0366 ec, trace_report_out = timeout(ctimeout, TimedThread)(_stage_out_api)(fspec, summary_file_path, trace_report, trace_report_out, transfer_timeout)
0367
0368 except PilotException as error:
0369 error_msg = str(error)
0370 error_details = handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=False)
0371 protocol = get_protocol(trace_report_out)
0372 trace_report.update(protocol=protocol)
0373 if not ignore_errors:
0374 trace_report.send()
0375 msg = ' %s:%s to %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0376 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0377 except Exception as error:
0378 error_msg = str(error)
0379 error_details = handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=False)
0380 protocol = get_protocol(trace_report_out)
0381 trace_report.update(protocol=protocol)
0382 if not ignore_errors:
0383 trace_report.send()
0384 msg = ' %s:%s to %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0385 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0386 else:
0387 protocol = get_protocol(trace_report_out)
0388 trace_report.update(protocol=protocol)
0389
0390
0391
0392
0393 state_reason = None if not trace_report_out else trace_report_out[0].get('stateReason')
0394 if ec and state_reason and not error_msg:
0395 error_details = handle_rucio_error(state_reason, trace_report, trace_report_out, fspec, stagein=False)
0396
0397 if not ignore_errors:
0398 trace_report.send()
0399 msg = ' %s:%s from %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0400 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0401
0402 if summary:
0403 if not os.path.exists(summary_file_path):
0404 logger.error('Failed to resolve Rucio summary JSON, wrong path? file=%s' % summary_file_path)
0405 else:
0406 with open(summary_file_path, 'rb') as f:
0407 summary_json = json.load(f)
0408 dat = summary_json.get("%s:%s" % (fspec.scope, fspec.lfn)) or {}
0409 fspec.turl = dat.get('pfn')
0410 logger.debug('set turl=%s' % fspec.turl)
0411
0412
0413 adler32 = dat.get('adler32')
0414 local_checksum = fspec.checksum.get('adler32')
0415 if local_checksum and adler32 and local_checksum != adler32:
0416 msg = 'checksum verification failed: local %s != remote %s' % \
0417 (local_checksum, adler32)
0418 logger.warning(msg)
0419 fspec.status = 'failed'
0420 fspec.status_code = ErrorCodes.PUTADMISMATCH
0421 trace_report.update(clientState='AD_MISMATCH', stateReason=msg, timeEnd=time())
0422 trace_report.send()
0423 if not ignore_errors:
0424 raise PilotException("Failed to stageout: CRC mismatched",
0425 code=ErrorCodes.PUTADMISMATCH, state='AD_MISMATCH')
0426 else:
0427 if local_checksum and adler32 and local_checksum == adler32:
0428 logger.info('local checksum (%s) = remote checksum (%s)' % (local_checksum, adler32))
0429 else:
0430 logger.warning('checksum could not be verified: local checksum (%s), remote checksum (%s)' %
0431 (str(local_checksum), str(adler32)))
0432 if not fspec.status_code:
0433 fspec.status_code = 0
0434 fspec.status = 'transferred'
0435 trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0436
0437 trace_report.send()
0438
0439 return files
0440
0441
0442
0443 def _stage_in_api(dst, fspec, trace_report, trace_report_out, transfer_timeout, use_pcache):
0444
0445 ec = 0
0446
0447
0448 from rucio.client.downloadclient import DownloadClient
0449 download_client = DownloadClient(logger=logger)
0450 if use_pcache:
0451 download_client.check_pcache = True
0452
0453
0454 if hasattr(download_client, 'tracing'):
0455 download_client.tracing = tracing_rucio
0456
0457
0458 f = {}
0459 f['did_scope'] = fspec.scope
0460 f['did_name'] = fspec.lfn
0461 f['did'] = '%s:%s' % (fspec.scope, fspec.lfn)
0462 f['rse'] = fspec.ddmendpoint
0463 f['base_dir'] = dst
0464 f['no_subdir'] = True
0465 if fspec.turl:
0466 f['pfn'] = fspec.turl
0467
0468 if transfer_timeout:
0469 f['transfer_timeout'] = transfer_timeout
0470 f['connection_timeout'] = 60 * 60
0471
0472
0473 logger.info('rucio API stage-in dictionary: %s' % f)
0474 trace_pattern = {}
0475 if trace_report:
0476 trace_pattern = trace_report
0477
0478
0479 try:
0480 logger.info('*** rucio API downloading file (taking over logging) ***')
0481 if fspec.turl:
0482 result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
0483 else:
0484 result = download_client.download_dids([f], trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
0485 except Exception as error:
0486 logger.warning('*** rucio API download client failed ***')
0487 logger.warning('caught exception: %s', error)
0488 logger.debug('trace_report_out=%s', trace_report_out)
0489
0490 if not trace_report_out:
0491 raise error
0492 if not trace_report_out[0].get('stateReason'):
0493 raise error
0494 ec = -1
0495 else:
0496 logger.info('*** rucio API download client finished ***')
0497 logger.debug('client returned %s', result)
0498
0499 logger.debug('trace_report_out=%s', trace_report_out)
0500
0501 return ec, trace_report_out
0502
0503
0504 def _stage_in_bulk(dst, files, trace_report_out=None, trace_common_fields=None):
0505 """
0506 Stage-in files in bulk using the Rucio API.
0507
0508 :param dst: destination (string).
0509 :param files: list of fspec objects.
0510 :param trace_report:
0511 :param trace_report_out:
0512 :return:
0513 """
0514
0515 from rucio.client.downloadclient import DownloadClient
0516 download_client = DownloadClient(logger=logger)
0517
0518
0519 if hasattr(download_client, 'tracing'):
0520 download_client.tracing = tracing_rucio
0521
0522
0523 file_list = []
0524
0525 for fspec in files:
0526 fspec.status_code = 0
0527
0528
0529 f = {}
0530 f['did_scope'] = fspec.scope
0531 f['did_name'] = fspec.lfn
0532 f['did'] = '%s:%s' % (fspec.scope, fspec.lfn)
0533 f['rse'] = fspec.ddmendpoint
0534 f['base_dir'] = fspec.workdir or dst
0535 f['no_subdir'] = True
0536 if fspec.turl:
0537 f['pfn'] = fspec.turl
0538 else:
0539 logger.warning('cannot perform bulk download since fspec.turl is not set (required by download_pfns()')
0540
0541
0542 if fspec.filesize:
0543 f['transfer_timeout'] = get_timeout(fspec.filesize)
0544 f['connection_timeout'] = 60 * 60
0545
0546 file_list.append(f)
0547
0548
0549 trace_pattern = trace_common_fields if trace_common_fields else {}
0550
0551
0552 num_threads = len(file_list)
0553 logger.info('*** rucio API downloading files (taking over logging) ***')
0554 try:
0555 result = download_client.download_pfns(file_list, num_threads, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
0556 except Exception as error:
0557 logger.warning('*** rucio API download client failed ***')
0558 logger.warning('caught exception: %s', error)
0559 logger.debug('trace_report_out=%s', trace_report_out)
0560
0561 if not trace_report_out:
0562 raise error
0563 if not trace_report_out[0].get('stateReason'):
0564 raise error
0565 else:
0566 logger.info('*** rucio API download client finished ***')
0567 logger.debug('client returned %s', result)
0568
0569
0570 def _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out, transfer_timeout):
0571
0572 ec = 0
0573
0574
0575 from rucio.client.uploadclient import UploadClient
0576 upload_client = UploadClient(logger=logger)
0577
0578
0579 if hasattr(upload_client, 'tracing'):
0580 upload_client.tracing = tracing_rucio
0581 if tracing_rucio:
0582 upload_client.trace = trace_report
0583
0584
0585 f = {}
0586 f['path'] = fspec.surl or getattr(fspec, 'pfn', None) or os.path.join(fspec.workdir, fspec.lfn)
0587 f['rse'] = fspec.ddmendpoint
0588 f['did_scope'] = fspec.scope
0589 f['no_register'] = True
0590
0591 if transfer_timeout:
0592 f['transfer_timeout'] = transfer_timeout
0593 f['connection_timeout'] = 60 * 60
0594
0595
0596
0597
0598
0599
0600 if fspec.lfn and '.root' in fspec.lfn:
0601 f['guid'] = fspec.guid
0602
0603 logger.info('rucio API stage-out dictionary: %s' % f)
0604
0605
0606 try:
0607 logger.info('*** rucio API uploading file (taking over logging) ***')
0608 logger.debug('summary_file_path=%s' % summary_file_path)
0609 logger.debug('trace_report_out=%s' % trace_report_out)
0610 result = upload_client.upload([f], summary_file_path=summary_file_path, traces_copy_out=trace_report_out)
0611 except Exception as error:
0612 logger.warning('*** rucio API upload client failed ***')
0613 logger.warning('caught exception: %s', error)
0614 import traceback
0615 logger.error(traceback.format_exc())
0616 logger.debug('trace_report_out=%s', trace_report_out)
0617 if not trace_report_out:
0618 raise error
0619 if not trace_report_out[0].get('stateReason'):
0620 raise error
0621 ec = -1
0622 except UnboundLocalError:
0623 logger.warning('*** rucio API upload client failed ***')
0624 logger.warning('rucio still needs a bug fix of the summary in the uploadclient')
0625 else:
0626 logger.warning('*** rucio API upload client finished ***')
0627 logger.debug('client returned %s', result)
0628
0629 try:
0630 file_exists = verify_stage_out(fspec)
0631 logger.info('file exists at the storage: %s' % str(file_exists))
0632 if not file_exists:
0633 raise StageOutFailure('physical check after upload failed')
0634 except Exception as error:
0635 msg = 'file existence verification failed with: %s' % error
0636 logger.info(msg)
0637 raise StageOutFailure(msg)
0638
0639 return ec, trace_report_out