This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b9bfd15b7 [feature-wip](parquet-reader) parquet physical type to 
doris logical type (#11769)
0b9bfd15b7 is described below

commit 0b9bfd15b7ca45bcf5abe17accfcd11333a1229a
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Mon Aug 15 16:08:11 2022 +0800

    [feature-wip](parquet-reader) parquet physical type to doris logical type 
(#11769)
    
    Two improvements have been added:
    1. Translate parquet physical type into doris logical type.
    2. Decode parquet column chunk into doris ColumnPtr, and add unit tests to 
show how to use related API.
---
 be/src/vec/exec/format/parquet/parquet_common.cpp  | 100 +++++++------
 be/src/vec/exec/format/parquet/parquet_common.h    |  81 ++++-------
 .../parquet/vparquet_column_chunk_reader.cpp       |  49 +++++--
 .../format/parquet/vparquet_column_chunk_reader.h  |  17 ++-
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   | 158 ++++++++++++++++-----
 5 files changed, 255 insertions(+), 150 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 1d80676b9b..082d0fc57d 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -18,34 +18,36 @@
 #include "parquet_common.h"
 
 #include "util/coding.h"
+#include "vec/data_types/data_type_nullable.h"
 
 namespace doris::vectorized {
 
-Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type 
encoding,
-                           std::unique_ptr<Decoder>& decoder) {
+#define FOR_LOGICAL_NUMERIC_TYPES(M) \
+    M(TypeIndex::Int32, Int32)       \
+    M(TypeIndex::UInt32, UInt32)     \
+    M(TypeIndex::Int64, Int64)       \
+    M(TypeIndex::UInt64, UInt64)     \
+    M(TypeIndex::Float32, Float32)   \
+    M(TypeIndex::Float64, Float64)
+
+Status Decoder::get_decoder(tparquet::Type::type type, 
tparquet::Encoding::type encoding,
+                            std::unique_ptr<Decoder>& decoder) {
     switch (encoding) {
     case tparquet::Encoding::PLAIN:
         switch (type) {
         case tparquet::Type::BOOLEAN:
             decoder.reset(new BoolPlainDecoder());
             break;
-        case tparquet::Type::INT32:
-            decoder.reset(new PlainDecoder<Int32>());
+        case tparquet::Type::BYTE_ARRAY:
+            decoder.reset(new ByteArrayPlainDecoder());
             break;
+        case tparquet::Type::INT32:
         case tparquet::Type::INT64:
-            decoder.reset(new PlainDecoder<Int64>());
-            break;
+        case tparquet::Type::INT96:
         case tparquet::Type::FLOAT:
-            decoder.reset(new PlainDecoder<Float32>());
-            break;
         case tparquet::Type::DOUBLE:
-            decoder.reset(new PlainDecoder<Float64>());
-            break;
-        case tparquet::Type::BYTE_ARRAY:
-            decoder.reset(new BAPlainDecoder());
-            break;
         case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
-            decoder.reset(new FixedLengthBAPlainDecoder());
+            decoder.reset(new PlainDecoder(type));
             break;
         default:
             return Status::InternalError("Unsupported plain type {} in parquet 
decoder",
@@ -60,34 +62,28 @@ Status Decoder::getDecoder(tparquet::Type::type type, 
tparquet::Encoding::type e
     return Status::OK();
 }
 
-Status Decoder::decode_values(ColumnPtr& doris_column, size_t num_values) {
+Status Decoder::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, 
size_t num_values) {
     CHECK(doris_column->is_nullable());
     auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
             (*std::move(doris_column)).mutate().get());
     MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
-    return _decode_values(data_column, num_values);
+    return decode_values(data_column, data_type, num_values);
 }
 
-Status FixedLengthBAPlainDecoder::decode_values(Slice& slice, size_t 
num_values) {
+Status PlainDecoder::decode_values(Slice& slice, size_t num_values) {
     size_t to_read_bytes = _type_length * num_values;
     if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
         return Status::IOError("Out-of-bounds access in parquet data decoder");
     }
-    // insert '\0' into the end of each binary
-    if (UNLIKELY(to_read_bytes + num_values > slice.size)) {
+    if (UNLIKELY(to_read_bytes > slice.size)) {
         return Status::IOError("Slice does not have enough space to write out 
the decoding data");
     }
-    uint32_t slice_offset = 0;
-    for (int i = 0; i < num_values; ++i) {
-        memcpy(slice.data + slice_offset, _data->data + _offset, _type_length);
-        slice_offset += _type_length + 1;
-        slice.data[slice_offset - 1] = '\0';
-        _offset += _type_length;
-    }
+    memcpy(slice.data, _data->data + _offset, to_read_bytes);
+    _offset += to_read_bytes;
     return Status::OK();
 }
 
-Status FixedLengthBAPlainDecoder::skip_values(size_t num_values) {
+Status PlainDecoder::skip_values(size_t num_values) {
     _offset += _type_length * num_values;
     if (UNLIKELY(_offset > _data->size)) {
         return Status::IOError("Out-of-bounds access in parquet data decoder");
@@ -95,23 +91,43 @@ Status FixedLengthBAPlainDecoder::skip_values(size_t 
num_values) {
     return Status::OK();
 }
 
-Status FixedLengthBAPlainDecoder::_decode_values(MutableColumnPtr& 
doris_column,
-                                                 size_t num_values) {
+Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t 
num_values,
+                                       size_t real_length) {
     if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
         return Status::IOError("Out-of-bounds access in parquet data decoder");
     }
-    auto& column_chars_t = 
assert_cast<ColumnString&>(*doris_column).get_chars();
-    auto& column_offsets = 
assert_cast<ColumnString&>(*doris_column).get_offsets();
     for (int i = 0; i < num_values; ++i) {
-        column_chars_t.insert(_data->data + _offset, _data->data + _offset + 
_type_length);
-        column_chars_t.emplace_back('\0');
-        column_offsets.emplace_back(column_chars_t.size());
+        doris_column->insert_data(_data->data + _offset, real_length);
         _offset += _type_length;
     }
     return Status::OK();
 }
 
-Status BAPlainDecoder::decode_values(Slice& slice, size_t num_values) {
+Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
+                                   size_t num_values) {
+    TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
+    switch (logical_type) {
+    case TypeIndex::Int8:
+    case TypeIndex::UInt8:
+        return _decode_short_int(doris_column, num_values, 1);
+    case TypeIndex::Int16:
+    case TypeIndex::UInt16:
+        return _decode_short_int(doris_column, num_values, 2);
+#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
+    case NUMERIC_TYPE:                           \
+        return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, num_values);
+        FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
+#undef DISPATCH
+    default:
+        break;
+    }
+
+    return Status::InvalidArgument("Can't decode parquet physical type {} to 
doris logical type {}",
+                                   tparquet::to_string(_physical_type),
+                                   getTypeName(data_type->get_type_id()));
+}
+
+Status ByteArrayPlainDecoder::decode_values(Slice& slice, size_t num_values) {
     uint32_t slice_offset = 0;
     for (int i = 0; i < num_values; ++i) {
         if (UNLIKELY(_offset + 4 > _data->size)) {
@@ -131,7 +147,7 @@ Status BAPlainDecoder::decode_values(Slice& slice, size_t 
num_values) {
     return Status::OK();
 }
 
-Status BAPlainDecoder::skip_values(size_t num_values) {
+Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
     for (int i = 0; i < num_values; ++i) {
         if (UNLIKELY(_offset + 4 > _data->size)) {
             return Status::IOError("Can't read byte array length from plain 
decoder");
@@ -147,9 +163,8 @@ Status BAPlainDecoder::skip_values(size_t num_values) {
     return Status::OK();
 }
 
-Status BAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t 
num_values) {
-    auto& column_chars_t = 
assert_cast<ColumnString&>(*doris_column).get_chars();
-    auto& column_offsets = 
assert_cast<ColumnString&>(*doris_column).get_offsets();
+Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
+                                            size_t num_values) {
     for (int i = 0; i < num_values; ++i) {
         if (UNLIKELY(_offset + 4 > _data->size)) {
             return Status::IOError("Can't read byte array length from plain 
decoder");
@@ -160,9 +175,7 @@ Status BAPlainDecoder::_decode_values(MutableColumnPtr& 
doris_column, size_t num
         if (UNLIKELY(_offset + length) > _data->size) {
             return Status::IOError("Can't read enough bytes in plain decoder");
         }
-        column_chars_t.insert(_data->data + _offset, _data->data + _offset + 
length);
-        column_chars_t.emplace_back('\0');
-        column_offsets.emplace_back(column_chars_t.size());
+        doris_column->insert_data(_data->data + _offset, length);
         _offset += length;
     }
     return Status::OK();
@@ -203,7 +216,8 @@ Status BoolPlainDecoder::skip_values(size_t num_values) {
     return Status::OK();
 }
 
-Status BoolPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t 
num_values) {
+Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
+                                       size_t num_values) {
     auto& column_data = 
static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
     bool value;
     for (int i = 0; i < num_values; ++i) {
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h 
b/be/src/vec/exec/format/parquet/parquet_common.h
index 44523ae22d..f0aed43288 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -25,6 +25,7 @@
 #include "vec/columns/column_array.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_string.h"
+#include "vec/data_types/data_type.h"
 
 namespace doris::vectorized {
 
@@ -35,8 +36,8 @@ public:
     Decoder() = default;
     virtual ~Decoder() = default;
 
-    static Status getDecoder(tparquet::Type::type type, 
tparquet::Encoding::type encoding,
-                             std::unique_ptr<Decoder>& decoder);
+    static Status get_decoder(tparquet::Type::type type, 
tparquet::Encoding::type encoding,
+                              std::unique_ptr<Decoder>& decoder);
 
     // The type with fix length
     void set_type_length(int32_t type_length) { _type_length = type_length; }
@@ -48,88 +49,63 @@ public:
     }
 
     // Write the decoded values batch to doris's column
-    Status decode_values(ColumnPtr& doris_column, size_t num_values);
+    Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, 
size_t num_values);
+
+    virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
+                                 size_t num_values) = 0;
 
     virtual Status decode_values(Slice& slice, size_t num_values) = 0;
 
     virtual Status skip_values(size_t num_values) = 0;
 
 protected:
-    virtual Status _decode_values(MutableColumnPtr& doris_column, size_t 
num_values) = 0;
-
     int32_t _type_length;
     Slice* _data = nullptr;
     uint32_t _offset = 0;
 };
 
-template <typename T>
 class PlainDecoder final : public Decoder {
 public:
-    PlainDecoder() = default;
+    PlainDecoder(tparquet::Type::type physical_type) : 
_physical_type(physical_type) {};
     ~PlainDecoder() override = default;
 
-    Status decode_values(Slice& slice, size_t num_values) override {
-        size_t to_read_bytes = TYPE_LENGTH * num_values;
-        if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
-            return Status::IOError("Out-of-bounds access in parquet data 
decoder");
-        }
-        if (UNLIKELY(to_read_bytes > slice.size)) {
-            return Status::IOError(
-                    "Slice does not have enough space to write out the 
decoding data");
-        }
-        memcpy(slice.data, _data->data + _offset, to_read_bytes);
-        _offset += to_read_bytes;
-        return Status::OK();
-    }
+    Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
+                         size_t num_values) override;
 
-    Status skip_values(size_t num_values) override {
-        _offset += TYPE_LENGTH * num_values;
-        if (UNLIKELY(_offset > _data->size)) {
-            return Status::IOError("Out-of-bounds access in parquet data 
decoder");
-        }
-        return Status::OK();
-    }
+    Status decode_values(Slice& slice, size_t num_values) override;
+
+    Status skip_values(size_t num_values) override;
 
 protected:
-    enum { TYPE_LENGTH = sizeof(T) };
+    Status _decode_short_int(MutableColumnPtr& doris_column, size_t 
num_values, size_t real_length);
 
-    Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) 
override {
-        size_t to_read_bytes = TYPE_LENGTH * num_values;
+    template <typename Numeric>
+    Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values) {
+        size_t to_read_bytes = _type_length * num_values;
         if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
             return Status::IOError("Out-of-bounds access in parquet data 
decoder");
         }
-        auto& column_data = 
static_cast<ColumnVector<T>&>(*doris_column).get_data();
-        const auto* raw_data = reinterpret_cast<const T*>(_data->data + 
_offset);
+        auto& column_data = 
static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
+        const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data + 
_offset);
         column_data.insert(raw_data, raw_data + num_values);
         _offset += to_read_bytes;
         return Status::OK();
     }
-};
-
-class FixedLengthBAPlainDecoder final : public Decoder {
-public:
-    FixedLengthBAPlainDecoder() = default;
-    ~FixedLengthBAPlainDecoder() override = default;
-
-    Status decode_values(Slice& slice, size_t num_values) override;
-
-    Status skip_values(size_t num_values) override;
 
-protected:
-    Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) 
override;
+    tparquet::Type::type _physical_type;
 };
 
-class BAPlainDecoder final : public Decoder {
+class ByteArrayPlainDecoder final : public Decoder {
 public:
-    BAPlainDecoder() = default;
-    ~BAPlainDecoder() override = default;
+    ByteArrayPlainDecoder() = default;
+    ~ByteArrayPlainDecoder() override = default;
+
+    Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
+                         size_t num_values) override;
 
     Status decode_values(Slice& slice, size_t num_values) override;
 
     Status skip_values(size_t num_values) override;
-
-protected:
-    Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) 
override;
 };
 
 /// Decoder bit-packed boolean-encoded values.
@@ -147,6 +123,9 @@ public:
         _offset = 0;
     }
 
+    Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
+                         size_t num_values) override;
+
     Status decode_values(Slice& slice, size_t num_values) override;
 
     Status skip_values(size_t num_values) override;
@@ -167,8 +146,6 @@ protected:
         return true;
     }
 
-    Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) 
override;
-
     /// A buffer to store unpacked values. Must be a multiple of 32 size to 
use the
     /// batch-oriented interface of BatchedBitReader. We use uint8_t instead 
of bool because
     /// bit unpacking is only supported for unsigned integers. The values are 
converted to
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index d4c7f534a3..a0a21b00ca 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -20,13 +20,14 @@
 namespace doris::vectorized {
 
 ColumnChunkReader::ColumnChunkReader(BufferedStreamReader* reader,
-                                     tparquet::ColumnChunk* column_chunk, 
FieldSchema* fieldSchema)
-        : _max_rep_level(fieldSchema->repetition_level),
-          _max_def_level(fieldSchema->definition_level),
+                                     tparquet::ColumnChunk* column_chunk, 
FieldSchema* field_schema)
+        : _field_schema(field_schema),
+          _max_rep_level(field_schema->repetition_level),
+          _max_def_level(field_schema->definition_level),
           _stream_reader(reader),
           _metadata(column_chunk->meta_data) {}
 
-Status ColumnChunkReader::init(size_t type_length) {
+Status ColumnChunkReader::init() {
     size_t start_offset = _metadata.__isset.dictionary_page_offset
                                   ? _metadata.dictionary_page_offset
                                   : _metadata.data_page_offset;
@@ -41,8 +42,6 @@ Status ColumnChunkReader::init(size_t type_length) {
 
     // get the block compression codec
     RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
_block_compress_codec));
-    // -1 means unfixed length type
-    _type_length = type_length;
 
     return Status::OK();
 }
@@ -91,14 +90,13 @@ Status ColumnChunkReader::load_page_data() {
         _page_decoder = _decoders[static_cast<int>(encoding)].get();
     } else {
         std::unique_ptr<Decoder> page_decoder;
-        Decoder::getDecoder(_metadata.type, encoding, page_decoder);
+        Decoder::get_decoder(_metadata.type, encoding, page_decoder);
         _decoders[static_cast<int>(encoding)] = std::move(page_decoder);
         _page_decoder = _decoders[static_cast<int>(encoding)].get();
     }
     _page_decoder->set_data(&_page_data);
-    if (_type_length > 0) {
-        _page_decoder->set_type_length(_type_length);
-    }
+    // Set type length
+    _page_decoder->set_type_length(_get_type_length());
 
     return Status::OK();
 }
@@ -138,12 +136,22 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, 
size_t n) {
     return _def_level_decoder.get_levels(levels, n);
 }
 
-Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, size_t 
num_values) {
+Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr& 
data_type,
+                                        size_t num_values) {
+    if (UNLIKELY(_num_values < num_values)) {
+        return Status::IOError("Decode too many values in current page");
+    }
+    _num_values -= num_values;
+    return _page_decoder->decode_values(doris_column, data_type, num_values);
+}
+
+Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, 
DataTypePtr& data_type,
+                                        size_t num_values) {
     if (UNLIKELY(_num_values < num_values)) {
         return Status::IOError("Decode too many values in current page");
     }
     _num_values -= num_values;
-    return _page_decoder->decode_values(doris_column, num_values);
+    return _page_decoder->decode_values(doris_column, data_type, num_values);
 }
 
 Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
@@ -153,4 +161,21 @@ Status ColumnChunkReader::decode_values(Slice& slice, 
size_t num_values) {
     _num_values -= num_values;
     return _page_decoder->decode_values(slice, num_values);
 }
+
+int32_t ColumnChunkReader::_get_type_length() {
+    switch (_field_schema->physical_type) {
+    case tparquet::Type::INT32:
+    case tparquet::Type::FLOAT:
+        return 4;
+    case tparquet::Type::INT64:
+    case tparquet::Type::DOUBLE:
+        return 8;
+    case tparquet::Type::INT96:
+        return 12;
+    case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
+        return _field_schema->parquet_schema.type_length;
+    default:
+        return -1;
+    }
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 282612bd21..f8510d4b37 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -59,13 +59,11 @@ namespace doris::vectorized {
 class ColumnChunkReader {
 public:
     ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk* 
column_chunk,
-                      FieldSchema* fieldSchema);
+                      FieldSchema* field_schema);
     ~ColumnChunkReader() = default;
 
     // Initialize chunk reader, will generate the decoder and codec.
-    // We can set the type_length if the length of colum type if fixed,
-    // or not set, the decoder will try to infer the type_length.
-    Status init(size_t type_length = -1);
+    Status init();
 
     // Whether the chunk reader has a more page to read.
     bool has_next_page() { return _page_reader->has_next_page(); }
@@ -86,8 +84,11 @@ public:
     // Load page data into the underlying container,
     // and initialize the repetition and definition level decoder for current 
page data.
     Status load_page_data();
-    // The remaining number of values in current page. Decreased when reading 
or skipping.
+    // The remaining number of values in current page(including null values). 
Decreased when reading or skipping.
     uint32_t num_values() const { return _num_values; };
+    // null values are not analyzing from definition levels
+    // the caller should maintain the consistency after analyzing null values 
from definition levels.
+    void dec_num_values(uint32_t dec_num) { _num_values -= dec_num; };
     // Get the raw data of current page.
     Slice& get_page_data() { return _page_data; }
 
@@ -97,7 +98,8 @@ public:
     size_t get_def_levels(level_t* levels, size_t n);
 
     // Decode values in current page into doris column.
-    Status decode_values(ColumnPtr& doris_column, size_t num_values);
+    Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, 
size_t num_values);
+    Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type, size_t num_values);
     // For test, Decode values in current page into slice.
     Status decode_values(Slice& slice, size_t num_values);
 
@@ -109,7 +111,9 @@ public:
 private:
     Status _decode_dict_page();
     void _reserve_decompress_buf(size_t size);
+    int32_t _get_type_length();
 
+    FieldSchema* _field_schema;
     level_t _max_rep_level;
     level_t _max_def_level;
 
@@ -131,7 +135,6 @@ private:
     // Map: encoding -> Decoder
     // Plain or Dictionary encoding. If the dictionary grows too big, the 
encoding will fall back to the plain encoding
     std::unordered_map<int, std::unique_ptr<Decoder>> _decoders;
-    size_t _type_length = -1;
 };
 
 } // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index db91103c88..95df8bd9a2 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -22,10 +22,15 @@
 
 #include <string>
 
+#include "exec/schema_scanner.h"
 #include "io/buffered_reader.h"
 #include "io/file_reader.h"
 #include "io/local_file_reader.h"
+#include "runtime/string_value.h"
 #include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/data_types/data_type_factory.hpp"
 #include "vec/exec/format/parquet/parquet_thrift_util.h"
 #include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
 #include "vec/exec/format/parquet/vparquet_file_metadata.h"
@@ -125,7 +130,8 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
 }
 
 static Status get_column_values(FileReader* file_reader, 
tparquet::ColumnChunk* column_chunk,
-                                FieldSchema* field_schema, Slice& slice) {
+                                FieldSchema* field_schema, ColumnPtr& 
doris_column,
+                                DataTypePtr& data_type) {
     tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
     size_t start_offset = chunk_meta.__isset.dictionary_page_offset
                                   ? chunk_meta.dictionary_page_offset
@@ -141,7 +147,35 @@ static Status get_column_values(FileReader* file_reader, 
tparquet::ColumnChunk*
     // load page data into underlying container
     chunk_reader.load_page_data();
     // decode page data
-    return chunk_reader.decode_values(slice, chunk_reader.num_values());
+    return chunk_reader.decode_values(doris_column, data_type, 
chunk_reader.num_values());
+}
+
+static void create_block(std::unique_ptr<vectorized::Block>& block) {
+    // Current supported column type:
+    SchemaScanner::ColumnDesc column_descs[] = {
+            {"tinyint_col", TYPE_TINYINT, sizeof(int8_t), true},
+            {"smallint_col", TYPE_SMALLINT, sizeof(int16_t), true},
+            {"int_col", TYPE_INT, sizeof(int32_t), true},
+            {"bigint_col", TYPE_BIGINT, sizeof(int64_t), true},
+            {"boolean_col", TYPE_BOOLEAN, sizeof(bool), true},
+            {"float_col", TYPE_FLOAT, sizeof(float_t), true},
+            {"double_col", TYPE_DOUBLE, sizeof(double_t), true},
+            {"string_col", TYPE_STRING, sizeof(StringValue), true}};
+    SchemaScanner schema_scanner(column_descs,
+                                 sizeof(column_descs) / 
sizeof(SchemaScanner::ColumnDesc));
+    ObjectPool object_pool;
+    SchemaScannerParam param;
+    schema_scanner.init(&param, &object_pool);
+    auto tuple_slots = 
const_cast<TupleDescriptor*>(schema_scanner.tuple_desc())->slots();
+    block.reset(new vectorized::Block());
+    for (const auto& slot_desc : tuple_slots) {
+        auto is_nullable = slot_desc->is_nullable();
+        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
+                                                                               
   is_nullable);
+        MutableColumnPtr data_column = data_type->create_column();
+        block->insert(
+                ColumnWithTypeAndName(std::move(data_column), data_type, 
slot_desc->col_name()));
+    }
 }
 
 TEST_F(ParquetThriftReaderTest, type_decoder) {
@@ -164,6 +198,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
      * `date_col` date, // 13
      * `list_string` array<string>) // 14
      */
+
     LocalFileReader 
reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
     /*
      * Data in type-decoder.parquet:
@@ -181,6 +216,8 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
     auto st = reader.open();
     EXPECT_TRUE(st.ok());
 
+    std::unique_ptr<vectorized::Block> block;
+    create_block(block);
     std::shared_ptr<FileMetaData> metaData;
     parse_thrift_footer(&reader, metaData);
     tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
@@ -190,51 +227,98 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
 
     // the physical_type of tinyint_col, smallint_col and int_col are all INT32
     // they are distinguished by converted_type(in 
FieldSchema.parquet_schema.converted_type)
-    for (int col_idx = 0; col_idx < 3; ++col_idx) {
-        char data[4 * rows];
-        Slice slice(data, 4 * rows);
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[col_idx],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(col_idx)), slice);
-        auto out_data = reinterpret_cast<int32_t*>(data);
+    {
+        auto& column_name_with_type = block->get_by_position(0);
+        auto& data_column = column_name_with_type.column;
+        auto& data_type = column_name_with_type.type;
+        get_column_values(&reader, &t_metadata.row_groups[0].columns[0],
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(0)), data_column,
+                          data_type);
         int int_sum = 0;
         for (int i = 0; i < rows; ++i) {
-            int_sum += out_data[i];
+            int_sum += (int8_t)data_column->get64(i);
         }
         ASSERT_EQ(int_sum, 5);
     }
-    // `bigint_col` bigint, // 3
     {
-        char data[8 * rows];
-        Slice slice(data, 8 * rows);
-        get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), slice);
-        auto out_data = reinterpret_cast<int64_t*>(data);
+        auto& column_name_with_type = block->get_by_position(1);
+        auto& data_column = column_name_with_type.column;
+        auto& data_type = column_name_with_type.type;
+        get_column_values(&reader, &t_metadata.row_groups[0].columns[1],
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(1)), data_column,
+                          data_type);
         int int_sum = 0;
         for (int i = 0; i < rows; ++i) {
-            int_sum += out_data[i];
+            int_sum += (int16_t)data_column->get64(i);
+        }
+        ASSERT_EQ(int_sum, 5);
+    }
+    {
+        auto& column_name_with_type = block->get_by_position(2);
+        auto& data_column = column_name_with_type.column;
+        auto& data_type = column_name_with_type.type;
+        get_column_values(&reader, &t_metadata.row_groups[0].columns[2],
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(2)), data_column,
+                          data_type);
+        int int_sum = 0;
+        for (int i = 0; i < rows; ++i) {
+            int_sum += (int32_t)data_column->get64(i);
+        }
+        ASSERT_EQ(int_sum, 5);
+    }
+    {
+        auto& column_name_with_type = block->get_by_position(3);
+        auto& data_column = column_name_with_type.column;
+        auto& data_type = column_name_with_type.type;
+        get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), data_column,
+                          data_type);
+        int64_t int_sum = 0;
+        for (int i = 0; i < rows; ++i) {
+            int_sum += (int64_t)data_column->get64(i);
         }
         ASSERT_EQ(int_sum, 5);
     }
     // `boolean_col` boolean, // 4
     {
-        char data[1 * rows];
-        Slice slice(data, 1 * rows);
+        auto& column_name_with_type = block->get_by_position(4);
+        auto& data_column = column_name_with_type.column;
+        auto& data_type = column_name_with_type.type;
         get_column_values(&reader, &t_metadata.row_groups[0].columns[4],
-                          
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), slice);
-        auto out_data = reinterpret_cast<bool*>(data);
-        ASSERT_FALSE(out_data[0]);
-        ASSERT_TRUE(out_data[1]);
-        ASSERT_FALSE(out_data[2]);
-        ASSERT_TRUE(out_data[3]);
-        ASSERT_FALSE(out_data[4]);
-        ASSERT_FALSE(out_data[5]);
-        ASSERT_TRUE(out_data[6]);
-        ASSERT_FALSE(out_data[7]);
-        ASSERT_FALSE(out_data[8]);
-        ASSERT_FALSE(out_data[9]);
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), data_column,
+                          data_type);
+        ASSERT_FALSE(static_cast<bool>(data_column->get64(0)));
+        ASSERT_TRUE(static_cast<bool>(data_column->get64(1)));
+        ASSERT_FALSE(static_cast<bool>(data_column->get64(2)));
+        ASSERT_TRUE(static_cast<bool>(data_column->get64(3)));
+        ASSERT_FALSE(static_cast<bool>(data_column->get64(4)));
+        ASSERT_FALSE(static_cast<bool>(data_column->get64(5)));
+        ASSERT_TRUE(static_cast<bool>(data_column->get64(6)));
+        ASSERT_FALSE(static_cast<bool>(data_column->get64(7)));
+        ASSERT_FALSE(static_cast<bool>(data_column->get64(8)));
+        ASSERT_FALSE(static_cast<bool>(data_column->get64(9)));
+    }
+    // `double_col` double, // 6
+    {
+        auto& column_name_with_type = block->get_by_position(6);
+        auto& data_column = column_name_with_type.column;
+        auto& data_type = column_name_with_type.type;
+        get_column_values(&reader, &t_metadata.row_groups[0].columns[6],
+                          
const_cast<FieldSchema*>(schema_descriptor.get_column(6)), data_column,
+                          data_type);
+        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+                (*std::move(data_column)).mutate().get());
+        MutableColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
+        ASSERT_EQ(nested_column->get_float64(0), -1.14);
+        ASSERT_EQ(nested_column->get_float64(1), 2.14);
+        ASSERT_EQ(nested_column->get_float64(2), -3.14);
+        ASSERT_EQ(nested_column->get_float64(3), 4.14);
     }
     // `string_col` string, // 7
     {
+        auto& column_name_with_type = block->get_by_position(7);
+        auto& data_column = column_name_with_type.column;
+        auto& data_type = column_name_with_type.type;
         tparquet::ColumnChunk column_chunk = 
t_metadata.row_groups[0].columns[7];
         tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
         size_t start_offset = chunk_meta.__isset.dictionary_page_offset
@@ -242,7 +326,6 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
                                       : chunk_meta.data_page_offset;
         size_t chunk_size = chunk_meta.total_compressed_size;
         BufferedFileStreamReader stream_reader(&reader, start_offset, 
chunk_size);
-
         ColumnChunkReader chunk_reader(&stream_reader, &column_chunk,
                                        
const_cast<FieldSchema*>(schema_descriptor.get_column(7)));
         // initialize chunk reader
@@ -252,8 +335,6 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
         // load page data into underlying container
         chunk_reader.load_page_data();
 
-        char data[50 * rows];
-        Slice slice(data, 50 * rows);
         level_t defs[rows];
         // Analyze null string
         chunk_reader.get_def_levels(defs, rows);
@@ -261,9 +342,14 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
         ASSERT_EQ(defs[3], 0);
         ASSERT_EQ(defs[7], 0);
 
-        chunk_reader.decode_values(slice, 7);
-        ASSERT_STREQ("s-row0", slice.data);
-        ASSERT_STREQ("s-row2", slice.data + 7);
+        chunk_reader.decode_values(data_column, data_type, 7);
+        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+                (*std::move(data_column)).mutate().get());
+        MutableColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
+        auto row0 = nested_column->get_data_at(0).data;
+        auto row2 = nested_column->get_data_at(1).data;
+        ASSERT_STREQ("s-row0", row0);
+        ASSERT_STREQ("s-row2", row2);
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to