![]() |
|
|||
File indexing completed on 2025-08-28 08:26:59
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <memory> 0021 #include <string> 0022 #include <vector> 0023 0024 #include "arrow/filesystem/filesystem.h" 0025 #include "arrow/util/macros.h" 0026 #include "arrow/util/uri.h" 0027 0028 namespace Aws { 0029 namespace Auth { 0030 0031 class AWSCredentialsProvider; 0032 class STSAssumeRoleCredentialsProvider; 0033 0034 } // namespace Auth 0035 namespace STS { 0036 class STSClient; 0037 } 0038 } // namespace Aws 0039 0040 namespace arrow { 0041 namespace fs { 0042 0043 /// Options for using a proxy for S3 0044 struct ARROW_EXPORT S3ProxyOptions { 0045 std::string scheme; 0046 std::string host; 0047 int port = -1; 0048 std::string username; 0049 std::string password; 0050 0051 /// Initialize from URI such as http://username:password@host:port 0052 /// or http://host:port 0053 static Result<S3ProxyOptions> FromUri(const std::string& uri); 0054 static Result<S3ProxyOptions> FromUri(const ::arrow::util::Uri& uri); 0055 0056 bool Equals(const S3ProxyOptions& other) const; 0057 }; 0058 0059 enum class S3CredentialsKind : int8_t { 0060 /// Anonymous access (no credentials used) 0061 Anonymous, 0062 /// Use default AWS credentials, configured through environment variables 0063 Default, 0064 /// Use explicitly-provided access key pair 0065 Explicit, 0066 /// Assume role through a role ARN 0067 Role, 0068 /// Use web identity token to assume role, configured through environment variables 0069 WebIdentity 0070 }; 0071 0072 /// Pure virtual class for describing custom S3 retry strategies 0073 class ARROW_EXPORT S3RetryStrategy { 0074 public: 0075 virtual ~S3RetryStrategy() = default; 0076 0077 /// Simple struct where each field corresponds to a field in Aws::Client::AWSError 0078 struct AWSErrorDetail { 0079 /// Corresponds to AWSError::GetErrorType() 0080 int error_type; 0081 /// Corresponds to AWSError::GetMessage() 0082 std::string message; 0083 /// Corresponds to AWSError::GetExceptionName() 0084 std::string exception_name; 0085 /// Corresponds to AWSError::ShouldRetry() 0086 bool should_retry; 0087 }; 0088 /// Returns true if the S3 request resulting in the provided error should be retried. 0089 virtual bool ShouldRetry(const AWSErrorDetail& error, int64_t attempted_retries) = 0; 0090 /// Returns the time in milliseconds the S3 client should sleep for until retrying. 0091 virtual int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& error, 0092 int64_t attempted_retries) = 0; 0093 /// Returns a stock AWS Default retry strategy. 0094 static std::shared_ptr<S3RetryStrategy> GetAwsDefaultRetryStrategy( 0095 int64_t max_attempts); 0096 /// Returns a stock AWS Standard retry strategy. 0097 static std::shared_ptr<S3RetryStrategy> GetAwsStandardRetryStrategy( 0098 int64_t max_attempts); 0099 }; 0100 0101 /// Options for the S3FileSystem implementation. 0102 struct ARROW_EXPORT S3Options { 0103 /// \brief AWS region to connect to. 0104 /// 0105 /// If unset, the AWS SDK will choose a default value. The exact algorithm 0106 /// depends on the SDK version. Before 1.8, the default is hardcoded 0107 /// to "us-east-1". Since 1.8, several heuristics are used to determine 0108 /// the region (environment variables, configuration profile, EC2 metadata 0109 /// server). 0110 std::string region; 0111 0112 /// \brief Socket connection timeout, in seconds 0113 /// 0114 /// If negative, the AWS SDK default value is used (typically 1 second). 0115 double connect_timeout = -1; 0116 0117 /// \brief Socket read timeout on Windows and macOS, in seconds 0118 /// 0119 /// If negative, the AWS SDK default value is used (typically 3 seconds). 0120 /// This option is ignored on non-Windows, non-macOS systems. 0121 double request_timeout = -1; 0122 0123 /// If non-empty, override region with a connect string such as "localhost:9000" 0124 // XXX perhaps instead take a URL like "http://localhost:9000"? 0125 std::string endpoint_override; 0126 /// S3 connection transport, default "https" 0127 std::string scheme = "https"; 0128 0129 /// ARN of role to assume 0130 std::string role_arn; 0131 /// Optional identifier for an assumed role session. 0132 std::string session_name; 0133 /// Optional external identifier to pass to STS when assuming a role 0134 std::string external_id; 0135 /// Frequency (in seconds) to refresh temporary credentials from assumed role 0136 int load_frequency = 900; 0137 0138 /// If connection is through a proxy, set options here 0139 S3ProxyOptions proxy_options; 0140 0141 /// AWS credentials provider 0142 std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider; 0143 0144 /// Type of credentials being used. Set along with credentials_provider. 0145 S3CredentialsKind credentials_kind = S3CredentialsKind::Default; 0146 0147 /// Whether to use virtual addressing of buckets 0148 /// 0149 /// If true, then virtual addressing is always enabled. 0150 /// If false, then virtual addressing is only enabled if `endpoint_override` is empty. 0151 /// 0152 /// This can be used for non-AWS backends that only support virtual hosted-style access. 0153 bool force_virtual_addressing = false; 0154 0155 /// Whether OutputStream writes will be issued in the background, without blocking. 0156 bool background_writes = true; 0157 0158 /// Whether to allow creation of buckets 0159 /// 0160 /// When S3FileSystem creates new buckets, it does not pass any non-default settings. 0161 /// In AWS S3, the bucket and all objects will be not publicly visible, and there 0162 /// will be no bucket policies and no resource tags. To have more control over how 0163 /// buckets are created, use a different API to create them. 0164 bool allow_bucket_creation = false; 0165 0166 /// Whether to allow deletion of buckets 0167 bool allow_bucket_deletion = false; 0168 0169 /// Whether to allow pessimistic directory creation in CreateDir function 0170 /// 0171 /// By default, CreateDir function will try to create the directory without checking its 0172 /// existence. It's an optimization to try directory creation and catch the error, 0173 /// rather than issue two dependent I/O calls. 0174 /// Though for key/value storage like Google Cloud Storage, too many creation calls will 0175 /// breach the rate limit for object mutation operations and cause serious consequences. 0176 /// It's also possible you don't have creation access for the parent directory. Set it 0177 /// to be true to address these scenarios. 0178 bool check_directory_existence_before_creation = false; 0179 0180 /// Whether to allow file-open methods to return before the actual open. 0181 /// 0182 /// Enabling this may reduce the latency of `OpenInputStream`, `OpenOutputStream`, 0183 /// and similar methods, by reducing the number of roundtrips necessary. It may also 0184 /// allow usage of more efficient S3 APIs for small files. 0185 /// The downside is that failure conditions such as attempting to open a file in a 0186 /// non-existing bucket will only be reported when actual I/O is done (at worse, 0187 /// when attempting to close the file). 0188 bool allow_delayed_open = false; 0189 0190 /// \brief Default metadata for OpenOutputStream. 0191 /// 0192 /// This will be ignored if non-empty metadata is passed to OpenOutputStream. 0193 std::shared_ptr<const KeyValueMetadata> default_metadata; 0194 0195 /// Optional retry strategy to determine which error types should be retried, and the 0196 /// delay between retries. 0197 std::shared_ptr<S3RetryStrategy> retry_strategy; 0198 0199 /// Optional customer-provided key for server-side encryption (SSE-C). 0200 /// 0201 /// This should be the 32-byte AES-256 key, unencoded. 0202 std::string sse_customer_key; 0203 0204 /// Optional path to a single PEM file holding all TLS CA certificates 0205 /// 0206 /// If empty, global filesystem options will be used (see FileSystemGlobalOptions); 0207 /// if the corresponding global filesystem option is also empty, the underlying 0208 /// TLS library's defaults will be used. 0209 /// 0210 /// Note this option may be ignored on some systems (Windows, macOS). 0211 std::string tls_ca_file_path; 0212 0213 /// Optional path to a directory holding TLS CA 0214 /// 0215 /// The given directory should contain CA certificates as individual PEM files 0216 /// named along the OpenSSL "hashed" format. 0217 /// 0218 /// If empty, global filesystem options will be used (see FileSystemGlobalOptions); 0219 /// if the corresponding global filesystem option is also empty, the underlying 0220 /// TLS library's defaults will be used. 0221 /// 0222 /// Note this option may be ignored on some systems (Windows, macOS). 0223 std::string tls_ca_dir_path; 0224 0225 /// Whether to verify the S3 endpoint's TLS certificate 0226 /// 0227 /// This option applies if the scheme is "https". 0228 bool tls_verify_certificates = true; 0229 0230 S3Options(); 0231 0232 /// Configure with the default AWS credentials provider chain. 0233 void ConfigureDefaultCredentials(); 0234 0235 /// Configure with anonymous credentials. This will only let you access public buckets. 0236 void ConfigureAnonymousCredentials(); 0237 0238 /// Configure with explicit access and secret key. 0239 void ConfigureAccessKey(const std::string& access_key, const std::string& secret_key, 0240 const std::string& session_token = ""); 0241 0242 /// Configure with credentials from an assumed role. 0243 void ConfigureAssumeRoleCredentials( 0244 const std::string& role_arn, const std::string& session_name = "", 0245 const std::string& external_id = "", int load_frequency = 900, 0246 const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR); 0247 0248 /// Configure with credentials from role assumed using a web identity token 0249 void ConfigureAssumeRoleWithWebIdentityCredentials(); 0250 0251 std::string GetAccessKey() const; 0252 std::string GetSecretKey() const; 0253 std::string GetSessionToken() const; 0254 0255 bool Equals(const S3Options& other) const; 0256 0257 /// \brief Initialize with default credentials provider chain 0258 /// 0259 /// This is recommended if you use the standard AWS environment variables 0260 /// and/or configuration file. 0261 static S3Options Defaults(); 0262 0263 /// \brief Initialize with anonymous credentials. 0264 /// 0265 /// This will only let you access public buckets. 0266 static S3Options Anonymous(); 0267 0268 /// \brief Initialize with explicit access and secret key. 0269 /// 0270 /// Optionally, a session token may also be provided for temporary credentials 0271 /// (from STS). 0272 static S3Options FromAccessKey(const std::string& access_key, 0273 const std::string& secret_key, 0274 const std::string& session_token = ""); 0275 0276 /// \brief Initialize from an assumed role. 0277 static S3Options FromAssumeRole( 0278 const std::string& role_arn, const std::string& session_name = "", 0279 const std::string& external_id = "", int load_frequency = 900, 0280 const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR); 0281 0282 /// \brief Initialize from an assumed role with web-identity. 0283 /// Uses the AWS SDK which uses environment variables to 0284 /// generate temporary credentials. 0285 static S3Options FromAssumeRoleWithWebIdentity(); 0286 0287 static Result<S3Options> FromUri(const ::arrow::util::Uri& uri, 0288 std::string* out_path = NULLPTR); 0289 static Result<S3Options> FromUri(const std::string& uri, 0290 std::string* out_path = NULLPTR); 0291 }; 0292 0293 /// S3-backed FileSystem implementation. 0294 /// 0295 /// Some implementation notes: 0296 /// - buckets are special and the operations available on them may be limited 0297 /// or more expensive than desired. 0298 class ARROW_EXPORT S3FileSystem : public FileSystem { 0299 public: 0300 ~S3FileSystem() override; 0301 0302 std::string type_name() const override { return "s3"; } 0303 0304 /// Return the original S3 options when constructing the filesystem 0305 S3Options options() const; 0306 /// Return the actual region this filesystem connects to 0307 std::string region() const; 0308 0309 bool Equals(const FileSystem& other) const override; 0310 Result<std::string> PathFromUri(const std::string& uri_string) const override; 0311 0312 /// \cond FALSE 0313 using FileSystem::CreateDir; 0314 using FileSystem::DeleteDirContents; 0315 using FileSystem::DeleteDirContentsAsync; 0316 using FileSystem::GetFileInfo; 0317 using FileSystem::OpenAppendStream; 0318 using FileSystem::OpenOutputStream; 0319 /// \endcond 0320 0321 Result<FileInfo> GetFileInfo(const std::string& path) override; 0322 Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override; 0323 0324 FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; 0325 0326 Status CreateDir(const std::string& path, bool recursive) override; 0327 0328 Status DeleteDir(const std::string& path) override; 0329 Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; 0330 Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override; 0331 Status DeleteRootDirContents() override; 0332 0333 Status DeleteFile(const std::string& path) override; 0334 0335 Status Move(const std::string& src, const std::string& dest) override; 0336 0337 Status CopyFile(const std::string& src, const std::string& dest) override; 0338 0339 /// Create a sequential input stream for reading from a S3 object. 0340 /// 0341 /// NOTE: Reads from the stream will be synchronous and unbuffered. 0342 /// You way want to wrap the stream in a BufferedInputStream or use 0343 /// a custom readahead strategy to avoid idle waits. 0344 Result<std::shared_ptr<io::InputStream>> OpenInputStream( 0345 const std::string& path) override; 0346 /// Create a sequential input stream for reading from a S3 object. 0347 /// 0348 /// This override avoids a HEAD request by assuming the FileInfo 0349 /// contains correct information. 0350 Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info) override; 0351 0352 /// Create a random access file for reading from a S3 object. 0353 /// 0354 /// See OpenInputStream for performance notes. 0355 Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( 0356 const std::string& path) override; 0357 /// Create a random access file for reading from a S3 object. 0358 /// 0359 /// This override avoids a HEAD request by assuming the FileInfo 0360 /// contains correct information. 0361 Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile( 0362 const FileInfo& info) override; 0363 0364 /// Create a sequential output stream for writing to a S3 object. 0365 /// 0366 /// NOTE: Writes to the stream will be buffered. Depending on 0367 /// S3Options.background_writes, they can be synchronous or not. 0368 /// It is recommended to enable background_writes unless you prefer 0369 /// implementing your own background execution strategy. 0370 Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( 0371 const std::string& path, 0372 const std::shared_ptr<const KeyValueMetadata>& metadata) override; 0373 0374 Result<std::shared_ptr<io::OutputStream>> OpenAppendStream( 0375 const std::string& path, 0376 const std::shared_ptr<const KeyValueMetadata>& metadata) override; 0377 0378 /// Create a S3FileSystem instance from the given options. 0379 static Result<std::shared_ptr<S3FileSystem>> Make( 0380 const S3Options& options, const io::IOContext& = io::default_io_context()); 0381 0382 protected: 0383 explicit S3FileSystem(const S3Options& options, const io::IOContext&); 0384 0385 class Impl; 0386 std::shared_ptr<Impl> impl_; 0387 }; 0388 0389 enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, Info, Debug, Trace }; 0390 0391 struct ARROW_EXPORT S3GlobalOptions { 0392 /// The log level for S3-originating messages. 0393 S3LogLevel log_level; 0394 0395 /// The number of threads to configure when creating AWS' I/O event loop 0396 /// 0397 /// Defaults to 1 as recommended by AWS' doc when the # of connections is 0398 /// expected to be, at most, in the hundreds 0399 /// 0400 /// For more details see Aws::Crt::Io::EventLoopGroup 0401 int num_event_loop_threads = 1; 0402 0403 /// Whether to install a process-wide SIGPIPE handler 0404 /// 0405 /// The AWS SDK may sometimes emit SIGPIPE signals for certain errors; 0406 /// by default, they would abort the current process. 0407 /// This option, if enabled, will install a process-wide signal handler 0408 /// that logs and otherwise ignore incoming SIGPIPE signals. 0409 /// 0410 /// This option has no effect on Windows. 0411 bool install_sigpipe_handler = false; 0412 0413 /// \brief Initialize with default options 0414 /// 0415 /// For log_level, this method first tries to extract a suitable value from the 0416 /// environment variable ARROW_S3_LOG_LEVEL. 0417 static S3GlobalOptions Defaults(); 0418 }; 0419 0420 /// \brief Initialize the S3 APIs with the specified set of options. 0421 /// 0422 /// It is required to call this function at least once before using S3FileSystem. 0423 /// 0424 /// Once this function is called you MUST call FinalizeS3 before the end of the 0425 /// application in order to avoid a segmentation fault at shutdown. 0426 ARROW_EXPORT 0427 Status InitializeS3(const S3GlobalOptions& options); 0428 0429 /// \brief Ensure the S3 APIs are initialized, but only if not already done. 0430 /// 0431 /// If necessary, this will call InitializeS3() with some default options. 0432 ARROW_EXPORT 0433 Status EnsureS3Initialized(); 0434 0435 /// Whether S3 was initialized, and not finalized. 0436 ARROW_EXPORT 0437 bool IsS3Initialized(); 0438 0439 /// Whether S3 was finalized. 0440 ARROW_EXPORT 0441 bool IsS3Finalized(); 0442 0443 /// \brief Shutdown the S3 APIs. 0444 /// 0445 /// This can wait for some S3 concurrent calls to finish so as to avoid 0446 /// race conditions. 0447 /// After this function has been called, all S3 calls will fail with an error. 0448 /// 0449 /// Calls to InitializeS3() and FinalizeS3() should be serialized by the 0450 /// application (this also applies to EnsureS3Initialized() and 0451 /// EnsureS3Finalized()). 0452 ARROW_EXPORT 0453 Status FinalizeS3(); 0454 0455 /// \brief Ensure the S3 APIs are shutdown, but only if not already done. 0456 /// 0457 /// If necessary, this will call FinalizeS3(). 0458 ARROW_EXPORT 0459 Status EnsureS3Finalized(); 0460 0461 ARROW_EXPORT 0462 Result<std::string> ResolveS3BucketRegion(const std::string& bucket); 0463 0464 } // namespace fs 0465 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |