Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:53

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <atomic>
0021 #include <cstdint>
0022 #include <optional>
0023 #include <thread>
0024 #include <unordered_map>
0025 #include <vector>
0026 
0027 #include "arrow/acero/options.h"
0028 #include "arrow/acero/type_fwd.h"
0029 #include "arrow/buffer.h"
0030 #include "arrow/compute/expression.h"
0031 #include "arrow/compute/util.h"
0032 #include "arrow/memory_pool.h"
0033 #include "arrow/result.h"
0034 #include "arrow/status.h"
0035 #include "arrow/util/bit_util.h"
0036 #include "arrow/util/cpu_info.h"
0037 #include "arrow/util/logging.h"
0038 #include "arrow/util/mutex.h"
0039 #include "arrow/util/thread_pool.h"
0040 #include "arrow/util/type_fwd.h"
0041 
0042 namespace arrow {
0043 
0044 namespace acero {
0045 
0046 ARROW_ACERO_EXPORT
0047 Status ValidateExecNodeInputs(ExecPlan* plan, const std::vector<ExecNode*>& inputs,
0048                               int expected_num_inputs, const char* kind_name);
0049 
0050 ARROW_ACERO_EXPORT
0051 Result<std::shared_ptr<Table>> TableFromExecBatches(
0052     const std::shared_ptr<Schema>& schema, const std::vector<ExecBatch>& exec_batches);
0053 
0054 class ARROW_ACERO_EXPORT AtomicCounter {
0055  public:
0056   AtomicCounter() = default;
0057 
0058   int count() const { return count_.load(); }
0059 
0060   std::optional<int> total() const {
0061     int total = total_.load();
0062     if (total == -1) return {};
0063     return total;
0064   }
0065 
0066   // return true if the counter is complete
0067   bool Increment() {
0068     ARROW_DCHECK_NE(count_.load(), total_.load());
0069     int count = count_.fetch_add(1) + 1;
0070     if (count != total_.load()) return false;
0071     return DoneOnce();
0072   }
0073 
0074   // return true if the counter is complete
0075   bool SetTotal(int total) {
0076     total_.store(total);
0077     if (count_.load() != total) return false;
0078     return DoneOnce();
0079   }
0080 
0081   // return true if the counter has not already been completed
0082   bool Cancel() { return DoneOnce(); }
0083 
0084   // return true if the counter has finished or been cancelled
0085   bool Completed() { return complete_.load(); }
0086 
0087  private:
0088   // ensure there is only one true return from Increment(), SetTotal(), or Cancel()
0089   bool DoneOnce() {
0090     bool expected = false;
0091     return complete_.compare_exchange_strong(expected, true);
0092   }
0093 
0094   std::atomic<int> count_{0}, total_{-1};
0095   std::atomic<bool> complete_{false};
0096 };
0097 
0098 class ARROW_ACERO_EXPORT ThreadIndexer {
0099  public:
0100   size_t operator()();
0101 
0102   static size_t Capacity();
0103 
0104  private:
0105   static size_t Check(size_t thread_index);
0106 
0107   arrow::util::Mutex mutex_;
0108   std::unordered_map<std::thread::id, size_t> id_to_index_;
0109 };
0110 
0111 /// \brief A consumer that collects results into an in-memory table
0112 struct ARROW_ACERO_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer {
0113  public:
0114   TableSinkNodeConsumer(std::shared_ptr<Table>* out, MemoryPool* pool)
0115       : out_(out), pool_(pool) {}
0116   Status Init(const std::shared_ptr<Schema>& schema,
0117               BackpressureControl* backpressure_control, ExecPlan* plan) override;
0118   Status Consume(ExecBatch batch) override;
0119   Future<> Finish() override;
0120 
0121  private:
0122   std::shared_ptr<Table>* out_;
0123   MemoryPool* pool_;
0124   std::shared_ptr<Schema> schema_;
0125   std::vector<std::shared_ptr<RecordBatch>> batches_;
0126   arrow::util::Mutex consume_mutex_;
0127 };
0128 
0129 class ARROW_ACERO_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {
0130  public:
0131   Status Init(const std::shared_ptr<Schema>&, BackpressureControl*,
0132               ExecPlan* plan) override {
0133     return Status::OK();
0134   }
0135   Status Consume(ExecBatch exec_batch) override { return Status::OK(); }
0136   Future<> Finish() override { return Status::OK(); }
0137 
0138  public:
0139   static std::shared_ptr<NullSinkNodeConsumer> Make() {
0140     return std::make_shared<NullSinkNodeConsumer>();
0141   }
0142 };
0143 
0144 /// CRTP helper for tracing helper functions
0145 
0146 class ARROW_ACERO_EXPORT TracedNode {
0147  public:
0148   // All nodes should call TraceStartProducing or NoteStartProducing exactly once
0149   // Most nodes will be fine with a call to NoteStartProducing since the StartProducing
0150   // call is usually fairly cheap and simply schedules tasks to fetch the actual data.
0151 
0152   explicit TracedNode(ExecNode* node) : node_(node) {}
0153 
0154   // Create a span to record the StartProducing work
0155   [[nodiscard]] ::arrow::internal::tracing::Scope TraceStartProducing(
0156       std::string extra_details) const;
0157 
0158   // Record a call to StartProducing without creating with a span
0159   void NoteStartProducing(std::string extra_details) const;
0160 
0161   // All nodes should call TraceInputReceived for each batch they receive.  This call
0162   // should track the time spent processing the batch.  NoteInputReceived is available
0163   // but usually won't be used unless a node is simply adding batches to a trivial queue.
0164 
0165   // Create a span to record the InputReceived work
0166   [[nodiscard]] ::arrow::internal::tracing::Scope TraceInputReceived(
0167       const ExecBatch& batch) const;
0168 
0169   // Record a call to InputReceived without creating with a span
0170   void NoteInputReceived(const ExecBatch& batch) const;
0171 
0172   // Create a span to record any "finish" work.  This should NOT be called as part of
0173   // InputFinished and many nodes may not need to call this at all.  This should be used
0174   // when a node has some extra work that has to be done once it has received all of its
0175   // data.  For example, an aggregation node calculating aggregations.  This will
0176   // typically be called as a result of InputFinished OR InputReceived.
0177   [[nodiscard]] ::arrow::internal::tracing::Scope TraceFinish() const;
0178 
0179  private:
0180   ExecNode* node_;
0181 };
0182 
0183 }  // namespace acero
0184 }  // namespace arrow