Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Wen Guan, wen.guan@cern.ch, 2018
0009 # - Alexey Anisenkov, anisyonk@cern.ch, 2019
0010 # - Paul Nilsson, paul.nilsson@cern.ch, 2019-2021
0011 
0012 import os
0013 import json
0014 import logging
0015 
0016 from .common import resolve_common_transfer_errors
0017 from pilot.common.exception import PilotException, ErrorCodes
0018 #from pilot.info.storageactivitymaps import get_ddm_activity
0019 from pilot.util.container import execute
0020 from pilot.util.ruciopath import get_rucio_path
0021 
0022 logger = logging.getLogger(__name__)
0023 
0024 # can be disable for Rucio if allowed to use all RSE for input
0025 require_replicas = False    ## indicates if given copytool requires input replicas to be resolved
0026 
0027 require_input_protocols = True    ## indicates if given copytool requires input protocols and manual generation of input replicas
0028 require_protocols = True  ## indicates if given copytool requires protocols to be resolved first for stage-out
0029 
0030 allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root', 's3', 's3+rucio']
0031 
0032 
0033 def is_valid_for_copy_in(files):
0034     return True  ## FIX ME LATER
0035 
0036 
0037 def is_valid_for_copy_out(files):
0038     return True  ## FIX ME LATER
0039 
0040 
0041 def resolve_surl(fspec, protocol, ddmconf, **kwargs):
0042     """
0043         Get final destination SURL for file to be transferred to Objectstore
0044         Can be customized at the level of specific copytool
0045 
0046         :param protocol: suggested protocol
0047         :param ddmconf: full ddm storage data
0048         :param fspec: file spec data
0049         :return: dictionary {'surl': surl}
0050     """
0051     ddm = ddmconf.get(fspec.ddmendpoint)
0052     if not ddm:
0053         raise PilotException('Failed to resolve ddmendpoint by name=%s' % fspec.ddmendpoint)
0054 
0055     if ddm.is_deterministic:
0056         surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), get_rucio_path(fspec.scope, fspec.lfn))
0057     elif ddm.type in ['OS_ES', 'OS_LOGS']:
0058         surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), fspec.lfn)
0059         fspec.protocol_id = protocol.get('id')
0060     else:
0061         raise PilotException('resolve_surl(): Failed to construct SURL for non deterministic ddm=%s: NOT IMPLEMENTED', fspec.ddmendpoint)
0062 
0063     return {'surl': surl}
0064 
0065 ## redundant logic, can be removed (anisyonk)
0066 #def resolve_protocol(fspec, activity, ddm):
0067 #    """
0068 #        Rosolve protocols to be used to transfer the file with corressponding activity
0069 #
0070 #        :param fspec: file spec data
0071 #        :param activity: actvitiy name as string
0072 #        :param ddm: ddm storage data
0073 #        :return: protocol as dictionary
0074 #    """
0075 #
0076 #    logger.info("Resolving protocol for file(lfn: %s, ddmendpoint: %s) with activity(%s)", fspec.lfn, fspec.ddmendpoint, activity)
0077 #
0078 #    activity = get_ddm_activity(activity)
0079 #    protocols = ddm.arprotocols.get(activity)
0080 #    protocols_allow = []
0081 #    for schema in allowed_schemas:
0082 #        for protocol in protocols:
0083 #            if schema is None or protocol.get('endpoint', '').startswith("%s://" % schema):
0084 #                protocols_allow.append(protocol)
0085 #    if not protocols_allow:
0086 #        err = "No available allowed protocols for file(lfn: %s, ddmendpoint: %s) with activity(%s)" % (fspec.lfn, fspec.ddmendpoint, activity)
0087 #        logger.error(err)
0088 #        raise PilotException(err)
0089 #    protocol = protocols_allow[0]
0090 #    logger.info("Resolved protocol for file(lfn: %s, ddmendpoint: %s) with activity(%s): %s", fspec.lfn, fspec.ddmendpoint, activity, protocol)
0091 #    return protocol
0092 
0093 
0094 def copy_in(files, **kwargs):
0095     """
0096         Download given files using rucio copytool.
0097 
0098         :param files: list of `FileSpec` objects
0099         :raise: PilotException in case of controlled error
0100     """
0101 
0102     # don't spoil the output, we depend on stderr parsing
0103     os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0104 
0105     ddmconf = kwargs.pop('ddmconf', {})
0106     #activity = kwargs.pop('activity', None)
0107     # trace_report = kwargs.get('trace_report')
0108 
0109     for fspec in files:
0110 
0111         cmd = []
0112         logger.info("To transfer file: %s", fspec)
0113         if fspec.protocol_id:
0114             ddm = ddmconf.get(fspec.ddmendpoint)
0115             if ddm:
0116                 ddm_special_setup = ddm.get_special_setup(fspec.protocol_id)
0117                 if ddm_special_setup:
0118                     cmd = [ddm_special_setup]
0119 
0120         # redundant logic: to be cleaned (anisyonk)
0121         #ddm = ddmconf.get(fspec.ddmendpoint)
0122         #if ddm:
0123         #    protocol = resolve_protocol(fspec, activity, ddm)
0124         #    surls = resolve_surl(fspec, protocol, ddmconf)
0125         #    if 'surl' in surls:
0126         #        fspec.surl = surls['surl']
0127         #    ddm_special_setup = ddm.get_special_setup(fspec.protocol_id)
0128         #    if ddm_special_setup:
0129         #        cmd += [ddm_special_setup]
0130 
0131         dst = fspec.workdir or kwargs.get('workdir') or '.'
0132         cmd += ['/usr/bin/env', 'rucio', '-v', 'download', '--no-subdir', '--dir', dst]
0133         if require_replicas:
0134             cmd += ['--rse', fspec.replicas[0]['ddmendpoint']]
0135 
0136         # a copytool module should consider fspec.turl for transfers, and could failback to fspec.surl,
0137         # but normally fspec.turl (transfer url) is mandatory and already populated by the top workflow
0138         turl = fspec.turl or fspec.surl
0139         if turl:
0140             if fspec.ddmendpoint:
0141                 cmd.extend(['--rse', fspec.ddmendpoint])
0142             cmd.extend(['--pfn', turl])
0143         cmd += ['%s:%s' % (fspec.scope, fspec.lfn)]
0144 
0145         rcode, stdout, stderr = execute(" ".join(cmd), **kwargs)
0146 
0147         if rcode:  ## error occurred
0148             error = resolve_common_transfer_errors(stderr, is_stagein=True)
0149             fspec.status = 'failed'
0150             fspec.status_code = error.get('rcode')
0151             raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0152 
0153         fspec.status_code = 0
0154         fspec.status = 'transferred'
0155 
0156     return files
0157 
0158 
0159 def copy_out(files, **kwargs):
0160     """
0161         Upload given files using rucio copytool.
0162 
0163         :param files: list of `FileSpec` objects
0164         :raise: PilotException in case of controlled error
0165     """
0166 
0167     # don't spoil the output, we depend on stderr parsing
0168     os.environ['RUCIO_LOGGING_FORMAT'] = '%(asctime)s %(levelname)s [%(message)s]'
0169 
0170     no_register = kwargs.pop('no_register', True)
0171     summary = kwargs.pop('summary', False)
0172     ddmconf = kwargs.pop('ddmconf', {})
0173     # trace_report = kwargs.get('trace_report')
0174 
0175     for fspec in files:
0176         cmd = []
0177         if fspec.protocol_id:
0178             ddm = ddmconf.get(fspec.ddmendpoint)
0179             if ddm:
0180                 ddm_special_setup = ddm.get_special_setup(fspec.protocol_id)
0181                 if ddm_special_setup:
0182                     cmd = [ddm_special_setup]
0183 
0184         cmd += ['/usr/bin/env', 'rucio', '-v', 'upload']
0185         cmd += ['--rse', fspec.ddmendpoint]
0186 
0187         if fspec.scope:
0188             cmd.extend(['--scope', fspec.scope])
0189         if fspec.guid:
0190             cmd.extend(['--guid', fspec.guid])
0191 
0192         if no_register:
0193             cmd.append('--no-register')
0194 
0195         if summary:
0196             cmd.append('--summary')
0197 
0198         if fspec.turl:
0199             cmd.extend(['--pfn', fspec.turl])
0200 
0201         cmd += [fspec.surl]
0202 
0203         rcode, stdout, stderr = execute(" ".join(cmd), **kwargs)
0204 
0205         if rcode:  ## error occurred
0206             error = resolve_common_transfer_errors(stderr, is_stagein=False)
0207             fspec.status = 'failed'
0208             fspec.status_code = error.get('rcode')
0209             raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0210 
0211         if summary:  # resolve final pfn (turl) from the summary JSON
0212             cwd = fspec.workdir or kwargs.get('workdir') or '.'
0213             path = os.path.join(cwd, 'rucio_upload.json')
0214             if not os.path.exists(path):
0215                 logger.error('Failed to resolve Rucio summary JSON, wrong path? file=%s', path)
0216             else:
0217                 with open(path, 'rb') as f:
0218                     summary = json.load(f)
0219                     dat = summary.get("%s:%s" % (fspec.scope, fspec.lfn)) or {}
0220                     fspec.turl = dat.get('pfn')
0221                     # quick transfer verification:
0222                     # the logic should be unified and moved to base layer shared for all the movers
0223                     adler32 = dat.get('adler32')
0224                     if fspec.checksum.get('adler32') and adler32 and fspec.checksum.get('adler32') != adler32:
0225                         raise PilotException("Failed to stageout: CRC mismatched", code=ErrorCodes.PUTADMISMATCH, state='AD_MISMATCH')
0226 
0227         fspec.status_code = 0
0228         fspec.status = 'transferred'
0229 
0230     return files