Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:14

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Mario Lassnig, mario.lassnig@cern.ch, 2017
0009 # - Paul Nilsson, paul.nilsson@cern.ch, 2017-2021
0010 # - Tobias Wegner, tobias.wegner@cern.ch, 2017-2018
0011 # - Alexey Anisenkov, anisyonk@cern.ch, 2018-2019
0012 
0013 # refactored by Alexey Anisenkov
0014 
0015 import os
0016 import hashlib
0017 import logging
0018 import time
0019 
0020 try:
0021     from functools import reduce  # Python 3
0022 #except ModuleNotFoundError:  # Python 3
0023 except Exception:
0024     pass
0025 
0026 from pilot.info import infosys
0027 from pilot.common.exception import PilotException, ErrorCodes, SizeTooLarge, NoLocalSpace, ReplicasNotFound
0028 from pilot.util.auxiliary import show_memory_usage
0029 from pilot.util.config import config
0030 from pilot.util.filehandling import calculate_checksum, write_json
0031 from pilot.util.math import convert_mb_to_b
0032 from pilot.util.parameters import get_maximum_input_sizes
0033 from pilot.util.workernode import get_local_disk_space
0034 from pilot.util.timer import TimeoutException
0035 from pilot.util.tracereport import TraceReport
0036 
0037 
0038 class StagingClient(object):
0039     """
0040         Base Staging Client
0041     """
0042 
0043     mode = ""  # stage-in/out, set by the inheritor of the class
0044     copytool_modules = {'rucio': {'module_name': 'rucio'},
0045                         'gfal': {'module_name': 'gfal'},
0046                         'gfalcopy': {'module_name': 'gfal'},
0047                         'xrdcp': {'module_name': 'xrdcp'},
0048                         'mv': {'module_name': 'mv'},
0049                         'objectstore': {'module_name': 'objectstore'},
0050                         's3': {'module_name': 's3'},
0051                         'gs': {'module_name': 'gs'},
0052                         'lsm': {'module_name': 'lsm'}
0053                         }
0054 
0055     # list of allowed schemas to be used for direct acccess mode from REMOTE replicas
0056     direct_remoteinput_allowed_schemas = ['root', 'https']
0057     # list of schemas to be used for direct acccess mode from LOCAL replicas
0058     direct_localinput_allowed_schemas = ['root', 'dcache', 'dcap', 'file', 'https', 'davs']
0059     # list of allowed schemas to be used for transfers from REMOTE sites
0060     remoteinput_allowed_schemas = ['root', 'gsiftp', 'dcap', 'davs', 'srm', 'storm', 'https']
0061 
0062     def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_copytools='rucio', trace_report=None):
0063         """
0064             If `acopytools` is not specified then it will be automatically resolved via infosys. In this case `infosys` requires initialization.
0065             :param acopytools: dict of copytool names per activity to be used for transfers. Accepts also list of names or string value without activity passed.
0066             :param logger: logging.Logger object to use for logging (None means no logging)
0067             :param default_copytools: copytool name(s) to be used in case of unknown activity passed. Accepts either list of names or single string value.
0068         """
0069 
0070         super(StagingClient, self).__init__()
0071 
0072         if not logger:
0073             logger = logging.getLogger(__name__ + '.null')
0074             logger.disabled = True
0075 
0076         self.logger = logger
0077         self.infosys = infosys_instance or infosys
0078 
0079         try:
0080             if isinstance(acopytools, basestring):  # Python 2  # noqa: F821
0081                 acopytools = {'default': [acopytools]} if acopytools else {}
0082         except Exception:
0083             if isinstance(acopytools, str):  # Python 3
0084                 acopytools = {'default': [acopytools]} if acopytools else {}
0085 
0086         if isinstance(acopytools, (list, tuple)):
0087             acopytools = {'default': acopytools} if acopytools else {}
0088 
0089         self.acopytools = acopytools or {}
0090 
0091         if self.infosys.queuedata:
0092             self.set_acopytools()
0093 
0094         if not self.acopytools.get('default'):
0095             self.acopytools['default'] = self.get_default_copytools(default_copytools)
0096 
0097         # get an initialized trace report (has to be updated for get/put if not defined before)
0098         self.trace_report = trace_report if trace_report else TraceReport(pq=os.environ.get('PILOT_SITENAME', ''))
0099 
0100         if not self.acopytools:
0101             msg = 'failed to initilize StagingClient: no acopytools options found, acopytools=%s' % self.acopytools
0102             logger.error(msg)
0103             self.trace_report.update(clientState='BAD_COPYTOOL', stateReason=msg)
0104             self.trace_report.send()
0105             raise PilotException("failed to resolve acopytools settings")
0106         logger.info('configured copytools per activity: acopytools=%s', self.acopytools)
0107 
0108     def set_acopytools(self):
0109         """
0110         Set the internal acopytools.
0111 
0112         :return:
0113         """
0114         if not self.acopytools:  # resolve from queuedata.acopytools using infosys
0115             self.acopytools = (self.infosys.queuedata.acopytools or {}).copy()
0116         if not self.acopytools:  # resolve from queuedata.copytools using infosys
0117             self.acopytools = dict(default=list((self.infosys.queuedata.copytools or {}).keys()))  # Python 2/3
0118             #self.acopytools = dict(default=(self.infosys.queuedata.copytools or {}).keys())  # Python 2
0119 
0120     @staticmethod
0121     def get_default_copytools(default_copytools):
0122         """
0123         Get the default copytools.
0124 
0125         :param default_copytools:
0126         :return: default copytools (string).
0127         """
0128         try:
0129             if isinstance(default_copytools, basestring):  # Python 2 # noqa: F821
0130                 default_copytools = [default_copytools] if default_copytools else []
0131         except Exception:
0132             if isinstance(default_copytools, str):  # Python 3
0133                 default_copytools = [default_copytools] if default_copytools else []
0134         return default_copytools
0135 
0136     @classmethod
0137     def get_preferred_replica(self, replicas, allowed_schemas):
0138         """
0139             Get preferred replica from the `replicas` list suitable for `allowed_schemas`
0140             :return: first matched replica or None if not found
0141         """
0142 
0143         for replica in replicas:
0144             pfn = replica.get('pfn')
0145             for schema in allowed_schemas:
0146                 if pfn and (not schema or pfn.startswith('%s://' % schema)):
0147                     return replica
0148 
0149     def prepare_sources(self, files, activities=None):
0150         """
0151             Customize/prepare source data for each entry in `files` optionally checking data for requested `activities`
0152             (custom StageClient could extend the logic if need)
0153             :param files: list of `FileSpec` objects to be processed
0154             :param activities: string or ordered list of activities to resolve `astorages` (optional)
0155             :return: None
0156         """
0157 
0158         return
0159 
0160     def prepare_inputddms(self, files, activities=None):
0161         """
0162             Populates filespec.inputddms for each entry from `files` list
0163             :param files: list of `FileSpec` objects
0164             :param activities: sting or ordered list of activities to resolve astorages (optional)
0165             :return: None
0166         """
0167 
0168         activities = activities or 'read_lan'
0169         try:
0170             if isinstance(activities, basestring):  # Python 2  # noqa: F821
0171                 activities = [activities]
0172         except Exception:
0173             if isinstance(activities, str):  # Python 3
0174                 activities = [activities]
0175 
0176         astorages = self.infosys.queuedata.astorages if self.infosys and self.infosys.queuedata else {}
0177 
0178         storages = []
0179         for a in activities:
0180             storages = astorages.get(a, [])
0181             if storages:
0182                 break
0183 
0184         #activity = activities[0]
0185         #if not storages:  ## ignore empty astorages
0186         #    raise PilotException("Failed to resolve input sources: no associated storages defined for activity=%s (%s)"
0187         #                         % (activity, ','.join(activities)), code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED')
0188 
0189         for fdat in files:
0190             if not fdat.inputddms:
0191                 fdat.inputddms = storages
0192             if not fdat.inputddms and fdat.ddmendpoint:
0193                 fdat.inputddms = [fdat.ddmendpoint]
0194 
0195     @classmethod
0196     def sort_replicas(self, replicas, inputddms):
0197         """
0198         Sort input replicas: consider first affected replicas from inputddms
0199         :param replicas: Prioritized list of replicas [(pfn, dat)]
0200         :param inputddms: preferred list of ddmebdpoint
0201         :return: sorted `replicas`
0202         """
0203 
0204         if not inputddms:
0205             return replicas
0206 
0207         # group replicas by ddmendpoint to properly consider priority of inputddms
0208         ddmreplicas = {}
0209         for pfn, xdat in replicas:
0210             ddmreplicas.setdefault(xdat.get('rse'), []).append((pfn, xdat))
0211 
0212         # process LAN first (keep fspec.inputddms priorities)
0213         xreplicas = []
0214         for ddm in inputddms:
0215             xreplicas.extend(ddmreplicas.get(ddm) or [])
0216 
0217         for pfn, xdat in replicas:
0218             if (pfn, xdat) in xreplicas:
0219                 continue
0220             xreplicas.append((pfn, xdat))
0221 
0222         return replicas
0223 
0224     def resolve_replicas(self, files, use_vp=False):
0225         """
0226         Populates filespec.replicas for each entry from `files` list
0227 
0228             fdat.replicas = [{'ddmendpoint':'ddmendpoint', 'pfn':'replica', 'domain':'domain value'}]
0229 
0230         :param files: list of `FileSpec` objects.
0231         :param use_vp: True for VP jobs (boolean).
0232         :return: `files`
0233         """
0234 
0235         logger = self.logger
0236         xfiles = []
0237 
0238         show_memory_usage()
0239 
0240         for fdat in files:
0241             ## skip fdat if need for further workflow (e.g. to properly handle OS ddms)
0242             xfiles.append(fdat)
0243 
0244         show_memory_usage()
0245 
0246         if not xfiles:  # no files for replica look-up
0247             return files
0248 
0249         # load replicas from Rucio
0250         from rucio.client import Client
0251         c = Client()
0252 
0253         show_memory_usage()
0254 
0255         location = self.detect_client_location()
0256         if not location:
0257             raise PilotException("Failed to get client location for Rucio", code=ErrorCodes.RUCIOLOCATIONFAILED)
0258 
0259         query = {
0260             'schemes': ['srm', 'root', 'davs', 'gsiftp', 'https', 'storm'],
0261             'dids': [dict(scope=e.scope, name=e.lfn) for e in xfiles],
0262         }
0263         query.update(sort='geoip', client_location=location)
0264         # reset the schemas for VP jobs
0265         if use_vp:
0266             query['schemes'] = ['root']
0267             query['rse_expression'] = 'istape=False\\type=SPECIAL'
0268 
0269         # add signature lifetime for signed URL storages
0270         query.update(signature_lifetime=24 * 3600)  # note: default is otherwise 1h
0271 
0272         logger.info('calling rucio.list_replicas() with query=%s', query)
0273 
0274         try:
0275             replicas = c.list_replicas(**query)
0276         except Exception as exc:
0277             raise PilotException("Failed to get replicas from Rucio: %s" % exc, code=ErrorCodes.RUCIOLISTREPLICASFAILED)
0278 
0279         show_memory_usage()
0280 
0281         replicas = list(replicas)
0282         logger.debug("replicas received from Rucio: %s", replicas)
0283 
0284         files_lfn = dict(((e.scope, e.lfn), e) for e in xfiles)
0285         for replica in replicas:
0286             k = replica['scope'], replica['name']
0287             fdat = files_lfn.get(k)
0288             if not fdat:  # not requested replica
0289                 continue
0290 
0291             # add the replicas to the fdat structure
0292             fdat = self.add_replicas(fdat, replica)
0293 
0294             # verify filesize and checksum values
0295             self.trace_report.update(validateStart=time.time())
0296             status = True
0297             if fdat.filesize != replica['bytes']:
0298                 logger.warning("Filesize of input file=%s mismatched with value from Rucio replica: filesize=%s, replica.filesize=%s, fdat=%s",
0299                                fdat.lfn, fdat.filesize, replica['bytes'], fdat)
0300                 status = False
0301 
0302             if not fdat.filesize:
0303                 fdat.filesize = replica['bytes']
0304                 logger.warning("Filesize value for input file=%s is not defined, assigning info from Rucio replica: filesize=%s", fdat.lfn, replica['bytes'])
0305 
0306             for ctype in ['adler32', 'md5']:
0307                 if fdat.checksum.get(ctype) != replica[ctype] and replica[ctype]:
0308                     logger.warning("Checksum value of input file=%s mismatched with info got from Rucio replica: checksum=%s, replica.checksum=%s, fdat=%s",
0309                                    fdat.lfn, fdat.checksum, replica[ctype], fdat)
0310                     status = False
0311 
0312                 if not fdat.checksum.get(ctype) and replica[ctype]:
0313                     fdat.checksum[ctype] = replica[ctype]
0314 
0315             if not status:
0316                 logger.info("filesize and checksum verification done")
0317                 self.trace_report.update(clientState="DONE")
0318 
0319         show_memory_usage()
0320 
0321         logger.info('Number of resolved replicas:\n' +
0322                     '\n'.join(["lfn=%s: replicas=%s, is_directaccess=%s"
0323                                % (f.lfn, len(f.replicas or []), f.is_directaccess(ensure_replica=False)) for f in files]))
0324 
0325         return files
0326 
0327     def add_replicas(self, fdat, replica):
0328         """
0329         Add the replicas to the fdat structure.
0330 
0331         :param fdat:
0332         :param replica:
0333         :return: updated fdat.
0334         """
0335 
0336         fdat.replicas = []  # reset replicas list
0337 
0338         # sort replicas by priority value
0339         try:
0340             sorted_replicas = sorted(replica.get('pfns', {}).iteritems(), key=lambda x: x[1]['priority'])  # Python 2
0341         except Exception:
0342             sorted_replicas = sorted(iter(list(replica.get('pfns', {}).items())),
0343                                      key=lambda x: x[1]['priority'])  # Python 3
0344 
0345         # prefer replicas from inputddms first
0346         xreplicas = self.sort_replicas(sorted_replicas, fdat.inputddms)
0347 
0348         for pfn, xdat in xreplicas:
0349 
0350             if xdat.get('type') != 'DISK':  # consider only DISK replicas
0351                 continue
0352 
0353             rinfo = {'pfn': pfn, 'ddmendpoint': xdat.get('rse'), 'domain': xdat.get('domain')}
0354 
0355             ## (TEMPORARY?) consider fspec.inputddms as a primary source for local/lan source list definition
0356             ## backward compartible logic -- FIX ME LATER if NEED
0357             ## in case we should rely on domain value from Rucio, just remove the overwrite line below
0358             rinfo['domain'] = 'lan' if rinfo['ddmendpoint'] in fdat.inputddms else 'wan'
0359 
0360             if not fdat.allow_lan and rinfo['domain'] == 'lan':
0361                 continue
0362             if not fdat.allow_wan and rinfo['domain'] == 'wan':
0363                 continue
0364 
0365             fdat.replicas.append(rinfo)
0366 
0367         if not fdat.replicas:
0368             self.logger.warning('no replicas were selected (verify replica type, allow_lan/wan and domain values)')
0369 
0370         return fdat
0371 
0372     @classmethod
0373     def detect_client_location(self):
0374         """
0375         Open a UDP socket to a machine on the internet, to get the local IPv4 and IPv6
0376         addresses of the requesting client.
0377         Try to determine the sitename automatically from common environment variables,
0378         in this order: SITE_NAME, ATLAS_SITE_NAME, OSG_SITE_NAME. If none of these exist
0379         use the fixed string 'ROAMING'.
0380         """
0381 
0382         ip = '0.0.0.0'
0383         try:
0384             import socket
0385             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
0386             s.connect(("8.8.8.8", 80))
0387             ip = s.getsockname()[0]
0388         except Exception:
0389             pass
0390 
0391         ip6 = '::'
0392         try:
0393             s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
0394             s.connect(("2001:4860:4860:0:0:0:0:8888", 80))
0395             ip6 = s.getsockname()[0]
0396         except Exception:
0397             pass
0398 
0399         site = os.environ.get('PILOT_RUCIO_SITENAME', 'unknown')
0400 #        site = os.environ.get('SITE_NAME',
0401 #                              os.environ.get('ATLAS_SITE_NAME',
0402 #                                             os.environ.get('OSG_SITE_NAME',
0403 #                                                            'ROAMING')))
0404 
0405         return {'ip': ip,
0406                 'ip6': ip6,
0407                 'fqdn': socket.getfqdn(),
0408                 'site': site}
0409 
0410     def transfer_files(self, copytool, files, **kwargs):
0411         """
0412             Apply transfer of given `files` using passed `copytool` module
0413             Should be implemented by custom Staging Client
0414             :param copytool: copytool module
0415             :param files: list of `FileSpec` objects
0416             :param kwargs: extra kwargs to be passed to copytool transfer handler
0417             :raise: PilotException in case of controlled error
0418         """
0419 
0420         raise NotImplementedError()
0421 
0422     def transfer(self, files, activity='default', **kwargs):  # noqa: C901
0423         """
0424             Automatically stage passed files using copy tools related to given `activity`
0425             :param files: list of `FileSpec` objects
0426             :param activity: list of activity names used to determine appropriate copytool (prioritized list)
0427             :param kwargs: extra kwargs to be passed to copytool transfer handler
0428             :raise: PilotException in case of controlled error
0429             :return: list of processed `FileSpec` objects
0430         """
0431 
0432         self.trace_report.update(relativeStart=time.time(), transferStart=time.time())
0433 
0434         try:
0435             if isinstance(activity, basestring):  # Python 2 # noqa: F821
0436                 activity = [activity]
0437         except Exception:
0438             if isinstance(activity, str):  # Python 3
0439                 activity = [activity]
0440         if 'default' not in activity:
0441             activity.append('default')
0442 
0443         copytools = None
0444         for aname in activity:
0445             copytools = self.acopytools.get(aname)
0446             if copytools:
0447                 break
0448 
0449         if not copytools:
0450             raise PilotException('failed to resolve copytool by preferred activities=%s, acopytools=%s' %
0451                                  (activity, self.acopytools))
0452 
0453         # populate inputddms if need
0454         self.prepare_inputddms(files)
0455 
0456         # initialize ddm_activity name for requested files if not set
0457         for fspec in files:
0458             if fspec.ddm_activity:  # skip already initialized data
0459                 continue
0460             if self.mode == 'stage-in':
0461                 if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'raythena':
0462                     fspec.status = 'no_transfer'
0463 
0464                 try:
0465                     fspec.ddm_activity = filter(None, ['read_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'read_wan'])  # Python 2
0466                 except Exception:
0467                     fspec.ddm_activity = [_f for _f in
0468                                           ['read_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'read_wan'] if
0469                                           _f]  # Python 3
0470             else:
0471                 try:
0472                     fspec.ddm_activity = filter(None, ['write_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'write_wan'])  # Python 2
0473                 except Exception:
0474                     fspec.ddm_activity = [_f for _f in
0475                                           ['write_lan' if fspec.ddmendpoint in fspec.inputddms else None, 'write_wan']
0476                                           if _f]  # Python 3
0477         caught_errors = []
0478 
0479         for name in copytools:
0480 
0481             # get remain files that need to be transferred by copytool
0482             remain_files = [e for e in files if e.status not in ['remote_io', 'transferred', 'no_transfer']]
0483 
0484             if not remain_files:
0485                 break
0486 
0487             try:
0488                 if name not in self.copytool_modules:
0489                     raise PilotException('passed unknown copytool with name=%s .. skipped' % name,
0490                                          code=ErrorCodes.UNKNOWNCOPYTOOL)
0491 
0492                 module = self.copytool_modules[name]['module_name']
0493                 self.logger.info('trying to use copytool=%s for activity=%s', name, activity)
0494                 copytool = __import__('pilot.copytool.%s' % module, globals(), locals(), [module], 0)  # Python 2/3
0495                 #self.trace_report.update(protocol=name)
0496 
0497             except PilotException as exc:
0498                 caught_errors.append(exc)
0499                 self.logger.debug('error: %s', exc)
0500                 continue
0501             except Exception as exc:
0502                 self.logger.warning('failed to import copytool module=%s, error=%s', module, exc)
0503                 continue
0504 
0505             try:
0506                 result = self.transfer_files(copytool, remain_files, activity, **kwargs)
0507                 self.logger.debug('transfer_files() using copytool=%s completed with result=%s', copytool, str(result))
0508                 show_memory_usage()
0509                 break
0510             except PilotException as exc:
0511                 self.logger.warning('failed to transfer_files() using copytool=%s .. skipped; error=%s', copytool, exc)
0512                 caught_errors.append(exc)
0513             except TimeoutException as exc:
0514                 self.logger.warning('function timed out: %s', exc)
0515                 caught_errors.append(exc)
0516             except Exception as exc:
0517                 self.logger.warning('failed to transfer files using copytool=%s .. skipped; error=%s', copytool, exc)
0518                 caught_errors.append(exc)
0519                 import traceback
0520                 self.logger.error(traceback.format_exc())
0521 
0522             if caught_errors and isinstance(caught_errors[-1], PilotException) and \
0523                     caught_errors[-1].get_error_code() == ErrorCodes.MISSINGOUTPUTFILE:
0524                 raise caught_errors[-1]
0525 
0526         remain_files = [fspec for fspec in files if fspec.status not in ['remote_io', 'transferred', 'no_transfer']]
0527 
0528         if remain_files:  # failed or incomplete transfer
0529             # propagate message from first error back up
0530             errmsg = str(caught_errors[0]) if caught_errors else ''
0531             if caught_errors and "Cannot authenticate" in str(caught_errors):
0532                 code = ErrorCodes.STAGEINAUTHENTICATIONFAILURE
0533             elif caught_errors and "bad queue configuration" in str(caught_errors):
0534                 code = ErrorCodes.BADQUEUECONFIGURATION
0535             elif caught_errors and isinstance(caught_errors[0], PilotException):
0536                 code = caught_errors[0].get_error_code()
0537                 errmsg = caught_errors[0].get_last_error()
0538             elif caught_errors and isinstance(caught_errors[0], TimeoutException):
0539                 code = ErrorCodes.STAGEINTIMEOUT if self.mode == 'stage-in' else ErrorCodes.STAGEOUTTIMEOUT  # is it stage-in/out?
0540                 self.logger.warning('caught time-out exception: %s', caught_errors[0])
0541             else:
0542                 code = ErrorCodes.STAGEINFAILED if self.mode == 'stage-in' else ErrorCodes.STAGEOUTFAILED  # is it stage-in/out?
0543             details = str(caught_errors) + ":" + 'failed to transfer files using copytools=%s' % copytools
0544             self.logger.fatal(details)
0545             raise PilotException(details, code=code)
0546 
0547         return files
0548 
0549     def require_protocols(self, files, copytool, activity, local_dir=''):
0550         """
0551             Populates fspec.protocols and fspec.turl for each entry in `files` according to preferred fspec.ddm_activity
0552             :param files: list of `FileSpec` objects
0553             :param activity: str or ordered list of transfer activity names to resolve acopytools related data
0554             :return: None
0555         """
0556 
0557         allowed_schemas = getattr(copytool, 'allowed_schemas', None)
0558 
0559         if self.infosys and self.infosys.queuedata:
0560             copytool_name = copytool.__name__.rsplit('.', 1)[-1]
0561             allowed_schemas = self.infosys.queuedata.resolve_allowed_schemas(activity, copytool_name) or allowed_schemas
0562 
0563         if local_dir:
0564             for fdat in files:
0565                 if not local_dir.endswith('/'):
0566                     local_dir += '/'
0567                 fdat.protocols = [{'endpoint': local_dir, 'flavour': '', 'id': 0, 'path': ''}]
0568         else:
0569             files = self.resolve_protocols(files)
0570 
0571         ddmconf = self.infosys.resolve_storage_data()
0572 
0573         for fspec in files:
0574 
0575             protocols = self.resolve_protocol(fspec, allowed_schemas)
0576             if not protocols and 'mv' not in self.infosys.queuedata.copytools:  # no protocols found
0577                 error = 'Failed to resolve protocol for file=%s, allowed_schemas=%s, fspec=%s' % (fspec.lfn, allowed_schemas, fspec)
0578                 self.logger.error("resolve_protocol: %s", error)
0579                 raise PilotException(error, code=ErrorCodes.NOSTORAGEPROTOCOL)
0580 
0581             # take first available protocol for copytool: FIX ME LATER if need (do iterate over all allowed protocols?)
0582             protocol = protocols[0]
0583 
0584             self.logger.info("Resolved protocol to be used for transfer: \'%s\': lfn=\'%s\'", protocol, fspec.lfn)
0585 
0586             resolve_surl = getattr(copytool, 'resolve_surl', None)
0587             if not callable(resolve_surl):
0588                 resolve_surl = self.resolve_surl
0589 
0590             r = resolve_surl(fspec, protocol, ddmconf, local_dir=local_dir)  # pass ddmconf for possible custom look up at the level of copytool
0591             if r.get('surl'):
0592                 fspec.turl = r['surl']
0593 
0594             if r.get('ddmendpoint'):
0595                 fspec.ddmendpoint = r['ddmendpoint']
0596 
0597     def resolve_protocols(self, files):
0598         """
0599             Populates filespec.protocols for each entry from `files` according to preferred `fspec.ddm_activity` value
0600             :param files: list of `FileSpec` objects
0601             fdat.protocols = [dict(endpoint, path, flavour), ..]
0602             :return: `files`
0603         """
0604 
0605         ddmconf = self.infosys.resolve_storage_data()
0606 
0607         for fdat in files:
0608             ddm = ddmconf.get(fdat.ddmendpoint)
0609             if not ddm:
0610                 error = 'Failed to resolve output ddmendpoint by name=%s (from PanDA), please check configuration.' % fdat.ddmendpoint
0611                 self.logger.error("resolve_protocols: %s, fspec=%s", error, fdat)
0612                 raise PilotException(error, code=ErrorCodes.NOSTORAGE)
0613 
0614             protocols = []
0615             for aname in fdat.ddm_activity:
0616                 protocols = ddm.arprotocols.get(aname)
0617                 if protocols:
0618                     break
0619 
0620             fdat.protocols = protocols
0621 
0622         return files
0623 
0624     @classmethod
0625     def resolve_protocol(self, fspec, allowed_schemas=None):
0626         """
0627             Resolve protocols according to allowed schema
0628             :param fspec: `FileSpec` instance
0629             :param allowed_schemas: list of allowed schemas or any if None
0630             :return: list of dict(endpoint, path, flavour)
0631         """
0632 
0633         if not fspec.protocols:
0634             return []
0635 
0636         protocols = []
0637 
0638         allowed_schemas = allowed_schemas or [None]
0639         for schema in allowed_schemas:
0640             for pdat in fspec.protocols:
0641                 if schema is None or pdat.get('endpoint', '').startswith("%s://" % schema):
0642                     protocols.append(pdat)
0643 
0644         return protocols
0645 
0646 
0647 class StageInClient(StagingClient):
0648 
0649     mode = "stage-in"
0650 
0651     def resolve_replica(self, fspec, primary_schemas=None, allowed_schemas=None, domain=None):
0652         """
0653             Resolve input replica (matched by `domain` if need) first according to `primary_schemas`,
0654             if not found then look up within `allowed_schemas`
0655             Primary schemas ignore replica priority (used to resolve direct access replica, which could be not with top priority set)
0656             :param fspec: input `FileSpec` objects
0657             :param allowed_schemas: list of allowed schemas or any if None
0658             :return: dict(surl, ddmendpoint, pfn, domain) or None if replica not found
0659         """
0660 
0661         if not fspec.replicas:
0662             self.logger.warning('resolve_replica() received no fspec.replicas')
0663             return
0664 
0665         allowed_schemas = allowed_schemas or [None]
0666         primary_replica, replica = None, None
0667 
0668         # group by ddmendpoint to look up related surl/srm value
0669         replicas = {}
0670 
0671         for rinfo in fspec.replicas:
0672 
0673             replicas.setdefault(rinfo['ddmendpoint'], []).append(rinfo)
0674 
0675             if rinfo['domain'] != domain:
0676                 continue
0677             if primary_schemas and not primary_replica:  # look up primary schemas if requested
0678                 primary_replica = self.get_preferred_replica([rinfo], primary_schemas)
0679             if not replica:
0680                 replica = self.get_preferred_replica([rinfo], allowed_schemas)
0681 
0682             if replica and primary_replica:
0683                 break
0684 
0685         replica = primary_replica or replica
0686 
0687         if not replica:  # replica not found
0688             schemas = 'any' if not allowed_schemas[0] else ','.join(allowed_schemas)
0689             pschemas = 'any' if primary_schemas and not primary_schemas[0] else ','.join(primary_schemas or [])
0690 
0691             error = 'Failed to find replica for file=%s, domain=%s, allowed_schemas=%s, pschemas=%s, fspec=%s' % (fspec.lfn, domain, schemas, pschemas, fspec)
0692             self.logger.info("resolve_replica: %s", error)
0693             return
0694 
0695         # prefer SRM protocol for surl -- to be verified, can it be deprecated?
0696         rse_replicas = replicas.get(replica['ddmendpoint'], [])
0697         surl = self.get_preferred_replica(rse_replicas, ['srm']) or rse_replicas[0]
0698         self.logger.info("[stage-in] surl (srm replica) from Rucio: pfn=%s, ddmendpoint=%s", surl['pfn'], surl['ddmendpoint'])
0699 
0700         return {'surl': surl['pfn'], 'ddmendpoint': replica['ddmendpoint'], 'pfn': replica['pfn'], 'domain': replica['domain']}
0701 
0702     def get_direct_access_variables(self, job):
0703         """
0704         Return the direct access settings for the PQ.
0705 
0706         :param job: job object.
0707         :return: allow_direct_access (bool), direct_access_type (string).
0708         """
0709 
0710         allow_direct_access, direct_access_type = False, ''
0711         if self.infosys.queuedata:  # infosys is initialized
0712             allow_direct_access = self.infosys.queuedata.direct_access_lan or self.infosys.queuedata.direct_access_wan
0713             if self.infosys.queuedata.direct_access_lan:
0714                 direct_access_type = 'LAN'
0715             if self.infosys.queuedata.direct_access_wan:
0716                 direct_access_type = 'WAN'
0717         else:
0718             self.logger.info('infosys.queuedata is not initialized: direct access mode will be DISABLED by default')
0719 
0720         if job and not job.is_analysis() and job.transfertype != 'direct':  # task forbids direct access
0721             allow_direct_access = False
0722             self.logger.info('switched off direct access mode for production job since transfertype=%s', job.transfertype)
0723 
0724         return allow_direct_access, direct_access_type
0725 
0726     def transfer_files(self, copytool, files, activity=None, **kwargs):  # noqa: C901
0727         """
0728         Automatically stage in files using the selected copy tool module.
0729 
0730         :param copytool: copytool module
0731         :param files: list of `FileSpec` objects
0732         :param kwargs: extra kwargs to be passed to copytool transfer handler
0733 
0734         :return: list of processed `FileSpec` objects
0735         :raise: PilotException in case of controlled error
0736         """
0737 
0738         if getattr(copytool, 'require_replicas', False) and files:
0739             if files[0].replicas is None:  # look up replicas only once
0740                 files = self.resolve_replicas(files, use_vp=kwargs['use_vp'])
0741 
0742             allowed_schemas = getattr(copytool, 'allowed_schemas', None)
0743 
0744             if self.infosys and self.infosys.queuedata:
0745                 copytool_name = copytool.__name__.rsplit('.', 1)[-1]
0746                 allowed_schemas = self.infosys.queuedata.resolve_allowed_schemas(activity, copytool_name) or allowed_schemas
0747 
0748             # overwrite allowed_schemas for VP jobs
0749             if kwargs['use_vp']:
0750                 allowed_schemas = ['root']
0751                 self.logger.debug('overwrote allowed_schemas for VP job: %s', str(allowed_schemas))
0752 
0753             for fspec in files:
0754                 resolve_replica = getattr(copytool, 'resolve_replica', None)
0755                 resolve_replica = self.resolve_replica if not callable(resolve_replica) else resolve_replica
0756 
0757                 replica = None
0758 
0759                 # process direct access logic  ## TODO move to upper level, should not be dependent on copytool (anisyonk)
0760                 # check local replicas first
0761                 if fspec.allow_lan:
0762                     # prepare schemas which will be used to look up first the replicas allowed for direct access mode
0763                     primary_schemas = (self.direct_localinput_allowed_schemas if fspec.direct_access_lan and
0764                                        fspec.is_directaccess(ensure_replica=False) else None)
0765                     replica = resolve_replica(fspec, primary_schemas, allowed_schemas, domain='lan')
0766                 else:
0767                     self.logger.info("[stage-in] LAN access is DISABLED for lfn=%s (fspec.allow_lan=%s)", fspec.lfn, fspec.allow_lan)
0768 
0769                 if not replica and fspec.allow_lan:
0770                     self.logger.info("[stage-in] No LAN replica found for lfn=%s, primary_schemas=%s, allowed_schemas=%s",
0771                                      fspec.lfn, primary_schemas, allowed_schemas)
0772 
0773                 # check remote replicas
0774                 if not replica and fspec.allow_wan:
0775                     # prepare schemas which will be used to look up first the replicas allowed for direct access mode
0776                     primary_schemas = (self.direct_remoteinput_allowed_schemas if fspec.direct_access_wan and
0777                                        fspec.is_directaccess(ensure_replica=False) else None)
0778                     xschemas = self.remoteinput_allowed_schemas
0779                     allowed_schemas = [schema for schema in allowed_schemas if schema in xschemas] if allowed_schemas else xschemas
0780                     replica = resolve_replica(fspec, primary_schemas, allowed_schemas, domain='wan')
0781 
0782                 if not replica and fspec.allow_wan:
0783                     self.logger.info("[stage-in] No WAN replica found for lfn=%s, primary_schemas=%s, allowed_schemas=%s",
0784                                      fspec.lfn, primary_schemas, allowed_schemas)
0785                 if not replica:
0786                     raise ReplicasNotFound('No replica found for lfn=%s (allow_lan=%s, allow_wan=%s)' % (fspec.lfn, fspec.allow_lan, fspec.allow_wan))
0787 
0788                 if replica.get('pfn'):
0789                     fspec.turl = replica['pfn']
0790                 if replica.get('surl'):
0791                     fspec.surl = replica['surl']  # TO BE CLARIFIED if it's still used and need
0792                 if replica.get('ddmendpoint'):
0793                     fspec.ddmendpoint = replica['ddmendpoint']
0794                 if replica.get('domain'):
0795                     fspec.domain = replica['domain']
0796 
0797                 self.logger.info("[stage-in] found replica to be used for lfn=%s: ddmendpoint=%s, pfn=%s", fspec.lfn, fspec.ddmendpoint, fspec.turl)
0798 
0799         # prepare files (resolve protocol/transfer url)
0800         if getattr(copytool, 'require_input_protocols', False) and files:
0801             self.require_protocols(files, copytool, activity, local_dir=kwargs['input_dir'])
0802 
0803         # mark direct access files with status=remote_io
0804         self.set_status_for_direct_access(files, kwargs.get('workdir', ''))
0805 
0806         # get remain files that need to be transferred by copytool
0807         remain_files = [e for e in files if e.status not in ['direct', 'remote_io', 'transferred', 'no_transfer']]
0808 
0809         if not remain_files:
0810             return files
0811 
0812         if not copytool.is_valid_for_copy_in(remain_files):
0813             msg = 'input is not valid for transfers using copytool=%s' % copytool
0814             self.logger.warning(msg)
0815             self.logger.debug('input: %s', remain_files)
0816             self.trace_report.update(clientState='NO_REPLICA', stateReason=msg)
0817             self.trace_report.send()
0818             raise PilotException('invalid input data for transfer operation')
0819 
0820         if self.infosys:
0821             if self.infosys.queuedata:
0822                 kwargs['copytools'] = self.infosys.queuedata.copytools
0823             kwargs['ddmconf'] = self.infosys.resolve_storage_data()
0824         kwargs['activity'] = activity
0825 
0826         # verify file sizes and available space for stage-in
0827         if getattr(copytool, 'check_availablespace', True):
0828             if self.infosys.queuedata.maxinputsize != -1:
0829                 self.check_availablespace(remain_files)
0830             else:
0831                 self.logger.info('skipping input file size check since maxinputsize=-1')
0832 
0833         show_memory_usage()
0834 
0835         # add the trace report
0836         kwargs['trace_report'] = self.trace_report
0837         self.logger.info('ready to transfer (stage-in) files: %s', remain_files)
0838 
0839         # use bulk downloads if necessary
0840         # if kwargs['use_bulk_transfer']
0841         # return copytool.copy_in_bulk(remain_files, **kwargs)
0842         return copytool.copy_in(remain_files, **kwargs)
0843 
0844     def set_status_for_direct_access(self, files, workdir):
0845         """
0846         Update the FileSpec status with 'remote_io' for direct access mode.
0847         Should be called only once since the function sends traces
0848 
0849         :param files: list of FileSpec objects.
0850         :param workdir: work directory (string).
0851         :return: None
0852         """
0853 
0854         for fspec in files:
0855             direct_lan = (fspec.domain == 'lan' and fspec.direct_access_lan and
0856                           fspec.is_directaccess(ensure_replica=True, allowed_replica_schemas=self.direct_localinput_allowed_schemas))
0857             direct_wan = (fspec.domain == 'wan' and fspec.direct_access_wan and
0858                           fspec.is_directaccess(ensure_replica=True, allowed_replica_schemas=self.direct_remoteinput_allowed_schemas))
0859 
0860             # testing direct acess
0861             #if 'CYFRONET' in os.environ.get('PILOT_SITENAME', ''):
0862             #    if '.root.' in fspec.lfn:
0863             #        direct_lan = True
0864 
0865             if not direct_lan and not direct_wan:
0866                 self.logger.debug('direct lan/wan transfer will not be used for lfn=%s', fspec.lfn)
0867             self.logger.debug('lfn=%s, direct_lan=%s, direct_wan=%s, direct_access_lan=%s, direct_access_wan=%s, '
0868                               'direct_localinput_allowed_schemas=%s, remoteinput_allowed_schemas=%s, domain=%s',
0869                               fspec.lfn, direct_lan, direct_wan, fspec.direct_access_lan, fspec.direct_access_wan,
0870                               str(self.direct_localinput_allowed_schemas), str(self.direct_remoteinput_allowed_schemas), fspec.domain)
0871 
0872             if direct_lan or direct_wan:
0873                 fspec.status_code = 0
0874                 fspec.status = 'remote_io'
0875 
0876                 alrb_xcache_proxy = os.environ.get('ALRB_XCACHE_PROXY', None)
0877                 if alrb_xcache_proxy and direct_lan:  #fspec.is_directaccess(ensure_replica=False):
0878                     fspec.turl = '${ALRB_XCACHE_PROXY}' + fspec.turl
0879 
0880                 self.logger.info('stage-in: direct access (remote i/o) will be used for lfn=%s (direct_lan=%s, direct_wan=%s), turl=%s',
0881                                  fspec.lfn, direct_lan, direct_wan, fspec.turl)
0882 
0883                 # send trace
0884                 localsite = os.environ.get('RUCIO_LOCAL_SITE_ID')
0885                 localsite = localsite or fspec.ddmendpoint
0886                 self.trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0887                 self.trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0888                 self.trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0889                 self.trace_report.update(url=fspec.turl, clientState='FOUND_ROOT', stateReason='direct_access')
0890 
0891                 # do not send the trace report at this point if remote file verification is to be done
0892                 # note also that we can't verify the files at this point since root will not be available from inside
0893                 # the rucio container
0894                 if config.Pilot.remotefileverification_log:
0895                     # store the trace report for later use (the trace report class inherits from dict, so just write it as JSON)
0896                     # outside of the container, it will be available in the normal work dir
0897                     # use the normal work dir if we are not in a container
0898                     _workdir = workdir if os.path.exists(workdir) else '.'
0899                     path = os.path.join(_workdir, config.Pilot.base_trace_report)
0900                     if not os.path.exists(_workdir):
0901                         path = os.path.join('/srv', config.Pilot.base_trace_report)
0902                     if not os.path.exists(path):
0903                         self.logger.debug('writing base trace report to: %s', path)
0904                         write_json(path, self.trace_report)
0905                 else:
0906                     self.trace_report.send()
0907 
0908     def check_availablespace(self, files):
0909         """
0910         Verify that enough local space is available to stage in and run the job
0911 
0912         :param files: list of FileSpec objects.
0913         :raise: PilotException in case of not enough space or total input size too large
0914         """
0915 
0916         for f in files:
0917             self.logger.debug('lfn=%s filesize=%d accessmode=%s', f.lfn, f.filesize, f.accessmode)
0918 
0919         maxinputsize = convert_mb_to_b(get_maximum_input_sizes())
0920         totalsize = reduce(lambda x, y: x + y.filesize, files, 0)
0921 
0922         # verify total filesize
0923         if maxinputsize and totalsize > maxinputsize:
0924             error = "too many/too large input files (%s). total file size=%s B > maxinputsize=%s B" % \
0925                     (len(files), totalsize, maxinputsize)
0926             raise SizeTooLarge(error)
0927 
0928         self.logger.info("total input file size=%s B within allowed limit=%s B (zero value means unlimited)", totalsize, maxinputsize)
0929 
0930         # get available space
0931         available_space = convert_mb_to_b(get_local_disk_space(os.getcwd()))
0932         self.logger.info("locally available space: %d B", available_space)
0933 
0934         # are we within the limit?
0935         if totalsize > available_space:
0936             error = "not enough local space for staging input files and run the job (need %d B, but only have %d B)" % \
0937                     (totalsize, available_space)
0938             raise NoLocalSpace(error)
0939 
0940 
0941 class StageOutClient(StagingClient):
0942 
0943     mode = "stage-out"
0944 
0945     def prepare_destinations(self, files, activities):
0946         """
0947             Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`
0948             Apply Pilot-side logic to choose proper destination
0949             :param files: list of FileSpec objects to be processed
0950             :param activities: ordered list of activities to be used to resolve astorages
0951             :return: updated fspec entries
0952         """
0953 
0954         if not self.infosys.queuedata:  # infosys is not initialized: not able to fix destination if need, nothing to do
0955             return files
0956 
0957         try:
0958             if isinstance(activities, (str, unicode)):  # Python 2 # noqa: F821
0959                 activities = [activities]
0960         except Exception:
0961             if isinstance(activities, str):  # Python 3
0962                 activities = [activities]
0963 
0964         if not activities:
0965             raise PilotException("Failed to resolve destination: passed empty activity list. Internal error.",
0966                                  code=ErrorCodes.INTERNALPILOTPROBLEM, state='INTERNAL_ERROR')
0967 
0968         astorages = self.infosys.queuedata.astorages or {}
0969 
0970         storages = None
0971         activity = activities[0]
0972         for a in activities:
0973             storages = astorages.get(a, {})
0974             if storages:
0975                 break
0976 
0977         if not storages:
0978             if 'mv' in self.infosys.queuedata.copytools:
0979                 return files
0980             else:
0981                 raise PilotException("Failed to resolve destination: no associated storages defined for activity=%s (%s)"
0982                                      % (activity, ','.join(activities)), code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED')
0983 
0984         # take the fist choice for now, extend the logic later if need
0985         ddm = storages[0]
0986 
0987         self.logger.info("[prepare_destinations][%s]: allowed (local) destinations: %s", activity, storages)
0988         self.logger.info("[prepare_destinations][%s]: resolved default destination ddm=%s", activity, ddm)
0989 
0990         for e in files:
0991             if not e.ddmendpoint:  # no preferences => use default destination
0992                 self.logger.info("[prepare_destinations][%s]: fspec.ddmendpoint is not set for lfn=%s"
0993                                  " .. will use default ddm=%s as (local) destination", activity, e.lfn, ddm)
0994                 e.ddmendpoint = ddm
0995             elif e.ddmendpoint not in storages:  # fspec.ddmendpoint is not in associated storages => assume it as final (non local) alternative destination
0996                 self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations"
0997                                  " .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
0998                 e.ddmendpoint = ddm
0999                 e.ddmendpoint_alt = e.ddmendpoint  # consider me later
1000 
1001         return files
1002 
1003     @classmethod
1004     def get_path(self, scope, lfn, prefix='rucio'):
1005         """
1006             Construct a partial Rucio PFN using the scope and the LFN
1007         """
1008 
1009         # <prefix=rucio>/<scope>/md5(<scope>:<lfn>)[0:2]/md5(<scope:lfn>)[2:4]/<lfn>
1010 
1011         s = '%s:%s' % (scope, lfn)
1012         hash_hex = hashlib.md5(s.encode('utf-8')).hexdigest()  # Python 2/3
1013 
1014         #paths = [prefix] + scope.split('.') + [hash_hex[0:2], hash_hex[2:4], lfn]
1015         # exclude prefix from the path: this should be properly considered in protocol/AGIS for today
1016         paths = scope.split('.') + [hash_hex[0:2], hash_hex[2:4], lfn]
1017         try:
1018             paths = filter(None, paths)  # remove empty parts to avoid double /-chars, Python 2
1019         except Exception:
1020             paths = [_f for _f in paths if _f]  # remove empty parts to avoid double /-chars, Python 3
1021 
1022         return '/'.join(paths)
1023 
1024     def resolve_surl(self, fspec, protocol, ddmconf, **kwargs):
1025         """
1026             Get final destination SURL for file to be transferred
1027             Can be customized at the level of specific copytool
1028             :param protocol: suggested protocol
1029             :param ddmconf: full ddmconf data
1030             :param activity: ordered list of preferred activity names to resolve SE protocols
1031             :return: dict with keys ('pfn', 'ddmendpoint')
1032         """
1033 
1034         local_dir = kwargs.get('local_dir', '')
1035         if not local_dir:
1036             # consider only deterministic sites (output destination) - unless local input/output
1037             ddm = ddmconf.get(fspec.ddmendpoint)
1038             if not ddm:
1039                 raise PilotException('Failed to resolve ddmendpoint by name=%s' % fspec.ddmendpoint)
1040 
1041             # path = protocol.get('path', '').rstrip('/')
1042             # if not (ddm.is_deterministic or (path and path.endswith('/rucio'))):
1043             if not ddm.is_deterministic:
1044                 raise PilotException('resolve_surl(): Failed to construct SURL for non deterministic ddm=%s: '
1045                                      'NOT IMPLEMENTED' % fspec.ddmendpoint, code=ErrorCodes.NONDETERMINISTICDDM)
1046 
1047         surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), self.get_path(fspec.scope, fspec.lfn))
1048         return {'surl': surl}
1049 
1050     def transfer_files(self, copytool, files, activity, **kwargs):
1051         """
1052             Automatically stage out files using the selected copy tool module.
1053 
1054             :param copytool: copytool module
1055             :param files: list of `FileSpec` objects
1056             :param activity: ordered list of preferred activity names to resolve SE protocols
1057             :param kwargs: extra kwargs to be passed to copytool transfer handler
1058 
1059             :return: the output of the copytool transfer operation
1060             :raise: PilotException in case of controlled error
1061         """
1062 
1063         # check if files exist before actual processing
1064         # populate filesize if need, calc checksum
1065         for fspec in files:
1066 
1067             if not fspec.ddmendpoint:  # ensure that output destination is properly set
1068                 if 'mv' not in self.infosys.queuedata.copytools:
1069                     msg = 'no output RSE defined for file=%s' % fspec.lfn
1070                     self.logger.error(msg)
1071                     raise PilotException(msg, code=ErrorCodes.NOSTORAGE, state='NO_OUTPUTSTORAGE_DEFINED')
1072 
1073             pfn = fspec.surl or getattr(fspec, 'pfn', None) or os.path.join(kwargs.get('workdir', ''), fspec.lfn)
1074             if not os.path.exists(pfn) or not os.access(pfn, os.R_OK):
1075                 msg = "output pfn file/directory does not exist: %s" % pfn
1076                 self.logger.error(msg)
1077                 self.trace_report.update(clientState='MISSINGOUTPUTFILE', stateReason=msg)
1078                 self.trace_report.send()
1079                 raise PilotException(msg, code=ErrorCodes.MISSINGOUTPUTFILE, state="FILE_INFO_FAIL")
1080             if not fspec.filesize:
1081                 fspec.filesize = os.path.getsize(pfn)
1082 
1083             if not fspec.filesize:
1084                 msg = 'output file has size zero: %s' % fspec.lfn
1085                 self.logger.fatal(msg)
1086                 raise PilotException(msg, code=ErrorCodes.ZEROFILESIZE, state="ZERO_FILE_SIZE")
1087 
1088             fspec.surl = pfn
1089             fspec.activity = activity
1090             if os.path.isfile(pfn) and not fspec.checksum.get('adler32'):
1091                 fspec.checksum['adler32'] = calculate_checksum(pfn)
1092 
1093         # prepare files (resolve protocol/transfer url)
1094         if getattr(copytool, 'require_protocols', True) and files:
1095             try:
1096                 output_dir = kwargs['output_dir']
1097             except Exception:
1098                 output_dir = ""
1099             self.require_protocols(files, copytool, activity, local_dir=output_dir)
1100 
1101         if not copytool.is_valid_for_copy_out(files):
1102             self.logger.warning('Input is not valid for transfers using copytool=%s', copytool)
1103             self.logger.debug('Input: %s', files)
1104             raise PilotException('Invalid input for transfer operation')
1105 
1106         self.logger.info('ready to transfer (stage-out) files: %s', files)
1107 
1108         if self.infosys:
1109             kwargs['copytools'] = self.infosys.queuedata.copytools
1110 
1111             # some copytools will need to know endpoint specifics (e.g. the space token) stored in ddmconf, add it
1112             kwargs['ddmconf'] = self.infosys.resolve_storage_data()
1113 
1114         if not files:
1115             msg = 'nothing to stage-out - an internal Pilot error has occurred'
1116             self.logger.fatal(msg)
1117             raise PilotException(msg, code=ErrorCodes.INTERNALPILOTPROBLEM)
1118 
1119         # add the trace report
1120         kwargs['trace_report'] = self.trace_report
1121 
1122         return copytool.copy_out(files, **kwargs)
1123 
1124 #class StageInClientAsync(object):
1125 #
1126 #    def __init__(self, site):
1127 #        raise NotImplementedError
1128 #
1129 #    def queue(self, files):
1130 #        raise NotImplementedError
1131 #
1132 #    def is_transferring(self):
1133 #        raise NotImplementedError
1134 #
1135 #    def start(self):
1136 #        raise NotImplementedError
1137 #
1138 #    def finish(self):
1139 #        raise NotImplementedError
1140 #
1141 #    def status(self):
1142 #        raise NotImplementedError
1143 #
1144 #
1145 #class StageOutClientAsync(object):
1146 #
1147 #    def __init__(self, site):
1148 #        raise NotImplementedError
1149 #
1150 #    def queue(self, files):
1151 #        raise NotImplementedError
1152 #
1153 #    def is_transferring(self):
1154 #        raise NotImplementedError
1155 #
1156 #    def start(self):
1157 #        raise NotImplementedError
1158 #
1159 #    def finish(self):
1160 #        raise NotImplementedError
1161 #
1162 #    def status(self):
1163 #        raise NotImplementedError