File indexing completed on 2025-08-28 08:26:57
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <algorithm>
0021 #include <cstddef>
0022 #include <cstdint>
0023 #include <memory>
0024 #include <string_view>
0025 #include <vector>
0026
0027 #include "arrow/buffer.h"
0028 #include "arrow/csv/options.h"
0029 #include "arrow/csv/type_fwd.h"
0030 #include "arrow/status.h"
0031 #include "arrow/util/macros.h"
0032 #include "arrow/util/visibility.h"
0033
0034 namespace arrow {
0035
0036 class MemoryPool;
0037
0038 namespace csv {
0039
0040
0041
0042
0043 ARROW_EXPORT
0044 int32_t SkipRows(const uint8_t* data, uint32_t size, int32_t num_rows,
0045 const uint8_t** out_data);
0046
0047 class BlockParserImpl;
0048
0049 namespace detail {
0050
0051 struct ParsedValueDesc {
0052 uint32_t offset : 31;
0053 bool quoted : 1;
0054 };
0055
0056 class ARROW_EXPORT DataBatch {
0057 public:
0058 explicit DataBatch(int32_t num_cols) : num_cols_(num_cols) {}
0059
0060
0061 int32_t num_rows() const { return num_rows_; }
0062
0063 int32_t num_cols() const { return num_cols_; }
0064
0065 uint32_t num_bytes() const { return parsed_size_; }
0066
0067 int32_t num_skipped_rows() const { return static_cast<int32_t>(skipped_rows_.size()); }
0068
0069 template <typename Visitor>
0070 Status VisitColumn(int32_t col_index, int64_t first_row, Visitor&& visit) const {
0071 using detail::ParsedValueDesc;
0072
0073 int32_t batch_row = 0;
0074 for (size_t buf_index = 0; buf_index < values_buffers_.size(); ++buf_index) {
0075 const auto& values_buffer = values_buffers_[buf_index];
0076 const auto values = reinterpret_cast<const ParsedValueDesc*>(values_buffer->data());
0077 const auto max_pos =
0078 static_cast<int32_t>(values_buffer->size() / sizeof(ParsedValueDesc)) - 1;
0079 for (int32_t pos = col_index; pos < max_pos; pos += num_cols_, ++batch_row) {
0080 auto start = values[pos].offset;
0081 auto stop = values[pos + 1].offset;
0082 auto quoted = values[pos + 1].quoted;
0083 Status status = visit(parsed_ + start, stop - start, quoted);
0084 if (ARROW_PREDICT_FALSE(!status.ok())) {
0085 return DecorateWithRowNumber(std::move(status), first_row, batch_row);
0086 }
0087 }
0088 }
0089 return Status::OK();
0090 }
0091
0092 template <typename Visitor>
0093 Status VisitLastRow(Visitor&& visit) const {
0094 using detail::ParsedValueDesc;
0095
0096 const auto& values_buffer = values_buffers_.back();
0097 const auto values = reinterpret_cast<const ParsedValueDesc*>(values_buffer->data());
0098 const auto start_pos =
0099 static_cast<int32_t>(values_buffer->size() / sizeof(ParsedValueDesc)) -
0100 num_cols_ - 1;
0101 for (int32_t col_index = 0; col_index < num_cols_; ++col_index) {
0102 auto start = values[start_pos + col_index].offset;
0103 auto stop = values[start_pos + col_index + 1].offset;
0104 auto quoted = values[start_pos + col_index + 1].quoted;
0105 ARROW_RETURN_NOT_OK(visit(parsed_ + start, stop - start, quoted));
0106 }
0107 return Status::OK();
0108 }
0109
0110 protected:
0111 Status DecorateWithRowNumber(Status&& status, int64_t first_row,
0112 int32_t batch_row) const {
0113 if (first_row >= 0) {
0114
0115
0116 const auto skips_before =
0117 std::upper_bound(skipped_rows_.begin(), skipped_rows_.end(), batch_row) -
0118 skipped_rows_.begin();
0119 status = status.WithMessage("Row #", batch_row + skips_before + first_row, ": ",
0120 status.message());
0121 }
0122
0123 ARROW_RETURN_IF_(true, std::move(status), ARROW_STRINGIFY(status));
0124 return std::move(status);
0125 }
0126
0127
0128 int32_t num_rows_ = 0;
0129
0130 int32_t num_cols_ = 0;
0131
0132
0133
0134 std::vector<std::shared_ptr<Buffer>> values_buffers_;
0135 std::shared_ptr<Buffer> parsed_buffer_;
0136 const uint8_t* parsed_ = NULLPTR;
0137 int32_t parsed_size_ = 0;
0138
0139
0140 std::vector<int32_t> skipped_rows_;
0141
0142 friend class ::arrow::csv::BlockParserImpl;
0143 };
0144
0145 }
0146
0147 constexpr int32_t kMaxParserNumRows = 100000;
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158
0159
0160
0161 class ARROW_EXPORT BlockParser {
0162 public:
0163 explicit BlockParser(ParseOptions options, int32_t num_cols = -1,
0164 int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows);
0165 explicit BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols = -1,
0166 int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows);
0167 ~BlockParser();
0168
0169
0170
0171
0172
0173 Status Parse(std::string_view data, uint32_t* out_size);
0174
0175
0176
0177
0178 Status Parse(const std::vector<std::string_view>& data, uint32_t* out_size);
0179
0180
0181
0182
0183
0184 Status ParseFinal(std::string_view data, uint32_t* out_size);
0185
0186
0187
0188
0189 Status ParseFinal(const std::vector<std::string_view>& data, uint32_t* out_size);
0190
0191
0192 int32_t num_rows() const { return parsed_batch().num_rows(); }
0193
0194 int32_t num_cols() const { return parsed_batch().num_cols(); }
0195
0196 uint32_t num_bytes() const { return parsed_batch().num_bytes(); }
0197
0198
0199 int32_t total_num_rows() const {
0200 return parsed_batch().num_rows() + parsed_batch().num_skipped_rows();
0201 }
0202
0203
0204 int64_t first_row_num() const;
0205
0206
0207
0208
0209
0210 template <typename Visitor>
0211 Status VisitColumn(int32_t col_index, Visitor&& visit) const {
0212 return parsed_batch().VisitColumn(col_index, first_row_num(),
0213 std::forward<Visitor>(visit));
0214 }
0215
0216 template <typename Visitor>
0217 Status VisitLastRow(Visitor&& visit) const {
0218 return parsed_batch().VisitLastRow(std::forward<Visitor>(visit));
0219 }
0220
0221 protected:
0222 std::unique_ptr<BlockParserImpl> impl_;
0223
0224 const detail::DataBatch& parsed_batch() const;
0225 };
0226
0227 }
0228 }