File indexing completed on 2025-08-28 08:26:57
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <memory>
0021 #include <string>
0022
0023 #include "arrow/csv/options.h"
0024 #include "arrow/dataset/dataset.h"
0025 #include "arrow/dataset/file_base.h"
0026 #include "arrow/dataset/type_fwd.h"
0027 #include "arrow/dataset/visibility.h"
0028 #include "arrow/ipc/type_fwd.h"
0029 #include "arrow/status.h"
0030 #include "arrow/util/compression.h"
0031
0032 namespace arrow {
0033 namespace dataset {
0034
0035 constexpr char kCsvTypeName[] = "csv";
0036
0037
0038
0039
0040
0041
0042 class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
0043 public:
0044
0045
0046 csv::ParseOptions parse_options = csv::ParseOptions::Defaults();
0047
0048 CsvFileFormat();
0049
0050 std::string type_name() const override { return kCsvTypeName; }
0051
0052 bool Equals(const FileFormat& other) const override;
0053
0054 Result<bool> IsSupported(const FileSource& source) const override;
0055
0056
0057 Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
0058
0059 Future<std::shared_ptr<FragmentScanner>> BeginScan(
0060 const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0061 const FragmentScanOptions* format_options,
0062 compute::ExecContext* exec_context) const override;
0063
0064 Result<RecordBatchGenerator> ScanBatchesAsync(
0065 const std::shared_ptr<ScanOptions>& scan_options,
0066 const std::shared_ptr<FileFragment>& file) const override;
0067
0068 Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0069 const FileSource& source, const FragmentScanOptions* format_options,
0070 compute::ExecContext* exec_context) const override;
0071
0072 Future<std::optional<int64_t>> CountRows(
0073 const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
0074 const std::shared_ptr<ScanOptions>& options) override;
0075
0076 Result<std::shared_ptr<FileWriter>> MakeWriter(
0077 std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
0078 std::shared_ptr<FileWriteOptions> options,
0079 fs::FileLocator destination_locator) const override;
0080
0081 std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
0082 };
0083
0084
0085 struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
0086 std::string type_name() const override { return kCsvTypeName; }
0087
0088 using StreamWrapFunc = std::function<Result<std::shared_ptr<io::InputStream>>(
0089 std::shared_ptr<io::InputStream>)>;
0090
0091
0092 csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults();
0093
0094
0095
0096
0097 csv::ReadOptions read_options = csv::ReadOptions::Defaults();
0098
0099
0100 csv::ParseOptions parse_options = csv::ParseOptions::Defaults();
0101
0102
0103
0104
0105
0106
0107 StreamWrapFunc stream_transform_func{};
0108 };
0109
0110 class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions {
0111 public:
0112
0113 std::shared_ptr<csv::WriteOptions> write_options;
0114
0115 protected:
0116 explicit CsvFileWriteOptions(std::shared_ptr<FileFormat> format)
0117 : FileWriteOptions(std::move(format)) {}
0118
0119 friend class CsvFileFormat;
0120 };
0121
0122 class ARROW_DS_EXPORT CsvFileWriter : public FileWriter {
0123 public:
0124 Status Write(const std::shared_ptr<RecordBatch>& batch) override;
0125
0126 private:
0127 CsvFileWriter(std::shared_ptr<io::OutputStream> destination,
0128 std::shared_ptr<ipc::RecordBatchWriter> writer,
0129 std::shared_ptr<Schema> schema,
0130 std::shared_ptr<CsvFileWriteOptions> options,
0131 fs::FileLocator destination_locator);
0132
0133 Future<> FinishInternal() override;
0134
0135 std::shared_ptr<io::OutputStream> destination_;
0136 std::shared_ptr<ipc::RecordBatchWriter> batch_writer_;
0137
0138 friend class CsvFileFormat;
0139 };
0140
0141
0142
0143 }
0144 }