Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:53

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <string>
0021 
0022 #include "arrow/acero/options.h"
0023 #include "arrow/acero/test_util_internal.h"
0024 #include "arrow/testing/random.h"
0025 
0026 namespace arrow {
0027 namespace acero {
0028 
0029 // \brief Make a delaying source that is optionally noisy (prints when it emits)
0030 AsyncGenerator<std::optional<ExecBatch>> MakeDelayedGen(
0031     Iterator<std::optional<ExecBatch>> src, std::string label, double delay_sec,
0032     bool noisy = false);
0033 
0034 // \brief Make a delaying source that is optionally noisy (prints when it emits)
0035 AsyncGenerator<std::optional<ExecBatch>> MakeDelayedGen(
0036     AsyncGenerator<std::optional<ExecBatch>> src, std::string label, double delay_sec,
0037     bool noisy = false);
0038 
0039 // \brief Make a delaying source that is optionally noisy (prints when it emits)
0040 AsyncGenerator<std::optional<ExecBatch>> MakeDelayedGen(BatchesWithSchema src,
0041                                                         std::string label,
0042                                                         double delay_sec,
0043                                                         bool noisy = false);
0044 
0045 /// A node that slightly resequences the input at random
0046 struct JitterNodeOptions : public ExecNodeOptions {
0047   random::SeedType seed;
0048   /// The max amount to add to a node's "cost".
0049   int max_jitter_modifier;
0050 
0051   explicit JitterNodeOptions(random::SeedType seed, int max_jitter_modifier = 5)
0052       : seed(seed), max_jitter_modifier(max_jitter_modifier) {}
0053   static constexpr std::string_view kName = "jitter";
0054 };
0055 
0056 class GateImpl;
0057 
0058 class Gate {
0059  public:
0060   static std::shared_ptr<Gate> Make();
0061 
0062   Gate();
0063   virtual ~Gate();
0064 
0065   void ReleaseAllBatches();
0066   void ReleaseOneBatch();
0067   Future<> WaitForNextReleasedBatch();
0068 
0069  private:
0070   ARROW_DISALLOW_COPY_AND_ASSIGN(Gate);
0071 
0072   GateImpl* impl_;
0073 };
0074 
0075 // A node that holds all input batches until a given gate is released
0076 struct GatedNodeOptions : public ExecNodeOptions {
0077   explicit GatedNodeOptions(Gate* gate) : gate(gate) {}
0078   Gate* gate;
0079 
0080   static constexpr std::string_view kName = "gated";
0081 };
0082 
0083 void RegisterTestNodes();
0084 
0085 }  // namespace acero
0086 }  // namespace arrow