Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:52

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 #include "arrow/acero/exec_plan.h"
0020 #include "arrow/acero/options.h"
0021 
0022 #include <memory>
0023 
0024 namespace arrow::acero {
0025 
0026 class BackpressureHandler {
0027  private:
0028   BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
0029                       std::unique_ptr<BackpressureControl> backpressure_control)
0030       : input_(input),
0031         low_threshold_(low_threshold),
0032         high_threshold_(high_threshold),
0033         backpressure_control_(std::move(backpressure_control)) {}
0034 
0035  public:
0036   static Result<BackpressureHandler> Make(
0037       ExecNode* input, size_t low_threshold, size_t high_threshold,
0038       std::unique_ptr<BackpressureControl> backpressure_control) {
0039     if (low_threshold >= high_threshold) {
0040       return Status::Invalid("low threshold (", low_threshold,
0041                              ") must be less than high threshold (", high_threshold, ")");
0042     }
0043     if (backpressure_control == NULLPTR) {
0044       return Status::Invalid("null backpressure control parameter");
0045     }
0046     BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
0047                                              std::move(backpressure_control));
0048     return backpressure_handler;
0049   }
0050 
0051   void Handle(size_t start_level, size_t end_level) {
0052     if (start_level < high_threshold_ && end_level >= high_threshold_) {
0053       backpressure_control_->Pause();
0054     } else if (start_level > low_threshold_ && end_level <= low_threshold_) {
0055       backpressure_control_->Resume();
0056     }
0057   }
0058 
0059   Status ForceShutdown() {
0060     // It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
0061     // Since acero's executor won't terminate if any one node is paused, we need to
0062     // force resume the node before stopping production.
0063     backpressure_control_->Resume();
0064     return input_->StopProducing();
0065   }
0066 
0067  private:
0068   ExecNode* input_;
0069   size_t low_threshold_;
0070   size_t high_threshold_;
0071   std::unique_ptr<BackpressureControl> backpressure_control_;
0072 };
0073 
0074 }  // namespace arrow::acero