Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Pavlo Svirin, pavlo.svirin@cern.ch, 2017
0009 # - Tobias Wegner, tobias.wegner@cern.ch, 2018
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2018-2021
0011 
0012 import os
0013 import logging
0014 import errno
0015 from time import time
0016 
0017 from .common import get_copysetup, verify_catalog_checksum, resolve_common_transfer_errors  #, get_timeout
0018 from pilot.common.exception import StageInFailure, StageOutFailure, PilotException, ErrorCodes
0019 from pilot.util.container import execute
0020 #from pilot.util.timer import timeout
0021 
0022 
0023 logger = logging.getLogger(__name__)
0024 
0025 require_replicas = True  ## indicate if given copytool requires input replicas to be resolved
0026 
0027 allowed_schemas = ['srm', 'gsiftp', 'root']  # prioritized list of supported schemas for transfers by given copytool
0028 
0029 
0030 def is_valid_for_copy_in(files):
0031     return True  ## FIX ME LATER
0032     #for f in files:
0033     #    if not all(key in f for key in ('name', 'source', 'destination')):
0034     #        return False
0035     #return True
0036 
0037 
0038 def is_valid_for_copy_out(files):
0039     #for f in files:
0040     #    if not all(key in f for key in ('name', 'source', 'destination')):
0041     #        return False
0042     return True
0043 
0044 
0045 def copy_in_old(files):
0046     """
0047     Tries to download the given files using lsm-get directly.
0048 
0049     :param files: Files to download
0050     :raises PilotException: StageInFailure
0051     """
0052 
0053     if not check_for_lsm(dst_in=True):
0054         raise StageInFailure("No LSM tools found")
0055     exit_code, stdout, stderr = move_all_files_in(files)
0056     if exit_code != 0:
0057         # raise failure
0058         raise StageInFailure(stdout)
0059 
0060 
0061 def copy_in(files, **kwargs):
0062     """
0063     Download given files using the lsm-get command.
0064 
0065     :param files: list of `FileSpec` objects.
0066     :raise: PilotException in case of controlled error.
0067     :return: files `FileSpec` object.
0068     """
0069 
0070     exit_code = 0
0071     stdout = ""
0072     stderr = ""
0073 
0074     copytools = kwargs.get('copytools') or []
0075     copysetup = get_copysetup(copytools, 'lsm')
0076     trace_report = kwargs.get('trace_report')
0077     #allow_direct_access = kwargs.get('allow_direct_access')
0078 
0079     # note, env vars might be unknown inside middleware contrainers, if so get the value already in the trace report
0080     localsite = os.environ.get('RUCIO_LOCAL_SITE_ID', trace_report.get_value('localSite'))
0081 
0082     for fspec in files:
0083         # update the trace report
0084         localsite = localsite if localsite else fspec.ddmendpoint
0085         trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
0086         trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0087         trace_report.update(scope=fspec.scope, dataset=fspec.dataset)
0088 
0089         # continue loop for files that are to be accessed directly  ## TO BE DEPRECATED (anisyonk)
0090         #if fspec.is_directaccess(ensure_replica=False) and allow_direct_access and fspec.accessmode == 'direct':
0091         #    fspec.status_code = 0
0092         #    fspec.status = 'remote_io'
0093         #    trace_report.update(url=fspec.turl, clientState='FOUND_ROOT', stateReason='direct_access')
0094         #    trace_report.send()
0095         #    continue
0096 
0097         trace_report.update(catStart=time())
0098 
0099         dst = fspec.workdir or kwargs.get('workdir') or '.'
0100         #timeout = get_timeout(fspec.filesize)
0101         source = fspec.turl
0102         destination = os.path.join(dst, fspec.lfn)
0103 
0104         logger.info("transferring file %s from %s to %s", fspec.lfn, source, destination)
0105 
0106         exit_code, stdout, stderr = move(source, destination, dst_in=True, copysetup=copysetup)
0107 
0108         if exit_code != 0:
0109             logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr)
0110 
0111             error = resolve_common_transfer_errors(stderr, is_stagein=True)
0112             fspec.status = 'failed'
0113             fspec.status_code = error.get('rcode')
0114             trace_report.update(clientState=error.get('state') or 'STAGEIN_ATTEMPT_FAILED',
0115                                 stateReason=error.get('error'), timeEnd=time())
0116             trace_report.send()
0117             raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0118 
0119         # verify checksum; compare local checksum with catalog value (fspec.checksum), use same checksum type
0120         state, diagnostics = verify_catalog_checksum(fspec, destination)
0121         if diagnostics != "":
0122             trace_report.update(clientState=state or 'STAGEIN_ATTEMPT_FAILED', stateReason=diagnostics,
0123                                 timeEnd=time())
0124             trace_report.send()
0125             raise PilotException(diagnostics, code=fspec.status_code, state=state)
0126 
0127         fspec.status_code = 0
0128         fspec.status = 'transferred'
0129         trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0130         trace_report.send()
0131 
0132     # for testing kill signals
0133     #import signal
0134     #os.kill(os.getpid(), signal.SIGSEGV)
0135 
0136     return files
0137 
0138 
0139 def copy_out(files, **kwargs):
0140     """
0141     Upload given files using lsm copytool.
0142 
0143     :param files: list of `FileSpec` objects.
0144     :raise: PilotException in case of controlled error.
0145     """
0146 
0147     copytools = kwargs.get('copytools') or []
0148     copysetup = get_copysetup(copytools, 'lsm')
0149     trace_report = kwargs.get('trace_report')
0150     ddmconf = kwargs.get('ddmconf', None)
0151     if not ddmconf:
0152         raise PilotException("copy_out() failed to resolve ddmconf from function arguments",
0153                              code=ErrorCodes.STAGEOUTFAILED,
0154                              state='COPY_ERROR')
0155 
0156     for fspec in files:
0157         trace_report.update(scope=fspec.scope, dataset=fspec.dataset, url=fspec.surl, filesize=fspec.filesize)
0158         trace_report.update(catStart=time(), filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
0159 
0160         # resolve token value from fspec.ddmendpoint
0161         ddm = ddmconf.get(fspec.ddmendpoint)
0162         token = ddm.token
0163         if not token:
0164             diagnostics = "copy_out() failed to resolve token value for ddmendpoint=%s" % (fspec.ddmendpoint)
0165             trace_report.update(clientState='STAGEOUT_ATTEMPT_FAILED',
0166                                 stateReason=diagnostics,
0167                                 timeEnd=time())
0168             trace_report.send()
0169             raise PilotException(diagnostics, code=ErrorCodes.STAGEOUTFAILED, state='COPY_ERROR')
0170 
0171         src = fspec.workdir or kwargs.get('workdir') or '.'
0172         #timeout = get_timeout(fspec.filesize)
0173         source = os.path.join(src, fspec.lfn)
0174         destination = fspec.turl
0175 
0176         # checksum has been calculated in the previous step - transfer_files() in api/data
0177         # note: pilot is handing over checksum to the command - which will/should verify it after the transfer
0178         checksum = "adler32:%s" % fspec.checksum.get('adler32')
0179 
0180         # define the command options
0181         opts = {'--size': fspec.filesize,
0182                 '-t': token,
0183                 '--checksum': checksum,
0184                 '--guid': fspec.guid}
0185         try:
0186             opts = " ".join(["%s %s" % (k, v) for (k, v) in opts.iteritems()])  # Python 2
0187         except Exception:
0188             opts = " ".join(["%s %s" % (k, v) for (k, v) in list(opts.items())])  # Python 3
0189 
0190         logger.info("transferring file %s from %s to %s", fspec.lfn, source, destination)
0191 
0192         nretries = 1  # input parameter to function?
0193         for retry in range(nretries):
0194             exit_code, stdout, stderr = move(source, destination, dst_in=False, copysetup=copysetup, options=opts)
0195 
0196             if exit_code != 0:
0197                 if stderr == "":
0198                     stderr = stdout
0199                 error = resolve_common_transfer_errors(stderr, is_stagein=False)
0200                 fspec.status = 'failed'
0201                 fspec.status_code = error.get('exit_code')
0202                 trace_report.update(clientState=error.get('state', None) or 'STAGEOUT_ATTEMPT_FAILED',
0203                                     stateReason=error.get('error', 'unknown error'),
0204                                     timeEnd=time())
0205                 trace_report.send()
0206                 raise PilotException(error.get('error'), code=error.get('exit_code'), state=error.get('state'))
0207             else:  # all successful
0208                 logger.info('all successful')
0209                 break
0210 
0211         fspec.status_code = 0
0212         fspec.status = 'transferred'
0213         trace_report.update(clientState='DONE', stateReason='OK', timeEnd=time())
0214         trace_report.send()
0215 
0216     return files
0217 
0218 
0219 def copy_out_old(files):
0220     """
0221     Tries to upload the given files using lsm-put directly.
0222 
0223     :param files: Files to upload
0224     :raises PilotException: StageOutFailure
0225     """
0226 
0227     if not check_for_lsm(dst_in=False):
0228         raise StageOutFailure("No LSM tools found")
0229 
0230     exit_code, stdout, stderr = move_all_files_out(files)
0231     if exit_code != 0:
0232         # raise failure
0233         raise StageOutFailure(stdout)
0234 
0235 
0236 def move_all_files_in(files, nretries=1):
0237     """
0238     Move all files.
0239 
0240     :param files:
0241     :param nretries: number of retries; sometimes there can be a timeout copying, but the next attempt may succeed
0242     :return: exit_code, stdout, stderr
0243     """
0244 
0245     exit_code = 0
0246     stdout = ""
0247     stderr = ""
0248 
0249     for entry in files:  # entry = {'name':<filename>, 'source':<dir>, 'destination':<dir>}
0250         logger.info("transferring file %s from %s to %s", entry['name'], entry['source'], entry['destination'])
0251 
0252         source = entry['source'] + '/' + entry['name']
0253         destination = os.path.join(entry['destination'], entry['name'])
0254         for retry in range(nretries):
0255             exit_code, stdout, stderr = move(source, destination, dst_in=True)
0256 
0257             if exit_code != 0:
0258                 if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries:
0259                     logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr)
0260                     return exit_code, stdout, stderr
0261             else:  # all successful
0262                 break
0263 
0264     return exit_code, stdout, stderr
0265 
0266 
0267 def move_all_files_out(files, nretries=1):
0268     """
0269     Move all files.
0270 
0271     :param files:
0272     :return: exit_code, stdout, stderr
0273     """
0274 
0275     exit_code = 0
0276     stdout = ""
0277     stderr = ""
0278 
0279     for entry in files:  # entry = {'name':<filename>, 'source':<dir>, 'destination':<dir>}
0280         logger.info("transferring file %s from %s to %s", entry['name'], entry['source'], entry['destination'])
0281 
0282         destination = entry['destination'] + '/' + entry['name']
0283         source = os.path.join(entry['source'], entry['name'])
0284         for retry in range(nretries):
0285             exit_code, stdout, stderr = move(source, destination, dst_in=False)
0286 
0287             if exit_code != 0:
0288                 if ((exit_code != errno.ETIMEDOUT) and (exit_code != errno.ETIME)) or (retry + 1) == nretries:
0289                     logger.warning("transfer failed: exit code = %d, stdout = %s, stderr = %s", exit_code, stdout, stderr)
0290                     return exit_code, stdout, stderr
0291             else:  # all successful
0292                 break
0293 
0294     return exit_code, stdout, stderr
0295 
0296 
0297 #@timeout(seconds=10800)
0298 def move(source, destination, dst_in=True, copysetup="", options=None):
0299     """
0300     Use lsm-get or lsm-put to transfer the file.
0301 
0302     :param source: path to source (string).
0303     :param destination: path to destination (string).
0304     :param dst_in: True for stage-in, False for stage-out (boolean).
0305     :return: exit code, stdout, stderr
0306     """
0307 
0308     # copysetup = '/osg/mwt2/app/atlas_app/atlaswn/setup.sh'
0309     if copysetup != "":
0310         cmd = 'source %s;' % copysetup
0311     else:
0312         cmd = ''
0313 
0314     args = "%s %s" % (source, destination)
0315     if options:
0316         args = "%s %s" % (options, args)
0317 
0318     if dst_in:
0319         cmd += "lsm-get %s" % args
0320     else:
0321         cmd += "lsm-put %s" % args
0322 
0323     try:
0324         exit_code, stdout, stderr = execute(cmd, usecontainer=False, copytool=True)  #, timeout=get_timeout(fspec.filesize))
0325     except Exception as error:
0326         if dst_in:
0327             exit_code = ErrorCodes.STAGEINFAILED
0328         else:
0329             exit_code = ErrorCodes.STAGEOUTFAILED
0330         stdout = 'exception caught: e' % error
0331         stderr = ''
0332         logger.warning(stdout)
0333 
0334     logger.info('exit_code=%d, stdout=%s, stderr=%s', exit_code, stdout, stderr)
0335     return exit_code, stdout, stderr
0336 
0337 
0338 def check_for_lsm(dst_in=True):
0339     cmd = None
0340     if dst_in:
0341         cmd = 'which lsm-get'
0342     else:
0343         cmd = 'which lsm-put'
0344     exit_code, gfal_path, _ = execute(cmd)
0345     return exit_code == 0