Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Pavlo Svirin, pavlo.svirin@cern.ch, 2017
0009 # - Tobias Wegner, tobias.wegner@cern.ch, 2018
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2018
0011 # - Alexey Anisenkov, anisyonk@cern.ch, 2018
0012 
0013 import os
0014 import logging
0015 import errno
0016 from time import time
0017 
0018 from .common import resolve_common_transfer_errors, get_timeout
0019 from pilot.common.exception import PilotException, ErrorCodes, StageInFailure, StageOutFailure
0020 from pilot.util.container import execute
0021 #from pilot.util.timer import timeout
0022 
0023 logger = logging.getLogger(__name__)
0024 
0025 require_replicas = True  ## indicate if given copytool requires input replicas to be resolved
0026 
0027 allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root']  # prioritized list of supported schemas for transfers by given copytool
0028 
0029 
0030 def is_valid_for_copy_in(files):
0031     return True  ## FIX ME LATER
0032     #for f in files:
0033     #    if not all(key in f for key in ('name', 'source', 'destination')):
0034     #        return False
0035     #return True
0036 
0037 
0038 def is_valid_for_copy_out(files):
0039     return True  ## FIX ME LATER
0040     #for f in files:
0041     #    if not all(key in f for key in ('name', 'source', 'destination')):
0042     #        return False
0043     #return True
0044 
0045 
0046 def copy_in(files, **kwargs):
0047     """
0048         Download given files using gfal-copy command.
0049 
0050         :param files: list of `FileSpec` objects
0051         :raise: PilotException in case of controlled error
0052     """
0053 
0054     #allow_direct_access = kwargs.get('allow_direct_access') or False
0055     trace_report = kwargs.get('trace_report')
0056 
0057     if not check_for_gfal():
0058         raise StageInFailure("No GFAL2 tools found")
0059 
0060     # note, env vars might be unknown inside middleware contrainers, if so get the value already in the trace report
0061     localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite'))
0062     for fspec in files:
0063         # update the trace report
0064         localsite = localsite if localsite else fspec.ddmendpoint
0065         trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0066         trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0067         trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0068 
0069         # continue loop for files that are to be accessed directly   ## TO BE DEPRECATED (should be applied at top level) (anisyonk)
0070         #if fspec.is_directaccess(ensure_replica=False) and allow_direct_access and fspec.accessmode == 'direct':
0071         #    fspec.status_code = 0
0072         #    fspec.status = 'remote_io'
0073         #    trace_report.update(url=fspec.turl, clientState='FOUND_ROOT', stateReason='direct_access')
0074         #    trace_report.send()
0075         #    continue
0076 
0077         trace_report.update(catStart=time())
0078 
0079         dst = fspec.workdir or kwargs.get('workdir') or '.'
0080 
0081         timeout = get_timeout(fspec.filesize)
0082         source = fspec.turl
0083         destination = "file://%s" % os.path.abspath(os.path.join(dst, fspec.lfn))
0084 
0085         cmd = ['gfal-copy --verbose -f', ' -t %s' % timeout]
0086 
0087         if fspec.checksum:
0088             cmd += ['-K', '%s:%s' % list(fspec.checksum.items())[0]]  # Python 2/3
0089 
0090         cmd += [source, destination]
0091 
0092         rcode, stdout, stderr = execute(" ".join(cmd), **kwargs)
0093 
0094         if rcode:  ## error occurred
0095             if rcode in [errno.ETIMEDOUT, errno.ETIME]:
0096                 error = {'rcode': ErrorCodes.STAGEINTIMEOUT,
0097                          'state': 'CP_TIMEOUT',
0098                          'error': 'Copy command timed out: %s' % stderr}
0099             else:
0100                 error = resolve_common_transfer_errors(stdout + stderr, is_stagein=True)
0101             fspec.status = 'failed'
0102             fspec.status_code = error.get('rcode')
0103             trace_report.update(clientState=error.get('state') or 'STAGEIN_ATTEMPT_FAILED',
0104                                 stateReason=error.get('error'), timeEnd=time())
0105             trace_report.send()
0106 
0107             raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0108 
0109         fspec.status_code = 0
0110         fspec.status = 'transferred'
0111         trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0112         trace_report.send()
0113 
0114     return files
0115 
0116 
0117 def copy_out(files, **kwargs):
0118     """
0119     Upload given files using gfal command.
0120 
0121     :param files: Files to upload
0122     :raises: PilotException in case of errors
0123     """
0124 
0125     if not check_for_gfal():
0126         raise StageOutFailure("No GFAL2 tools found")
0127 
0128     trace_report = kwargs.get('trace_report')
0129 
0130     for fspec in files:
0131         trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize)
0132         trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0133 
0134         src = fspec.workdir or kwargs.get('workdir') or '.'
0135 
0136         timeout = get_timeout(fspec.filesize)
0137 
0138         source = "file://%s" % os.path.abspath(fspec.surl or os.path.join(src, fspec.lfn))
0139         destination = fspec.turl
0140 
0141         cmd = ['gfal-copy --verbose -f', ' -t %s' % timeout]
0142 
0143         if fspec.checksum:
0144             cmd += ['-K', '%s:%s' % list(fspec.checksum.items())[0]]  # Python 2/3
0145 
0146         cmd += [source, destination]
0147 
0148         rcode, stdout, stderr = execute(" ".join(cmd), **kwargs)
0149 
0150         if rcode:  ## error occurred
0151             if rcode in [errno.ETIMEDOUT, errno.ETIME]:
0152                 error = {'rcode': ErrorCodes.STAGEOUTTIMEOUT,
0153                          'state': 'CP_TIMEOUT',
0154                          'error': 'Copy command timed out: %s' % stderr}
0155             else:
0156                 error = resolve_common_transfer_errors(stdout + stderr, is_stagein=False)
0157             fspec.status = 'failed'
0158             fspec.status_code = error.get('rcode')
0159             trace_report.update(clientState=error.get('state', None) or 'STAGEOUT_ATTEMPT_FAILED',
0160                                 stateReason=error.get('error', 'unknown error'),
0161                                 timeEnd=time())
0162             trace_report.send()
0163             raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0164 
0165         fspec.status_code = 0
0166         fspec.status = 'transferred'
0167         trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0168         trace_report.send()
0169 
0170     return files
0171 
0172 
0173 def move_all_files_in(files, nretries=1):   ### NOT USED -- TO BE DEPRECATED
0174     """
0175     Move all files.
0176 
0177     :param files:
0178     :param nretries: number of retries; sometimes there can be a timeout copying, but the next attempt may succeed
0179     :return: exit_code, stdout, stderr
0180     """
0181 
0182     exit_code = 0
0183     stdout = ""
0184     stderr = ""
0185 
0186     for entry in files:  # entry = {'name':<filename>, 'source':<dir>, 'destination':<dir>}
0187         logger.info("transferring file %s from %s to %s" % (entry['name'], entry['source'], entry['destination']))
0188 
0189         source = entry['source'] + '/' + entry['name']
0190         # why /*4 ? Because sometimes gfal-copy complains about file:// protocol (anyone knows why?)
0191         # with four //// this does not seem to happen
0192         destination = 'file:///' + os.path.join(entry['destination'], entry['name'])
0193         for retry in range(nretries):
0194             exit_code, stdout, stderr = move(source, destination, entry.get('recursive', False))
0195 
0196             if exit_code != 0:
0197                 if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries:
0198                     logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s" % (exit_code, stdout, stderr))
0199                     return exit_code, stdout, stderr
0200             else:  # all successful
0201                 break
0202 
0203     return exit_code, stdout, stderr
0204 
0205 
0206 def move_all_files_out(files, nretries=1):  ### NOT USED -- TO BE DEPRECATED
0207     """
0208     Move all files.
0209 
0210     :param files:
0211     :return: exit_code, stdout, stderr
0212     """
0213 
0214     exit_code = 0
0215     stdout = ""
0216     stderr = ""
0217 
0218     for entry in files:  # entry = {'name':<filename>, 'source':<dir>, 'destination':<dir>}
0219         logger.info("transferring file %s from %s to %s" % (entry['name'], entry['source'], entry['destination']))
0220 
0221         destination = entry['destination'] + '/' + entry['name']
0222         # why /*4 ? Because sometimes gfal-copy complains about file:// protocol (anyone knows why?)
0223         # with four //// this does not seem to happen
0224         source = 'file:///' + os.path.join(entry['source'], entry['name'])
0225         for retry in range(nretries):
0226             exit_code, stdout, stderr = move(source, destination)
0227 
0228             if exit_code != 0:
0229                 if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries:
0230                     logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s" % (exit_code, stdout, stderr))
0231                     return exit_code, stdout, stderr
0232             else:  # all successful
0233                 break
0234 
0235     return exit_code, stdout, stderr
0236 
0237 
0238 #@timeout(seconds=10800)
0239 def move(source, destination, recursive=False):
0240     cmd = None
0241     if recursive:
0242         cmd = "gfal-copy -r %s %s" % (source, destination)
0243     else:
0244         cmd = "gfal-copy %s %s" % (source, destination)
0245     print(cmd)
0246     exit_code, stdout, stderr = execute(cmd)
0247 
0248     return exit_code, stdout, stderr
0249 
0250 
0251 def check_for_gfal():
0252     exit_code, gfal_path, _ = execute('which gfal-copy')
0253     return exit_code == 0