![]() |
|
|||
File indexing completed on 2025-08-28 08:26:52
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <cstdint> 0021 #include <functional> 0022 #include <optional> 0023 #include <vector> 0024 0025 #include "arrow/compute/exec.h" 0026 #include "arrow/result.h" 0027 0028 namespace arrow { 0029 namespace acero { 0030 namespace util { 0031 0032 using arrow::compute::ExecBatch; 0033 0034 /// \brief A container that accumulates batches until they are ready to 0035 /// be processed. 0036 class AccumulationQueue { 0037 public: 0038 AccumulationQueue() : row_count_(0) {} 0039 ~AccumulationQueue() = default; 0040 0041 // We should never be copying ExecBatch around 0042 AccumulationQueue(const AccumulationQueue&) = delete; 0043 AccumulationQueue& operator=(const AccumulationQueue&) = delete; 0044 0045 AccumulationQueue(AccumulationQueue&& that); 0046 AccumulationQueue& operator=(AccumulationQueue&& that); 0047 0048 void Concatenate(AccumulationQueue&& that); 0049 void InsertBatch(ExecBatch batch); 0050 int64_t row_count() { return row_count_; } 0051 size_t batch_count() { return batches_.size(); } 0052 bool empty() const { return batches_.empty(); } 0053 void Clear(); 0054 ExecBatch& operator[](size_t i); 0055 0056 private: 0057 int64_t row_count_; 0058 std::vector<ExecBatch> batches_; 0059 }; 0060 0061 /// A queue that sequences incoming batches 0062 /// 0063 /// This can be used when a node needs to do some kind of ordered processing on 0064 /// the stream. 0065 /// 0066 /// Batches can be inserted in any order. The process_callback will be called on 0067 /// the batches, in order, without reentrant calls. For this reason the callback 0068 /// should be quick. 0069 /// 0070 /// For example, in a top-n node, the process callback should determine how many 0071 /// rows need to be delivered for the given batch, and then return a task to actually 0072 /// deliver those rows. 0073 class SequencingQueue { 0074 public: 0075 using Task = std::function<Status()>; 0076 0077 /// Strategy that describes how to handle items 0078 class Processor { 0079 public: 0080 /// Process the batch, potentially generating a task 0081 /// 0082 /// This method will be called on each batch in order. Calls to this method 0083 /// will be serialized and it will not be called reentrantly. This makes it 0084 /// safe to do things that rely on order but minimal time should be spent here 0085 /// to avoid becoming a bottleneck. 0086 /// 0087 /// \return a follow-up task that will be scheduled. The follow-up task(s) are 0088 /// is not guaranteed to run in any particular order. If nullopt is 0089 /// returned then nothing will be scheduled. 0090 virtual Result<std::optional<Task>> Process(ExecBatch batch) = 0; 0091 /// Schedule a task 0092 virtual void Schedule(Task task) = 0; 0093 }; 0094 0095 virtual ~SequencingQueue() = default; 0096 0097 /// Insert a batch into the queue 0098 /// 0099 /// This will insert the batch into the queue. If this batch was the next batch 0100 /// to deliver then this will trigger 1+ calls to the process callback to generate 0101 /// 1+ tasks. 0102 /// 0103 /// The task generated by this call will be executed immediately. The remaining 0104 /// tasks will be scheduled using the schedule callback. 0105 /// 0106 /// From a data pipeline perspective the sequencing queue is a "sometimes" breaker. If 0107 /// a task arrives in order then this call will usually execute the downstream pipeline. 0108 /// If this task arrives early then this call will only queue the data. 0109 virtual Status InsertBatch(ExecBatch batch) = 0; 0110 0111 /// Create a queue 0112 /// \param processor describes how to process the batches, must outlive the queue 0113 static std::unique_ptr<SequencingQueue> Make(Processor* processor); 0114 }; 0115 0116 /// A queue that sequences incoming batches 0117 /// 0118 /// Unlike SequencingQueue the Process method is not expected to schedule new tasks. 0119 /// 0120 /// If a batch arrives and another thread is currently processing then the batch 0121 /// will be queued and control will return. In other words, delivery of batches will 0122 /// not block on the Process method. 0123 /// 0124 /// It can be helpful to think of this as if a dedicated thread is running Process as 0125 /// batches arrive 0126 class SerialSequencingQueue { 0127 public: 0128 /// Strategy that describes how to handle items 0129 class Processor { 0130 public: 0131 virtual ~Processor() = default; 0132 /// Process the batch 0133 /// 0134 /// This method will be called on each batch in order. Calls to this method 0135 /// will be serialized and it will not be called reentrantly. This makes it 0136 /// safe to do things that rely on order. 0137 /// 0138 /// If this falls behind then data may accumulate 0139 /// 0140 /// TODO: Could add backpressure if needed but right now all uses of this should 0141 /// be pretty fast and so are unlikely to block. 0142 virtual Status Process(ExecBatch batch) = 0; 0143 }; 0144 0145 virtual ~SerialSequencingQueue() = default; 0146 0147 /// Insert a batch into the queue 0148 /// 0149 /// This will insert the batch into the queue. If this batch was the next batch 0150 /// to deliver then this may trigger calls to the processor which will be run 0151 /// as part of this call. 0152 virtual Status InsertBatch(ExecBatch batch) = 0; 0153 0154 /// Create a queue 0155 /// \param processor describes how to process the batches, must outlive the queue 0156 static std::unique_ptr<SerialSequencingQueue> Make(Processor* processor); 0157 }; 0158 0159 } // namespace util 0160 } // namespace acero 0161 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |