Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:15

0001 #!/usr/bin/env python
0002 # Licensed under the Apache License, Version 2.0 (the "License");
0003 # you may not use this file except in compliance with the License.
0004 # You may obtain a copy of the License at
0005 # http://www.apache.org/licenses/LICENSE-2.0
0006 #
0007 # Authors:
0008 # - Paul Nilsson, paul.nilsson@cern.ch, 2021
0009 
0010 import os
0011 import logging
0012 
0013 try:
0014     import boto3
0015     from botocore.exceptions import ClientError
0016 except Exception:
0017     pass
0018 
0019 from .common import resolve_common_transfer_errors
0020 from pilot.common.errorcodes import ErrorCodes
0021 from pilot.common.exception import PilotException
0022 from pilot.util.ruciopath import get_rucio_path
0023 
0024 logger = logging.getLogger(__name__)
0025 errors = ErrorCodes()
0026 
0027 require_replicas = False    ## indicates if given copytool requires input replicas to be resolved
0028 require_input_protocols = True    ## indicates if given copytool requires input protocols and manual generation of input replicas
0029 require_protocols = True  ## indicates if given copytool requires protocols to be resolved first for stage-out
0030 
0031 allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root', 's3', 's3+rucio']
0032 
0033 
0034 def is_valid_for_copy_in(files):
0035     return True  ## FIX ME LATER
0036 
0037 
0038 def is_valid_for_copy_out(files):
0039     return True  ## FIX ME LATER
0040 
0041 
0042 def resolve_surl(fspec, protocol, ddmconf, **kwargs):
0043     """
0044         Get final destination SURL for file to be transferred to Objectstore
0045         Can be customized at the level of specific copytool
0046 
0047         :param protocol: suggested protocol
0048         :param ddmconf: full ddm storage data
0049         :param fspec: file spec data
0050         :return: dictionary {'surl': surl}
0051     """
0052     ddm = ddmconf.get(fspec.ddmendpoint)
0053     if not ddm:
0054         raise PilotException('failed to resolve ddmendpoint by name=%s' % fspec.ddmendpoint)
0055 
0056     if ddm.is_deterministic:
0057         surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), get_rucio_path(fspec.scope, fspec.lfn))
0058     elif ddm.type in ['OS_ES', 'OS_LOGS']:
0059         surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), fspec.lfn)
0060         fspec.protocol_id = protocol.get('id')
0061     else:
0062         raise PilotException('resolve_surl(): Failed to construct SURL for non deterministic ddm=%s: NOT IMPLEMENTED', fspec.ddmendpoint)
0063 
0064     # example:
0065     #   protocol = {u'path': u'/atlas-eventservice', u'endpoint': u's3://s3.cern.ch:443/', u'flavour': u'AWS-S3-SSL', u'id': 175}
0066     #   surl = 's3://s3.cern.ch:443//atlas-eventservice/EventService_premerge_24706191-5013009653-24039149400-322-5.tar'
0067     return {'surl': surl}
0068 
0069 
0070 def copy_in(files, **kwargs):
0071     """
0072     Download given files from an S3 bucket.
0073 
0074     :param files: list of `FileSpec` objects
0075     :raise: PilotException in case of controlled error
0076     """
0077 
0078     for fspec in files:
0079 
0080         dst = fspec.workdir or kwargs.get('workdir') or '.'
0081 
0082         bucket = 'bucket'  # UPDATE ME
0083         path = os.path.join(dst, fspec.lfn)
0084         logger.info('downloading object %s from bucket=%s to local file %s', fspec.lfn, bucket, path)
0085         status, diagnostics = download_file(path, bucket, object_name=fspec.lfn)
0086 
0087         if not status:  ## an error occurred
0088             error = resolve_common_transfer_errors(diagnostics, is_stagein=True)
0089             fspec.status = 'failed'
0090             fspec.status_code = error.get('rcode')
0091             raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0092 
0093         fspec.status_code = 0
0094         fspec.status = 'transferred'
0095 
0096     return files
0097 
0098 
0099 def download_file(path, bucket, object_name=None):
0100     """
0101     Download a file from an S3 bucket.
0102 
0103     :param path: Path to local file after download (string).
0104     :param bucket: Bucket to download from.
0105     :param object_name: S3 object name. If not specified then file_name from path is used.
0106     :return: True if file was uploaded (else False), diagnostics (string).
0107     """
0108 
0109     # if S3 object_name was not specified, use file name from path
0110     if object_name is None:
0111         object_name = os.path.basename(path)
0112 
0113     try:
0114         s3 = boto3.client('s3')
0115         s3.download_file(bucket, object_name, path)
0116     except ClientError as error:
0117         diagnostics = 'S3 ClientError: %s' % error
0118         logger.critical(diagnostics)
0119         return False, diagnostics
0120     except Exception as error:
0121         diagnostics = 'exception caught in s3_client: %s' % error
0122         logger.critical(diagnostics)
0123         return False, diagnostics
0124 
0125     return True, ""
0126 
0127 
0128 def copy_out(files, **kwargs):
0129     """
0130     Upload given files to S3 storage.
0131 
0132     :param files: list of `FileSpec` objects
0133     :raise: PilotException in case of controlled error
0134     """
0135 
0136     workdir = kwargs.pop('workdir')
0137 
0138     for fspec in files:
0139 
0140         path = os.path.join(workdir, fspec.lfn)
0141         if os.path.exists(path):
0142             bucket = 'bucket'  # UPDATE ME
0143             logger.info('uploading %s to bucket=%s using object name=%s', path, bucket, fspec.lfn)
0144             status, diagnostics = upload_file(path, bucket, object_name=fspec.lfn)
0145 
0146             if not status:  ## an error occurred
0147                 # create new error code(s) in ErrorCodes.py and set it/them in resolve_common_transfer_errors()
0148                 error = resolve_common_transfer_errors(diagnostics, is_stagein=False)
0149                 fspec.status = 'failed'
0150                 fspec.status_code = error.get('rcode')
0151                 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0152         else:
0153             diagnostics = 'local output file does not exist: %s' % path
0154             logger.warning(diagnostics)
0155             fspec.status = 'failed'
0156             fspec.status_code = errors.STAGEOUTFAILED
0157             raise PilotException(diagnostics, code=fspec.status_code, state=fspec.status)
0158 
0159         fspec.status = 'transferred'
0160         fspec.status_code = 0
0161 
0162     return files
0163 
0164 
0165 def upload_file(file_name, bucket, object_name=None):
0166     """
0167     Upload a file to an S3 bucket.
0168 
0169     :param file_name: File to upload.
0170     :param bucket: Bucket to upload to.
0171     :param object_name: S3 object name. If not specified then file_name is used.
0172     :return: True if file was uploaded (else False), diagnostics (string).
0173     """
0174 
0175     # if S3 object_name was not specified, use file_name
0176     if object_name is None:
0177         object_name = file_name
0178 
0179     # upload the file
0180     try:
0181         s3_client = boto3.client('s3')
0182         #response = s3_client.upload_file(file_name, bucket, object_name)
0183         s3_client.upload_file(file_name, bucket, object_name)
0184     except ClientError as error:
0185         diagnostics = 'S3 ClientError: %s' % error
0186         logger.critical(diagnostics)
0187         return False, diagnostics
0188     except Exception as error:
0189         diagnostics = 'exception caught in s3_client: %s' % error
0190         logger.critical(diagnostics)
0191         return False, diagnostics
0192 
0193     return True, ""