Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:57

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <string>
0021 
0022 #include "arrow/dataset/file_base.h"
0023 #include "arrow/record_batch.h"
0024 #include "arrow/status.h"
0025 #include "arrow/util/async_util.h"
0026 #include "arrow/util/future.h"
0027 
0028 namespace arrow {
0029 namespace dataset {
0030 namespace internal {
0031 
0032 // This lines up with our other defaults in the scanner and execution plan
0033 constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 8 * 1024 * 1024;
0034 
0035 /// \brief Utility class that manages a set of writers to different paths
0036 ///
0037 /// Writers may be closed and reopened (and a new file created) based on the dataset
0038 /// write options (for example, max_rows_per_file or max_open_files)
0039 ///
0040 /// The dataset writer enforces its own back pressure based on the # of rows (as opposed
0041 /// to # of batches which is how it is typically enforced elsewhere) and # of files.
0042 class ARROW_DS_EXPORT DatasetWriter {
0043  public:
0044   /// \brief Create a dataset writer
0045   ///
0046   /// Will fail if basename_template is invalid or if there is existing data and
0047   /// existing_data_behavior is kError
0048   ///
0049   /// \param write_options options to control how the data should be written
0050   /// \param max_rows_queued max # of rows allowed to be queued before the dataset_writer
0051   ///                        will ask for backpressure
0052   static Result<std::unique_ptr<DatasetWriter>> Make(
0053       FileSystemDatasetWriteOptions write_options, util::AsyncTaskScheduler* scheduler,
0054       std::function<void()> pause_callback, std::function<void()> resume_callback,
0055       std::function<void()> finish_callback,
0056       uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued);
0057 
0058   ~DatasetWriter();
0059 
0060   /// \brief Write a batch to the dataset
0061   /// \param[in] batch The batch to write
0062   /// \param[in] directory The directory to write to
0063   ///
0064   /// Note: The written filename will be {directory}/{filename_factory(i)} where i is a
0065   /// counter controlled by `max_open_files` and `max_rows_per_file`
0066   ///
0067   /// If multiple WriteRecordBatch calls arrive with the same `directory` then the batches
0068   /// may be written to the same file.
0069   ///
0070   /// The returned future will be marked finished when the record batch has been queued
0071   /// to be written.  If the returned future is unfinished then this indicates the dataset
0072   /// writer's queue is full and the data provider should pause.
0073   ///
0074   /// This method is NOT async reentrant.  The returned future will only be unfinished
0075   /// if back pressure needs to be applied.  Async reentrancy is not necessary for
0076   /// concurrent writes to happen.  Calling this method again before the previous future
0077   /// completes will not just violate max_rows_queued but likely lead to race conditions.
0078   ///
0079   /// One thing to note is that the ordering of your data can affect your maximum
0080   /// potential parallelism.  If this seems odd then consider a dataset where the first
0081   /// 1000 batches go to the same directory and then the 1001st batch goes to a different
0082   /// directory.  The only way to get two parallel writes immediately would be to queue
0083   /// all 1000 pending writes to the first directory.
0084   void WriteRecordBatch(std::shared_ptr<RecordBatch> batch, const std::string& directory,
0085                         const std::string& prefix = "");
0086 
0087   /// Finish all pending writes and close any open files
0088   void Finish();
0089 
0090  protected:
0091   DatasetWriter(FileSystemDatasetWriteOptions write_options,
0092                 util::AsyncTaskScheduler* scheduler, std::function<void()> pause_callback,
0093                 std::function<void()> resume_callback,
0094                 std::function<void()> finish_callback,
0095                 uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued);
0096 
0097   class DatasetWriterImpl;
0098   std::unique_ptr<DatasetWriterImpl> impl_;
0099 };
0100 
0101 }  // namespace internal
0102 }  // namespace dataset
0103 }  // namespace arrow