This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 124b4f7694 [feature-wip](parquet-reader) row group reader ut finish 
(#11887)
124b4f7694 is described below

commit 124b4f769466dfb8b4272bacbe40dfd49b0364bf
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Thu Aug 18 17:18:14 2022 +0800

    [feature-wip](parquet-reader) row group reader ut finish (#11887)
    
    Co-authored-by: jinzhe <jin...@selectdb.com>
---
 .../vec/exec/format/parquet/parquet_thrift_util.h  |   2 +-
 .../parquet/vparquet_column_chunk_reader.cpp       |   5 -
 .../exec/format/parquet/vparquet_column_reader.cpp |  31 ++--
 .../exec/format/parquet/vparquet_column_reader.h   |  12 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  |  17 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |   7 +-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   6 +-
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   | 177 +++++++++++++++++++++
 8 files changed, 215 insertions(+), 42 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h 
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index cb5dc1558b..7852926509 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -34,7 +34,7 @@ constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', 
'1'};
 constexpr int64_t PARQUET_FOOTER_READ_SIZE = 64 * 1024;
 constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
 
-Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData>& 
file_metadata) {
+static Status parse_thrift_footer(FileReader* file, 
std::shared_ptr<FileMetaData>& file_metadata) {
     // try with buffer on stack
     uint8_t buff[PARQUET_FOOTER_READ_SIZE];
     int64_t file_size = file->size();
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 751780fbae..0cb4e8229c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -32,19 +32,14 @@ Status ColumnChunkReader::init() {
                                   ? _metadata.dictionary_page_offset
                                   : _metadata.data_page_offset;
     size_t chunk_size = _metadata.total_compressed_size;
-    VLOG_DEBUG << "create _page_reader";
     _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, 
chunk_size);
-
     if (_metadata.__isset.dictionary_page_offset) {
         RETURN_IF_ERROR(_decode_dict_page());
     }
     // seek to the first data page
     _page_reader->seek_to_page(_metadata.data_page_offset);
-
     // get the block compression codec
     RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
_block_compress_codec));
-
-    VLOG_DEBUG << "initColumnChunkReader finish";
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index e7b189e40c..3daf80e7c8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -37,7 +37,6 @@ Status ParquetColumnReader::create(FileReader* file, 
FieldSchema* field,
     if (field->type.type == TYPE_ARRAY) {
         return Status::Corruption("not supported array type yet");
     } else {
-        VLOG_DEBUG << "field->physical_column_index: " << 
field->physical_column_index;
         tparquet::ColumnChunk chunk = 
row_group.columns[field->physical_column_index];
         ScalarColumnReader* scalar_reader = new ScalarColumnReader(column);
         scalar_reader->init_column_metadata(chunk);
@@ -60,23 +59,27 @@ void ParquetColumnReader::_skipped_pages() {}
 
 Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
                                 std::vector<RowRange>& row_ranges) {
-    BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), 
_metadata->size());
-    _row_ranges.reset(&row_ranges);
-    _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field));
-    _chunk_reader->init();
+    _stream_reader =
+            new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size());
+    _row_ranges = &row_ranges;
+    _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field));
+    RETURN_IF_ERROR(_chunk_reader->init());
+    RETURN_IF_ERROR(_chunk_reader->next_page());
+    if (_row_ranges->size() != 0) {
+        _skipped_pages();
+    }
+    RETURN_IF_ERROR(_chunk_reader->load_page_data());
     return Status::OK();
 }
 
 Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
                                             size_t batch_size, size_t* 
read_rows, bool* eof) {
     if (_chunk_reader->remaining_num_values() <= 0) {
-        // seek to next page header
-        _chunk_reader->next_page();
+        RETURN_IF_ERROR(_chunk_reader->next_page());
         if (_row_ranges->size() != 0) {
             _skipped_pages();
         }
-        // load data to decoder
-        _chunk_reader->load_page_data();
+        RETURN_IF_ERROR(_chunk_reader->load_page_data());
     }
     size_t read_values = _chunk_reader->remaining_num_values() < batch_size
                                  ? _chunk_reader->remaining_num_values()
@@ -84,14 +87,14 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
     *read_rows = read_values;
     WhichDataType which_type(type);
     switch (_metadata->t_metadata().type) {
-    case tparquet::Type::INT32: {
+    case tparquet::Type::INT32:
+    case tparquet::Type::INT64:
+    case tparquet::Type::FLOAT:
+    case tparquet::Type::DOUBLE:
+    case tparquet::Type::BOOLEAN: {
         _chunk_reader->decode_values(doris_column, type, read_values);
         return Status::OK();
     }
-    case tparquet::Type::INT64: {
-        // todo: test int64
-        return Status::OK();
-    }
     default:
         return Status::Corruption("unsupported parquet data type");
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 696fbe5db0..6c6a0e4013 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -50,7 +50,12 @@ private:
 class ParquetColumnReader {
 public:
     ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {};
-    virtual ~ParquetColumnReader() = default;
+    virtual ~ParquetColumnReader() {
+        if (_stream_reader != nullptr) {
+            delete _stream_reader;
+            _stream_reader = nullptr;
+        }
+    };
     virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& 
type, size_t batch_size,
                                     size_t* read_rows, bool* eof) = 0;
     static Status create(FileReader* file, FieldSchema* field, const 
ParquetReadColumn& column,
@@ -64,14 +69,15 @@ protected:
 
 protected:
     const ParquetReadColumn& _column;
+    BufferedFileStreamReader* _stream_reader;
     std::unique_ptr<ParquetColumnMetadata> _metadata;
-    std::unique_ptr<std::vector<RowRange>> _row_ranges;
+    std::vector<RowRange>* _row_ranges;
 };
 
 class ScalarColumnReader : public ParquetColumnReader {
 public:
     ScalarColumnReader(const ParquetReadColumn& column) : 
ParquetColumnReader(column) {};
-    ~ScalarColumnReader() override = default;
+    ~ScalarColumnReader() override { close(); };
     Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* 
chunk,
                 std::vector<RowRange>& row_ranges);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t 
batch_size,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 751e43863a..0ac58ce6ed 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -45,10 +45,9 @@ Status RowGroupReader::init(const FieldDescriptor& schema, 
std::vector<RowRange>
 Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema,
                                             std::vector<RowRange>& row_ranges) 
{
     for (auto& read_col : _read_columns) {
-        SlotDescriptor* slot_desc = read_col.slot_desc;
+        SlotDescriptor* slot_desc = read_col._slot_desc;
         TypeDescriptor col_type = slot_desc->type();
         auto field = 
const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name()));
-        VLOG_DEBUG << "field: " << field->debug_string();
         std::unique_ptr<ParquetColumnReader> reader;
         RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, 
read_col, _row_group_meta,
                                                     row_ranges, reader));
@@ -62,20 +61,18 @@ Status RowGroupReader::_init_column_readers(const 
FieldDescriptor& schema,
 }
 
 Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* 
_batch_eof) {
-    if (_read_rows >= _total_rows) {
-        *_batch_eof = true;
-    }
     for (auto& read_col : _read_columns) {
-        auto slot_desc = read_col.slot_desc;
+        auto slot_desc = read_col._slot_desc;
         auto& column_with_type_and_name = 
block->get_by_name(slot_desc->col_name());
-        auto column_ptr = column_with_type_and_name.column;
-        auto column_type = column_with_type_and_name.type;
+        auto& column_ptr = column_with_type_and_name.column;
+        auto& column_type = column_with_type_and_name.type;
         size_t batch_read_rows = 0;
         RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
                 column_ptr, column_type, batch_size, &batch_read_rows, 
_batch_eof));
         _read_rows += batch_read_rows;
-        VLOG_DEBUG << "read column: " << column_with_type_and_name.name;
-        VLOG_DEBUG << "read rows in column: " << batch_read_rows;
+        if (_read_rows >= _total_rows) {
+            *_batch_eof = true;
+        }
     }
     // use data fill utils read column data to column ptr
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index b16df6b557..13fb9e8cad 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -62,7 +62,6 @@ Status ParquetReader::init_reader(const TupleDescriptor* 
tuple_desc,
     }
     auto schema_desc = _file_metadata->schema();
     for (int i = 0; i < _file_metadata->num_columns(); ++i) {
-        // for test
         VLOG_DEBUG << schema_desc.debug_string();
         // Get the Column Reader for the boolean column
         _map_column.emplace(schema_desc.get_column(i)->name, i);
@@ -89,11 +88,7 @@ Status ParquetReader::_init_read_columns(const 
std::vector<SlotDescriptor*>& tup
             VLOG_DEBUG << str_error.str();
             return Status::InvalidArgument(str_error.str());
         }
-        ParquetReadColumn column;
-        column.slot_desc = slot_desc;
-        column.parquet_column_id = parquet_col_id;
-        auto physical_type = 
_file_metadata->schema().get_column(parquet_col_id)->physical_type;
-        column.parquet_type = physical_type;
+        ParquetReadColumn column(slot_desc);
         _read_columns.emplace_back(column);
         VLOG_DEBUG << "slot_desc " << slot_desc->debug_string();
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index c1d0ec4247..a979daf692 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -53,11 +53,11 @@ class ParquetReadColumn {
 public:
     friend class ParquetReader;
     friend class RowGroupReader;
+    ParquetReadColumn(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) {};
+    ~ParquetReadColumn() = default;
 
 private:
-    SlotDescriptor* slot_desc;
-    int parquet_column_id;
-    tparquet::Type::type parquet_type;
+    SlotDescriptor* _slot_desc;
     //    int64_t start_offset;
     //    int64_t chunk_size;
 };
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c334b105ed..7aa0f8cbd3 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -33,6 +33,7 @@
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/exec/format/parquet/parquet_thrift_util.h"
 #include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
+#include "vec/exec/format/parquet/vparquet_column_reader.h"
 #include "vec/exec/format/parquet/vparquet_file_metadata.h"
 
 namespace doris {
@@ -353,6 +354,182 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
     }
 }
 
+TEST_F(ParquetThriftReaderTest, column_reader) {
+    LocalFileReader 
file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
+    auto st = file_reader.open();
+    EXPECT_TRUE(st.ok());
+
+    // prepare metadata
+    std::shared_ptr<FileMetaData> meta_data;
+    parse_thrift_footer(&file_reader, meta_data);
+    tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata();
+
+    FieldDescriptor schema_descriptor;
+    // todo use schema of meta_data
+    schema_descriptor.parse_from_thrift(t_metadata.schema);
+    // create scalar column reader
+    std::unique_ptr<ParquetColumnReader> reader;
+    auto field = const_cast<FieldSchema*>(schema_descriptor.get_column(0));
+    // create read model
+    TDescriptorTable t_desc_table;
+    // table descriptors
+    TTableDescriptor t_table_desc;
+
+    t_table_desc.id = 0;
+    t_table_desc.tableType = TTableType::OLAP_TABLE;
+    t_table_desc.numCols = 0;
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+    TSlotDescriptor tslot_desc;
+    {
+        tslot_desc.id = 0;
+        tslot_desc.parent = 0;
+        TTypeDesc type;
+        {
+            TTypeNode node;
+            node.__set_type(TTypeNodeType::SCALAR);
+            TScalarType scalar_type;
+            scalar_type.__set_type(TPrimitiveType::TINYINT);
+            node.__set_scalar_type(scalar_type);
+            type.types.push_back(node);
+        }
+        tslot_desc.slotType = type;
+        tslot_desc.columnPos = 0;
+        tslot_desc.byteOffset = 0;
+        tslot_desc.nullIndicatorByte = 0;
+        tslot_desc.nullIndicatorBit = -1;
+        tslot_desc.colName = "tinyint_col";
+        tslot_desc.slotIdx = 0;
+        tslot_desc.isMaterialized = true;
+        t_desc_table.slotDescriptors.push_back(tslot_desc);
+    }
+    t_desc_table.__isset.slotDescriptors = true;
+    {
+        // TTupleDescriptor dest
+        TTupleDescriptor t_tuple_desc;
+        t_tuple_desc.id = 0;
+        t_tuple_desc.byteSize = 16;
+        t_tuple_desc.numNullBytes = 0;
+        t_tuple_desc.tableId = 0;
+        t_tuple_desc.__isset.tableId = true;
+        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+    }
+    DescriptorTbl* desc_tbl;
+    ObjectPool obj_pool;
+    DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+    auto slot_desc = desc_tbl->get_slot_descriptor(0);
+    ParquetReadColumn column(slot_desc);
+    std::vector<RowRange> row_ranges = std::vector<RowRange>();
+    ParquetColumnReader::create(&file_reader, field, column, 
t_metadata.row_groups[0], row_ranges,
+                                reader);
+    std::unique_ptr<vectorized::Block> block;
+    create_block(block);
+    auto& column_with_type_and_name = 
block->get_by_name(slot_desc->col_name());
+    auto& column_ptr = column_with_type_and_name.column;
+    auto& column_type = column_with_type_and_name.type;
+    size_t batch_read_rows = 0;
+    bool batch_eof = false;
+    ASSERT_EQ(column_ptr->size(), 0);
+
+    reader->read_column_data(column_ptr, column_type, 1024, &batch_read_rows, 
&batch_eof);
+    EXPECT_TRUE(!batch_eof);
+    ASSERT_EQ(batch_read_rows, 10);
+    ASSERT_EQ(column_ptr->size(), 10);
+
+    auto* nullable_column =
+            
reinterpret_cast<vectorized::ColumnNullable*>((*std::move(column_ptr)).mutate().get());
+    MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
+    int int_sum = 0;
+    for (int i = 0; i < column_ptr->size(); i++) {
+        int_sum += (int8_t)column_ptr->get64(i);
+    }
+    ASSERT_EQ(int_sum, 5);
+}
+
+TEST_F(ParquetThriftReaderTest, group_reader) {
+    TDescriptorTable t_desc_table;
+    TTableDescriptor t_table_desc;
+    std::vector<std::string> int_types = {"boolean_col", "tinyint_col", 
"smallint_col", "int_col",
+                                          "bigint_col",  "float_col",   
"double_col"};
+    //        "string_col"
+    t_table_desc.id = 0;
+    t_table_desc.tableType = TTableType::OLAP_TABLE;
+    t_table_desc.numCols = 0;
+    t_table_desc.numClusteringCols = 0;
+    t_desc_table.tableDescriptors.push_back(t_table_desc);
+    t_desc_table.__isset.tableDescriptors = true;
+
+    for (int i = 0; i < int_types.size(); i++) {
+        TSlotDescriptor tslot_desc;
+        {
+            tslot_desc.id = i;
+            tslot_desc.parent = 0;
+            TTypeDesc type;
+            {
+                TTypeNode node;
+                node.__set_type(TTypeNodeType::SCALAR);
+                TScalarType scalar_type;
+                scalar_type.__set_type(TPrimitiveType::type(i + 2));
+                node.__set_scalar_type(scalar_type);
+                type.types.push_back(node);
+            }
+            tslot_desc.slotType = type;
+            tslot_desc.columnPos = 0;
+            tslot_desc.byteOffset = 0;
+            tslot_desc.nullIndicatorByte = 0;
+            tslot_desc.nullIndicatorBit = -1;
+            tslot_desc.colName = int_types[i];
+            tslot_desc.slotIdx = 0;
+            tslot_desc.isMaterialized = true;
+            t_desc_table.slotDescriptors.push_back(tslot_desc);
+        }
+    }
+
+    t_desc_table.__isset.slotDescriptors = true;
+    {
+        // TTupleDescriptor dest
+        TTupleDescriptor t_tuple_desc;
+        t_tuple_desc.id = 0;
+        t_tuple_desc.byteSize = 16;
+        t_tuple_desc.numNullBytes = 0;
+        t_tuple_desc.tableId = 0;
+        t_tuple_desc.__isset.tableId = true;
+        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
+    }
+    DescriptorTbl* desc_tbl;
+    ObjectPool obj_pool;
+    DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+    std::vector<ParquetReadColumn> read_columns;
+    for (int i = 0; i < int_types.size(); i++) {
+        auto slot_desc = desc_tbl->get_slot_descriptor(i);
+        ParquetReadColumn column(slot_desc);
+        read_columns.emplace_back(column);
+    }
+
+    LocalFileReader 
file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
+    auto st = file_reader.open();
+    EXPECT_TRUE(st.ok());
+
+    // prepare metadata
+    std::shared_ptr<FileMetaData> meta_data;
+    parse_thrift_footer(&file_reader, meta_data);
+    tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata();
+
+    auto row_group = t_metadata.row_groups[0];
+    std::shared_ptr<RowGroupReader> row_group_reader;
+    row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, 
row_group));
+    std::vector<RowRange> row_ranges = std::vector<RowRange>();
+    auto stg = row_group_reader->init(meta_data->schema(), row_ranges);
+    EXPECT_TRUE(stg.ok());
+
+    std::unique_ptr<vectorized::Block> block;
+    create_block(block);
+    bool batch_eof = false;
+    auto stb = row_group_reader->next_batch(block.get(), 1024, &batch_eof);
+    EXPECT_TRUE(stb.ok());
+    LOG(WARNING) << "block data: " << block->dump_structure();
+}
 } // namespace vectorized
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to