![]() |
|
|||
File indexing completed on 2025-08-28 08:26:57
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <string> 0021 0022 #include "arrow/dataset/file_base.h" 0023 #include "arrow/record_batch.h" 0024 #include "arrow/status.h" 0025 #include "arrow/util/async_util.h" 0026 #include "arrow/util/future.h" 0027 0028 namespace arrow { 0029 namespace dataset { 0030 namespace internal { 0031 0032 // This lines up with our other defaults in the scanner and execution plan 0033 constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 8 * 1024 * 1024; 0034 0035 /// \brief Utility class that manages a set of writers to different paths 0036 /// 0037 /// Writers may be closed and reopened (and a new file created) based on the dataset 0038 /// write options (for example, max_rows_per_file or max_open_files) 0039 /// 0040 /// The dataset writer enforces its own back pressure based on the # of rows (as opposed 0041 /// to # of batches which is how it is typically enforced elsewhere) and # of files. 0042 class ARROW_DS_EXPORT DatasetWriter { 0043 public: 0044 /// \brief Create a dataset writer 0045 /// 0046 /// Will fail if basename_template is invalid or if there is existing data and 0047 /// existing_data_behavior is kError 0048 /// 0049 /// \param write_options options to control how the data should be written 0050 /// \param max_rows_queued max # of rows allowed to be queued before the dataset_writer 0051 /// will ask for backpressure 0052 static Result<std::unique_ptr<DatasetWriter>> Make( 0053 FileSystemDatasetWriteOptions write_options, util::AsyncTaskScheduler* scheduler, 0054 std::function<void()> pause_callback, std::function<void()> resume_callback, 0055 std::function<void()> finish_callback, 0056 uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued); 0057 0058 ~DatasetWriter(); 0059 0060 /// \brief Write a batch to the dataset 0061 /// \param[in] batch The batch to write 0062 /// \param[in] directory The directory to write to 0063 /// 0064 /// Note: The written filename will be {directory}/{filename_factory(i)} where i is a 0065 /// counter controlled by `max_open_files` and `max_rows_per_file` 0066 /// 0067 /// If multiple WriteRecordBatch calls arrive with the same `directory` then the batches 0068 /// may be written to the same file. 0069 /// 0070 /// The returned future will be marked finished when the record batch has been queued 0071 /// to be written. If the returned future is unfinished then this indicates the dataset 0072 /// writer's queue is full and the data provider should pause. 0073 /// 0074 /// This method is NOT async reentrant. The returned future will only be unfinished 0075 /// if back pressure needs to be applied. Async reentrancy is not necessary for 0076 /// concurrent writes to happen. Calling this method again before the previous future 0077 /// completes will not just violate max_rows_queued but likely lead to race conditions. 0078 /// 0079 /// One thing to note is that the ordering of your data can affect your maximum 0080 /// potential parallelism. If this seems odd then consider a dataset where the first 0081 /// 1000 batches go to the same directory and then the 1001st batch goes to a different 0082 /// directory. The only way to get two parallel writes immediately would be to queue 0083 /// all 1000 pending writes to the first directory. 0084 void WriteRecordBatch(std::shared_ptr<RecordBatch> batch, const std::string& directory, 0085 const std::string& prefix = ""); 0086 0087 /// Finish all pending writes and close any open files 0088 void Finish(); 0089 0090 protected: 0091 DatasetWriter(FileSystemDatasetWriteOptions write_options, 0092 util::AsyncTaskScheduler* scheduler, std::function<void()> pause_callback, 0093 std::function<void()> resume_callback, 0094 std::function<void()> finish_callback, 0095 uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued); 0096 0097 class DatasetWriterImpl; 0098 std::unique_ptr<DatasetWriterImpl> impl_; 0099 }; 0100 0101 } // namespace internal 0102 } // namespace dataset 0103 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |