Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:59

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <memory>
0021 #include <string>
0022 #include <vector>
0023 
0024 #include "arrow/filesystem/filesystem.h"
0025 #include "arrow/util/macros.h"
0026 #include "arrow/util/uri.h"
0027 
0028 namespace Aws {
0029 namespace Auth {
0030 
0031 class AWSCredentialsProvider;
0032 class STSAssumeRoleCredentialsProvider;
0033 
0034 }  // namespace Auth
0035 namespace STS {
0036 class STSClient;
0037 }
0038 }  // namespace Aws
0039 
0040 namespace arrow {
0041 namespace fs {
0042 
0043 /// Options for using a proxy for S3
0044 struct ARROW_EXPORT S3ProxyOptions {
0045   std::string scheme;
0046   std::string host;
0047   int port = -1;
0048   std::string username;
0049   std::string password;
0050 
0051   /// Initialize from URI such as http://username:password@host:port
0052   /// or http://host:port
0053   static Result<S3ProxyOptions> FromUri(const std::string& uri);
0054   static Result<S3ProxyOptions> FromUri(const ::arrow::util::Uri& uri);
0055 
0056   bool Equals(const S3ProxyOptions& other) const;
0057 };
0058 
0059 enum class S3CredentialsKind : int8_t {
0060   /// Anonymous access (no credentials used)
0061   Anonymous,
0062   /// Use default AWS credentials, configured through environment variables
0063   Default,
0064   /// Use explicitly-provided access key pair
0065   Explicit,
0066   /// Assume role through a role ARN
0067   Role,
0068   /// Use web identity token to assume role, configured through environment variables
0069   WebIdentity
0070 };
0071 
0072 /// Pure virtual class for describing custom S3 retry strategies
0073 class ARROW_EXPORT S3RetryStrategy {
0074  public:
0075   virtual ~S3RetryStrategy() = default;
0076 
0077   /// Simple struct where each field corresponds to a field in Aws::Client::AWSError
0078   struct AWSErrorDetail {
0079     /// Corresponds to AWSError::GetErrorType()
0080     int error_type;
0081     /// Corresponds to AWSError::GetMessage()
0082     std::string message;
0083     /// Corresponds to AWSError::GetExceptionName()
0084     std::string exception_name;
0085     /// Corresponds to AWSError::ShouldRetry()
0086     bool should_retry;
0087   };
0088   /// Returns true if the S3 request resulting in the provided error should be retried.
0089   virtual bool ShouldRetry(const AWSErrorDetail& error, int64_t attempted_retries) = 0;
0090   /// Returns the time in milliseconds the S3 client should sleep for until retrying.
0091   virtual int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& error,
0092                                                 int64_t attempted_retries) = 0;
0093   /// Returns a stock AWS Default retry strategy.
0094   static std::shared_ptr<S3RetryStrategy> GetAwsDefaultRetryStrategy(
0095       int64_t max_attempts);
0096   /// Returns a stock AWS Standard retry strategy.
0097   static std::shared_ptr<S3RetryStrategy> GetAwsStandardRetryStrategy(
0098       int64_t max_attempts);
0099 };
0100 
0101 /// Options for the S3FileSystem implementation.
0102 struct ARROW_EXPORT S3Options {
0103   /// \brief AWS region to connect to.
0104   ///
0105   /// If unset, the AWS SDK will choose a default value.  The exact algorithm
0106   /// depends on the SDK version.  Before 1.8, the default is hardcoded
0107   /// to "us-east-1".  Since 1.8, several heuristics are used to determine
0108   /// the region (environment variables, configuration profile, EC2 metadata
0109   /// server).
0110   std::string region;
0111 
0112   /// \brief Socket connection timeout, in seconds
0113   ///
0114   /// If negative, the AWS SDK default value is used (typically 1 second).
0115   double connect_timeout = -1;
0116 
0117   /// \brief Socket read timeout on Windows and macOS, in seconds
0118   ///
0119   /// If negative, the AWS SDK default value is used (typically 3 seconds).
0120   /// This option is ignored on non-Windows, non-macOS systems.
0121   double request_timeout = -1;
0122 
0123   /// If non-empty, override region with a connect string such as "localhost:9000"
0124   // XXX perhaps instead take a URL like "http://localhost:9000"?
0125   std::string endpoint_override;
0126   /// S3 connection transport, default "https"
0127   std::string scheme = "https";
0128 
0129   /// ARN of role to assume
0130   std::string role_arn;
0131   /// Optional identifier for an assumed role session.
0132   std::string session_name;
0133   /// Optional external identifier to pass to STS when assuming a role
0134   std::string external_id;
0135   /// Frequency (in seconds) to refresh temporary credentials from assumed role
0136   int load_frequency = 900;
0137 
0138   /// If connection is through a proxy, set options here
0139   S3ProxyOptions proxy_options;
0140 
0141   /// AWS credentials provider
0142   std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
0143 
0144   /// Type of credentials being used. Set along with credentials_provider.
0145   S3CredentialsKind credentials_kind = S3CredentialsKind::Default;
0146 
0147   /// Whether to use virtual addressing of buckets
0148   ///
0149   /// If true, then virtual addressing is always enabled.
0150   /// If false, then virtual addressing is only enabled if `endpoint_override` is empty.
0151   ///
0152   /// This can be used for non-AWS backends that only support virtual hosted-style access.
0153   bool force_virtual_addressing = false;
0154 
0155   /// Whether OutputStream writes will be issued in the background, without blocking.
0156   bool background_writes = true;
0157 
0158   /// Whether to allow creation of buckets
0159   ///
0160   /// When S3FileSystem creates new buckets, it does not pass any non-default settings.
0161   /// In AWS S3, the bucket and all objects will be not publicly visible, and there
0162   /// will be no bucket policies and no resource tags. To have more control over how
0163   /// buckets are created, use a different API to create them.
0164   bool allow_bucket_creation = false;
0165 
0166   /// Whether to allow deletion of buckets
0167   bool allow_bucket_deletion = false;
0168 
0169   /// Whether to allow pessimistic directory creation in CreateDir function
0170   ///
0171   /// By default, CreateDir function will try to create the directory without checking its
0172   /// existence. It's an optimization to try directory creation and catch the error,
0173   /// rather than issue two dependent I/O calls.
0174   /// Though for key/value storage like Google Cloud Storage, too many creation calls will
0175   /// breach the rate limit for object mutation operations and cause serious consequences.
0176   /// It's also possible you don't have creation access for the parent directory. Set it
0177   /// to be true to address these scenarios.
0178   bool check_directory_existence_before_creation = false;
0179 
0180   /// Whether to allow file-open methods to return before the actual open.
0181   ///
0182   /// Enabling this may reduce the latency of `OpenInputStream`, `OpenOutputStream`,
0183   /// and similar methods, by reducing the number of roundtrips necessary. It may also
0184   /// allow usage of more efficient S3 APIs for small files.
0185   /// The downside is that failure conditions such as attempting to open a file in a
0186   /// non-existing bucket will only be reported when actual I/O is done (at worse,
0187   /// when attempting to close the file).
0188   bool allow_delayed_open = false;
0189 
0190   /// \brief Default metadata for OpenOutputStream.
0191   ///
0192   /// This will be ignored if non-empty metadata is passed to OpenOutputStream.
0193   std::shared_ptr<const KeyValueMetadata> default_metadata;
0194 
0195   /// Optional retry strategy to determine which error types should be retried, and the
0196   /// delay between retries.
0197   std::shared_ptr<S3RetryStrategy> retry_strategy;
0198 
0199   /// Optional customer-provided key for server-side encryption (SSE-C).
0200   ///
0201   /// This should be the 32-byte AES-256 key, unencoded.
0202   std::string sse_customer_key;
0203 
0204   /// Optional path to a single PEM file holding all TLS CA certificates
0205   ///
0206   /// If empty, global filesystem options will be used (see FileSystemGlobalOptions);
0207   /// if the corresponding global filesystem option is also empty, the underlying
0208   /// TLS library's defaults will be used.
0209   ///
0210   /// Note this option may be ignored on some systems (Windows, macOS).
0211   std::string tls_ca_file_path;
0212 
0213   /// Optional path to a directory holding TLS CA
0214   ///
0215   /// The given directory should contain CA certificates as individual PEM files
0216   /// named along the OpenSSL "hashed" format.
0217   ///
0218   /// If empty, global filesystem options will be used (see FileSystemGlobalOptions);
0219   /// if the corresponding global filesystem option is also empty, the underlying
0220   /// TLS library's defaults will be used.
0221   ///
0222   /// Note this option may be ignored on some systems (Windows, macOS).
0223   std::string tls_ca_dir_path;
0224 
0225   /// Whether to verify the S3 endpoint's TLS certificate
0226   ///
0227   /// This option applies if the scheme is "https".
0228   bool tls_verify_certificates = true;
0229 
0230   S3Options();
0231 
0232   /// Configure with the default AWS credentials provider chain.
0233   void ConfigureDefaultCredentials();
0234 
0235   /// Configure with anonymous credentials.  This will only let you access public buckets.
0236   void ConfigureAnonymousCredentials();
0237 
0238   /// Configure with explicit access and secret key.
0239   void ConfigureAccessKey(const std::string& access_key, const std::string& secret_key,
0240                           const std::string& session_token = "");
0241 
0242   /// Configure with credentials from an assumed role.
0243   void ConfigureAssumeRoleCredentials(
0244       const std::string& role_arn, const std::string& session_name = "",
0245       const std::string& external_id = "", int load_frequency = 900,
0246       const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);
0247 
0248   /// Configure with credentials from role assumed using a web identity token
0249   void ConfigureAssumeRoleWithWebIdentityCredentials();
0250 
0251   std::string GetAccessKey() const;
0252   std::string GetSecretKey() const;
0253   std::string GetSessionToken() const;
0254 
0255   bool Equals(const S3Options& other) const;
0256 
0257   /// \brief Initialize with default credentials provider chain
0258   ///
0259   /// This is recommended if you use the standard AWS environment variables
0260   /// and/or configuration file.
0261   static S3Options Defaults();
0262 
0263   /// \brief Initialize with anonymous credentials.
0264   ///
0265   /// This will only let you access public buckets.
0266   static S3Options Anonymous();
0267 
0268   /// \brief Initialize with explicit access and secret key.
0269   ///
0270   /// Optionally, a session token may also be provided for temporary credentials
0271   /// (from STS).
0272   static S3Options FromAccessKey(const std::string& access_key,
0273                                  const std::string& secret_key,
0274                                  const std::string& session_token = "");
0275 
0276   /// \brief Initialize from an assumed role.
0277   static S3Options FromAssumeRole(
0278       const std::string& role_arn, const std::string& session_name = "",
0279       const std::string& external_id = "", int load_frequency = 900,
0280       const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);
0281 
0282   /// \brief Initialize from an assumed role with web-identity.
0283   /// Uses the AWS SDK which uses environment variables to
0284   /// generate temporary credentials.
0285   static S3Options FromAssumeRoleWithWebIdentity();
0286 
0287   static Result<S3Options> FromUri(const ::arrow::util::Uri& uri,
0288                                    std::string* out_path = NULLPTR);
0289   static Result<S3Options> FromUri(const std::string& uri,
0290                                    std::string* out_path = NULLPTR);
0291 };
0292 
0293 /// S3-backed FileSystem implementation.
0294 ///
0295 /// Some implementation notes:
0296 /// - buckets are special and the operations available on them may be limited
0297 ///   or more expensive than desired.
0298 class ARROW_EXPORT S3FileSystem : public FileSystem {
0299  public:
0300   ~S3FileSystem() override;
0301 
0302   std::string type_name() const override { return "s3"; }
0303 
0304   /// Return the original S3 options when constructing the filesystem
0305   S3Options options() const;
0306   /// Return the actual region this filesystem connects to
0307   std::string region() const;
0308 
0309   bool Equals(const FileSystem& other) const override;
0310   Result<std::string> PathFromUri(const std::string& uri_string) const override;
0311 
0312   /// \cond FALSE
0313   using FileSystem::CreateDir;
0314   using FileSystem::DeleteDirContents;
0315   using FileSystem::DeleteDirContentsAsync;
0316   using FileSystem::GetFileInfo;
0317   using FileSystem::OpenAppendStream;
0318   using FileSystem::OpenOutputStream;
0319   /// \endcond
0320 
0321   Result<FileInfo> GetFileInfo(const std::string& path) override;
0322   Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
0323 
0324   FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
0325 
0326   Status CreateDir(const std::string& path, bool recursive) override;
0327 
0328   Status DeleteDir(const std::string& path) override;
0329   Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override;
0330   Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override;
0331   Status DeleteRootDirContents() override;
0332 
0333   Status DeleteFile(const std::string& path) override;
0334 
0335   Status Move(const std::string& src, const std::string& dest) override;
0336 
0337   Status CopyFile(const std::string& src, const std::string& dest) override;
0338 
0339   /// Create a sequential input stream for reading from a S3 object.
0340   ///
0341   /// NOTE: Reads from the stream will be synchronous and unbuffered.
0342   /// You way want to wrap the stream in a BufferedInputStream or use
0343   /// a custom readahead strategy to avoid idle waits.
0344   Result<std::shared_ptr<io::InputStream>> OpenInputStream(
0345       const std::string& path) override;
0346   /// Create a sequential input stream for reading from a S3 object.
0347   ///
0348   /// This override avoids a HEAD request by assuming the FileInfo
0349   /// contains correct information.
0350   Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info) override;
0351 
0352   /// Create a random access file for reading from a S3 object.
0353   ///
0354   /// See OpenInputStream for performance notes.
0355   Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
0356       const std::string& path) override;
0357   /// Create a random access file for reading from a S3 object.
0358   ///
0359   /// This override avoids a HEAD request by assuming the FileInfo
0360   /// contains correct information.
0361   Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
0362       const FileInfo& info) override;
0363 
0364   /// Create a sequential output stream for writing to a S3 object.
0365   ///
0366   /// NOTE: Writes to the stream will be buffered.  Depending on
0367   /// S3Options.background_writes, they can be synchronous or not.
0368   /// It is recommended to enable background_writes unless you prefer
0369   /// implementing your own background execution strategy.
0370   Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
0371       const std::string& path,
0372       const std::shared_ptr<const KeyValueMetadata>& metadata) override;
0373 
0374   Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
0375       const std::string& path,
0376       const std::shared_ptr<const KeyValueMetadata>& metadata) override;
0377 
0378   /// Create a S3FileSystem instance from the given options.
0379   static Result<std::shared_ptr<S3FileSystem>> Make(
0380       const S3Options& options, const io::IOContext& = io::default_io_context());
0381 
0382  protected:
0383   explicit S3FileSystem(const S3Options& options, const io::IOContext&);
0384 
0385   class Impl;
0386   std::shared_ptr<Impl> impl_;
0387 };
0388 
0389 enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, Info, Debug, Trace };
0390 
0391 struct ARROW_EXPORT S3GlobalOptions {
0392   /// The log level for S3-originating messages.
0393   S3LogLevel log_level;
0394 
0395   /// The number of threads to configure when creating AWS' I/O event loop
0396   ///
0397   /// Defaults to 1 as recommended by AWS' doc when the # of connections is
0398   /// expected to be, at most, in the hundreds
0399   ///
0400   /// For more details see Aws::Crt::Io::EventLoopGroup
0401   int num_event_loop_threads = 1;
0402 
0403   /// Whether to install a process-wide SIGPIPE handler
0404   ///
0405   /// The AWS SDK may sometimes emit SIGPIPE signals for certain errors;
0406   /// by default, they would abort the current process.
0407   /// This option, if enabled, will install a process-wide signal handler
0408   /// that logs and otherwise ignore incoming SIGPIPE signals.
0409   ///
0410   /// This option has no effect on Windows.
0411   bool install_sigpipe_handler = false;
0412 
0413   /// \brief Initialize with default options
0414   ///
0415   /// For log_level, this method first tries to extract a suitable value from the
0416   /// environment variable ARROW_S3_LOG_LEVEL.
0417   static S3GlobalOptions Defaults();
0418 };
0419 
0420 /// \brief Initialize the S3 APIs with the specified set of options.
0421 ///
0422 /// It is required to call this function at least once before using S3FileSystem.
0423 ///
0424 /// Once this function is called you MUST call FinalizeS3 before the end of the
0425 /// application in order to avoid a segmentation fault at shutdown.
0426 ARROW_EXPORT
0427 Status InitializeS3(const S3GlobalOptions& options);
0428 
0429 /// \brief Ensure the S3 APIs are initialized, but only if not already done.
0430 ///
0431 /// If necessary, this will call InitializeS3() with some default options.
0432 ARROW_EXPORT
0433 Status EnsureS3Initialized();
0434 
0435 /// Whether S3 was initialized, and not finalized.
0436 ARROW_EXPORT
0437 bool IsS3Initialized();
0438 
0439 /// Whether S3 was finalized.
0440 ARROW_EXPORT
0441 bool IsS3Finalized();
0442 
0443 /// \brief Shutdown the S3 APIs.
0444 ///
0445 /// This can wait for some S3 concurrent calls to finish so as to avoid
0446 /// race conditions.
0447 /// After this function has been called, all S3 calls will fail with an error.
0448 ///
0449 /// Calls to InitializeS3() and FinalizeS3() should be serialized by the
0450 /// application (this also applies to EnsureS3Initialized() and
0451 /// EnsureS3Finalized()).
0452 ARROW_EXPORT
0453 Status FinalizeS3();
0454 
0455 /// \brief Ensure the S3 APIs are shutdown, but only if not already done.
0456 ///
0457 /// If necessary, this will call FinalizeS3().
0458 ARROW_EXPORT
0459 Status EnsureS3Finalized();
0460 
0461 ARROW_EXPORT
0462 Result<std::string> ResolveS3BucketRegion(const std::string& bucket);
0463 
0464 }  // namespace fs
0465 }  // namespace arrow