Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:57

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <algorithm>
0021 #include <cstddef>
0022 #include <cstdint>
0023 #include <memory>
0024 #include <string_view>
0025 #include <vector>
0026 
0027 #include "arrow/buffer.h"
0028 #include "arrow/csv/options.h"
0029 #include "arrow/csv/type_fwd.h"
0030 #include "arrow/status.h"
0031 #include "arrow/util/macros.h"
0032 #include "arrow/util/visibility.h"
0033 
0034 namespace arrow {
0035 
0036 class MemoryPool;
0037 
0038 namespace csv {
0039 
0040 /// Skip at most num_rows from the given input.  The input pointer is updated
0041 /// and the number of actually skipped rows is returns (may be less than
0042 /// requested if the input is too short).
0043 ARROW_EXPORT
0044 int32_t SkipRows(const uint8_t* data, uint32_t size, int32_t num_rows,
0045                  const uint8_t** out_data);
0046 
0047 class BlockParserImpl;
0048 
0049 namespace detail {
0050 
0051 struct ParsedValueDesc {
0052   uint32_t offset : 31;
0053   bool quoted : 1;
0054 };
0055 
0056 class ARROW_EXPORT DataBatch {
0057  public:
0058   explicit DataBatch(int32_t num_cols) : num_cols_(num_cols) {}
0059 
0060   /// \brief Return the number of parsed rows (not skipped)
0061   int32_t num_rows() const { return num_rows_; }
0062   /// \brief Return the number of parsed columns
0063   int32_t num_cols() const { return num_cols_; }
0064   /// \brief Return the total size in bytes of parsed data
0065   uint32_t num_bytes() const { return parsed_size_; }
0066   /// \brief Return the number of skipped rows
0067   int32_t num_skipped_rows() const { return static_cast<int32_t>(skipped_rows_.size()); }
0068 
0069   template <typename Visitor>
0070   Status VisitColumn(int32_t col_index, int64_t first_row, Visitor&& visit) const {
0071     using detail::ParsedValueDesc;
0072 
0073     int32_t batch_row = 0;
0074     for (size_t buf_index = 0; buf_index < values_buffers_.size(); ++buf_index) {
0075       const auto& values_buffer = values_buffers_[buf_index];
0076       const auto values = reinterpret_cast<const ParsedValueDesc*>(values_buffer->data());
0077       const auto max_pos =
0078           static_cast<int32_t>(values_buffer->size() / sizeof(ParsedValueDesc)) - 1;
0079       for (int32_t pos = col_index; pos < max_pos; pos += num_cols_, ++batch_row) {
0080         auto start = values[pos].offset;
0081         auto stop = values[pos + 1].offset;
0082         auto quoted = values[pos + 1].quoted;
0083         Status status = visit(parsed_ + start, stop - start, quoted);
0084         if (ARROW_PREDICT_FALSE(!status.ok())) {
0085           return DecorateWithRowNumber(std::move(status), first_row, batch_row);
0086         }
0087       }
0088     }
0089     return Status::OK();
0090   }
0091 
0092   template <typename Visitor>
0093   Status VisitLastRow(Visitor&& visit) const {
0094     using detail::ParsedValueDesc;
0095 
0096     const auto& values_buffer = values_buffers_.back();
0097     const auto values = reinterpret_cast<const ParsedValueDesc*>(values_buffer->data());
0098     const auto start_pos =
0099         static_cast<int32_t>(values_buffer->size() / sizeof(ParsedValueDesc)) -
0100         num_cols_ - 1;
0101     for (int32_t col_index = 0; col_index < num_cols_; ++col_index) {
0102       auto start = values[start_pos + col_index].offset;
0103       auto stop = values[start_pos + col_index + 1].offset;
0104       auto quoted = values[start_pos + col_index + 1].quoted;
0105       ARROW_RETURN_NOT_OK(visit(parsed_ + start, stop - start, quoted));
0106     }
0107     return Status::OK();
0108   }
0109 
0110  protected:
0111   Status DecorateWithRowNumber(Status&& status, int64_t first_row,
0112                                int32_t batch_row) const {
0113     if (first_row >= 0) {
0114       // `skipped_rows_` is in ascending order by construction, so use bisection
0115       // to find out how many rows were skipped before `batch_row`.
0116       const auto skips_before =
0117           std::upper_bound(skipped_rows_.begin(), skipped_rows_.end(), batch_row) -
0118           skipped_rows_.begin();
0119       status = status.WithMessage("Row #", batch_row + skips_before + first_row, ": ",
0120                                   status.message());
0121     }
0122     // Use return_if so that when extra context is enabled it will be added
0123     ARROW_RETURN_IF_(true, std::move(status), ARROW_STRINGIFY(status));
0124     return std::move(status);
0125   }
0126 
0127   // The number of rows in this batch (not including any skipped ones)
0128   int32_t num_rows_ = 0;
0129   // The number of columns
0130   int32_t num_cols_ = 0;
0131 
0132   // XXX should we ensure the parsed buffer is padded with 8 or 16 excess zero bytes?
0133   // It may help with null parsing...
0134   std::vector<std::shared_ptr<Buffer>> values_buffers_;
0135   std::shared_ptr<Buffer> parsed_buffer_;
0136   const uint8_t* parsed_ = NULLPTR;
0137   int32_t parsed_size_ = 0;
0138 
0139   // Record the current num_rows_ each time a row is skipped
0140   std::vector<int32_t> skipped_rows_;
0141 
0142   friend class ::arrow::csv::BlockParserImpl;
0143 };
0144 
0145 }  // namespace detail
0146 
0147 constexpr int32_t kMaxParserNumRows = 100000;
0148 
0149 /// \class BlockParser
0150 /// \brief A reusable block-based parser for CSV data
0151 ///
0152 /// The parser takes a block of CSV data and delimits rows and fields,
0153 /// unquoting and unescaping them on the fly.  Parsed data is own by the
0154 /// parser, so the original buffer can be discarded after Parse() returns.
0155 ///
0156 /// If the block is truncated (i.e. not all data can be parsed), it is up
0157 /// to the caller to arrange the next block to start with the trailing data.
0158 /// Also, if the previous block ends with CR (0x0d) and a new block starts
0159 /// with LF (0x0a), the parser will consider the leading newline as an empty
0160 /// line; the caller should therefore strip it.
0161 class ARROW_EXPORT BlockParser {
0162  public:
0163   explicit BlockParser(ParseOptions options, int32_t num_cols = -1,
0164                        int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows);
0165   explicit BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols = -1,
0166                        int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows);
0167   ~BlockParser();
0168 
0169   /// \brief Parse a block of data
0170   ///
0171   /// Parse a block of CSV data, ingesting up to max_num_rows rows.
0172   /// The number of bytes actually parsed is returned in out_size.
0173   Status Parse(std::string_view data, uint32_t* out_size);
0174 
0175   /// \brief Parse sequential blocks of data
0176   ///
0177   /// Only the last block is allowed to be truncated.
0178   Status Parse(const std::vector<std::string_view>& data, uint32_t* out_size);
0179 
0180   /// \brief Parse the final block of data
0181   ///
0182   /// Like Parse(), but called with the final block in a file.
0183   /// The last row may lack a trailing line separator.
0184   Status ParseFinal(std::string_view data, uint32_t* out_size);
0185 
0186   /// \brief Parse the final sequential blocks of data
0187   ///
0188   /// Only the last block is allowed to be truncated.
0189   Status ParseFinal(const std::vector<std::string_view>& data, uint32_t* out_size);
0190 
0191   /// \brief Return the number of parsed rows
0192   int32_t num_rows() const { return parsed_batch().num_rows(); }
0193   /// \brief Return the number of parsed columns
0194   int32_t num_cols() const { return parsed_batch().num_cols(); }
0195   /// \brief Return the total size in bytes of parsed data
0196   uint32_t num_bytes() const { return parsed_batch().num_bytes(); }
0197 
0198   /// \brief Return the total number of rows including rows which were skipped
0199   int32_t total_num_rows() const {
0200     return parsed_batch().num_rows() + parsed_batch().num_skipped_rows();
0201   }
0202 
0203   /// \brief Return the row number of the first row in the block or -1 if unsupported
0204   int64_t first_row_num() const;
0205 
0206   /// \brief Visit parsed values in a column
0207   ///
0208   /// The signature of the visitor is
0209   /// Status(const uint8_t* data, uint32_t size, bool quoted)
0210   template <typename Visitor>
0211   Status VisitColumn(int32_t col_index, Visitor&& visit) const {
0212     return parsed_batch().VisitColumn(col_index, first_row_num(),
0213                                       std::forward<Visitor>(visit));
0214   }
0215 
0216   template <typename Visitor>
0217   Status VisitLastRow(Visitor&& visit) const {
0218     return parsed_batch().VisitLastRow(std::forward<Visitor>(visit));
0219   }
0220 
0221  protected:
0222   std::unique_ptr<BlockParserImpl> impl_;
0223 
0224   const detail::DataBatch& parsed_batch() const;
0225 };
0226 
0227 }  // namespace csv
0228 }  // namespace arrow