Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Tobias Wegner, tobias.wegner@cern.ch, 2017-2018
0009 # - Alexey Anisenkov, anisyonk@cern.ch, 2018
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
0011 # - Tomas Javurek, tomas.javurek@cern.ch, 2019
0012 # - David Cameron, david.cameron@cern.ch, 2019
0013 
0014 from __future__ import absolute_import  # Python 2 (2to3 complains about this)
0015 
0016 import os
0017 import json
0018 import logging
0019 from time import time
0020 from copy import deepcopy
0021 
0022 from .common import resolve_common_transfer_errors, verify_catalog_checksum, get_timeout
0023 from pilot.common.exception import PilotException, StageOutFailure, ErrorCodes
0024 from pilot.util.timer import timeout, TimedThread
0025 
0026 logger = logging.getLogger(__name__)
0027 logger.setLevel(logging.DEBUG)
0028 
0029 # can be disabled for Rucio if allowed to use all RSE for input
0030 require_replicas = True    ## indicates if given copytool requires input replicas to be resolved
0031 require_protocols = False  ## indicates if given copytool requires protocols to be resolved first for stage-out
0032 tracing_rucio = False      ## should Rucio send the trace?
0033 
0034 
0035 def is_valid_for_copy_in(files):
0036     return True  ## FIX ME LATER
0037 
0038 
0039 def is_valid_for_copy_out(files):
0040     return True  ## FIX ME LATER
0041 
0042 
0043 def verify_stage_out(fspec):
0044     """
0045     Checks that the uploaded file is physically at the destination.
0046     :param fspec: file specifications
0047     """
0048     from rucio.rse import rsemanager as rsemgr
0049     rse_settings = rsemgr.get_rse_info(fspec.ddmendpoint)
0050     uploaded_file = {'name': fspec.lfn, 'scope': fspec.scope}
0051     logger.info('Checking file: %s', str(fspec.lfn))
0052     return rsemgr.exists(rse_settings, [uploaded_file])
0053 
0054 
0055 #@timeout(seconds=10800)
0056 def copy_in(files, **kwargs):
0057     """
0058         Download given files using rucio copytool.
0059 
0060         :param files: list of `FileSpec` objects
0061         :param ignore_errors: boolean, if specified then transfer failures will be ignored
0062         :raise: PilotException in case of controlled error
0063     """
0064 
0065     ignore_errors = kwargs.get('ignore_errors')
0066     trace_report = kwargs.get('trace_report')
0067     use_pcache = kwargs.get('use_pcache')
0068     #job = kwargs.get('job')
0069 
0070     # don't spoil the output, we depend on stderr parsing
0071     os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0072 
0073     logger.debug('RUCIO_LOCAL_SITE_ID=%s', os.environ.get('RUCIO_LOCAL_SITE_ID', '<unknown>'))
0074     logger.debug('trace_report[localSite]=%s', trace_report.get_value('localSite'))
0075     # note, env vars might be unknown inside middleware contrainers, if so get the value already in the trace report
0076     localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite'))
0077     for fspec in files:
0078         logger.info('rucio copytool, downloading file with scope:%s lfn:%s', str(fspec.scope), str(fspec.lfn))
0079         # update the trace report
0080         localsite = localsite if localsite else fspec.ddmendpoint
0081         trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0082         trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0083         trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0084         trace_report.update(url=fspec.turl if fspec.turl else fspec.surl)
0085         trace_report.update(catStart=time())  ## is this metric still needed? LFC catalog
0086         fspec.status_code = 0
0087         dst = fspec.workdir or kwargs.get('workdir') or '.'
0088         logger.info('the file will be stored in %s' % str(dst))
0089 
0090         trace_report_out = []
0091         transfer_timeout = get_timeout(fspec.filesize)
0092         ctimeout = transfer_timeout + 10  # give the API a chance to do the time-out first
0093         logger.info('overall transfer timeout=%s' % ctimeout)
0094 
0095         error_msg = ""
0096         ec = 0
0097         try:
0098             ec, trace_report_out = timeout(ctimeout, timer=TimedThread)(_stage_in_api)(dst, fspec, trace_report, trace_report_out, transfer_timeout, use_pcache)
0099             #_stage_in_api(dst, fspec, trace_report, trace_report_out)
0100         except Exception as error:
0101             error_msg = str(error)
0102             error_details = handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=True)
0103             protocol = get_protocol(trace_report_out)
0104             trace_report.update(protocol=protocol)
0105             if not ignore_errors:
0106                 trace_report.send()
0107                 msg = ' %s:%s from %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0108                 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0109         else:
0110             protocol = get_protocol(trace_report_out)
0111             trace_report.update(protocol=protocol)
0112 
0113         # make sure there was no missed failure (only way to deal with this until rucio API has been fixed)
0114         # (using the timeout decorator prevents the trace_report_out from being updated - rucio API should return
0115         # the proper error immediately instead of encoding it into a dictionary)
0116         state_reason = None if not trace_report_out else trace_report_out[0].get('stateReason')
0117         if ec and state_reason and not error_msg:
0118             error_details = handle_rucio_error(state_reason, trace_report, trace_report_out, fspec, stagein=True)
0119 
0120             if not ignore_errors:
0121                 trace_report.send()
0122                 msg = ' %s:%s from %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0123                 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0124 
0125         # verify checksum; compare local checksum with catalog value (fspec.checksum), use same checksum type
0126         destination = os.path.join(dst, fspec.lfn)
0127         if os.path.exists(destination):
0128             state, diagnostics = verify_catalog_checksum(fspec, destination)
0129             if diagnostics != "" and not ignore_errors:
0130                 trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0131                                     timeEnd=time())
0132                 trace_report.send()
0133                 raise PilotException(diagnostics, code=fspec.status_code, state=state)
0134         else:
0135             diagnostics = 'file does not exist: %s (cannot verify catalog checksum)' % destination
0136             logger.warning(diagnostics)
0137             state = 'STAGEIN_ATTEMPT_FAILED'
0138             fspec.status_code = ErrorCodes.STAGEINFAILED
0139             trace_report.update(clientState=state, stateReason=diagnostics,
0140                                 timeEnd=time())
0141             trace_report.send()
0142             raise PilotException(diagnostics, code=fspec.status_code, state=state)
0143 
0144         if not fspec.status_code:
0145             fspec.status_code = 0
0146             fspec.status = 'transferred'
0147             trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0148 
0149         trace_report.send()
0150 
0151     return files
0152 
0153 
0154 def get_protocol(trace_report_out):
0155     """
0156     Extract the protocol used for the transfer from the dictionary returned by rucio.
0157 
0158     :param trace_report_out: returned rucio transfer dictionary (dictionary).
0159     :return: protocol (string).
0160     """
0161 
0162     try:
0163         p = trace_report_out[0].get('protocol')
0164     except Exception as error:
0165         logger.warning('exception caught: %s' % error)
0166         p = ''
0167 
0168     return p
0169 
0170 
0171 def handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=True):
0172     """
0173 
0174     :param error_msg:
0175     :param trace_report:
0176     :param trace_report_out:
0177     :param fspec:
0178     :return:
0179     """
0180 
0181     # try to get a better error message from the traces
0182     error_msg_org = error_msg
0183     if trace_report_out:
0184         logger.debug('reading stateReason from trace_report_out: %s' % trace_report_out)
0185         error_msg = trace_report_out[0].get('stateReason', '')
0186         if not error_msg or error_msg == 'OK':
0187             logger.warning('could not extract error message from trace report - reverting to original error message')
0188             error_msg = error_msg_org
0189     else:
0190         logger.debug('no trace_report_out')
0191     logger.info('rucio returned an error: \"%s\"' % error_msg)
0192 
0193     error_details = resolve_common_transfer_errors(error_msg, is_stagein=stagein)
0194     fspec.status = 'failed'
0195     fspec.status_code = error_details.get('rcode')
0196 
0197     msg = 'STAGEIN_ATTEMPT_FAILED' if stagein else 'STAGEOUT_ATTEMPT_FAILED'
0198     trace_report.update(clientState=error_details.get('state', msg),
0199                         stateReason=error_details.get('error'), timeEnd=time())
0200 
0201     return error_details
0202 
0203 
0204 def copy_in_bulk(files, **kwargs):
0205     """
0206         Download given files using rucio copytool.
0207 
0208         :param files: list of `FileSpec` objects
0209         :param ignore_errors: boolean, if specified then transfer failures will be ignored
0210         :raise: PilotException in case of controlled error
0211     """
0212 
0213     #allow_direct_access = kwargs.get('allow_direct_access')
0214     ignore_errors = kwargs.get('ignore_errors')
0215     trace_common_fields = kwargs.get('trace_report')
0216 
0217     # don't spoil the output, we depend on stderr parsing
0218     os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0219 
0220     dst = kwargs.get('workdir') or '.'
0221 
0222     # THE DOWNLOAD
0223     trace_report_out = []
0224     try:
0225         # transfer_timeout = get_timeout(fspec.filesize, add=10)  # give the API a chance to do the time-out first
0226         # timeout(transfer_timeout)(_stage_in_api)(dst, fspec, trace_report, trace_report_out)
0227         _stage_in_bulk(dst, files, trace_report_out, trace_common_fields)
0228     except Exception as error:
0229         error_msg = str(error)
0230         # Fill and sned the traces, if they are not received from Rucio, abortion of the download process
0231         # If there was Exception from Rucio, but still some traces returned, we continue to VALIDATION section
0232         if not trace_report_out:
0233             trace_report = deepcopy(trace_common_fields)
0234             localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', None)
0235             diagnostics = 'None of the traces received from Rucio. Response from Rucio: %s' % error_msg
0236             for fspec in files:
0237                 localsite = localsite if localsite else fspec.ddmendpoint
0238                 trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0239                 trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0240                 trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0241                 trace_report.update('STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics, timeEnd=time())
0242                 trace_report.send()
0243             logger.error(diagnostics)
0244             raise PilotException(diagnostics, code=fspec.status_code, state='STAGEIN_ATTEMPT_FAILED')
0245 
0246     # VALIDATION AND TERMINATION
0247     files_done = []
0248     for fspec in files:
0249 
0250         # getting the trace for given file
0251         # if one trace is missing, the whould stagin gets failed
0252         trace_candidates = _get_trace(fspec, trace_report_out)
0253         protocol = get_protocol(trace_report_out)  # note this is probably not correct (using [0])
0254         trace_report.update(protocol=protocol)
0255         trace_report = None
0256         diagnostics = 'unknown'
0257         if len(trace_candidates) == 0:
0258             diagnostics = 'No trace retrieved for given file.'
0259             logger.error('No trace retrieved for given file. %s' % fspec.lfn)
0260         elif len(trace_candidates) != 1:
0261             diagnostics = 'Too many traces for given file.'
0262             logger.error('Rucio returned too many traces for given file. %s' % fspec.lfn)
0263         else:
0264             trace_report = trace_candidates[0]
0265 
0266         # verify checksum; compare local checksum with catalog value (fspec.checksum), use same checksum type
0267         destination = os.path.join(dst, fspec.lfn)
0268         if os.path.exists(destination):
0269             state, diagnostics = verify_catalog_checksum(fspec, destination)
0270             if diagnostics != "" and not ignore_errors and trace_report:  # caution, validation against empty string
0271                 trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0272                                     timeEnd=time())
0273                 logger.error(diagnostics)
0274         elif trace_report:
0275             diagnostics = 'file does not exist: %s (cannot verify catalog checksum)' % destination
0276             state = 'STAGEIN_ATTEMPT_FAILED'
0277             fspec.status_code = ErrorCodes.STAGEINFAILED
0278             trace_report.update(clientState=state, stateReason=diagnostics, timeEnd=time())
0279             logger.error(diagnostics)
0280         else:
0281             fspec.status_code = ErrorCodes.STAGEINFAILED
0282 
0283         if not fspec.status_code:
0284             fspec.status_code = 0
0285             fspec.status = 'transferred'
0286             trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0287             files_done.append(fspec)
0288 
0289         # updating the trace and sending it
0290         if not trace_report:
0291             logger.error('An unknown error occurred when handling the traces. %s' % fspec.lfn)
0292             logger.warning('No trace sent!!!')
0293         trace_report.update(guid=fspec.guid.replace('-', ''))
0294         trace_report.send()
0295 
0296     if len(files_done) != len(files):
0297         raise PilotException('Not all files downloaded.', code=ErrorCodes.STAGEINFAILED, state='STAGEIN_ATTEMPT_FAILED')
0298 
0299     return files_done
0300 
0301 
0302 def _get_trace(fspec, traces):
0303     """
0304     Traces returned by Rucio are not orderred the same as input files from pilot.
0305     This method finds the proper trace.
0306 
0307     :param: fspec: the file that is seeked
0308     :param: traces: all traces that are received by Rucio
0309 
0310     :return: trace_candiates that correspond to the given file
0311     """
0312     try:
0313         try:
0314             trace_candidates = list(filter(lambda t: t['filename'] == fspec.lfn and t['scope'] == fspec.scope, traces))  # Python 2
0315         except Exception:
0316             trace_candidates = list([t for t in traces if t['filename'] == fspec.lfn and t['scope'] == fspec.scope])  # Python 3
0317         if trace_candidates:
0318             return trace_candidates
0319         else:
0320             logger.warning('File does not match to any trace received from Rucio: %s %s' % (fspec.lfn, fspec.scope))
0321     except Exception as error:
0322         logger.warning('Traces from pilot and rucio could not be merged: %s' % str(error))
0323         return []
0324 
0325 
0326 #@timeout(seconds=10800)
0327 def copy_out(files, **kwargs):  # noqa: C901
0328     """
0329         Upload given files using rucio copytool.
0330 
0331         :param files: list of `FileSpec` objects
0332         :param ignore_errors: boolean, if specified then transfer failures will be ignored
0333         :raise: PilotException in case of controlled error
0334     """
0335 
0336     # don't spoil the output, we depend on stderr parsing
0337     os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0338 
0339     summary = kwargs.pop('summary', True)
0340     ignore_errors = kwargs.pop('ignore_errors', False)
0341     trace_report = kwargs.get('trace_report')
0342 
0343     localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', None)
0344     for fspec in files:
0345         logger.info('rucio copytool, uploading file with scope: %s and lfn: %s' % (str(fspec.scope), str(fspec.lfn)))
0346         localsite = localsite if localsite else fspec.ddmendpoint
0347         trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint)
0348         trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize)
0349         trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0350         fspec.status_code = 0
0351 
0352         summary_file_path = None
0353         cwd = fspec.workdir or kwargs.get('workdir') or '.'
0354         if summary:
0355             summary_file_path = os.path.join(cwd, 'rucio_upload.json')
0356 
0357         logger.info('the file will be uploaded to %s' % str(fspec.ddmendpoint))
0358         trace_report_out = []
0359         transfer_timeout = get_timeout(fspec.filesize)
0360         ctimeout = transfer_timeout + 10  # give the API a chance to do the time-out first
0361         logger.info('overall transfer timeout=%s' % ctimeout)
0362 
0363         error_msg = ""
0364         ec = 0
0365         try:
0366             ec, trace_report_out = timeout(ctimeout, TimedThread)(_stage_out_api)(fspec, summary_file_path, trace_report, trace_report_out, transfer_timeout)
0367             #_stage_out_api(fspec, summary_file_path, trace_report, trace_report_out)
0368         except PilotException as error:
0369             error_msg = str(error)
0370             error_details = handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=False)
0371             protocol = get_protocol(trace_report_out)
0372             trace_report.update(protocol=protocol)
0373             if not ignore_errors:
0374                 trace_report.send()
0375                 msg = ' %s:%s to %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0376                 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0377         except Exception as error:
0378             error_msg = str(error)
0379             error_details = handle_rucio_error(error_msg, trace_report, trace_report_out, fspec, stagein=False)
0380             protocol = get_protocol(trace_report_out)
0381             trace_report.update(protocol=protocol)
0382             if not ignore_errors:
0383                 trace_report.send()
0384                 msg = ' %s:%s to %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0385                 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0386         else:
0387             protocol = get_protocol(trace_report_out)
0388             trace_report.update(protocol=protocol)
0389 
0390         # make sure there was no missed failure (only way to deal with this until rucio API has been fixed)
0391         # (using the timeout decorator prevents the trace_report_out from being updated - rucio API should return
0392         # the proper error immediately instead of encoding it into a dictionary)
0393         state_reason = None if not trace_report_out else trace_report_out[0].get('stateReason')
0394         if ec and state_reason and not error_msg:
0395             error_details = handle_rucio_error(state_reason, trace_report, trace_report_out, fspec, stagein=False)
0396 
0397             if not ignore_errors:
0398                 trace_report.send()
0399                 msg = ' %s:%s from %s, %s' % (fspec.scope, fspec.lfn, fspec.ddmendpoint, error_details.get('error'))
0400                 raise PilotException(msg, code=error_details.get('rcode'), state=error_details.get('state'))
0401 
0402         if summary:  # resolve final pfn (turl) from the summary JSON
0403             if not os.path.exists(summary_file_path):
0404                 logger.error('Failed to resolve Rucio summary JSON, wrong path? file=%s' % summary_file_path)
0405             else:
0406                 with open(summary_file_path, 'rb') as f:
0407                     summary_json = json.load(f)
0408                     dat = summary_json.get("%s:%s" % (fspec.scope, fspec.lfn)) or {}
0409                     fspec.turl = dat.get('pfn')
0410                     logger.debug('set turl=%s' % fspec.turl)
0411                     # quick transfer verification:
0412                     # the logic should be unified and moved to base layer shared for all the movers
0413                     adler32 = dat.get('adler32')
0414                     local_checksum = fspec.checksum.get('adler32')
0415                     if local_checksum and adler32 and local_checksum != adler32:
0416                         msg = 'checksum verification failed: local %s != remote %s' % \
0417                               (local_checksum, adler32)
0418                         logger.warning(msg)
0419                         fspec.status = 'failed'
0420                         fspec.status_code = ErrorCodes.PUTADMISMATCH
0421                         trace_report.update(clientState='AD_MISMATCH', stateReason=msg, timeEnd=time())
0422                         trace_report.send()
0423                         if not ignore_errors:
0424                             raise PilotException("Failed to stageout: CRC mismatched",
0425                                                  code=ErrorCodes.PUTADMISMATCH, state='AD_MISMATCH')
0426                     else:
0427                         if local_checksum and adler32 and local_checksum == adler32:
0428                             logger.info('local checksum (%s) = remote checksum (%s)' % (local_checksum, adler32))
0429                         else:
0430                             logger.warning('checksum could not be verified: local checksum (%s), remote checksum (%s)' %
0431                                            (str(local_checksum), str(adler32)))
0432         if not fspec.status_code:
0433             fspec.status_code = 0
0434             fspec.status = 'transferred'
0435             trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0436 
0437         trace_report.send()
0438 
0439     return files
0440 
0441 
0442 # stageIn using rucio api.
0443 def _stage_in_api(dst, fspec, trace_report, trace_report_out, transfer_timeout, use_pcache):
0444 
0445     ec = 0
0446 
0447     # init. download client
0448     from rucio.client.downloadclient import DownloadClient
0449     download_client = DownloadClient(logger=logger)
0450     if use_pcache:
0451         download_client.check_pcache = True
0452 
0453     # traces are switched off
0454     if hasattr(download_client, 'tracing'):
0455         download_client.tracing = tracing_rucio
0456 
0457     # file specifications before the actual download
0458     f = {}
0459     f['did_scope'] = fspec.scope
0460     f['did_name'] = fspec.lfn
0461     f['did'] = '%s:%s' % (fspec.scope, fspec.lfn)
0462     f['rse'] = fspec.ddmendpoint
0463     f['base_dir'] = dst
0464     f['no_subdir'] = True
0465     if fspec.turl:
0466         f['pfn'] = fspec.turl
0467 
0468     if transfer_timeout:
0469         f['transfer_timeout'] = transfer_timeout
0470     f['connection_timeout'] = 60 * 60
0471 
0472     # proceed with the download
0473     logger.info('rucio API stage-in dictionary: %s' % f)
0474     trace_pattern = {}
0475     if trace_report:
0476         trace_pattern = trace_report
0477 
0478     # download client raises an exception if any file failed
0479     try:
0480         logger.info('*** rucio API downloading file (taking over logging) ***')
0481         if fspec.turl:
0482             result = download_client.download_pfns([f], 1, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
0483         else:
0484             result = download_client.download_dids([f], trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
0485     except Exception as error:
0486         logger.warning('*** rucio API download client failed ***')
0487         logger.warning('caught exception: %s', error)
0488         logger.debug('trace_report_out=%s', trace_report_out)
0489         # only raise an exception if the error info cannot be extracted
0490         if not trace_report_out:
0491             raise error
0492         if not trace_report_out[0].get('stateReason'):
0493             raise error
0494         ec = -1
0495     else:
0496         logger.info('*** rucio API download client finished ***')
0497         logger.debug('client returned %s', result)
0498 
0499     logger.debug('trace_report_out=%s', trace_report_out)
0500 
0501     return ec, trace_report_out
0502 
0503 
0504 def _stage_in_bulk(dst, files, trace_report_out=None, trace_common_fields=None):
0505     """
0506     Stage-in files in bulk using the Rucio API.
0507 
0508     :param dst: destination (string).
0509     :param files: list of fspec objects.
0510     :param trace_report:
0511     :param trace_report_out:
0512     :return:
0513     """
0514     # init. download client
0515     from rucio.client.downloadclient import DownloadClient
0516     download_client = DownloadClient(logger=logger)
0517 
0518     # traces are switched off
0519     if hasattr(download_client, 'tracing'):
0520         download_client.tracing = tracing_rucio
0521 
0522     # build the list of file dictionaries before calling the download function
0523     file_list = []
0524 
0525     for fspec in files:
0526         fspec.status_code = 0
0527 
0528         # file specifications before the actual download
0529         f = {}
0530         f['did_scope'] = fspec.scope
0531         f['did_name'] = fspec.lfn
0532         f['did'] = '%s:%s' % (fspec.scope, fspec.lfn)
0533         f['rse'] = fspec.ddmendpoint
0534         f['base_dir'] = fspec.workdir or dst
0535         f['no_subdir'] = True
0536         if fspec.turl:
0537             f['pfn'] = fspec.turl
0538         else:
0539             logger.warning('cannot perform bulk download since fspec.turl is not set (required by download_pfns()')
0540             # fail somehow
0541 
0542         if fspec.filesize:
0543             f['transfer_timeout'] = get_timeout(fspec.filesize)
0544         f['connection_timeout'] = 60 * 60
0545 
0546         file_list.append(f)
0547 
0548     # proceed with the download
0549     trace_pattern = trace_common_fields if trace_common_fields else {}
0550 
0551     # download client raises an exception if any file failed
0552     num_threads = len(file_list)
0553     logger.info('*** rucio API downloading files (taking over logging) ***')
0554     try:
0555         result = download_client.download_pfns(file_list, num_threads, trace_custom_fields=trace_pattern, traces_copy_out=trace_report_out)
0556     except Exception as error:
0557         logger.warning('*** rucio API download client failed ***')
0558         logger.warning('caught exception: %s', error)
0559         logger.debug('trace_report_out=%s', trace_report_out)
0560         # only raise an exception if the error info cannot be extracted
0561         if not trace_report_out:
0562             raise error
0563         if not trace_report_out[0].get('stateReason'):
0564             raise error
0565     else:
0566         logger.info('*** rucio API download client finished ***')
0567         logger.debug('client returned %s', result)
0568 
0569 
0570 def _stage_out_api(fspec, summary_file_path, trace_report, trace_report_out, transfer_timeout):
0571 
0572     ec = 0
0573 
0574     # init. download client
0575     from rucio.client.uploadclient import UploadClient
0576     upload_client = UploadClient(logger=logger)
0577 
0578     # traces are turned off
0579     if hasattr(upload_client, 'tracing'):
0580         upload_client.tracing = tracing_rucio
0581     if tracing_rucio:
0582         upload_client.trace = trace_report
0583 
0584     # file specifications before the upload
0585     f = {}
0586     f['path'] = fspec.surl or getattr(fspec, 'pfn', None) or os.path.join(fspec.workdir, fspec.lfn)
0587     f['rse'] = fspec.ddmendpoint
0588     f['did_scope'] = fspec.scope
0589     f['no_register'] = True
0590 
0591     if transfer_timeout:
0592         f['transfer_timeout'] = transfer_timeout
0593     f['connection_timeout'] = 60 * 60
0594 
0595     # if fspec.storageId and int(fspec.storageId) > 0:
0596     #     if fspec.turl and fspec.is_nondeterministic:
0597     #         f['pfn'] = fspec.turl
0598     # elif fspec.lfn and '.root' in fspec.lfn:
0599     #     f['guid'] = fspec.guid
0600     if fspec.lfn and '.root' in fspec.lfn:
0601         f['guid'] = fspec.guid
0602 
0603     logger.info('rucio API stage-out dictionary: %s' % f)
0604 
0605     # upload client raises an exception if any file failed
0606     try:
0607         logger.info('*** rucio API uploading file (taking over logging) ***')
0608         logger.debug('summary_file_path=%s' % summary_file_path)
0609         logger.debug('trace_report_out=%s' % trace_report_out)
0610         result = upload_client.upload([f], summary_file_path=summary_file_path, traces_copy_out=trace_report_out)
0611     except Exception as error:
0612         logger.warning('*** rucio API upload client failed ***')
0613         logger.warning('caught exception: %s', error)
0614         import traceback
0615         logger.error(traceback.format_exc())
0616         logger.debug('trace_report_out=%s', trace_report_out)
0617         if not trace_report_out:
0618             raise error
0619         if not trace_report_out[0].get('stateReason'):
0620             raise error
0621         ec = -1
0622     except UnboundLocalError:
0623         logger.warning('*** rucio API upload client failed ***')
0624         logger.warning('rucio still needs a bug fix of the summary in the uploadclient')
0625     else:
0626         logger.warning('*** rucio API upload client finished ***')
0627         logger.debug('client returned %s', result)
0628 
0629     try:
0630         file_exists = verify_stage_out(fspec)
0631         logger.info('file exists at the storage: %s' % str(file_exists))
0632         if not file_exists:
0633             raise StageOutFailure('physical check after upload failed')
0634     except Exception as error:
0635         msg = 'file existence verification failed with: %s' % error
0636         logger.info(msg)
0637         raise StageOutFailure(msg)
0638 
0639     return ec, trace_report_out