File indexing completed on 2026-04-10 08:39:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 import os
0011 import logging
0012
0013 try:
0014 import boto3
0015 from botocore.exceptions import ClientError
0016 except Exception:
0017 pass
0018
0019 from .common import resolve_common_transfer_errors
0020 from pilot.common.errorcodes import ErrorCodes
0021 from pilot.common.exception import PilotException
0022 from pilot.util.ruciopath import get_rucio_path
0023
0024 logger = logging.getLogger(__name__)
0025 errors = ErrorCodes()
0026
0027 require_replicas = False
0028 require_input_protocols = True
0029 require_protocols = True
0030
0031 allowed_schemas = ['srm', 'gsiftp', 'https', 'davs', 'root', 's3', 's3+rucio']
0032
0033
0034 def is_valid_for_copy_in(files):
0035 return True
0036
0037
0038 def is_valid_for_copy_out(files):
0039 return True
0040
0041
0042 def resolve_surl(fspec, protocol, ddmconf, **kwargs):
0043 """
0044 Get final destination SURL for file to be transferred to Objectstore
0045 Can be customized at the level of specific copytool
0046
0047 :param protocol: suggested protocol
0048 :param ddmconf: full ddm storage data
0049 :param fspec: file spec data
0050 :return: dictionary {'surl': surl}
0051 """
0052 ddm = ddmconf.get(fspec.ddmendpoint)
0053 if not ddm:
0054 raise PilotException('failed to resolve ddmendpoint by name=%s' % fspec.ddmendpoint)
0055
0056 if ddm.is_deterministic:
0057 surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), get_rucio_path(fspec.scope, fspec.lfn))
0058 elif ddm.type in ['OS_ES', 'OS_LOGS']:
0059 surl = protocol.get('endpoint', '') + os.path.join(protocol.get('path', ''), fspec.lfn)
0060 fspec.protocol_id = protocol.get('id')
0061 else:
0062 raise PilotException('resolve_surl(): Failed to construct SURL for non deterministic ddm=%s: NOT IMPLEMENTED', fspec.ddmendpoint)
0063
0064
0065
0066
0067 return {'surl': surl}
0068
0069
0070 def copy_in(files, **kwargs):
0071 """
0072 Download given files from an S3 bucket.
0073
0074 :param files: list of `FileSpec` objects
0075 :raise: PilotException in case of controlled error
0076 """
0077
0078 for fspec in files:
0079
0080 dst = fspec.workdir or kwargs.get('workdir') or '.'
0081
0082 bucket = 'bucket'
0083 path = os.path.join(dst, fspec.lfn)
0084 logger.info('downloading object %s from bucket=%s to local file %s', fspec.lfn, bucket, path)
0085 status, diagnostics = download_file(path, bucket, object_name=fspec.lfn)
0086
0087 if not status:
0088 error = resolve_common_transfer_errors(diagnostics, is_stagein=True)
0089 fspec.status = 'failed'
0090 fspec.status_code = error.get('rcode')
0091 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0092
0093 fspec.status_code = 0
0094 fspec.status = 'transferred'
0095
0096 return files
0097
0098
0099 def download_file(path, bucket, object_name=None):
0100 """
0101 Download a file from an S3 bucket.
0102
0103 :param path: Path to local file after download (string).
0104 :param bucket: Bucket to download from.
0105 :param object_name: S3 object name. If not specified then file_name from path is used.
0106 :return: True if file was uploaded (else False), diagnostics (string).
0107 """
0108
0109
0110 if object_name is None:
0111 object_name = os.path.basename(path)
0112
0113 try:
0114 s3 = boto3.client('s3')
0115 s3.download_file(bucket, object_name, path)
0116 except ClientError as error:
0117 diagnostics = 'S3 ClientError: %s' % error
0118 logger.critical(diagnostics)
0119 return False, diagnostics
0120 except Exception as error:
0121 diagnostics = 'exception caught in s3_client: %s' % error
0122 logger.critical(diagnostics)
0123 return False, diagnostics
0124
0125 return True, ""
0126
0127
0128 def copy_out(files, **kwargs):
0129 """
0130 Upload given files to S3 storage.
0131
0132 :param files: list of `FileSpec` objects
0133 :raise: PilotException in case of controlled error
0134 """
0135
0136 workdir = kwargs.pop('workdir')
0137
0138 for fspec in files:
0139
0140 path = os.path.join(workdir, fspec.lfn)
0141 if os.path.exists(path):
0142 bucket = 'bucket'
0143 logger.info('uploading %s to bucket=%s using object name=%s', path, bucket, fspec.lfn)
0144 status, diagnostics = upload_file(path, bucket, object_name=fspec.lfn)
0145
0146 if not status:
0147
0148 error = resolve_common_transfer_errors(diagnostics, is_stagein=False)
0149 fspec.status = 'failed'
0150 fspec.status_code = error.get('rcode')
0151 raise PilotException(error.get('error'), code=error.get('rcode'), state=error.get('state'))
0152 else:
0153 diagnostics = 'local output file does not exist: %s' % path
0154 logger.warning(diagnostics)
0155 fspec.status = 'failed'
0156 fspec.status_code = errors.STAGEOUTFAILED
0157 raise PilotException(diagnostics, code=fspec.status_code, state=fspec.status)
0158
0159 fspec.status = 'transferred'
0160 fspec.status_code = 0
0161
0162 return files
0163
0164
0165 def upload_file(file_name, bucket, object_name=None):
0166 """
0167 Upload a file to an S3 bucket.
0168
0169 :param file_name: File to upload.
0170 :param bucket: Bucket to upload to.
0171 :param object_name: S3 object name. If not specified then file_name is used.
0172 :return: True if file was uploaded (else False), diagnostics (string).
0173 """
0174
0175
0176 if object_name is None:
0177 object_name = file_name
0178
0179
0180 try:
0181 s3_client = boto3.client('s3')
0182
0183 s3_client.upload_file(file_name, bucket, object_name)
0184 except ClientError as error:
0185 diagnostics = 'S3 ClientError: %s' % error
0186 logger.critical(diagnostics)
0187 return False, diagnostics
0188 except Exception as error:
0189 diagnostics = 'exception caught in s3_client: %s' % error
0190 logger.critical(diagnostics)
0191 return False, diagnostics
0192
0193 return True, ""