File indexing completed on 2025-08-28 08:26:59
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <memory>
0021
0022 #include "arrow/io/interfaces.h"
0023 #include "arrow/result.h"
0024 #include "arrow/status.h"
0025 #include "arrow/util/checked_cast.h"
0026 #include "arrow/util/macros.h"
0027 #include "arrow/util/visibility.h"
0028
0029 namespace arrow {
0030 namespace io {
0031 namespace internal {
0032
0033 template <class LockType>
0034 class SharedLockGuard {
0035 public:
0036 explicit SharedLockGuard(LockType* lock) : lock_(lock) { lock_->LockShared(); }
0037
0038 ~SharedLockGuard() { lock_->UnlockShared(); }
0039
0040 protected:
0041 LockType* lock_;
0042 };
0043
0044 template <class LockType>
0045 class ExclusiveLockGuard {
0046 public:
0047 explicit ExclusiveLockGuard(LockType* lock) : lock_(lock) { lock_->LockExclusive(); }
0048
0049 ~ExclusiveLockGuard() { lock_->UnlockExclusive(); }
0050
0051 protected:
0052 LockType* lock_;
0053 };
0054
0055
0056
0057
0058
0059
0060
0061 class ARROW_EXPORT SharedExclusiveChecker {
0062 public:
0063 SharedExclusiveChecker();
0064 void LockShared();
0065 void UnlockShared();
0066 void LockExclusive();
0067 void UnlockExclusive();
0068
0069 SharedLockGuard<SharedExclusiveChecker> shared_guard() {
0070 return SharedLockGuard<SharedExclusiveChecker>(this);
0071 }
0072
0073 ExclusiveLockGuard<SharedExclusiveChecker> exclusive_guard() {
0074 return ExclusiveLockGuard<SharedExclusiveChecker>(this);
0075 }
0076
0077 protected:
0078 struct Impl;
0079 std::shared_ptr<Impl> impl_;
0080 };
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091 template <class Derived>
0092 class InputStreamConcurrencyWrapper : public InputStream {
0093 public:
0094 Status Close() final {
0095 auto guard = lock_.exclusive_guard();
0096 return derived()->DoClose();
0097 }
0098
0099 Status Abort() final {
0100 auto guard = lock_.exclusive_guard();
0101 return derived()->DoAbort();
0102 }
0103
0104 Result<int64_t> Tell() const final {
0105 auto guard = lock_.exclusive_guard();
0106 return derived()->DoTell();
0107 }
0108
0109 Result<int64_t> Read(int64_t nbytes, void* out) final {
0110 auto guard = lock_.exclusive_guard();
0111 return derived()->DoRead(nbytes, out);
0112 }
0113
0114 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final {
0115 auto guard = lock_.exclusive_guard();
0116 return derived()->DoRead(nbytes);
0117 }
0118
0119 Result<std::string_view> Peek(int64_t nbytes) final {
0120 auto guard = lock_.exclusive_guard();
0121 return derived()->DoPeek(nbytes);
0122 }
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143 protected:
0144
0145
0146 virtual Status DoAbort() { return derived()->DoClose(); }
0147
0148 virtual Result<std::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) {
0149 return Status::NotImplemented("Peek not implemented");
0150 }
0151
0152 Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); }
0153
0154 const Derived* derived() const {
0155 return ::arrow::internal::checked_cast<const Derived*>(this);
0156 }
0157
0158 mutable SharedExclusiveChecker lock_;
0159 };
0160
0161 template <class Derived>
0162 class RandomAccessFileConcurrencyWrapper : public RandomAccessFile {
0163 public:
0164 Status Close() final {
0165 auto guard = lock_.exclusive_guard();
0166 return derived()->DoClose();
0167 }
0168
0169 Status Abort() final {
0170 auto guard = lock_.exclusive_guard();
0171 return derived()->DoAbort();
0172 }
0173
0174 Result<int64_t> Tell() const final {
0175 auto guard = lock_.exclusive_guard();
0176 return derived()->DoTell();
0177 }
0178
0179 Result<int64_t> Read(int64_t nbytes, void* out) final {
0180 auto guard = lock_.exclusive_guard();
0181 return derived()->DoRead(nbytes, out);
0182 }
0183
0184 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final {
0185 auto guard = lock_.exclusive_guard();
0186 return derived()->DoRead(nbytes);
0187 }
0188
0189 Result<std::string_view> Peek(int64_t nbytes) final {
0190 auto guard = lock_.exclusive_guard();
0191 return derived()->DoPeek(nbytes);
0192 }
0193
0194 Status Seek(int64_t position) final {
0195 auto guard = lock_.exclusive_guard();
0196 return derived()->DoSeek(position);
0197 }
0198
0199 Result<int64_t> GetSize() final {
0200 auto guard = lock_.shared_guard();
0201 return derived()->DoGetSize();
0202 }
0203
0204
0205
0206
0207
0208
0209
0210 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) final {
0211 auto guard = lock_.shared_guard();
0212 return derived()->DoReadAt(position, nbytes, out);
0213 }
0214
0215 Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) final {
0216 auto guard = lock_.shared_guard();
0217 return derived()->DoReadAt(position, nbytes);
0218 }
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230
0231
0232
0233
0234
0235
0236
0237
0238
0239
0240
0241
0242
0243 protected:
0244
0245
0246 virtual Status DoAbort() { return derived()->DoClose(); }
0247
0248 virtual Result<std::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) {
0249 return Status::NotImplemented("Peek not implemented");
0250 }
0251
0252 Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); }
0253
0254 const Derived* derived() const {
0255 return ::arrow::internal::checked_cast<const Derived*>(this);
0256 }
0257
0258 mutable SharedExclusiveChecker lock_;
0259 };
0260
0261 }
0262 }
0263 }