This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0b5bb565a7 [feature-wip](parquet-reader) parquet dictionary decoder
(#11981)
0b5bb565a7 is described below
commit 0b5bb565a7497463e79e1efce7f6739e48b1deba
Author: Ashin Gau <[email protected]>
AuthorDate: Fri Aug 26 19:24:37 2022 +0800
[feature-wip](parquet-reader) parquet dictionary decoder (#11981)
Parse parquet data with dictionary encoding.
Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0
specification.
Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page
for Parquet 2.0+ files.
refer: https://github.com/apache/parquet-format/blob/master/Encodings.md
---
be/src/util/rle_encoding.h | 291 +++++++++++++++++++
be/src/vec/exec/format/parquet/parquet_common.cpp | 176 +++++++-----
be/src/vec/exec/format/parquet/parquet_common.h | 151 ++++++----
.../parquet/vparquet_column_chunk_reader.cpp | 56 +++-
.../format/parquet/vparquet_column_chunk_reader.h | 4 +-
.../test_data/parquet_scanner/dict-decoder.parquet | Bin 0 -> 4282 bytes
.../test_data/parquet_scanner/dict-decoder.txt | 16 ++
.../test_data/parquet_scanner/type-decoder.txt | 14 +
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 315 ++++++---------------
9 files changed, 657 insertions(+), 366 deletions(-)
diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h
index 08a7a23a4d..1409473a09 100644
--- a/be/src/util/rle_encoding.h
+++ b/be/src/util/rle_encoding.h
@@ -568,4 +568,295 @@ inline void RleEncoder<T>::Clear() {
bit_writer_.Clear();
}
+// Copy from
https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h
+// Utility classes to do run length encoding (RLE) for fixed bit width values.
If runs
+// are sufficiently long, RLE is used, otherwise, the values are just
bit-packed
+// (literal encoding).
+//
+// For both types of runs, there is a byte-aligned indicator which encodes the
length
+// of the run and the type of the run.
+//
+// This encoding has the benefit that when there aren't any long enough runs,
values
+// are always decoded at fixed (can be precomputed) bit offsets OR both the
value and
+// the run length are byte aligned. This allows for very efficient decoding
+// implementations.
+// The encoding is:
+// encoded-block := run*
+// run := literal-run | repeated-run
+// literal-run := literal-indicator < literal bytes >
+// repeated-run := repeated-indicator < repeated value. padded to byte
boundary >
+// literal-indicator := varint_encode( number_of_groups << 1 | 1)
+// repeated-indicator := varint_encode( number_of_repetitions << 1 )
+//
+// Each run is preceded by a varint. The varint's least significant bit is
+// used to indicate whether the run is a literal run or a repeated run. The
rest
+// of the varint is used to determine the length of the run (eg how many times
the
+// value repeats).
+//
+// In the case of literal runs, the run length is always a multiple of 8 (i.e.
encode
+// in groups of 8), so that no matter the bit-width of the value, the sequence
will end
+// on a byte boundary without padding.
+// Given that we know it is a multiple of 8, we store the number of 8-groups
rather than
+// the actual number of encoded ints. (This means that the total number of
encoded values
+// can not be determined from the encoded data, since the number of values in
the last
+// group may not be a multiple of 8). For the last group of literal runs, we
pad
+// the group to 8 with zeros. This allows for 8 at a time decoding on the read
side
+// without the need for additional checks.
+//
+// There is a break-even point when it is more storage efficient to do run
length
+// encoding. For 1 bit-width values, that point is 8 values. They require 2
bytes
+// for both the repeated encoding or the literal encoding. This value can
always
+// be computed based on the bit-width.
+// TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but
more
+// investigation is needed to do this efficiently, see the reverted
IMPALA-6658.
+// TODO: think about how to use this for strings. The bit packing isn't quite
the same.
+//
+// Examples with bit-width 1 (eg encoding booleans):
+// ----------------------------------------
+// 100 1s followed by 100 0s:
+// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1
byte>
+// - (total 4 bytes)
+//
+// alternating 1s and 0s (200 total):
+// 200 ints = 25 groups of 8
+// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
+// (total 26 bytes, 1 byte overhead)
+
+// RLE decoder with a batch-oriented interface that enables fast decoding.
+// Users of this class must first initialize the class to point to a buffer of
+// RLE-encoded data, passed into the constructor or Reset(). The provided
+// bit_width must be at most min(sizeof(T) * 8,
BatchedBitReader::MAX_BITWIDTH).
+// Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to
+// see if the next run is a repeated or literal run, then calling
+// GetRepeatedValue() or GetLiteralValues() respectively to read the values.
+//
+// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
+// Other decoding errors are signalled by functions returning false. If an
+// error is encountered then it is not valid to read any more data until
+// Reset() is called.
+
+template <typename T>
+class RleBatchDecoder {
+public:
+ RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
+ Reset(buffer, buffer_len, bit_width);
+ }
+
+ RleBatchDecoder() = default;
+
+ // Reset the decoder to read from a new buffer.
+ void Reset(uint8_t* buffer, int buffer_len, int bit_width);
+
+ // Return the size of the current repeated run. Returns zero if the
current run is
+ // a literal run or if no more runs can be read from the input.
+ int32_t NextNumRepeats();
+
+ // Get the value of the current repeated run and consume the given number
of repeats.
+ // Only valid to call when NextNumRepeats() > 0. The given number of
repeats cannot
+ // be greater than the remaining number of repeats in the run.
'num_repeats_to_consume'
+ // can be set to 0 to peek at the value without consuming repeats.
+ T GetRepeatedValue(int32_t num_repeats_to_consume);
+
+ // Return the size of the current literal run. Returns zero if the current
run is
+ // a repeated run or if no more runs can be read from the input.
+ int32_t NextNumLiterals();
+
+ // Consume 'num_literals_to_consume' literals from the current literal run,
+ // copying the values to 'values'. 'num_literals_to_consume' must be <=
+ // NextNumLiterals(). Returns true if the requested number of literals were
+ // successfully read or false if an error was encountered, e.g. the input
was
+ // truncated.
+ bool GetLiteralValues(int32_t num_literals_to_consume, T* values)
WARN_UNUSED_RESULT;
+
+ // Consume 'num_values_to_consume' values and copy them to 'values'.
+ // Returns the number of consumed values or 0 if an error occurred.
+ int32_t GetBatch(T* values, int32_t batch_num);
+
+private:
+ // Called when both 'literal_count_' and 'repeat_count_' have been
exhausted.
+ // Sets either 'literal_count_' or 'repeat_count_' to the size of the next
literal
+ // or repeated run, or leaves both at 0 if no more values can be read
(either because
+ // the end of the input was reached or an error was encountered decoding).
+ void NextCounts();
+
+ /// Fill the literal buffer. Invalid to call if there are already buffered
literals.
+ /// Return false if the input was truncated. This does not advance
'literal_count_'.
+ bool FillLiteralBuffer() WARN_UNUSED_RESULT;
+
+ bool HaveBufferedLiterals() const { return literal_buffer_pos_ <
num_buffered_literals_; }
+
+ /// Output buffered literals, advancing 'literal_buffer_pos_' and
decrementing
+ /// 'literal_count_'. Returns the number of literals outputted.
+ int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
+
+ BatchedBitReader bit_reader_;
+
+ // Number of bits needed to encode the value. Must be between 0 and 64
after
+ // the decoder is initialized with a buffer. -1 indicates the decoder was
not
+ // initialized.
+ int bit_width_ = -1;
+
+ // If a repeated run, the number of repeats remaining in the current run
to be read.
+ // If the current run is a literal run, this is 0.
+ int32_t repeat_count_ = 0;
+
+ // If a literal run, the number of literals remaining in the current run
to be read.
+ // If the current run is a repeated run, this is 0.
+ int32_t literal_count_ = 0;
+
+ // If a repeated run, the current repeated value.
+ T repeated_value_;
+
+ // Size of buffer for literal values. Large enough to decode a full batch
of 32
+ // literals. The buffer is needed to allow clients to read in batches that
are not
+ // multiples of 32.
+ static constexpr int LITERAL_BUFFER_LEN = 32;
+
+ // Buffer containing 'num_buffered_literals_' values.
'literal_buffer_pos_' is the
+ // position of the next literal to be read from the buffer.
+ T literal_buffer_[LITERAL_BUFFER_LEN];
+ int num_buffered_literals_ = 0;
+ int literal_buffer_pos_ = 0;
+};
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t
max_to_output, T* values) {
+ int32_t num_to_output =
+ std::min<int32_t>(max_to_output, num_buffered_literals_ -
literal_buffer_pos_);
+ memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) *
num_to_output);
+ literal_buffer_pos_ += num_to_output;
+ literal_count_ -= num_to_output;
+ return num_to_output;
+}
+
+template <typename T>
+inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int
bit_width) {
+ bit_reader_.Reset(buffer, buffer_len);
+ bit_width_ = bit_width;
+ repeat_count_ = 0;
+ literal_count_ = 0;
+ num_buffered_literals_ = 0;
+ literal_buffer_pos_ = 0;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumRepeats() {
+ if (repeat_count_ > 0) return repeat_count_;
+ if (literal_count_ == 0) NextCounts();
+ return repeat_count_;
+}
+
+template <typename T>
+inline void RleBatchDecoder<T>::NextCounts() {
+ // Read the next run's indicator int, it could be a literal or repeated
run.
+ // The int is encoded as a ULEB128-encoded value.
+ uint32_t indicator_value = 0;
+ if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) {
+ return;
+ }
+
+ // lsb indicates if it is a literal run or repeated run
+ bool is_literal = indicator_value & 1;
+
+ // Don't try to handle run lengths that don't fit in an int32_t - just
fail gracefully.
+ // The Parquet standard does not allow longer runs - see PARQUET-1290.
+ uint32_t run_len = indicator_value >> 1;
+ if (is_literal) {
+ // Use int64_t to avoid overflowing multiplication.
+ int64_t literal_count = static_cast<int64_t>(run_len) * 8;
+ if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max()))
return;
+ literal_count_ = literal_count;
+ } else {
+ if (UNLIKELY(run_len == 0)) return;
+ bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8),
&repeated_value_);
+ if (UNLIKELY(!result)) return;
+ repeat_count_ = run_len;
+ }
+}
+
+template <typename T>
+inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
+ repeat_count_ -= num_repeats_to_consume;
+ return repeated_value_;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumLiterals() {
+ if (literal_count_ > 0) return literal_count_;
+ if (repeat_count_ == 0) NextCounts();
+ return literal_count_;
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::GetLiteralValues(int32_t
num_literals_to_consume, T* values) {
+ int32_t num_consumed = 0;
+ // Copy any buffered literals left over from previous calls.
+ if (HaveBufferedLiterals()) {
+ num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
+ }
+
+ int32_t num_remaining = num_literals_to_consume - num_consumed;
+ // Copy literals directly to the output, bypassing 'literal_buffer_' when
possible.
+ // Need to round to a batch of 32 if the caller is consuming only part of
the current
+ // run avoid ending on a non-byte boundary.
+ int32_t num_to_bypass =
+ std::min<int32_t>(literal_count_,
BitUtil::RoundDownToPowerOf2(num_remaining, 32));
+ if (num_to_bypass > 0) {
+ int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass,
values + num_consumed);
+ // If we couldn't read the expected number, that means the input was
truncated.
+ if (num_read < num_to_bypass) return false;
+ literal_count_ -= num_to_bypass;
+ num_consumed += num_to_bypass;
+ num_remaining = num_literals_to_consume - num_consumed;
+ }
+
+ if (num_remaining > 0) {
+ // We weren't able to copy all the literals requested directly from
the input.
+ // Buffer literals and copy over the requested number.
+ if (UNLIKELY(!FillLiteralBuffer())) return false;
+ OutputBufferedLiterals(num_remaining, values + num_consumed);
+ }
+ return true;
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::FillLiteralBuffer() {
+ int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN,
literal_count_);
+ num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_,
num_to_buffer, literal_buffer_);
+ // If we couldn't read the expected number, that means the input was
truncated.
+ if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
+ literal_buffer_pos_ = 0;
+ return true;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::GetBatch(T* values, int32_t batch_num) {
+ int32_t num_consumed = 0;
+ while (num_consumed < batch_num) {
+ // Add RLE encoded values by repeating the current value this number
of times.
+ int32_t num_repeats = NextNumRepeats();
+ if (num_repeats > 0) {
+ int32_t num_repeats_to_set = std::min(num_repeats, batch_num -
num_consumed);
+ T repeated_value = GetRepeatedValue(num_repeats_to_set);
+ for (int i = 0; i < num_repeats_to_set; ++i) {
+ values[num_consumed + i] = repeated_value;
+ }
+ num_consumed += num_repeats_to_set;
+ continue;
+ }
+
+ // Add remaining literal values, if any.
+ int32_t num_literals = NextNumLiterals();
+ if (num_literals == 0) {
+ break;
+ }
+ int32_t num_literals_to_set = std::min(num_literals, batch_num -
num_consumed);
+ if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) {
+ return 0;
+ }
+ num_consumed += num_literals_to_set;
+ }
+ return num_consumed;
+}
+
} // namespace doris
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index be4ec35223..347db41d86 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -44,12 +44,16 @@ Status Decoder::get_decoder(tparquet::Type::type type,
tparquet::Encoding::type
std::unique_ptr<Decoder>& decoder) {
switch (encoding) {
case tparquet::Encoding::PLAIN:
+ case tparquet::Encoding::RLE_DICTIONARY:
switch (type) {
case tparquet::Type::BOOLEAN:
+ if (encoding != tparquet::Encoding::PLAIN) {
+ return Status::InternalError("Bool type can't has dictionary
page");
+ }
decoder.reset(new BoolPlainDecoder());
break;
case tparquet::Type::BYTE_ARRAY:
- decoder.reset(new ByteArrayPlainDecoder());
+ decoder.reset(new ByteArrayDecoder());
break;
case tparquet::Type::INT32:
case tparquet::Type::INT64:
@@ -57,14 +61,12 @@ Status Decoder::get_decoder(tparquet::Type::type type,
tparquet::Encoding::type
case tparquet::Type::FLOAT:
case tparquet::Type::DOUBLE:
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
- decoder.reset(new PlainDecoder(type));
+ decoder.reset(new FixLengthDecoder(type));
break;
default:
- return Status::InternalError("Unsupported plain type {} in parquet
decoder",
+ return Status::InternalError("Unsupported type {} in parquet
decoder",
tparquet::to_string(type));
}
- case tparquet::Encoding::RLE_DICTIONARY:
- break;
default:
return Status::InternalError("Unsupported encoding {} in parquet
decoder",
tparquet::to_string(encoding));
@@ -118,39 +120,55 @@ Status Decoder::decode_values(ColumnPtr& doris_column,
DataTypePtr& data_type, s
return decode_values(data_column, data_type, num_values);
}
-Status PlainDecoder::decode_values(Slice& slice, size_t num_values) {
- size_t to_read_bytes = _type_length * num_values;
- if (UNLIKELY(to_read_bytes > slice.size)) {
- return Status::IOError("Slice does not have enough space to write out
the decoding data");
- }
- memcpy(slice.data, _data->data + _offset, to_read_bytes);
- _offset += to_read_bytes;
+Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t
dict_size) {
+ _has_dict = true;
+ _dict = std::move(dict);
return Status::OK();
}
-Status PlainDecoder::skip_values(size_t num_values) {
- _offset += _type_length * num_values;
- if (UNLIKELY(_offset > _data->size)) {
- return Status::IOError("Out-of-bounds access in parquet data decoder");
+void FixLengthDecoder::set_data(Slice* data) {
+ _data = data;
+ _offset = 0;
+ if (_has_dict) {
+ uint8_t bit_width = *data->data;
+ _index_batch_decoder.reset(
+ new
RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1,
+ static_cast<int>(data->size) -
1, bit_width));
+ }
+}
+
+Status FixLengthDecoder::skip_values(size_t num_values) {
+ if (_has_dict) {
+ _indexes.resize(num_values);
+ _index_batch_decoder->GetBatch(&_indexes[0], num_values);
+ } else {
+ _offset += _type_length * num_values;
+ if (UNLIKELY(_offset > _data->size)) {
+ return Status::IOError("Out-of-bounds access in parquet data
decoder");
+ }
}
return Status::OK();
}
-Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t
num_values,
- size_t real_length) {
+Status FixLengthDecoder::_decode_short_int(MutableColumnPtr& doris_column,
size_t num_values,
+ size_t real_length) {
if (UNLIKELY(_physical_type != tparquet::Type::INT32)) {
return Status::InternalError("Short int can only be decoded from
INT32");
}
for (int i = 0; i < num_values; ++i) {
- doris_column->insert_data(_data->data + _offset, real_length);
- _offset += _type_length;
+ char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+ doris_column->insert_data(buf_start, real_length);
+ _FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
-Status PlainDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
- size_t num_values) {
- if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
+Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
+ size_t num_values) {
+ if (_has_dict) {
+ _indexes.resize(num_values);
+ _index_batch_decoder->GetBatch(&_indexes[0], num_values);
+ } else if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
@@ -228,8 +246,9 @@ Status PlainDecoder::decode_values(MutableColumnPtr&
doris_column, DataTypePtr&
case TypeIndex::FixedString:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
for (int i = 0; i < num_values; ++i) {
- doris_column->insert_data(_data->data + _offset, _type_length);
- _offset += _type_length;
+ char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+ doris_column->insert_data(buf_start, _type_length);
+ _FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
@@ -243,48 +262,37 @@ Status PlainDecoder::decode_values(MutableColumnPtr&
doris_column, DataTypePtr&
getTypeName(data_type->get_type_id()));
}
-Status ByteArrayPlainDecoder::decode_values(Slice& slice, size_t num_values) {
- uint32_t slice_offset = 0;
- for (int i = 0; i < num_values; ++i) {
- if (UNLIKELY(_offset + 4 > _data->size)) {
- return Status::IOError("Can't read byte array length from plain
decoder");
- }
- uint32_t length =
- decode_fixed32_le(reinterpret_cast<const
uint8_t*>(_data->data) + _offset);
- _offset += 4;
- if (UNLIKELY(_offset + length) > _data->size) {
- return Status::IOError("Can't read enough bytes in plain decoder");
- }
- memcpy(slice.data + slice_offset, _data->data + _offset, length);
- slice_offset += length + 1;
- slice.data[slice_offset - 1] = '\0';
- _offset += length;
+Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t
dict_size) {
+ _has_dict = true;
+ _dict = std::move(dict);
+ _dict_offsets.resize(dict_size + 1);
+ uint32_t offset_cursor = 0;
+ for (int i = 0; i < dict_size; ++i) {
+ uint32_t length = decode_fixed32_le(_dict.get() + offset_cursor);
+ offset_cursor += 4;
+ _dict_offsets[i] = offset_cursor;
+ offset_cursor += length;
}
+ _dict_offsets[dict_size] = offset_cursor + 4;
return Status::OK();
}
-Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
- for (int i = 0; i < num_values; ++i) {
- if (UNLIKELY(_offset + 4 > _data->size)) {
- return Status::IOError("Can't read byte array length from plain
decoder");
- }
- uint32_t length =
- decode_fixed32_le(reinterpret_cast<const
uint8_t*>(_data->data) + _offset);
- _offset += 4;
- if (UNLIKELY(_offset + length) > _data->size) {
- return Status::IOError("Can't skip enough bytes in plain decoder");
- }
- _offset += length;
+void ByteArrayDecoder::set_data(Slice* data) {
+ _data = data;
+ _offset = 0;
+ if (_has_dict) {
+ uint8_t bit_width = *data->data;
+ _index_batch_decoder.reset(
+ new
RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1,
+ static_cast<int>(data->size) -
1, bit_width));
}
- return Status::OK();
}
-Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
- size_t num_values) {
- TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
- switch (logical_type) {
- case TypeIndex::String:
- case TypeIndex::FixedString:
+Status ByteArrayDecoder::skip_values(size_t num_values) {
+ if (_has_dict) {
+ _indexes.resize(num_values);
+ _index_batch_decoder->GetBatch(&_indexes[0], num_values);
+ } else {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from
plain decoder");
@@ -293,11 +301,44 @@ Status
ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, Data
decode_fixed32_le(reinterpret_cast<const
uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
- return Status::IOError("Can't read enough bytes in plain
decoder");
+ return Status::IOError("Can't skip enough bytes in plain
decoder");
}
- doris_column->insert_data(_data->data + _offset, length);
_offset += length;
}
+ }
+ return Status::OK();
+}
+
+Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
+ size_t num_values) {
+ if (_has_dict) {
+ _indexes.resize(num_values);
+ _index_batch_decoder->GetBatch(&_indexes[0], num_values);
+ }
+ TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
+ switch (logical_type) {
+ case TypeIndex::String:
+ case TypeIndex::FixedString:
+ for (int i = 0; i < num_values; ++i) {
+ if (_has_dict) {
+ uint32_t idx = _indexes[i];
+ uint32_t idx_cursor = _dict_offsets[idx];
+ char* buff_start = reinterpret_cast<char*>(_dict.get() +
idx_cursor);
+ doris_column->insert_data(buff_start, _dict_offsets[idx + 1] -
idx_cursor - 4);
+ } else {
+ if (UNLIKELY(_offset + 4 > _data->size)) {
+ return Status::IOError("Can't read byte array length from
plain decoder");
+ }
+ uint32_t length =
+ decode_fixed32_le(reinterpret_cast<const
uint8_t*>(_data->data) + _offset);
+ _offset += 4;
+ if (UNLIKELY(_offset + length) > _data->size) {
+ return Status::IOError("Can't read enough bytes in plain
decoder");
+ }
+ doris_column->insert_data(_data->data + _offset, length);
+ _offset += length;
+ }
+ }
return Status::OK();
case TypeIndex::Decimal32:
return _decode_binary_decimal<Int32>(doris_column, data_type,
num_values);
@@ -313,17 +354,6 @@ Status
ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, Data
getTypeName(data_type->get_type_id()));
}
-Status BoolPlainDecoder::decode_values(Slice& slice, size_t num_values) {
- bool value;
- for (int i = 0; i < num_values; ++i) {
- if (UNLIKELY(!_decode_value(&value))) {
- return Status::IOError("Can't read enough booleans in plain
decoder");
- }
- slice.data[i] = value ? 1 : 0;
- }
- return Status::OK();
-}
-
Status BoolPlainDecoder::skip_values(size_t num_values) {
int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_,
(int)num_values);
unpacked_value_idx_ += skip_cached;
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h
b/be/src/vec/exec/format/parquet/parquet_common.h
index 3620721ddd..9504fd32b5 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -25,6 +25,7 @@
#include "schema_desc.h"
#include "util/bit_stream_utils.inline.h"
#include "util/coding.h"
+#include "util/rle_encoding.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
@@ -110,10 +111,12 @@ public:
virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
size_t num_values) = 0;
- virtual Status decode_values(Slice& slice, size_t num_values) = 0;
-
virtual Status skip_values(size_t num_values) = 0;
+ virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t
dict_size) {
+ return Status::NotSupported("set_dict is not supported");
+ }
+
protected:
int32_t _type_length;
Slice* _data = nullptr;
@@ -146,33 +149,25 @@ void Decoder::init_decimal_converter(DataTypePtr&
data_type) {
}
}
-class PlainDecoder final : public Decoder {
+class FixLengthDecoder final : public Decoder {
public:
- PlainDecoder(tparquet::Type::type physical_type) :
_physical_type(physical_type) {};
- ~PlainDecoder() override = default;
+ FixLengthDecoder(tparquet::Type::type physical_type) :
_physical_type(physical_type) {};
+ ~FixLengthDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
size_t num_values) override;
- Status decode_values(Slice& slice, size_t num_values) override;
-
Status skip_values(size_t num_values) override;
+ Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size)
override;
+
+ void set_data(Slice* data) override;
+
protected:
Status _decode_short_int(MutableColumnPtr& doris_column, size_t
num_values, size_t real_length);
template <typename Numeric>
- Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values) {
- size_t to_read_bytes = _type_length * num_values;
- if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
- return Status::IOError("Out-of-bounds access in parquet data
decoder");
- }
- auto& column_data =
static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
- const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data +
_offset);
- column_data.insert(raw_data, raw_data + num_values);
- _offset += to_read_bytes;
- return Status::OK();
- }
+ Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values);
template <typename CppType, typename ColumnType>
Status _decode_date(MutableColumnPtr& doris_column, TypeIndex&
logical_type, size_t num_values);
@@ -193,16 +188,44 @@ protected:
Status _decode_primitive_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
size_t num_values);
+#define _FIXED_GET_DATA_OFFSET(index)
\
+ _has_dict ? reinterpret_cast<char*>(_dict.get() + _indexes[index] *
_type_length) \
+ : _data->data + _offset
+
+#define _FIXED_SHIFT_DATA_OFFSET() \
+ if (!_has_dict) _offset += _type_length
+
tparquet::Type::type _physical_type;
+ // For dictionary encoding
+ bool _has_dict = false;
+ std::unique_ptr<uint8_t[]> _dict = nullptr;
+ std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
+ std::vector<uint32_t> _indexes;
};
+template <typename Numeric>
+Status FixLengthDecoder::_decode_numeric(MutableColumnPtr& doris_column,
size_t num_values) {
+ if (_has_dict) {
+ for (int i = 0; i < num_values; ++i) {
+ char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+ doris_column->insert_data(buf_start, _type_length);
+ }
+ } else {
+ auto& column_data =
static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
+ const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data +
_offset);
+ column_data.insert(raw_data, raw_data + num_values);
+ _offset += _type_length * num_values;
+ }
+ return Status::OK();
+}
+
template <typename CppType, typename ColumnType>
-Status PlainDecoder::_decode_date(MutableColumnPtr& doris_column, TypeIndex&
logical_type,
- size_t num_values) {
+Status FixLengthDecoder::_decode_date(MutableColumnPtr& doris_column,
TypeIndex& logical_type,
+ size_t num_values) {
auto& column_data =
static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
for (int i = 0; i < num_values; ++i) {
- int64_t date_value =
- static_cast<int64_t>(*reinterpret_cast<int32_t*>(_data->data +
_offset));
+ char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+ int64_t date_value =
static_cast<int64_t>(*reinterpret_cast<int32_t*>(buf_start));
CppType v;
v.from_unixtime(date_value * 24 * 60 * 60, *_decode_params->ctz); //
day to seconds
if constexpr (std::is_same_v<CppType, VecDateTimeValue>) {
@@ -211,17 +234,18 @@ Status PlainDecoder::_decode_date(MutableColumnPtr&
doris_column, TypeIndex& log
}
ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v);
column_data.emplace_back(cast_value);
- _offset += _type_length;
+ _FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
-Status PlainDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
TypeIndex& logical_type,
- size_t num_values) {
+Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
TypeIndex& logical_type,
+ size_t num_values) {
auto& column_data =
static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
for (int i = 0; i < num_values; i++) {
- int64_t& date_value = *reinterpret_cast<int64_t*>(_data->data +
_offset);
+ char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+ int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start);
CppType v;
v.from_unixtime(date_value / _decode_params->second_mask,
*_decode_params->ctz);
if constexpr (std::is_same_v<CppType,
DateV2Value<DateTimeV2ValueType>>) {
@@ -231,17 +255,18 @@ Status PlainDecoder::_decode_datetime64(MutableColumnPtr&
doris_column, TypeInde
}
ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v);
column_data.emplace_back(cast_value);
- _offset += _type_length;
+ _FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
-Status PlainDecoder::_decode_datetime96(MutableColumnPtr& doris_column,
TypeIndex& logical_type,
- size_t num_values) {
+Status FixLengthDecoder::_decode_datetime96(MutableColumnPtr& doris_column,
TypeIndex& logical_type,
+ size_t num_values) {
auto& column_data =
static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
for (int i = 0; i < num_values; ++i) {
- ParquetInt96& datetime96 =
*reinterpret_cast<ParquetInt96*>(_data->data + _offset);
+ char* buf_start = _FIXED_GET_DATA_OFFSET(i);
+ ParquetInt96& datetime96 = *reinterpret_cast<ParquetInt96*>(buf_start);
CppType v;
int64_t micros = datetime96.to_timestamp_micros();
v.from_unixtime(micros / 1000000, *_decode_params->ctz);
@@ -252,20 +277,20 @@ Status PlainDecoder::_decode_datetime96(MutableColumnPtr&
doris_column, TypeInde
}
ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v);
column_data.emplace_back(cast_value);
- _offset += _type_length;
+ _FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
template <typename DecimalPrimitiveType>
-Status PlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
- size_t num_values) {
+Status FixLengthDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
+ DataTypePtr& data_type, size_t
num_values) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
for (int i = 0; i < num_values; ++i) {
- char* buf_start = _data->data + _offset;
+ char* buf_start = _FIXED_GET_DATA_OFFSET(i);
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using
big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
@@ -279,21 +304,22 @@ Status
PlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, Data
}
DecimalPrimitiveType cast_value(value);
column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value));
- _offset += _type_length;
+ _FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
-Status PlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column,
- DataTypePtr& data_type, size_t
num_values) {
+Status FixLengthDecoder::_decode_primitive_decimal(MutableColumnPtr&
doris_column,
+ DataTypePtr& data_type,
size_t num_values) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
for (int i = 0; i < num_values; ++i) {
+ char* buf_start = _FIXED_GET_DATA_OFFSET(i);
// we should use decimal128 to scale up/down
- Int128 value = *reinterpret_cast<DecimalPhysicalType*>(_data->data +
_offset);
+ Int128 value = *reinterpret_cast<DecimalPhysicalType*>(buf_start);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
@@ -301,44 +327,62 @@ Status
PlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column,
}
DecimalPrimitiveType cast_value(value);
column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value));
- _offset += _type_length;
+ _FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
-class ByteArrayPlainDecoder final : public Decoder {
+class ByteArrayDecoder final : public Decoder {
public:
- ByteArrayPlainDecoder() = default;
- ~ByteArrayPlainDecoder() override = default;
+ ByteArrayDecoder() = default;
+ ~ByteArrayDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
size_t num_values) override;
- Status decode_values(Slice& slice, size_t num_values) override;
-
Status skip_values(size_t num_values) override;
+ void set_data(Slice* data) override;
+
+ Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size)
override;
+
protected:
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
size_t num_values);
+
+ // For dictionary encoding
+ bool _has_dict = false;
+ std::unique_ptr<uint8_t[]> _dict = nullptr;
+ std::vector<uint32_t> _dict_offsets;
+ std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
+ std::vector<uint32_t> _indexes;
};
template <typename DecimalPrimitiveType>
-Status ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr&
doris_column,
- DataTypePtr& data_type,
size_t num_values) {
+Status ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
+ DataTypePtr& data_type, size_t
num_values) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
for (int i = 0; i < num_values; ++i) {
- if (UNLIKELY(_offset + 4 > _data->size)) {
- return Status::IOError("Can't read byte array length from plain
decoder");
+ char* buf_start;
+ uint32_t length;
+ if (_has_dict) {
+ uint32_t idx = _indexes[i];
+ uint32_t idx_cursor = _dict_offsets[idx];
+ buf_start = reinterpret_cast<char*>(_dict.get() + idx_cursor);
+ length = _dict_offsets[idx + 1] - idx_cursor - 4;
+ } else {
+ if (UNLIKELY(_offset + 4 > _data->size)) {
+ return Status::IOError("Can't read byte array length from
plain decoder");
+ }
+ length = decode_fixed32_le(reinterpret_cast<const
uint8_t*>(_data->data) + _offset);
+ _offset += 4;
+ buf_start = _data->data + _offset;
+ _offset += length;
}
- uint32_t length =
- decode_fixed32_le(reinterpret_cast<const
uint8_t*>(_data->data) + _offset);
- _offset += 4;
- char* buf_start = _data->data + _offset;
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using
big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
@@ -351,7 +395,6 @@ Status
ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_col
}
DecimalPrimitiveType cast_value(value);
column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value));
- _offset += length;
}
return Status::OK();
}
@@ -374,8 +417,6 @@ public:
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
size_t num_values) override;
- Status decode_values(Slice& slice, size_t num_values) override;
-
Status skip_values(size_t num_values) override;
protected:
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 778e7b1a66..a697fc5038 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -119,7 +119,43 @@ Status ColumnChunkReader::load_page_data() {
Status ColumnChunkReader::_decode_dict_page() {
const tparquet::PageHeader& header = *_page_reader->get_page_header();
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
- // TODO(gaoxin): decode dictionary page
+
+ // Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0
specification.
+ // Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary
page for Parquet 2.0+ files.
+ // refer: https://github.com/apache/parquet-format/blob/master/Encodings.md
+ tparquet::Encoding::type dict_encoding =
header.dictionary_page_header.encoding;
+ if (dict_encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
+ dict_encoding != tparquet::Encoding::PLAIN) {
+ return Status::InternalError("Unsupported dictionary encoding {}",
+ tparquet::to_string(dict_encoding));
+ }
+
+ // Prepare dictionary data
+ int32_t uncompressed_size = header.uncompressed_page_size;
+ std::unique_ptr<uint8_t[]> dict_data(new uint8_t[uncompressed_size]);
+ if (_block_compress_codec != nullptr) {
+ Slice compressed_data;
+ RETURN_IF_ERROR(_page_reader->get_page_date(compressed_data));
+ Slice dict_slice(dict_data.get(), uncompressed_size);
+ RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data,
&dict_slice));
+ } else {
+ Slice dict_slice;
+ RETURN_IF_ERROR(_page_reader->get_page_date(dict_slice));
+ // The data is stored by BufferedStreamReader, we should copy it out
+ memcpy(dict_data.get(), dict_slice.data, dict_slice.size);
+ }
+
+ // Cache page decoder
+ std::unique_ptr<Decoder> page_decoder;
+ Decoder::get_decoder(_metadata.type, tparquet::Encoding::RLE_DICTIONARY,
page_decoder);
+ // Set type length
+ page_decoder->set_type_length(_get_type_length());
+ // Initialize the time convert context
+ page_decoder->init(_field_schema, _ctz);
+ // Set the dictionary data
+ RETURN_IF_ERROR(page_decoder->set_dict(dict_data,
header.dictionary_page_header.num_values));
+ _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] =
std::move(page_decoder);
+
return Status::OK();
}
@@ -138,6 +174,16 @@ Status ColumnChunkReader::skip_values(size_t num_values) {
return _page_decoder->skip_values(num_values);
}
+void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t
num_values) {
+ DCHECK_GE(_remaining_num_values, num_values);
+ CHECK(doris_column->is_nullable());
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+ (*std::move(doris_column)).mutate().get());
+ MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
+ data_column->insert_default();
+ _remaining_num_values -= num_values;
+}
+
size_t ColumnChunkReader::get_rep_levels(level_t* levels, size_t n) {
DCHECK_GT(_max_rep_level, 0);
return _rep_level_decoder.get_levels(levels, n);
@@ -166,14 +212,6 @@ Status ColumnChunkReader::decode_values(MutableColumnPtr&
doris_column, DataType
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
-Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
- if (UNLIKELY(_remaining_num_values < num_values)) {
- return Status::IOError("Decode too many values in current page");
- }
- _remaining_num_values -= num_values;
- return _page_decoder->decode_values(slice, num_values);
-}
-
int32_t ColumnChunkReader::_get_type_length() {
switch (_field_schema->physical_type) {
case tparquet::Type::INT32:
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index bc3fcedbe1..79fdc204dc 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -87,7 +87,7 @@ public:
uint32_t remaining_num_values() const { return _remaining_num_values; };
// null values are generated from definition levels
// the caller should maintain the consistency after analyzing null values
from definition levels.
- void dec_num_values(uint32_t dec_num) { _remaining_num_values -= dec_num;
};
+ void insert_null_values(ColumnPtr& doris_column, size_t num_values);
// Get the raw data of current page.
Slice& get_page_data() { return _page_data; }
@@ -99,8 +99,6 @@ public:
// Decode values in current page into doris column.
Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values);
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type, size_t num_values);
- // For test, Decode values in current page into slice.
- Status decode_values(Slice& slice, size_t num_values);
// Get the repetition level decoder of current page.
LevelDecoder& rep_level_decoder() { return _rep_level_decoder; }
diff --git a/be/test/exec/test_data/parquet_scanner/dict-decoder.parquet
b/be/test/exec/test_data/parquet_scanner/dict-decoder.parquet
new file mode 100644
index 0000000000..268d1d7135
Binary files /dev/null and
b/be/test/exec/test_data/parquet_scanner/dict-decoder.parquet differ
diff --git a/be/test/exec/test_data/parquet_scanner/dict-decoder.txt
b/be/test/exec/test_data/parquet_scanner/dict-decoder.txt
new file mode 100644
index 0000000000..ba0a3fba5a
--- /dev/null
+++ b/be/test/exec/test_data/parquet_scanner/dict-decoder.txt
@@ -0,0 +1,16 @@
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
+|tinyint_col(Nullable(Int8))|smallint_col(Nullable(Int16))|int_col(Nullable(Int32))|bigint_col(Nullable(Int64))|boolean_col(Nullable(UInt8))|float_col(Nullable(Float32))|double_col(Nullable(Float64))|string_col(Nullable(String))|binary_col(Nullable(String))|timestamp_col(Nullable(DateTime))|decimal_col(Nullable(Decimal(27,
9)))|char_col(Nullable(String))|varchar_col(Nullable(String))|date_col(Nullable(Date))|date_v2_col(Nullable(DateV2))|timestamp_v2_col(Nullable(DateTimeV2))|
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
diff --git a/be/test/exec/test_data/parquet_scanner/type-decoder.txt
b/be/test/exec/test_data/parquet_scanner/type-decoder.txt
new file mode 100644
index 0000000000..afe2b6e573
--- /dev/null
+++ b/be/test/exec/test_data/parquet_scanner/type-decoder.txt
@@ -0,0 +1,14 @@
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
+|tinyint_col(Nullable(Int8))|smallint_col(Nullable(Int16))|int_col(Nullable(Int32))|bigint_col(Nullable(Int64))|boolean_col(Nullable(UInt8))|float_col(Nullable(Float32))|double_col(Nullable(Float64))|string_col(Nullable(String))|binary_col(Nullable(String))|timestamp_col(Nullable(DateTime))|decimal_col(Nullable(Decimal(27,
9)))|char_col(Nullable(String))|varchar_col(Nullable(String))|date_col(Nullable(Date))|date_v2_col(Nullable(DateV2))|timestamp_v2_col(Nullable(DateTimeV2))|
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
+| -1| -1|
-1| -1| 0|
-1.140000| -1.140000| s-row0|
b-row0| 2022-08-01 07:23:17|
-1.140000000| c-row0| vc-row0|
2022-08-01| 2022-08-01| 2022-08-01
07:23:17|
+| 2| 2|
2| 2| 1|
2.140000| 2.140000| NULL|
b-row1| 2022-08-02 07:23:18|
2.140000000| c-row1| vc-row1|
2022-08-02| 2022-08-02| 2022-08-02
07:23:18|
+| -3| -3|
-3| -3| 0|
-3.140000| -3.140000| s-row2|
b-row2| 2022-08-03 07:23:19|
-3.140000000| c-row2| vc-row2|
2022-08-03| 2022-08-03| 2022-08-03
07:23:19|
+| 4| 4|
4| 4| 1|
4.140000| 4.140000| NULL|
b-row3| 2022-08-04 07:24:17|
4.140000000| c-row3| vc-row3|
2022-08-04| 2022-08-04| 2022-08-04
07:24:17|
+| -5| -5|
-5| -5| 0|
-5.140000| -5.140000| s-row4|
b-row4| 2022-08-05 07:25:17|
-5.140000000| c-row4| vc-row4|
2022-08-05| 2022-08-05| 2022-08-05
07:25:17|
+| 6| 6|
6| 6| 0|
6.140000| 6.140000| s-row5|
b-row5| 2022-08-06 07:26:17|
6.140000000| c-row5| vc-row5|
2022-08-06| 2022-08-06| 2022-08-06
07:26:17|
+| -7| -7|
-7| -7| 1|
-7.140000| -7.140000| s-row6|
b-row6| 2022-08-07 07:27:17|
-7.140000000| c-row6| vc-row6|
2022-08-07| 2022-08-07| 2022-08-07
07:27:17|
+| 8| 8|
8| 8| 0|
8.140000| 8.140000| NULL|
b-row7| 2022-08-08 07:28:17|
8.140000000| c-row7| vc-row7|
2022-08-08| 2022-08-08| 2022-08-08
07:28:17|
+| -9| -9|
-9| -9| 0|
-9.140000| -9.140000| s-row8|
b-row8| 2022-08-09 07:29:17|
-9.140000000| c-row8| vc-row8|
2022-08-09| 2022-08-09| 2022-08-09
07:29:17|
+| 10| 10|
10| 10| 0|
10.140000| 10.140000| s-row9|
b-row9| 2022-08-10 07:21:17|
10.140000000| c-row9| vc-row9|
2022-08-10| 2022-08-10| 2022-08-10
07:21:17|
++---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c0f62067d3..75cc087d12 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -131,9 +131,25 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4);
}
+static int fill_nullable_column(ColumnPtr& doris_column, level_t* definitions,
size_t num_values) {
+ CHECK(doris_column->is_nullable());
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+ (*std::move(doris_column)).mutate().get());
+ NullMap& map_data = nullable_column->get_null_map_data();
+ int null_cnt = 0;
+ for (int i = 0; i < num_values; ++i) {
+ bool nullable = definitions[i] == 0;
+ if (nullable) {
+ null_cnt++;
+ }
+ map_data.emplace_back(nullable);
+ }
+ return null_cnt;
+}
+
static Status get_column_values(FileReader* file_reader,
tparquet::ColumnChunk* column_chunk,
FieldSchema* field_schema, ColumnPtr&
doris_column,
- DataTypePtr& data_type) {
+ DataTypePtr& data_type, level_t* definitions) {
tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
size_t start_offset = chunk_meta.__isset.dictionary_page_offset
? chunk_meta.dictionary_page_offset
@@ -150,8 +166,46 @@ static Status get_column_values(FileReader* file_reader,
tparquet::ColumnChunk*
chunk_reader.next_page();
// load page data into underlying container
chunk_reader.load_page_data();
+ int rows = chunk_reader.remaining_num_values();
+ // definition levels
+ if (field_schema->definition_level == 0) { // required field
+ std::fill(definitions, definitions + rows, 1);
+ } else {
+ chunk_reader.get_def_levels(definitions, rows);
+ }
+ // fill nullable values
+ fill_nullable_column(doris_column, definitions, rows);
// decode page data
- return chunk_reader.decode_values(doris_column, data_type,
chunk_reader.remaining_num_values());
+ if (field_schema->definition_level == 0) {
+ // required column
+ return chunk_reader.decode_values(doris_column, data_type, rows);
+ } else {
+ // column with null values
+ level_t level_type = definitions[0];
+ int num_values = 1;
+ for (int i = 1; i < rows; ++i) {
+ if (definitions[i] != level_type) {
+ if (level_type == 0) {
+ // null values
+ chunk_reader.insert_null_values(doris_column, num_values);
+ } else {
+ RETURN_IF_ERROR(
+ chunk_reader.decode_values(doris_column,
data_type, num_values));
+ }
+ level_type = definitions[i];
+ num_values = 1;
+ } else {
+ num_values++;
+ }
+ }
+ if (level_type == 0) {
+ // null values
+ chunk_reader.insert_null_values(doris_column, num_values);
+ } else {
+ RETURN_IF_ERROR(chunk_reader.decode_values(doris_column,
data_type, num_values));
+ }
+ return Status::OK();
+ }
}
static void create_block(std::unique_ptr<vectorized::Block>& block) {
@@ -192,10 +246,11 @@ static void
create_block(std::unique_ptr<vectorized::Block>& block) {
}
}
-TEST_F(ParquetThriftReaderTest, type_decoder) {
+static void read_parquet_data_and_check(const std::string& parquet_file,
+ const std::string& result_file, int
rows) {
/*
- * type-decoder.parquet is the part of following table:
- * create table `type_decoder`(
+ * table schema in parquet file:
+ * create table `decoder`(
* `tinyint_col` tinyint, // 0
* `smallint_col` smallint, // 1
* `int_col` int, // 2
@@ -213,20 +268,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
* `list_string` array<string>) // 14
*/
- LocalFileReader
reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
- /*
- * Data in type-decoder.parquet:
- * -1 -1 -1 -1 false -1.14 -1.14 s-row0 b-row0
2022-08-01 07:23:17 -1.14 c-row0 vc-row0 2022-08-01
["as-0","as-1"]
- * 2 2 2 2 true 2.14 2.14 NULL b-row1
2022-08-02 07:23:18 2.14 c-row1 vc-row1 2022-08-02
[null,"as-3"]
- * -3 -3 -3 -3 false -3.14 -3.14 s-row2 b-row2
2022-08-03 07:23:19 -3.14 c-row2 vc-row2 2022-08-03 []
- * 4 4 4 4 true 4.14 4.14 NULL b-row3
2022-08-04 07:24:17 4.14 c-row3 vc-row3 2022-08-04 ["as-4"]
- * -5 -5 -5 -5 false -5.14 -5.14 s-row4 b-row4
2022-08-05 07:25:17 -5.14 c-row4 vc-row4 2022-08-05
["as-5",null]
- * 6 6 6 6 false 6.14 6.14 s-row5 b-row5
2022-08-06 07:26:17 6.14 c-row5 vc-row5 2022-08-06
[null,null]
- * -7 -7 -7 -7 true -7.14 -7.14 s-row6 b-row6
2022-08-07 07:27:17 -7.14 c-row6 vc-row6 2022-08-07
["as-6","as-7"]
- * 8 8 8 8 false 8.14 8.14 NULL b-row7
2022-08-08 07:28:17 8.14 c-row7 vc-row7 2022-08-08
["as-0","as-8"]
- * -9 -9 -9 -9 false -9.14 -9.14 s-row8 b-row8
2022-08-09 07:29:17 -9.14 c-row8 vc-row8 2022-08-09
["as-9","as-10"]
- * 10 10 10 10 false 10.14 10.14 s-row9 b-row9
2022-08-10 07:21:17 10.14 c-row9 vc-row9 2022-08-10
["as-11","as-12"]
- */
+ LocalFileReader reader(parquet_file, 0);
auto st = reader.open();
EXPECT_TRUE(st.ok());
@@ -237,194 +279,15 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
FieldDescriptor schema_descriptor;
schema_descriptor.parse_from_thrift(t_metadata.schema);
- int rows = 10;
+ level_t defs[rows];
- // the physical_type of tinyint_col, smallint_col and int_col are all INT32
- // they are distinguished by converted_type(in
FieldSchema.parquet_schema.converted_type)
- {
- auto& column_name_with_type = block->get_by_position(0);
- auto& data_column = column_name_with_type.column;
- auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[0],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(0)), data_column,
- data_type);
- int int_sum = 0;
- for (int i = 0; i < rows; ++i) {
- int_sum += (int8_t)data_column->get64(i);
- }
- ASSERT_EQ(int_sum, 5);
- }
- {
- auto& column_name_with_type = block->get_by_position(1);
- auto& data_column = column_name_with_type.column;
- auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[1],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(1)), data_column,
- data_type);
- int int_sum = 0;
- for (int i = 0; i < rows; ++i) {
- int_sum += (int16_t)data_column->get64(i);
- }
- ASSERT_EQ(int_sum, 5);
- }
- {
- auto& column_name_with_type = block->get_by_position(2);
- auto& data_column = column_name_with_type.column;
- auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[2],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(2)), data_column,
- data_type);
- int int_sum = 0;
- for (int i = 0; i < rows; ++i) {
- int_sum += (int32_t)data_column->get64(i);
- }
- ASSERT_EQ(int_sum, 5);
- }
- {
- auto& column_name_with_type = block->get_by_position(3);
- auto& data_column = column_name_with_type.column;
- auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), data_column,
- data_type);
- int64_t int_sum = 0;
- for (int i = 0; i < rows; ++i) {
- int_sum += (int64_t)data_column->get64(i);
- }
- ASSERT_EQ(int_sum, 5);
- }
- // `boolean_col` boolean, // 4
- {
- auto& column_name_with_type = block->get_by_position(4);
+ for (int c = 0; c < 14; ++c) {
+ auto& column_name_with_type = block->get_by_position(c);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[4],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), data_column,
- data_type);
- ASSERT_FALSE(static_cast<bool>(data_column->get64(0)));
- ASSERT_TRUE(static_cast<bool>(data_column->get64(1)));
- ASSERT_FALSE(static_cast<bool>(data_column->get64(2)));
- ASSERT_TRUE(static_cast<bool>(data_column->get64(3)));
- ASSERT_FALSE(static_cast<bool>(data_column->get64(4)));
- ASSERT_FALSE(static_cast<bool>(data_column->get64(5)));
- ASSERT_TRUE(static_cast<bool>(data_column->get64(6)));
- ASSERT_FALSE(static_cast<bool>(data_column->get64(7)));
- ASSERT_FALSE(static_cast<bool>(data_column->get64(8)));
- ASSERT_FALSE(static_cast<bool>(data_column->get64(9)));
- }
- // `double_col` double, // 6
- {
- auto& column_name_with_type = block->get_by_position(6);
- auto& data_column = column_name_with_type.column;
- auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[6],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(6)), data_column,
- data_type);
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
- (*std::move(data_column)).mutate().get());
- MutableColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
- ASSERT_EQ(nested_column->get_float64(0), -1.14);
- ASSERT_EQ(nested_column->get_float64(1), 2.14);
- ASSERT_EQ(nested_column->get_float64(2), -3.14);
- ASSERT_EQ(nested_column->get_float64(3), 4.14);
- }
- // `string_col` string, // 7
- {
- auto& column_name_with_type = block->get_by_position(7);
- auto& data_column = column_name_with_type.column;
- auto& data_type = column_name_with_type.type;
- tparquet::ColumnChunk column_chunk =
t_metadata.row_groups[0].columns[7];
- tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
- size_t start_offset = chunk_meta.__isset.dictionary_page_offset
- ? chunk_meta.dictionary_page_offset
- : chunk_meta.data_page_offset;
- size_t chunk_size = chunk_meta.total_compressed_size;
- BufferedFileStreamReader stream_reader(&reader, start_offset,
chunk_size);
- cctz::time_zone ctz;
- TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone,
ctz);
- ColumnChunkReader chunk_reader(&stream_reader, &column_chunk,
-
const_cast<FieldSchema*>(schema_descriptor.get_column(7)),
- &ctz);
- // initialize chunk reader
- chunk_reader.init();
- // seek to next page header
- chunk_reader.next_page();
- // load page data into underlying container
- chunk_reader.load_page_data();
-
- level_t defs[rows];
- // Analyze null string
- chunk_reader.get_def_levels(defs, rows);
- ASSERT_EQ(defs[1], 0);
- ASSERT_EQ(defs[3], 0);
- ASSERT_EQ(defs[7], 0);
-
- chunk_reader.decode_values(data_column, data_type, 7);
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
- (*std::move(data_column)).mutate().get());
- MutableColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
- auto row0 = nested_column->get_data_at(0).data;
- auto row2 = nested_column->get_data_at(1).data;
- ASSERT_STREQ("s-row0", row0);
- ASSERT_STREQ("s-row2", row2);
- }
- // `timestamp_col` timestamp, // 9, DATETIME
- {
- auto& column_name_with_type = block->get_by_position(9);
- auto& data_column = column_name_with_type.column;
- auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[9],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column,
- data_type);
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
- (*std::move(data_column)).mutate().get());
- MutableColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
- int64_t date_value = (int64_t)nested_column->get64(0);
- VecDateTimeInt64Union conv = {.i64 = date_value};
- auto dt = conv.dt;
- ASSERT_EQ(dt.hour(), 7);
- ASSERT_EQ(dt.minute(), 23);
- ASSERT_EQ(dt.second(), 17);
- }
- // `decimal_col` decimal, // 10
- {
- auto& column_name_with_type = block->get_by_position(10);
- auto& data_column = column_name_with_type.column;
- auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[10],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(10)), data_column,
- data_type);
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
- (*std::move(data_column)).mutate().get());
- MutableColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
- int neg = 1;
- for (int i = 0; i < rows; ++i) {
- neg *= -1;
- auto decimal_field = nested_column->operator[](i)
-
.get<vectorized::DecimalField<vectorized::Decimal128>>();
- EXPECT_EQ(DecimalV2Value(decimal_field.get_value()),
- DecimalV2Value(std::to_string(neg * (1.14 + i))));
- }
- }
- // `date_col` date, // 13, DATE
- {
- auto& column_name_with_type = block->get_by_position(13);
- auto& data_column = column_name_with_type.column;
- auto& data_type = column_name_with_type.type;
- get_column_values(&reader, &t_metadata.row_groups[0].columns[13],
-
const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column,
- data_type);
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
- (*std::move(data_column)).mutate().get());
- MutableColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
- for (int i = 0; i < rows; ++i) {
- int64_t date_value = (int64_t)nested_column->get64(i);
- VecDateTimeInt64Union conv = {.i64 = date_value};
- auto dt = conv.dt;
- ASSERT_EQ(dt.year(), 2022);
- ASSERT_EQ(dt.month(), 8);
- ASSERT_EQ(dt.day(), i + 1);
- }
+ get_column_values(&reader, &t_metadata.row_groups[0].columns[c],
+
const_cast<FieldSchema*>(schema_descriptor.get_column(c)), data_column,
+ data_type, defs);
}
// `date_v2_col` date, // 14 - 13, DATEV2
{
@@ -433,18 +296,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[13],
const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column,
- data_type);
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
- (*std::move(data_column)).mutate().get());
- MutableColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
- for (int i = 0; i < rows; ++i) {
- uint32_t date_value = (uint32_t)nested_column->get64(i);
- DateV2UInt32Union conv = {.ui32 = date_value};
- auto dt = conv.dt;
- ASSERT_EQ(dt.year(), 2022);
- ASSERT_EQ(dt.month(), 8);
- ASSERT_EQ(dt.day(), i + 1);
- }
+ data_type, defs);
}
// `timestamp_v2_col` timestamp, // 15 - 9, DATETIMEV2
{
@@ -453,17 +305,28 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[9],
const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column,
- data_type);
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
- (*std::move(data_column)).mutate().get());
- MutableColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
- uint64_t date_value = nested_column->get64(0);
- DateTimeV2UInt64Union conv = {.ui64 = date_value};
- auto dt = conv.dt;
- ASSERT_EQ(dt.hour(), 7);
- ASSERT_EQ(dt.minute(), 23);
- ASSERT_EQ(dt.second(), 17);
+ data_type, defs);
}
+
+ LocalFileReader result(result_file, 0);
+ auto rst = result.open();
+ EXPECT_TRUE(rst.ok());
+ uint8_t result_buf[result.size() + 1];
+ result_buf[result.size()] = '\0';
+ int64_t bytes_read;
+ bool eof;
+ result.read(result_buf, result.size(), &bytes_read, &eof);
+ ASSERT_STREQ(block->dump_data(0, rows).c_str(),
reinterpret_cast<char*>(result_buf));
+}
+
+TEST_F(ParquetThriftReaderTest, type_decoder) {
+
read_parquet_data_and_check("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
+
"./be/test/exec/test_data/parquet_scanner/type-decoder.txt", 10);
+}
+
+TEST_F(ParquetThriftReaderTest, dict_decoder) {
+
read_parquet_data_and_check("./be/test/exec/test_data/parquet_scanner/dict-decoder.parquet",
+
"./be/test/exec/test_data/parquet_scanner/dict-decoder.txt", 12);
}
TEST_F(ParquetThriftReaderTest, column_reader) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]