Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:57

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <memory>
0021 #include <string>
0022 
0023 #include "arrow/csv/options.h"
0024 #include "arrow/dataset/dataset.h"
0025 #include "arrow/dataset/file_base.h"
0026 #include "arrow/dataset/type_fwd.h"
0027 #include "arrow/dataset/visibility.h"
0028 #include "arrow/ipc/type_fwd.h"
0029 #include "arrow/status.h"
0030 #include "arrow/util/compression.h"
0031 
0032 namespace arrow {
0033 namespace dataset {
0034 
0035 constexpr char kCsvTypeName[] = "csv";
0036 
0037 /// \addtogroup dataset-file-formats
0038 ///
0039 /// @{
0040 
0041 /// \brief A FileFormat implementation that reads from and writes to Csv files
0042 class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
0043  public:
0044   // TODO(ARROW-18328) Remove this, moved to CsvFragmentScanOptions
0045   /// Options affecting the parsing of CSV files
0046   csv::ParseOptions parse_options = csv::ParseOptions::Defaults();
0047 
0048   CsvFileFormat();
0049 
0050   std::string type_name() const override { return kCsvTypeName; }
0051 
0052   bool Equals(const FileFormat& other) const override;
0053 
0054   Result<bool> IsSupported(const FileSource& source) const override;
0055 
0056   /// \brief Return the schema of the file if possible.
0057   Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
0058 
0059   Future<std::shared_ptr<FragmentScanner>> BeginScan(
0060       const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0061       const FragmentScanOptions* format_options,
0062       compute::ExecContext* exec_context) const override;
0063 
0064   Result<RecordBatchGenerator> ScanBatchesAsync(
0065       const std::shared_ptr<ScanOptions>& scan_options,
0066       const std::shared_ptr<FileFragment>& file) const override;
0067 
0068   Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0069       const FileSource& source, const FragmentScanOptions* format_options,
0070       compute::ExecContext* exec_context) const override;
0071 
0072   Future<std::optional<int64_t>> CountRows(
0073       const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
0074       const std::shared_ptr<ScanOptions>& options) override;
0075 
0076   Result<std::shared_ptr<FileWriter>> MakeWriter(
0077       std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
0078       std::shared_ptr<FileWriteOptions> options,
0079       fs::FileLocator destination_locator) const override;
0080 
0081   std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
0082 };
0083 
0084 /// \brief Per-scan options for CSV fragments
0085 struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
0086   std::string type_name() const override { return kCsvTypeName; }
0087 
0088   using StreamWrapFunc = std::function<Result<std::shared_ptr<io::InputStream>>(
0089       std::shared_ptr<io::InputStream>)>;
0090 
0091   /// CSV conversion options
0092   csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults();
0093 
0094   /// CSV reading options
0095   ///
0096   /// Note that use_threads is always ignored.
0097   csv::ReadOptions read_options = csv::ReadOptions::Defaults();
0098 
0099   /// CSV parse options
0100   csv::ParseOptions parse_options = csv::ParseOptions::Defaults();
0101 
0102   /// Optional stream wrapping function
0103   ///
0104   /// If defined, all open dataset file fragments will be passed
0105   /// through this function.  One possible use case is to transparently
0106   /// transcode all input files from a given character set to utf8.
0107   StreamWrapFunc stream_transform_func{};
0108 };
0109 
0110 class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions {
0111  public:
0112   /// Options passed to csv::MakeCSVWriter.
0113   std::shared_ptr<csv::WriteOptions> write_options;
0114 
0115  protected:
0116   explicit CsvFileWriteOptions(std::shared_ptr<FileFormat> format)
0117       : FileWriteOptions(std::move(format)) {}
0118 
0119   friend class CsvFileFormat;
0120 };
0121 
0122 class ARROW_DS_EXPORT CsvFileWriter : public FileWriter {
0123  public:
0124   Status Write(const std::shared_ptr<RecordBatch>& batch) override;
0125 
0126  private:
0127   CsvFileWriter(std::shared_ptr<io::OutputStream> destination,
0128                 std::shared_ptr<ipc::RecordBatchWriter> writer,
0129                 std::shared_ptr<Schema> schema,
0130                 std::shared_ptr<CsvFileWriteOptions> options,
0131                 fs::FileLocator destination_locator);
0132 
0133   Future<> FinishInternal() override;
0134 
0135   std::shared_ptr<io::OutputStream> destination_;
0136   std::shared_ptr<ipc::RecordBatchWriter> batch_writer_;
0137 
0138   friend class CsvFileFormat;
0139 };
0140 
0141 /// @}
0142 
0143 }  // namespace dataset
0144 }  // namespace arrow