This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 124b4f7694 [feature-wip](parquet-reader) row group reader ut finish (#11887) 124b4f7694 is described below commit 124b4f769466dfb8b4272bacbe40dfd49b0364bf Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Thu Aug 18 17:18:14 2022 +0800 [feature-wip](parquet-reader) row group reader ut finish (#11887) Co-authored-by: jinzhe <jin...@selectdb.com> --- .../vec/exec/format/parquet/parquet_thrift_util.h | 2 +- .../parquet/vparquet_column_chunk_reader.cpp | 5 - .../exec/format/parquet/vparquet_column_reader.cpp | 31 ++-- .../exec/format/parquet/vparquet_column_reader.h | 12 +- .../exec/format/parquet/vparquet_group_reader.cpp | 17 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 7 +- be/src/vec/exec/format/parquet/vparquet_reader.h | 6 +- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 177 +++++++++++++++++++++ 8 files changed, 215 insertions(+), 42 deletions(-) diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h b/be/src/vec/exec/format/parquet/parquet_thrift_util.h index cb5dc1558b..7852926509 100644 --- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h +++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h @@ -34,7 +34,7 @@ constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'}; constexpr int64_t PARQUET_FOOTER_READ_SIZE = 64 * 1024; constexpr uint32_t PARQUET_FOOTER_SIZE = 8; -Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData>& file_metadata) { +static Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData>& file_metadata) { // try with buffer on stack uint8_t buff[PARQUET_FOOTER_READ_SIZE]; int64_t file_size = file->size(); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 751780fbae..0cb4e8229c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -32,19 +32,14 @@ Status ColumnChunkReader::init() { ? _metadata.dictionary_page_offset : _metadata.data_page_offset; size_t chunk_size = _metadata.total_compressed_size; - VLOG_DEBUG << "create _page_reader"; _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, chunk_size); - if (_metadata.__isset.dictionary_page_offset) { RETURN_IF_ERROR(_decode_dict_page()); } // seek to the first data page _page_reader->seek_to_page(_metadata.data_page_offset); - // get the block compression codec RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); - - VLOG_DEBUG << "initColumnChunkReader finish"; return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index e7b189e40c..3daf80e7c8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -37,7 +37,6 @@ Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, if (field->type.type == TYPE_ARRAY) { return Status::Corruption("not supported array type yet"); } else { - VLOG_DEBUG << "field->physical_column_index: " << field->physical_column_index; tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index]; ScalarColumnReader* scalar_reader = new ScalarColumnReader(column); scalar_reader->init_column_metadata(chunk); @@ -60,23 +59,27 @@ void ParquetColumnReader::_skipped_pages() {} Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, std::vector<RowRange>& row_ranges) { - BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), _metadata->size()); - _row_ranges.reset(&row_ranges); - _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field)); - _chunk_reader->init(); + _stream_reader = + new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size()); + _row_ranges = &row_ranges; + _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field)); + RETURN_IF_ERROR(_chunk_reader->init()); + RETURN_IF_ERROR(_chunk_reader->next_page()); + if (_row_ranges->size() != 0) { + _skipped_pages(); + } + RETURN_IF_ERROR(_chunk_reader->load_page_data()); return Status::OK(); } Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, size_t* read_rows, bool* eof) { if (_chunk_reader->remaining_num_values() <= 0) { - // seek to next page header - _chunk_reader->next_page(); + RETURN_IF_ERROR(_chunk_reader->next_page()); if (_row_ranges->size() != 0) { _skipped_pages(); } - // load data to decoder - _chunk_reader->load_page_data(); + RETURN_IF_ERROR(_chunk_reader->load_page_data()); } size_t read_values = _chunk_reader->remaining_num_values() < batch_size ? _chunk_reader->remaining_num_values() @@ -84,14 +87,14 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr *read_rows = read_values; WhichDataType which_type(type); switch (_metadata->t_metadata().type) { - case tparquet::Type::INT32: { + case tparquet::Type::INT32: + case tparquet::Type::INT64: + case tparquet::Type::FLOAT: + case tparquet::Type::DOUBLE: + case tparquet::Type::BOOLEAN: { _chunk_reader->decode_values(doris_column, type, read_values); return Status::OK(); } - case tparquet::Type::INT64: { - // todo: test int64 - return Status::OK(); - } default: return Status::Corruption("unsupported parquet data type"); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 696fbe5db0..6c6a0e4013 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -50,7 +50,12 @@ private: class ParquetColumnReader { public: ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {}; - virtual ~ParquetColumnReader() = default; + virtual ~ParquetColumnReader() { + if (_stream_reader != nullptr) { + delete _stream_reader; + _stream_reader = nullptr; + } + }; virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, size_t* read_rows, bool* eof) = 0; static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column, @@ -64,14 +69,15 @@ protected: protected: const ParquetReadColumn& _column; + BufferedFileStreamReader* _stream_reader; std::unique_ptr<ParquetColumnMetadata> _metadata; - std::unique_ptr<std::vector<RowRange>> _row_ranges; + std::vector<RowRange>* _row_ranges; }; class ScalarColumnReader : public ParquetColumnReader { public: ScalarColumnReader(const ParquetReadColumn& column) : ParquetColumnReader(column) {}; - ~ScalarColumnReader() override = default; + ~ScalarColumnReader() override { close(); }; Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, std::vector<RowRange>& row_ranges); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 751e43863a..0ac58ce6ed 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -45,10 +45,9 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange> Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges) { for (auto& read_col : _read_columns) { - SlotDescriptor* slot_desc = read_col.slot_desc; + SlotDescriptor* slot_desc = read_col._slot_desc; TypeDescriptor col_type = slot_desc->type(); auto field = const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name())); - VLOG_DEBUG << "field: " << field->debug_string(); std::unique_ptr<ParquetColumnReader> reader; RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta, row_ranges, reader)); @@ -62,20 +61,18 @@ Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema, } Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_eof) { - if (_read_rows >= _total_rows) { - *_batch_eof = true; - } for (auto& read_col : _read_columns) { - auto slot_desc = read_col.slot_desc; + auto slot_desc = read_col._slot_desc; auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); - auto column_ptr = column_with_type_and_name.column; - auto column_type = column_with_type_and_name.type; + auto& column_ptr = column_with_type_and_name.column; + auto& column_type = column_with_type_and_name.type; size_t batch_read_rows = 0; RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data( column_ptr, column_type, batch_size, &batch_read_rows, _batch_eof)); _read_rows += batch_read_rows; - VLOG_DEBUG << "read column: " << column_with_type_and_name.name; - VLOG_DEBUG << "read rows in column: " << batch_read_rows; + if (_read_rows >= _total_rows) { + *_batch_eof = true; + } } // use data fill utils read column data to column ptr return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index b16df6b557..13fb9e8cad 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -62,7 +62,6 @@ Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc, } auto schema_desc = _file_metadata->schema(); for (int i = 0; i < _file_metadata->num_columns(); ++i) { - // for test VLOG_DEBUG << schema_desc.debug_string(); // Get the Column Reader for the boolean column _map_column.emplace(schema_desc.get_column(i)->name, i); @@ -89,11 +88,7 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup VLOG_DEBUG << str_error.str(); return Status::InvalidArgument(str_error.str()); } - ParquetReadColumn column; - column.slot_desc = slot_desc; - column.parquet_column_id = parquet_col_id; - auto physical_type = _file_metadata->schema().get_column(parquet_col_id)->physical_type; - column.parquet_type = physical_type; + ParquetReadColumn column(slot_desc); _read_columns.emplace_back(column); VLOG_DEBUG << "slot_desc " << slot_desc->debug_string(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index c1d0ec4247..a979daf692 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -53,11 +53,11 @@ class ParquetReadColumn { public: friend class ParquetReader; friend class RowGroupReader; + ParquetReadColumn(SlotDescriptor* slot_desc) : _slot_desc(slot_desc) {}; + ~ParquetReadColumn() = default; private: - SlotDescriptor* slot_desc; - int parquet_column_id; - tparquet::Type::type parquet_type; + SlotDescriptor* _slot_desc; // int64_t start_offset; // int64_t chunk_size; }; diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index c334b105ed..7aa0f8cbd3 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -33,6 +33,7 @@ #include "vec/data_types/data_type_factory.hpp" #include "vec/exec/format/parquet/parquet_thrift_util.h" #include "vec/exec/format/parquet/vparquet_column_chunk_reader.h" +#include "vec/exec/format/parquet/vparquet_column_reader.h" #include "vec/exec/format/parquet/vparquet_file_metadata.h" namespace doris { @@ -353,6 +354,182 @@ TEST_F(ParquetThriftReaderTest, type_decoder) { } } +TEST_F(ParquetThriftReaderTest, column_reader) { + LocalFileReader file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); + auto st = file_reader.open(); + EXPECT_TRUE(st.ok()); + + // prepare metadata + std::shared_ptr<FileMetaData> meta_data; + parse_thrift_footer(&file_reader, meta_data); + tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata(); + + FieldDescriptor schema_descriptor; + // todo use schema of meta_data + schema_descriptor.parse_from_thrift(t_metadata.schema); + // create scalar column reader + std::unique_ptr<ParquetColumnReader> reader; + auto field = const_cast<FieldSchema*>(schema_descriptor.get_column(0)); + // create read model + TDescriptorTable t_desc_table; + // table descriptors + TTableDescriptor t_table_desc; + + t_table_desc.id = 0; + t_table_desc.tableType = TTableType::OLAP_TABLE; + t_table_desc.numCols = 0; + t_table_desc.numClusteringCols = 0; + t_desc_table.tableDescriptors.push_back(t_table_desc); + t_desc_table.__isset.tableDescriptors = true; + TSlotDescriptor tslot_desc; + { + tslot_desc.id = 0; + tslot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::TINYINT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + tslot_desc.slotType = type; + tslot_desc.columnPos = 0; + tslot_desc.byteOffset = 0; + tslot_desc.nullIndicatorByte = 0; + tslot_desc.nullIndicatorBit = -1; + tslot_desc.colName = "tinyint_col"; + tslot_desc.slotIdx = 0; + tslot_desc.isMaterialized = true; + t_desc_table.slotDescriptors.push_back(tslot_desc); + } + t_desc_table.__isset.slotDescriptors = true; + { + // TTupleDescriptor dest + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = 0; + t_tuple_desc.byteSize = 16; + t_tuple_desc.numNullBytes = 0; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + DescriptorTbl* desc_tbl; + ObjectPool obj_pool; + DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl); + auto slot_desc = desc_tbl->get_slot_descriptor(0); + ParquetReadColumn column(slot_desc); + std::vector<RowRange> row_ranges = std::vector<RowRange>(); + ParquetColumnReader::create(&file_reader, field, column, t_metadata.row_groups[0], row_ranges, + reader); + std::unique_ptr<vectorized::Block> block; + create_block(block); + auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); + auto& column_ptr = column_with_type_and_name.column; + auto& column_type = column_with_type_and_name.type; + size_t batch_read_rows = 0; + bool batch_eof = false; + ASSERT_EQ(column_ptr->size(), 0); + + reader->read_column_data(column_ptr, column_type, 1024, &batch_read_rows, &batch_eof); + EXPECT_TRUE(!batch_eof); + ASSERT_EQ(batch_read_rows, 10); + ASSERT_EQ(column_ptr->size(), 10); + + auto* nullable_column = + reinterpret_cast<vectorized::ColumnNullable*>((*std::move(column_ptr)).mutate().get()); + MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + int int_sum = 0; + for (int i = 0; i < column_ptr->size(); i++) { + int_sum += (int8_t)column_ptr->get64(i); + } + ASSERT_EQ(int_sum, 5); +} + +TEST_F(ParquetThriftReaderTest, group_reader) { + TDescriptorTable t_desc_table; + TTableDescriptor t_table_desc; + std::vector<std::string> int_types = {"boolean_col", "tinyint_col", "smallint_col", "int_col", + "bigint_col", "float_col", "double_col"}; + // "string_col" + t_table_desc.id = 0; + t_table_desc.tableType = TTableType::OLAP_TABLE; + t_table_desc.numCols = 0; + t_table_desc.numClusteringCols = 0; + t_desc_table.tableDescriptors.push_back(t_table_desc); + t_desc_table.__isset.tableDescriptors = true; + + for (int i = 0; i < int_types.size(); i++) { + TSlotDescriptor tslot_desc; + { + tslot_desc.id = i; + tslot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::type(i + 2)); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + tslot_desc.slotType = type; + tslot_desc.columnPos = 0; + tslot_desc.byteOffset = 0; + tslot_desc.nullIndicatorByte = 0; + tslot_desc.nullIndicatorBit = -1; + tslot_desc.colName = int_types[i]; + tslot_desc.slotIdx = 0; + tslot_desc.isMaterialized = true; + t_desc_table.slotDescriptors.push_back(tslot_desc); + } + } + + t_desc_table.__isset.slotDescriptors = true; + { + // TTupleDescriptor dest + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = 0; + t_tuple_desc.byteSize = 16; + t_tuple_desc.numNullBytes = 0; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + DescriptorTbl* desc_tbl; + ObjectPool obj_pool; + DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl); + std::vector<ParquetReadColumn> read_columns; + for (int i = 0; i < int_types.size(); i++) { + auto slot_desc = desc_tbl->get_slot_descriptor(i); + ParquetReadColumn column(slot_desc); + read_columns.emplace_back(column); + } + + LocalFileReader file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); + auto st = file_reader.open(); + EXPECT_TRUE(st.ok()); + + // prepare metadata + std::shared_ptr<FileMetaData> meta_data; + parse_thrift_footer(&file_reader, meta_data); + tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata(); + + auto row_group = t_metadata.row_groups[0]; + std::shared_ptr<RowGroupReader> row_group_reader; + row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, row_group)); + std::vector<RowRange> row_ranges = std::vector<RowRange>(); + auto stg = row_group_reader->init(meta_data->schema(), row_ranges); + EXPECT_TRUE(stg.ok()); + + std::unique_ptr<vectorized::Block> block; + create_block(block); + bool batch_eof = false; + auto stb = row_group_reader->next_batch(block.get(), 1024, &batch_eof); + EXPECT_TRUE(stb.ok()); + LOG(WARNING) << "block data: " << block->dump_structure(); +} } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org