Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:53

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <cstdint>
0021 #include <functional>
0022 #include <memory>
0023 #include <vector>
0024 
0025 #include "arrow/acero/exec_plan.h"
0026 #include "arrow/acero/util.h"
0027 #include "arrow/acero/visibility.h"
0028 #include "arrow/compute/type_fwd.h"
0029 #include "arrow/status.h"
0030 #include "arrow/type_fwd.h"
0031 #include "arrow/util/cancel.h"
0032 #include "arrow/util/type_fwd.h"
0033 
0034 namespace arrow {
0035 namespace acero {
0036 
0037 /// A utility base class for simple exec nodes with one input
0038 ///
0039 /// Pause/Resume Producing are forwarded appropriately
0040 /// There is nothing to do in StopProducingImpl
0041 ///
0042 /// An AtomicCounter is used to keep track of when all data has arrived.  When it
0043 /// has the Finish() method will be invoked
0044 class ARROW_ACERO_EXPORT MapNode : public ExecNode, public TracedNode {
0045  public:
0046   MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
0047           std::shared_ptr<Schema> output_schema);
0048 
0049   Status InputFinished(ExecNode* input, int total_batches) override;
0050 
0051   Status StartProducing() override;
0052 
0053   void PauseProducing(ExecNode* output, int32_t counter) override;
0054 
0055   void ResumeProducing(ExecNode* output, int32_t counter) override;
0056 
0057   Status InputReceived(ExecNode* input, ExecBatch batch) override;
0058 
0059   const Ordering& ordering() const override;
0060 
0061  protected:
0062   Status StopProducingImpl() override;
0063 
0064   /// Transform a batch
0065   ///
0066   /// The output batch will have the same guarantee as the input batch
0067   /// If this was the last batch this call may trigger Finish()
0068   virtual Result<ExecBatch> ProcessBatch(ExecBatch batch) = 0;
0069 
0070   /// Function called after all data has been received
0071   ///
0072   /// By default this does nothing.  Override this to provide a custom implementation.
0073   virtual void Finish();
0074 
0075  protected:
0076   // Counter for the number of batches received
0077   AtomicCounter input_counter_;
0078 };
0079 
0080 }  // namespace acero
0081 }  // namespace arrow