File indexing completed on 2025-08-28 08:26:52
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019 #include "arrow/acero/exec_plan.h"
0020 #include "arrow/acero/options.h"
0021
0022 #include <memory>
0023
0024 namespace arrow::acero {
0025
0026 class BackpressureHandler {
0027 private:
0028 BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
0029 std::unique_ptr<BackpressureControl> backpressure_control)
0030 : input_(input),
0031 low_threshold_(low_threshold),
0032 high_threshold_(high_threshold),
0033 backpressure_control_(std::move(backpressure_control)) {}
0034
0035 public:
0036 static Result<BackpressureHandler> Make(
0037 ExecNode* input, size_t low_threshold, size_t high_threshold,
0038 std::unique_ptr<BackpressureControl> backpressure_control) {
0039 if (low_threshold >= high_threshold) {
0040 return Status::Invalid("low threshold (", low_threshold,
0041 ") must be less than high threshold (", high_threshold, ")");
0042 }
0043 if (backpressure_control == NULLPTR) {
0044 return Status::Invalid("null backpressure control parameter");
0045 }
0046 BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
0047 std::move(backpressure_control));
0048 return backpressure_handler;
0049 }
0050
0051 void Handle(size_t start_level, size_t end_level) {
0052 if (start_level < high_threshold_ && end_level >= high_threshold_) {
0053 backpressure_control_->Pause();
0054 } else if (start_level > low_threshold_ && end_level <= low_threshold_) {
0055 backpressure_control_->Resume();
0056 }
0057 }
0058
0059 Status ForceShutdown() {
0060
0061
0062
0063 backpressure_control_->Resume();
0064 return input_->StopProducing();
0065 }
0066
0067 private:
0068 ExecNode* input_;
0069 size_t low_threshold_;
0070 size_t high_threshold_;
0071 std::unique_ptr<BackpressureControl> backpressure_control_;
0072 };
0073
0074 }