File indexing completed on 2025-08-28 08:26:53
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <atomic>
0021 #include <cstdint>
0022 #include <optional>
0023 #include <thread>
0024 #include <unordered_map>
0025 #include <vector>
0026
0027 #include "arrow/acero/options.h"
0028 #include "arrow/acero/type_fwd.h"
0029 #include "arrow/buffer.h"
0030 #include "arrow/compute/expression.h"
0031 #include "arrow/compute/util.h"
0032 #include "arrow/memory_pool.h"
0033 #include "arrow/result.h"
0034 #include "arrow/status.h"
0035 #include "arrow/util/bit_util.h"
0036 #include "arrow/util/cpu_info.h"
0037 #include "arrow/util/logging.h"
0038 #include "arrow/util/mutex.h"
0039 #include "arrow/util/thread_pool.h"
0040 #include "arrow/util/type_fwd.h"
0041
0042 namespace arrow {
0043
0044 namespace acero {
0045
0046 ARROW_ACERO_EXPORT
0047 Status ValidateExecNodeInputs(ExecPlan* plan, const std::vector<ExecNode*>& inputs,
0048 int expected_num_inputs, const char* kind_name);
0049
0050 ARROW_ACERO_EXPORT
0051 Result<std::shared_ptr<Table>> TableFromExecBatches(
0052 const std::shared_ptr<Schema>& schema, const std::vector<ExecBatch>& exec_batches);
0053
0054 class ARROW_ACERO_EXPORT AtomicCounter {
0055 public:
0056 AtomicCounter() = default;
0057
0058 int count() const { return count_.load(); }
0059
0060 std::optional<int> total() const {
0061 int total = total_.load();
0062 if (total == -1) return {};
0063 return total;
0064 }
0065
0066
0067 bool Increment() {
0068 ARROW_DCHECK_NE(count_.load(), total_.load());
0069 int count = count_.fetch_add(1) + 1;
0070 if (count != total_.load()) return false;
0071 return DoneOnce();
0072 }
0073
0074
0075 bool SetTotal(int total) {
0076 total_.store(total);
0077 if (count_.load() != total) return false;
0078 return DoneOnce();
0079 }
0080
0081
0082 bool Cancel() { return DoneOnce(); }
0083
0084
0085 bool Completed() { return complete_.load(); }
0086
0087 private:
0088
0089 bool DoneOnce() {
0090 bool expected = false;
0091 return complete_.compare_exchange_strong(expected, true);
0092 }
0093
0094 std::atomic<int> count_{0}, total_{-1};
0095 std::atomic<bool> complete_{false};
0096 };
0097
0098 class ARROW_ACERO_EXPORT ThreadIndexer {
0099 public:
0100 size_t operator()();
0101
0102 static size_t Capacity();
0103
0104 private:
0105 static size_t Check(size_t thread_index);
0106
0107 arrow::util::Mutex mutex_;
0108 std::unordered_map<std::thread::id, size_t> id_to_index_;
0109 };
0110
0111
0112 struct ARROW_ACERO_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer {
0113 public:
0114 TableSinkNodeConsumer(std::shared_ptr<Table>* out, MemoryPool* pool)
0115 : out_(out), pool_(pool) {}
0116 Status Init(const std::shared_ptr<Schema>& schema,
0117 BackpressureControl* backpressure_control, ExecPlan* plan) override;
0118 Status Consume(ExecBatch batch) override;
0119 Future<> Finish() override;
0120
0121 private:
0122 std::shared_ptr<Table>* out_;
0123 MemoryPool* pool_;
0124 std::shared_ptr<Schema> schema_;
0125 std::vector<std::shared_ptr<RecordBatch>> batches_;
0126 arrow::util::Mutex consume_mutex_;
0127 };
0128
0129 class ARROW_ACERO_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {
0130 public:
0131 Status Init(const std::shared_ptr<Schema>&, BackpressureControl*,
0132 ExecPlan* plan) override {
0133 return Status::OK();
0134 }
0135 Status Consume(ExecBatch exec_batch) override { return Status::OK(); }
0136 Future<> Finish() override { return Status::OK(); }
0137
0138 public:
0139 static std::shared_ptr<NullSinkNodeConsumer> Make() {
0140 return std::make_shared<NullSinkNodeConsumer>();
0141 }
0142 };
0143
0144
0145
0146 class ARROW_ACERO_EXPORT TracedNode {
0147 public:
0148
0149
0150
0151
0152 explicit TracedNode(ExecNode* node) : node_(node) {}
0153
0154
0155 [[nodiscard]] ::arrow::internal::tracing::Scope TraceStartProducing(
0156 std::string extra_details) const;
0157
0158
0159 void NoteStartProducing(std::string extra_details) const;
0160
0161
0162
0163
0164
0165
0166 [[nodiscard]] ::arrow::internal::tracing::Scope TraceInputReceived(
0167 const ExecBatch& batch) const;
0168
0169
0170 void NoteInputReceived(const ExecBatch& batch) const;
0171
0172
0173
0174
0175
0176
0177 [[nodiscard]] ::arrow::internal::tracing::Scope TraceFinish() const;
0178
0179 private:
0180 ExecNode* node_;
0181 };
0182
0183 }
0184 }