Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:52

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <cstdint>
0021 #include <functional>
0022 #include <optional>
0023 #include <vector>
0024 
0025 #include "arrow/compute/exec.h"
0026 #include "arrow/result.h"
0027 
0028 namespace arrow {
0029 namespace acero {
0030 namespace util {
0031 
0032 using arrow::compute::ExecBatch;
0033 
0034 /// \brief A container that accumulates batches until they are ready to
0035 ///        be processed.
0036 class AccumulationQueue {
0037  public:
0038   AccumulationQueue() : row_count_(0) {}
0039   ~AccumulationQueue() = default;
0040 
0041   // We should never be copying ExecBatch around
0042   AccumulationQueue(const AccumulationQueue&) = delete;
0043   AccumulationQueue& operator=(const AccumulationQueue&) = delete;
0044 
0045   AccumulationQueue(AccumulationQueue&& that);
0046   AccumulationQueue& operator=(AccumulationQueue&& that);
0047 
0048   void Concatenate(AccumulationQueue&& that);
0049   void InsertBatch(ExecBatch batch);
0050   int64_t row_count() { return row_count_; }
0051   size_t batch_count() { return batches_.size(); }
0052   bool empty() const { return batches_.empty(); }
0053   void Clear();
0054   ExecBatch& operator[](size_t i);
0055 
0056  private:
0057   int64_t row_count_;
0058   std::vector<ExecBatch> batches_;
0059 };
0060 
0061 /// A queue that sequences incoming batches
0062 ///
0063 /// This can be used when a node needs to do some kind of ordered processing on
0064 /// the stream.
0065 ///
0066 /// Batches can be inserted in any order.  The process_callback will be called on
0067 /// the batches, in order, without reentrant calls. For this reason the callback
0068 /// should be quick.
0069 ///
0070 /// For example, in a top-n node, the process callback should determine how many
0071 /// rows need to be delivered for the given batch, and then return a task to actually
0072 /// deliver those rows.
0073 class SequencingQueue {
0074  public:
0075   using Task = std::function<Status()>;
0076 
0077   /// Strategy that describes how to handle items
0078   class Processor {
0079    public:
0080     /// Process the batch, potentially generating a task
0081     ///
0082     /// This method will be called on each batch in order.  Calls to this method
0083     /// will be serialized and it will not be called reentrantly.  This makes it
0084     /// safe to do things that rely on order but minimal time should be spent here
0085     /// to avoid becoming a bottleneck.
0086     ///
0087     /// \return a follow-up task that will be scheduled.  The follow-up task(s) are
0088     ///         is not guaranteed to run in any particular order.  If nullopt is
0089     ///         returned then nothing will be scheduled.
0090     virtual Result<std::optional<Task>> Process(ExecBatch batch) = 0;
0091     /// Schedule a task
0092     virtual void Schedule(Task task) = 0;
0093   };
0094 
0095   virtual ~SequencingQueue() = default;
0096 
0097   /// Insert a batch into the queue
0098   ///
0099   /// This will insert the batch into the queue.  If this batch was the next batch
0100   /// to deliver then this will trigger 1+ calls to the process callback to generate
0101   /// 1+ tasks.
0102   ///
0103   /// The task generated by this call will be executed immediately.  The remaining
0104   /// tasks will be scheduled using the schedule callback.
0105   ///
0106   /// From a data pipeline perspective the sequencing queue is a "sometimes" breaker.  If
0107   /// a task arrives in order then this call will usually execute the downstream pipeline.
0108   /// If this task arrives early then this call will only queue the data.
0109   virtual Status InsertBatch(ExecBatch batch) = 0;
0110 
0111   /// Create a queue
0112   /// \param processor describes how to process the batches, must outlive the queue
0113   static std::unique_ptr<SequencingQueue> Make(Processor* processor);
0114 };
0115 
0116 /// A queue that sequences incoming batches
0117 ///
0118 /// Unlike SequencingQueue the Process method is not expected to schedule new tasks.
0119 ///
0120 /// If a batch arrives and another thread is currently processing then the batch
0121 /// will be queued and control will return.  In other words, delivery of batches will
0122 /// not block on the Process method.
0123 ///
0124 /// It can be helpful to think of this as if a dedicated thread is running Process as
0125 /// batches arrive
0126 class SerialSequencingQueue {
0127  public:
0128   /// Strategy that describes how to handle items
0129   class Processor {
0130    public:
0131     virtual ~Processor() = default;
0132     /// Process the batch
0133     ///
0134     /// This method will be called on each batch in order.  Calls to this method
0135     /// will be serialized and it will not be called reentrantly.  This makes it
0136     /// safe to do things that rely on order.
0137     ///
0138     /// If this falls behind then data may accumulate
0139     ///
0140     /// TODO: Could add backpressure if needed but right now all uses of this should
0141     ///       be pretty fast and so are unlikely to block.
0142     virtual Status Process(ExecBatch batch) = 0;
0143   };
0144 
0145   virtual ~SerialSequencingQueue() = default;
0146 
0147   /// Insert a batch into the queue
0148   ///
0149   /// This will insert the batch into the queue.  If this batch was the next batch
0150   /// to deliver then this may trigger calls to the processor which will be run
0151   /// as part of this call.
0152   virtual Status InsertBatch(ExecBatch batch) = 0;
0153 
0154   /// Create a queue
0155   /// \param processor describes how to process the batches, must outlive the queue
0156   static std::unique_ptr<SerialSequencingQueue> Make(Processor* processor);
0157 };
0158 
0159 }  // namespace util
0160 }  // namespace acero
0161 }  // namespace arrow