This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0b9bfd15b7 [feature-wip](parquet-reader) parquet physical type to doris logical type (#11769) 0b9bfd15b7 is described below commit 0b9bfd15b7ca45bcf5abe17accfcd11333a1229a Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Aug 15 16:08:11 2022 +0800 [feature-wip](parquet-reader) parquet physical type to doris logical type (#11769) Two improvements have been added: 1. Translate parquet physical type into doris logical type. 2. Decode parquet column chunk into doris ColumnPtr, and add unit tests to show how to use related API. --- be/src/vec/exec/format/parquet/parquet_common.cpp | 100 +++++++------ be/src/vec/exec/format/parquet/parquet_common.h | 81 ++++------- .../parquet/vparquet_column_chunk_reader.cpp | 49 +++++-- .../format/parquet/vparquet_column_chunk_reader.h | 17 ++- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 158 ++++++++++++++++----- 5 files changed, 255 insertions(+), 150 deletions(-) diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index 1d80676b9b..082d0fc57d 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -18,34 +18,36 @@ #include "parquet_common.h" #include "util/coding.h" +#include "vec/data_types/data_type_nullable.h" namespace doris::vectorized { -Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type encoding, - std::unique_ptr<Decoder>& decoder) { +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int32, Int32) \ + M(TypeIndex::UInt32, UInt32) \ + M(TypeIndex::Int64, Int64) \ + M(TypeIndex::UInt64, UInt64) \ + M(TypeIndex::Float32, Float32) \ + M(TypeIndex::Float64, Float64) + +Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, + std::unique_ptr<Decoder>& decoder) { switch (encoding) { case tparquet::Encoding::PLAIN: switch (type) { case tparquet::Type::BOOLEAN: decoder.reset(new BoolPlainDecoder()); break; - case tparquet::Type::INT32: - decoder.reset(new PlainDecoder<Int32>()); + case tparquet::Type::BYTE_ARRAY: + decoder.reset(new ByteArrayPlainDecoder()); break; + case tparquet::Type::INT32: case tparquet::Type::INT64: - decoder.reset(new PlainDecoder<Int64>()); - break; + case tparquet::Type::INT96: case tparquet::Type::FLOAT: - decoder.reset(new PlainDecoder<Float32>()); - break; case tparquet::Type::DOUBLE: - decoder.reset(new PlainDecoder<Float64>()); - break; - case tparquet::Type::BYTE_ARRAY: - decoder.reset(new BAPlainDecoder()); - break; case tparquet::Type::FIXED_LEN_BYTE_ARRAY: - decoder.reset(new FixedLengthBAPlainDecoder()); + decoder.reset(new PlainDecoder(type)); break; default: return Status::InternalError("Unsupported plain type {} in parquet decoder", @@ -60,34 +62,28 @@ Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type e return Status::OK(); } -Status Decoder::decode_values(ColumnPtr& doris_column, size_t num_values) { +Status Decoder::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) { CHECK(doris_column->is_nullable()); auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( (*std::move(doris_column)).mutate().get()); MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); - return _decode_values(data_column, num_values); + return decode_values(data_column, data_type, num_values); } -Status FixedLengthBAPlainDecoder::decode_values(Slice& slice, size_t num_values) { +Status PlainDecoder::decode_values(Slice& slice, size_t num_values) { size_t to_read_bytes = _type_length * num_values; if (UNLIKELY(_offset + to_read_bytes > _data->size)) { return Status::IOError("Out-of-bounds access in parquet data decoder"); } - // insert '\0' into the end of each binary - if (UNLIKELY(to_read_bytes + num_values > slice.size)) { + if (UNLIKELY(to_read_bytes > slice.size)) { return Status::IOError("Slice does not have enough space to write out the decoding data"); } - uint32_t slice_offset = 0; - for (int i = 0; i < num_values; ++i) { - memcpy(slice.data + slice_offset, _data->data + _offset, _type_length); - slice_offset += _type_length + 1; - slice.data[slice_offset - 1] = '\0'; - _offset += _type_length; - } + memcpy(slice.data, _data->data + _offset, to_read_bytes); + _offset += to_read_bytes; return Status::OK(); } -Status FixedLengthBAPlainDecoder::skip_values(size_t num_values) { +Status PlainDecoder::skip_values(size_t num_values) { _offset += _type_length * num_values; if (UNLIKELY(_offset > _data->size)) { return Status::IOError("Out-of-bounds access in parquet data decoder"); @@ -95,23 +91,43 @@ Status FixedLengthBAPlainDecoder::skip_values(size_t num_values) { return Status::OK(); } -Status FixedLengthBAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, - size_t num_values) { +Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t num_values, + size_t real_length) { if (UNLIKELY(_offset + _type_length * num_values > _data->size)) { return Status::IOError("Out-of-bounds access in parquet data decoder"); } - auto& column_chars_t = assert_cast<ColumnString&>(*doris_column).get_chars(); - auto& column_offsets = assert_cast<ColumnString&>(*doris_column).get_offsets(); for (int i = 0; i < num_values; ++i) { - column_chars_t.insert(_data->data + _offset, _data->data + _offset + _type_length); - column_chars_t.emplace_back('\0'); - column_offsets.emplace_back(column_chars_t.size()); + doris_column->insert_data(_data->data + _offset, real_length); _offset += _type_length; } return Status::OK(); } -Status BAPlainDecoder::decode_values(Slice& slice, size_t num_values) { +Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) { + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { + case TypeIndex::Int8: + case TypeIndex::UInt8: + return _decode_short_int(doris_column, num_values, 1); + case TypeIndex::Int16: + case TypeIndex::UInt16: + return _decode_short_int(doris_column, num_values, 2); +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ + case NUMERIC_TYPE: \ + return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, num_values); + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + default: + break; + } + + return Status::InvalidArgument("Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), + getTypeName(data_type->get_type_id())); +} + +Status ByteArrayPlainDecoder::decode_values(Slice& slice, size_t num_values) { uint32_t slice_offset = 0; for (int i = 0; i < num_values; ++i) { if (UNLIKELY(_offset + 4 > _data->size)) { @@ -131,7 +147,7 @@ Status BAPlainDecoder::decode_values(Slice& slice, size_t num_values) { return Status::OK(); } -Status BAPlainDecoder::skip_values(size_t num_values) { +Status ByteArrayPlainDecoder::skip_values(size_t num_values) { for (int i = 0; i < num_values; ++i) { if (UNLIKELY(_offset + 4 > _data->size)) { return Status::IOError("Can't read byte array length from plain decoder"); @@ -147,9 +163,8 @@ Status BAPlainDecoder::skip_values(size_t num_values) { return Status::OK(); } -Status BAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t num_values) { - auto& column_chars_t = assert_cast<ColumnString&>(*doris_column).get_chars(); - auto& column_offsets = assert_cast<ColumnString&>(*doris_column).get_offsets(); +Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) { for (int i = 0; i < num_values; ++i) { if (UNLIKELY(_offset + 4 > _data->size)) { return Status::IOError("Can't read byte array length from plain decoder"); @@ -160,9 +175,7 @@ Status BAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t num if (UNLIKELY(_offset + length) > _data->size) { return Status::IOError("Can't read enough bytes in plain decoder"); } - column_chars_t.insert(_data->data + _offset, _data->data + _offset + length); - column_chars_t.emplace_back('\0'); - column_offsets.emplace_back(column_chars_t.size()); + doris_column->insert_data(_data->data + _offset, length); _offset += length; } return Status::OK(); @@ -203,7 +216,8 @@ Status BoolPlainDecoder::skip_values(size_t num_values) { return Status::OK(); } -Status BoolPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t num_values) { +Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) { auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data(); bool value; for (int i = 0; i < num_values; ++i) { diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index 44523ae22d..f0aed43288 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -25,6 +25,7 @@ #include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" +#include "vec/data_types/data_type.h" namespace doris::vectorized { @@ -35,8 +36,8 @@ public: Decoder() = default; virtual ~Decoder() = default; - static Status getDecoder(tparquet::Type::type type, tparquet::Encoding::type encoding, - std::unique_ptr<Decoder>& decoder); + static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, + std::unique_ptr<Decoder>& decoder); // The type with fix length void set_type_length(int32_t type_length) { _type_length = type_length; } @@ -48,88 +49,63 @@ public: } // Write the decoded values batch to doris's column - Status decode_values(ColumnPtr& doris_column, size_t num_values); + Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values); + + virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) = 0; virtual Status decode_values(Slice& slice, size_t num_values) = 0; virtual Status skip_values(size_t num_values) = 0; protected: - virtual Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) = 0; - int32_t _type_length; Slice* _data = nullptr; uint32_t _offset = 0; }; -template <typename T> class PlainDecoder final : public Decoder { public: - PlainDecoder() = default; + PlainDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {}; ~PlainDecoder() override = default; - Status decode_values(Slice& slice, size_t num_values) override { - size_t to_read_bytes = TYPE_LENGTH * num_values; - if (UNLIKELY(_offset + to_read_bytes > _data->size)) { - return Status::IOError("Out-of-bounds access in parquet data decoder"); - } - if (UNLIKELY(to_read_bytes > slice.size)) { - return Status::IOError( - "Slice does not have enough space to write out the decoding data"); - } - memcpy(slice.data, _data->data + _offset, to_read_bytes); - _offset += to_read_bytes; - return Status::OK(); - } + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) override; - Status skip_values(size_t num_values) override { - _offset += TYPE_LENGTH * num_values; - if (UNLIKELY(_offset > _data->size)) { - return Status::IOError("Out-of-bounds access in parquet data decoder"); - } - return Status::OK(); - } + Status decode_values(Slice& slice, size_t num_values) override; + + Status skip_values(size_t num_values) override; protected: - enum { TYPE_LENGTH = sizeof(T) }; + Status _decode_short_int(MutableColumnPtr& doris_column, size_t num_values, size_t real_length); - Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override { - size_t to_read_bytes = TYPE_LENGTH * num_values; + template <typename Numeric> + Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values) { + size_t to_read_bytes = _type_length * num_values; if (UNLIKELY(_offset + to_read_bytes > _data->size)) { return Status::IOError("Out-of-bounds access in parquet data decoder"); } - auto& column_data = static_cast<ColumnVector<T>&>(*doris_column).get_data(); - const auto* raw_data = reinterpret_cast<const T*>(_data->data + _offset); + auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data(); + const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data + _offset); column_data.insert(raw_data, raw_data + num_values); _offset += to_read_bytes; return Status::OK(); } -}; - -class FixedLengthBAPlainDecoder final : public Decoder { -public: - FixedLengthBAPlainDecoder() = default; - ~FixedLengthBAPlainDecoder() override = default; - - Status decode_values(Slice& slice, size_t num_values) override; - - Status skip_values(size_t num_values) override; -protected: - Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override; + tparquet::Type::type _physical_type; }; -class BAPlainDecoder final : public Decoder { +class ByteArrayPlainDecoder final : public Decoder { public: - BAPlainDecoder() = default; - ~BAPlainDecoder() override = default; + ByteArrayPlainDecoder() = default; + ~ByteArrayPlainDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) override; Status decode_values(Slice& slice, size_t num_values) override; Status skip_values(size_t num_values) override; - -protected: - Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override; }; /// Decoder bit-packed boolean-encoded values. @@ -147,6 +123,9 @@ public: _offset = 0; } + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) override; + Status decode_values(Slice& slice, size_t num_values) override; Status skip_values(size_t num_values) override; @@ -167,8 +146,6 @@ protected: return true; } - Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override; - /// A buffer to store unpacked values. Must be a multiple of 32 size to use the /// batch-oriented interface of BatchedBitReader. We use uint8_t instead of bool because /// bit unpacking is only supported for unsigned integers. The values are converted to diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index d4c7f534a3..a0a21b00ca 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -20,13 +20,14 @@ namespace doris::vectorized { ColumnChunkReader::ColumnChunkReader(BufferedStreamReader* reader, - tparquet::ColumnChunk* column_chunk, FieldSchema* fieldSchema) - : _max_rep_level(fieldSchema->repetition_level), - _max_def_level(fieldSchema->definition_level), + tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema) + : _field_schema(field_schema), + _max_rep_level(field_schema->repetition_level), + _max_def_level(field_schema->definition_level), _stream_reader(reader), _metadata(column_chunk->meta_data) {} -Status ColumnChunkReader::init(size_t type_length) { +Status ColumnChunkReader::init() { size_t start_offset = _metadata.__isset.dictionary_page_offset ? _metadata.dictionary_page_offset : _metadata.data_page_offset; @@ -41,8 +42,6 @@ Status ColumnChunkReader::init(size_t type_length) { // get the block compression codec RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); - // -1 means unfixed length type - _type_length = type_length; return Status::OK(); } @@ -91,14 +90,13 @@ Status ColumnChunkReader::load_page_data() { _page_decoder = _decoders[static_cast<int>(encoding)].get(); } else { std::unique_ptr<Decoder> page_decoder; - Decoder::getDecoder(_metadata.type, encoding, page_decoder); + Decoder::get_decoder(_metadata.type, encoding, page_decoder); _decoders[static_cast<int>(encoding)] = std::move(page_decoder); _page_decoder = _decoders[static_cast<int>(encoding)].get(); } _page_decoder->set_data(&_page_data); - if (_type_length > 0) { - _page_decoder->set_type_length(_type_length); - } + // Set type length + _page_decoder->set_type_length(_get_type_length()); return Status::OK(); } @@ -138,12 +136,22 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) { return _def_level_decoder.get_levels(levels, n); } -Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, size_t num_values) { +Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) { + if (UNLIKELY(_num_values < num_values)) { + return Status::IOError("Decode too many values in current page"); + } + _num_values -= num_values; + return _page_decoder->decode_values(doris_column, data_type, num_values); +} + +Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + size_t num_values) { if (UNLIKELY(_num_values < num_values)) { return Status::IOError("Decode too many values in current page"); } _num_values -= num_values; - return _page_decoder->decode_values(doris_column, num_values); + return _page_decoder->decode_values(doris_column, data_type, num_values); } Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) { @@ -153,4 +161,21 @@ Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) { _num_values -= num_values; return _page_decoder->decode_values(slice, num_values); } + +int32_t ColumnChunkReader::_get_type_length() { + switch (_field_schema->physical_type) { + case tparquet::Type::INT32: + case tparquet::Type::FLOAT: + return 4; + case tparquet::Type::INT64: + case tparquet::Type::DOUBLE: + return 8; + case tparquet::Type::INT96: + return 12; + case tparquet::Type::FIXED_LEN_BYTE_ARRAY: + return _field_schema->parquet_schema.type_length; + default: + return -1; + } +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 282612bd21..f8510d4b37 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -59,13 +59,11 @@ namespace doris::vectorized { class ColumnChunkReader { public: ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, - FieldSchema* fieldSchema); + FieldSchema* field_schema); ~ColumnChunkReader() = default; // Initialize chunk reader, will generate the decoder and codec. - // We can set the type_length if the length of colum type if fixed, - // or not set, the decoder will try to infer the type_length. - Status init(size_t type_length = -1); + Status init(); // Whether the chunk reader has a more page to read. bool has_next_page() { return _page_reader->has_next_page(); } @@ -86,8 +84,11 @@ public: // Load page data into the underlying container, // and initialize the repetition and definition level decoder for current page data. Status load_page_data(); - // The remaining number of values in current page. Decreased when reading or skipping. + // The remaining number of values in current page(including null values). Decreased when reading or skipping. uint32_t num_values() const { return _num_values; }; + // null values are not analyzing from definition levels + // the caller should maintain the consistency after analyzing null values from definition levels. + void dec_num_values(uint32_t dec_num) { _num_values -= dec_num; }; // Get the raw data of current page. Slice& get_page_data() { return _page_data; } @@ -97,7 +98,8 @@ public: size_t get_def_levels(level_t* levels, size_t n); // Decode values in current page into doris column. - Status decode_values(ColumnPtr& doris_column, size_t num_values); + Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values); + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values); // For test, Decode values in current page into slice. Status decode_values(Slice& slice, size_t num_values); @@ -109,7 +111,9 @@ public: private: Status _decode_dict_page(); void _reserve_decompress_buf(size_t size); + int32_t _get_type_length(); + FieldSchema* _field_schema; level_t _max_rep_level; level_t _max_def_level; @@ -131,7 +135,6 @@ private: // Map: encoding -> Decoder // Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding std::unordered_map<int, std::unique_ptr<Decoder>> _decoders; - size_t _type_length = -1; }; } // namespace doris::vectorized diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index db91103c88..95df8bd9a2 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -22,10 +22,15 @@ #include <string> +#include "exec/schema_scanner.h" #include "io/buffered_reader.h" #include "io/file_reader.h" #include "io/local_file_reader.h" +#include "runtime/string_value.h" #include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/data_types/data_type_factory.hpp" #include "vec/exec/format/parquet/parquet_thrift_util.h" #include "vec/exec/format/parquet/vparquet_column_chunk_reader.h" #include "vec/exec/format/parquet/vparquet_file_metadata.h" @@ -125,7 +130,8 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) { } static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* column_chunk, - FieldSchema* field_schema, Slice& slice) { + FieldSchema* field_schema, ColumnPtr& doris_column, + DataTypePtr& data_type) { tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data; size_t start_offset = chunk_meta.__isset.dictionary_page_offset ? chunk_meta.dictionary_page_offset @@ -141,7 +147,35 @@ static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* // load page data into underlying container chunk_reader.load_page_data(); // decode page data - return chunk_reader.decode_values(slice, chunk_reader.num_values()); + return chunk_reader.decode_values(doris_column, data_type, chunk_reader.num_values()); +} + +static void create_block(std::unique_ptr<vectorized::Block>& block) { + // Current supported column type: + SchemaScanner::ColumnDesc column_descs[] = { + {"tinyint_col", TYPE_TINYINT, sizeof(int8_t), true}, + {"smallint_col", TYPE_SMALLINT, sizeof(int16_t), true}, + {"int_col", TYPE_INT, sizeof(int32_t), true}, + {"bigint_col", TYPE_BIGINT, sizeof(int64_t), true}, + {"boolean_col", TYPE_BOOLEAN, sizeof(bool), true}, + {"float_col", TYPE_FLOAT, sizeof(float_t), true}, + {"double_col", TYPE_DOUBLE, sizeof(double_t), true}, + {"string_col", TYPE_STRING, sizeof(StringValue), true}}; + SchemaScanner schema_scanner(column_descs, + sizeof(column_descs) / sizeof(SchemaScanner::ColumnDesc)); + ObjectPool object_pool; + SchemaScannerParam param; + schema_scanner.init(¶m, &object_pool); + auto tuple_slots = const_cast<TupleDescriptor*>(schema_scanner.tuple_desc())->slots(); + block.reset(new vectorized::Block()); + for (const auto& slot_desc : tuple_slots) { + auto is_nullable = slot_desc->is_nullable(); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), + is_nullable); + MutableColumnPtr data_column = data_type->create_column(); + block->insert( + ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); + } } TEST_F(ParquetThriftReaderTest, type_decoder) { @@ -164,6 +198,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { * `date_col` date, // 13 * `list_string` array<string>) // 14 */ + LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); /* * Data in type-decoder.parquet: @@ -181,6 +216,8 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { auto st = reader.open(); EXPECT_TRUE(st.ok()); + std::unique_ptr<vectorized::Block> block; + create_block(block); std::shared_ptr<FileMetaData> metaData; parse_thrift_footer(&reader, metaData); tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata(); @@ -190,51 +227,98 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { // the physical_type of tinyint_col, smallint_col and int_col are all INT32 // they are distinguished by converted_type(in FieldSchema.parquet_schema.converted_type) - for (int col_idx = 0; col_idx < 3; ++col_idx) { - char data[4 * rows]; - Slice slice(data, 4 * rows); - get_column_values(&reader, &t_metadata.row_groups[0].columns[col_idx], - const_cast<FieldSchema*>(schema_descriptor.get_column(col_idx)), slice); - auto out_data = reinterpret_cast<int32_t*>(data); + { + auto& column_name_with_type = block->get_by_position(0); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[0], + const_cast<FieldSchema*>(schema_descriptor.get_column(0)), data_column, + data_type); int int_sum = 0; for (int i = 0; i < rows; ++i) { - int_sum += out_data[i]; + int_sum += (int8_t)data_column->get64(i); } ASSERT_EQ(int_sum, 5); } - // `bigint_col` bigint, // 3 { - char data[8 * rows]; - Slice slice(data, 8 * rows); - get_column_values(&reader, &t_metadata.row_groups[0].columns[3], - const_cast<FieldSchema*>(schema_descriptor.get_column(3)), slice); - auto out_data = reinterpret_cast<int64_t*>(data); + auto& column_name_with_type = block->get_by_position(1); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[1], + const_cast<FieldSchema*>(schema_descriptor.get_column(1)), data_column, + data_type); int int_sum = 0; for (int i = 0; i < rows; ++i) { - int_sum += out_data[i]; + int_sum += (int16_t)data_column->get64(i); + } + ASSERT_EQ(int_sum, 5); + } + { + auto& column_name_with_type = block->get_by_position(2); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[2], + const_cast<FieldSchema*>(schema_descriptor.get_column(2)), data_column, + data_type); + int int_sum = 0; + for (int i = 0; i < rows; ++i) { + int_sum += (int32_t)data_column->get64(i); + } + ASSERT_EQ(int_sum, 5); + } + { + auto& column_name_with_type = block->get_by_position(3); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[3], + const_cast<FieldSchema*>(schema_descriptor.get_column(3)), data_column, + data_type); + int64_t int_sum = 0; + for (int i = 0; i < rows; ++i) { + int_sum += (int64_t)data_column->get64(i); } ASSERT_EQ(int_sum, 5); } // `boolean_col` boolean, // 4 { - char data[1 * rows]; - Slice slice(data, 1 * rows); + auto& column_name_with_type = block->get_by_position(4); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; get_column_values(&reader, &t_metadata.row_groups[0].columns[4], - const_cast<FieldSchema*>(schema_descriptor.get_column(4)), slice); - auto out_data = reinterpret_cast<bool*>(data); - ASSERT_FALSE(out_data[0]); - ASSERT_TRUE(out_data[1]); - ASSERT_FALSE(out_data[2]); - ASSERT_TRUE(out_data[3]); - ASSERT_FALSE(out_data[4]); - ASSERT_FALSE(out_data[5]); - ASSERT_TRUE(out_data[6]); - ASSERT_FALSE(out_data[7]); - ASSERT_FALSE(out_data[8]); - ASSERT_FALSE(out_data[9]); + const_cast<FieldSchema*>(schema_descriptor.get_column(4)), data_column, + data_type); + ASSERT_FALSE(static_cast<bool>(data_column->get64(0))); + ASSERT_TRUE(static_cast<bool>(data_column->get64(1))); + ASSERT_FALSE(static_cast<bool>(data_column->get64(2))); + ASSERT_TRUE(static_cast<bool>(data_column->get64(3))); + ASSERT_FALSE(static_cast<bool>(data_column->get64(4))); + ASSERT_FALSE(static_cast<bool>(data_column->get64(5))); + ASSERT_TRUE(static_cast<bool>(data_column->get64(6))); + ASSERT_FALSE(static_cast<bool>(data_column->get64(7))); + ASSERT_FALSE(static_cast<bool>(data_column->get64(8))); + ASSERT_FALSE(static_cast<bool>(data_column->get64(9))); + } + // `double_col` double, // 6 + { + auto& column_name_with_type = block->get_by_position(6); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; + get_column_values(&reader, &t_metadata.row_groups[0].columns[6], + const_cast<FieldSchema*>(schema_descriptor.get_column(6)), data_column, + data_type); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(data_column)).mutate().get()); + MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + ASSERT_EQ(nested_column->get_float64(0), -1.14); + ASSERT_EQ(nested_column->get_float64(1), 2.14); + ASSERT_EQ(nested_column->get_float64(2), -3.14); + ASSERT_EQ(nested_column->get_float64(3), 4.14); } // `string_col` string, // 7 { + auto& column_name_with_type = block->get_by_position(7); + auto& data_column = column_name_with_type.column; + auto& data_type = column_name_with_type.type; tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[7]; tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data; size_t start_offset = chunk_meta.__isset.dictionary_page_offset @@ -242,7 +326,6 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { : chunk_meta.data_page_offset; size_t chunk_size = chunk_meta.total_compressed_size; BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size); - ColumnChunkReader chunk_reader(&stream_reader, &column_chunk, const_cast<FieldSchema*>(schema_descriptor.get_column(7))); // initialize chunk reader @@ -252,8 +335,6 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { // load page data into underlying container chunk_reader.load_page_data(); - char data[50 * rows]; - Slice slice(data, 50 * rows); level_t defs[rows]; // Analyze null string chunk_reader.get_def_levels(defs, rows); @@ -261,9 +342,14 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { ASSERT_EQ(defs[3], 0); ASSERT_EQ(defs[7], 0); - chunk_reader.decode_values(slice, 7); - ASSERT_STREQ("s-row0", slice.data); - ASSERT_STREQ("s-row2", slice.data + 7); + chunk_reader.decode_values(data_column, data_type, 7); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(data_column)).mutate().get()); + MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + auto row0 = nested_column->get_data_at(0).data; + auto row2 = nested_column->get_data_at(1).data; + ASSERT_STREQ("s-row0", row0); + ASSERT_STREQ("s-row2", row2); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org