Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:59

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <memory>
0021 
0022 #include "arrow/io/interfaces.h"
0023 #include "arrow/result.h"
0024 #include "arrow/status.h"
0025 #include "arrow/util/checked_cast.h"
0026 #include "arrow/util/macros.h"
0027 #include "arrow/util/visibility.h"
0028 
0029 namespace arrow {
0030 namespace io {
0031 namespace internal {
0032 
0033 template <class LockType>
0034 class SharedLockGuard {
0035  public:
0036   explicit SharedLockGuard(LockType* lock) : lock_(lock) { lock_->LockShared(); }
0037 
0038   ~SharedLockGuard() { lock_->UnlockShared(); }
0039 
0040  protected:
0041   LockType* lock_;
0042 };
0043 
0044 template <class LockType>
0045 class ExclusiveLockGuard {
0046  public:
0047   explicit ExclusiveLockGuard(LockType* lock) : lock_(lock) { lock_->LockExclusive(); }
0048 
0049   ~ExclusiveLockGuard() { lock_->UnlockExclusive(); }
0050 
0051  protected:
0052   LockType* lock_;
0053 };
0054 
0055 // Debug concurrency checker that marks "shared" and "exclusive" code sections,
0056 // aborting if the concurrency rules get violated.  Does nothing in release mode.
0057 // Note that we intentionally use the same class declaration in debug and
0058 // release builds in order to avoid runtime failures when e.g. loading a
0059 // release-built DLL with a debug-built application, or the reverse.
0060 
0061 class ARROW_EXPORT SharedExclusiveChecker {
0062  public:
0063   SharedExclusiveChecker();
0064   void LockShared();
0065   void UnlockShared();
0066   void LockExclusive();
0067   void UnlockExclusive();
0068 
0069   SharedLockGuard<SharedExclusiveChecker> shared_guard() {
0070     return SharedLockGuard<SharedExclusiveChecker>(this);
0071   }
0072 
0073   ExclusiveLockGuard<SharedExclusiveChecker> exclusive_guard() {
0074     return ExclusiveLockGuard<SharedExclusiveChecker>(this);
0075   }
0076 
0077  protected:
0078   struct Impl;
0079   std::shared_ptr<Impl> impl_;
0080 };
0081 
0082 // Concurrency wrappers for IO classes that check the correctness of
0083 // concurrent calls to various methods.  It is not necessary to wrap all
0084 // IO classes with these, only a few core classes that get used in tests.
0085 //
0086 // We're not using virtual inheritance here as virtual bases have poorly
0087 // understood semantic overhead which we'd be passing on to implementers
0088 // and users of these interfaces.  Instead, we just duplicate the method
0089 // wrappers between those two classes.
0090 
0091 template <class Derived>
0092 class InputStreamConcurrencyWrapper : public InputStream {
0093  public:
0094   Status Close() final {
0095     auto guard = lock_.exclusive_guard();
0096     return derived()->DoClose();
0097   }
0098 
0099   Status Abort() final {
0100     auto guard = lock_.exclusive_guard();
0101     return derived()->DoAbort();
0102   }
0103 
0104   Result<int64_t> Tell() const final {
0105     auto guard = lock_.exclusive_guard();
0106     return derived()->DoTell();
0107   }
0108 
0109   Result<int64_t> Read(int64_t nbytes, void* out) final {
0110     auto guard = lock_.exclusive_guard();
0111     return derived()->DoRead(nbytes, out);
0112   }
0113 
0114   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final {
0115     auto guard = lock_.exclusive_guard();
0116     return derived()->DoRead(nbytes);
0117   }
0118 
0119   Result<std::string_view> Peek(int64_t nbytes) final {
0120     auto guard = lock_.exclusive_guard();
0121     return derived()->DoPeek(nbytes);
0122   }
0123 
0124   /*
0125   Methods to implement in derived class:
0126 
0127   Status DoClose();
0128   Result<int64_t> DoTell() const;
0129   Result<int64_t> DoRead(int64_t nbytes, void* out);
0130   Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
0131 
0132   And optionally:
0133 
0134   Status DoAbort() override;
0135   Result<std::string_view> DoPeek(int64_t nbytes) override;
0136 
0137   These methods should be protected in the derived class and
0138   InputStreamConcurrencyWrapper declared as a friend with
0139 
0140   friend InputStreamConcurrencyWrapper<derived>;
0141   */
0142 
0143  protected:
0144   // Default implementations.  They are virtual because the derived class may
0145   // have derived classes itself.
0146   virtual Status DoAbort() { return derived()->DoClose(); }
0147 
0148   virtual Result<std::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) {
0149     return Status::NotImplemented("Peek not implemented");
0150   }
0151 
0152   Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); }
0153 
0154   const Derived* derived() const {
0155     return ::arrow::internal::checked_cast<const Derived*>(this);
0156   }
0157 
0158   mutable SharedExclusiveChecker lock_;
0159 };
0160 
0161 template <class Derived>
0162 class RandomAccessFileConcurrencyWrapper : public RandomAccessFile {
0163  public:
0164   Status Close() final {
0165     auto guard = lock_.exclusive_guard();
0166     return derived()->DoClose();
0167   }
0168 
0169   Status Abort() final {
0170     auto guard = lock_.exclusive_guard();
0171     return derived()->DoAbort();
0172   }
0173 
0174   Result<int64_t> Tell() const final {
0175     auto guard = lock_.exclusive_guard();
0176     return derived()->DoTell();
0177   }
0178 
0179   Result<int64_t> Read(int64_t nbytes, void* out) final {
0180     auto guard = lock_.exclusive_guard();
0181     return derived()->DoRead(nbytes, out);
0182   }
0183 
0184   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final {
0185     auto guard = lock_.exclusive_guard();
0186     return derived()->DoRead(nbytes);
0187   }
0188 
0189   Result<std::string_view> Peek(int64_t nbytes) final {
0190     auto guard = lock_.exclusive_guard();
0191     return derived()->DoPeek(nbytes);
0192   }
0193 
0194   Status Seek(int64_t position) final {
0195     auto guard = lock_.exclusive_guard();
0196     return derived()->DoSeek(position);
0197   }
0198 
0199   Result<int64_t> GetSize() final {
0200     auto guard = lock_.shared_guard();
0201     return derived()->DoGetSize();
0202   }
0203 
0204   // NOTE: ReadAt doesn't use stream pointer, but it is allowed to update it
0205   // (it's the case on Windows when using ReadFileEx).
0206   // So any method that relies on the current position (even if it doesn't
0207   // update it, such as Peek) cannot run in parallel with ReadAt and has
0208   // to use the exclusive_guard.
0209 
0210   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) final {
0211     auto guard = lock_.shared_guard();
0212     return derived()->DoReadAt(position, nbytes, out);
0213   }
0214 
0215   Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) final {
0216     auto guard = lock_.shared_guard();
0217     return derived()->DoReadAt(position, nbytes);
0218   }
0219 
0220   /*
0221   Methods to implement in derived class:
0222 
0223   Status DoClose();
0224   Result<int64_t> DoTell() const;
0225   Result<int64_t> DoRead(int64_t nbytes, void* out);
0226   Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
0227   Status DoSeek(int64_t position);
0228   Result<int64_t> DoGetSize()
0229   Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
0230   Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
0231 
0232   And optionally:
0233 
0234   Status DoAbort() override;
0235   Result<std::string_view> DoPeek(int64_t nbytes) override;
0236 
0237   These methods should be protected in the derived class and
0238   RandomAccessFileConcurrencyWrapper declared as a friend with
0239 
0240   friend RandomAccessFileConcurrencyWrapper<derived>;
0241   */
0242 
0243  protected:
0244   // Default implementations.  They are virtual because the derived class may
0245   // have derived classes itself.
0246   virtual Status DoAbort() { return derived()->DoClose(); }
0247 
0248   virtual Result<std::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) {
0249     return Status::NotImplemented("Peek not implemented");
0250   }
0251 
0252   Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); }
0253 
0254   const Derived* derived() const {
0255     return ::arrow::internal::checked_cast<const Derived*>(this);
0256   }
0257 
0258   mutable SharedExclusiveChecker lock_;
0259 };
0260 
0261 }  // namespace internal
0262 }  // namespace io
0263 }  // namespace arrow