morningman commented on code in PR #11601: URL: https://github.com/apache/doris/pull/11601#discussion_r945588010
########## be/src/vec/exec/format/parquet/vparquet_reader.h: ########## @@ -58,18 +64,18 @@ class ParquetReadColumn { class ParquetReader { public: - ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, + ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, size_t batch_size, int64_t range_start_offset, int64_t range_size); ~ParquetReader(); Status init_reader(const TupleDescriptor* tuple_desc, const std::vector<SlotDescriptor*>& tuple_slot_descs, - const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone); + std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone); - Status read_next_batch(Block* block); + Status read_next_batch(Block* block, bool* eof); - bool has_next() const { return !_batch_eof; }; + bool has_next() const { return !*_file_eof; }; Review Comment: the `eof` of `read_next_batch` already told you the end-of-file info. No need this `has_next()` method. ########## be/src/vec/exec/format/parquet/vparquet_reader.cpp: ########## @@ -45,26 +50,26 @@ void ParquetReader::close() { Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc, const std::vector<SlotDescriptor*>& tuple_slot_descs, - const std::vector<ExprContext*>& conjunct_ctxs, + std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) { _file_reader->open(); + _conjunct_ctxs.reset(&conjunct_ctxs); RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata)); auto metadata = _file_metadata->to_thrift_metadata(); Review Comment: This `metadata` is unused? You can save it as a field of this class. ########## be/src/vec/exec/format/parquet/vparquet_group_reader.cpp: ########## @@ -24,235 +24,57 @@ namespace doris::vectorized { RowGroupReader::RowGroupReader(doris::FileReader* file_reader, - const std::shared_ptr<FileMetaData>& file_metadata, const std::vector<ParquetReadColumn>& read_columns, - const std::map<std::string, int>& map_column, - const std::vector<ExprContext*>& conjunct_ctxs) + const int32_t row_group_id, tparquet::RowGroup& row_group) : _file_reader(file_reader), - _file_metadata(file_metadata), _read_columns(read_columns), - _map_column(map_column), - _conjunct_ctxs(conjunct_ctxs), - _current_row_group(-1) {} + _row_group_id(row_group_id), + _row_group_meta(row_group), + _total_rows(row_group.num_rows) {} RowGroupReader::~RowGroupReader() { - for (auto& column_reader : _column_readers) { - auto reader = column_reader.second; - reader->close(); - delete reader; - reader = nullptr; - } _column_readers.clear(); } -Status RowGroupReader::init(const TupleDescriptor* tuple_desc, int64_t split_start_offset, - int64_t split_size) { - _tuple_desc = tuple_desc; - _split_start_offset = split_start_offset; - _split_size = split_size; - _init_conjuncts(tuple_desc, _conjunct_ctxs); - RETURN_IF_ERROR(_init_column_readers()); +Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges) { + RETURN_IF_ERROR(_init_column_readers(schema, row_ranges)); return Status::OK(); } -void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc, - const std::vector<ExprContext*>& conjunct_ctxs) { - if (tuple_desc->slots().empty()) { - return; - } - for (auto& read_col : _read_columns) { - _parquet_column_ids.emplace(read_col.parquet_column_id); - } - - for (int i = 0; i < tuple_desc->slots().size(); i++) { - auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name()); - if (col_iter == _map_column.end()) { - continue; - } - int parquet_col_id = col_iter->second; - if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) { - continue; - } - for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) { - Expr* conjunct = conjunct_ctxs[conj_idx]->root(); - if (conjunct->get_num_children() == 0) { - continue; - } - Expr* raw_slot = conjunct->get_child(0); - if (TExprNodeType::SLOT_REF != raw_slot->node_type()) { - continue; - } - SlotRef* slot_ref = (SlotRef*)raw_slot; - SlotId conjunct_slot_id = slot_ref->slot_id(); - if (conjunct_slot_id == tuple_desc->slots()[i]->id()) { - // Get conjuncts by conjunct_slot_id - auto iter = _slot_conjuncts.find(conjunct_slot_id); - if (_slot_conjuncts.end() == iter) { - std::vector<ExprContext*> conjuncts; - conjuncts.emplace_back(conjunct_ctxs[conj_idx]); - _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts)); - } else { - std::vector<ExprContext*> conjuncts = iter->second; - conjuncts.emplace_back(conjunct_ctxs[conj_idx]); - } - } - } - } -} - -Status RowGroupReader::_init_column_readers() { +Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema, + std::vector<RowRange>& row_ranges) { for (auto& read_col : _read_columns) { SlotDescriptor* slot_desc = read_col.slot_desc; - FieldDescriptor schema = _file_metadata->schema(); TypeDescriptor col_type = slot_desc->type(); - const auto& field = schema.get_column(slot_desc->col_name()); - const tparquet::RowGroup row_group = - _file_metadata->to_thrift_metadata().row_groups[_current_row_group]; - ParquetColumnReader* reader = nullptr; - RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, MAX_PARQUET_BLOCK_SIZE, field, - read_col, slot_desc->type(), row_group, - reader)); + auto field = const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name())); + VLOG_DEBUG << "field: " << field->debug_string(); + std::unique_ptr<ParquetColumnReader> reader; + RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta, + row_ranges, reader)); if (reader == nullptr) { + VLOG_DEBUG << "Init row group reader failed"; return Status::Corruption("Init row group reader failed"); } - _column_readers[slot_desc->id()] = reader; + _column_readers[slot_desc->id()] = std::move(reader); } return Status::OK(); } -Status RowGroupReader::fill_columns_data(Block* block, const int32_t group_id) { - // get ColumnWithTypeAndName from src_block +Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_eof) { + if (_read_rows >= _total_rows) { + *_batch_eof = true; + } for (auto& read_col : _read_columns) { - const tparquet::RowGroup row_group = - _file_metadata->to_thrift_metadata().row_groups[_current_row_group]; - auto& column_with_type_and_name = block->get_by_name(read_col.slot_desc->col_name()); - RETURN_IF_ERROR(_column_readers[read_col.slot_desc->id()]->read_column_data( - row_group, &column_with_type_and_name.column)); - VLOG_DEBUG << column_with_type_and_name.name; + auto slot_desc = read_col.slot_desc; + auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); + auto column_ptr = column_with_type_and_name.column; + auto column_type = column_with_type_and_name.type; + RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data( + column_ptr, column_type, batch_size, &_read_rows, _batch_eof)); Review Comment: `_read_rows` should be a cumulative value. But here it just be set to the number values of current batch. ########## be/src/vec/exec/format/parquet/vparquet_column_reader.cpp: ########## @@ -19,50 +19,82 @@ #include <common/status.h> #include <gen_cpp/parquet_types.h> +#include <vec/columns/columns_number.h> #include "schema_desc.h" #include "vparquet_column_chunk_reader.h" namespace doris::vectorized { -Status ScalarColumnReader::init(const FileReader* file, const FieldSchema* field, - const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type, - int64_t chunk_size) { - // todo1: init column chunk reader - // BufferedFileStreamReader stream_reader(reader, 0, chunk_size); - // _chunk_reader(&stream_reader, chunk, field); - // _chunk_reader.init(); - return Status(); -} - -Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size, - const FieldSchema* field, const ParquetReadColumn& column, - const TypeDescriptor& col_type, +Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, + const ParquetReadColumn& column, const tparquet::RowGroup& row_group, - const ParquetColumnReader* reader) { + std::vector<RowRange>& row_ranges, + std::unique_ptr<ParquetColumnReader>& reader) { if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) { return Status::Corruption("not supported type"); } if (field->type.type == TYPE_ARRAY) { return Status::Corruption("not supported array type yet"); } else { + VLOG_DEBUG << "field->physical_column_index: " << field->physical_column_index; + tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index]; ScalarColumnReader* scalar_reader = new ScalarColumnReader(column); - RETURN_IF_ERROR(scalar_reader->init(file, field, - &row_group.columns[field->physical_column_index], - col_type, chunk_size)); - reader = scalar_reader; + scalar_reader->init_column_metadata(chunk); + RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges)); + reader.reset(scalar_reader); } return Status::OK(); } -Status ScalarColumnReader::read_column_data(const tparquet::RowGroup& row_group_meta, - ColumnPtr* data) { - // todo2: read data with chunk reader to load page data - // while (_chunk_reader.has_next) { - // _chunk_reader.next_page(); - // _chunk_reader.load_page_data(); - // } - return Status(); +void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chunk) { + auto chunk_meta = chunk.meta_data; + int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset + ? chunk_meta.dictionary_page_offset + : chunk_meta.data_page_offset; + size_t chunk_len = chunk_meta.total_compressed_size; + _metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, chunk_meta)); +} + +void ParquetColumnReader::_skipped_pages() {} + +Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, + std::vector<RowRange>& row_ranges) { + BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), _metadata->size()); + _row_ranges.reset(&row_ranges); + _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field)); + _chunk_reader->init(); + return Status::OK(); +} + +Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, + size_t batch_size, int64_t* read_rows, bool* eof) { + if (_chunk_reader->num_values() <= 0) { + // seek to next page header + _chunk_reader->next_page(); + if (_row_ranges->size() != 0) { + _skipped_pages(); + } + // load data to decoder + _chunk_reader->load_page_data(); + } + size_t read_values = + _chunk_reader->num_values() < batch_size ? _chunk_reader->num_values() : batch_size; Review Comment: I think we don't need to check the number of values of a `_chunk_reader`. Just call `_chunk_reader->decode_values` and return the number of values read. ########## be/src/vec/exec/format/parquet/vparquet_reader.cpp: ########## @@ -90,63 +95,240 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup auto physical_type = _file_metadata->schema().get_column(parquet_col_id)->physical_type; column.parquet_type = physical_type; _read_columns.emplace_back(column); + VLOG_DEBUG << "slot_desc " << slot_desc->debug_string(); } return Status::OK(); } -Status ParquetReader::read_next_batch(Block* block) { - int32_t group_id = 0; - RETURN_IF_ERROR(_row_group_reader->get_next_row_group(&group_id)); +Status ParquetReader::read_next_batch(Block* block, bool* eof) { + DCHECK(_total_groups == _row_group_readers.size()); + if (_total_groups == 0) { + *eof = true; + } + bool _batch_eof = false; + auto row_group_reader = _row_group_readers[_current_row_group_id]; + RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size, &_batch_eof)); + if (_batch_eof) { + _current_row_group_id++; + if (_current_row_group_id > _total_groups) { + *eof = true; + } Review Comment: missing `else`? In `else` block, you should open the next row group reader and call `next_batch()` ########## be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp: ########## @@ -43,11 +44,12 @@ Status ColumnChunkReader::init() { // get the block compression codec RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); + VLOG_DEBUG << "initColumnChunkReader finish"; return Status::OK(); } Status ColumnChunkReader::next_page() { - RETURN_IF_ERROR(_page_reader->next_page()); + RETURN_IF_ERROR(_page_reader->next_page_header()); _num_values = _page_reader->get_page_header()->data_page_header.num_values; Review Comment: Better rename `_num_values` to `_left_num_values`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org