morningman commented on code in PR #11601:
URL: https://github.com/apache/doris/pull/11601#discussion_r945588010

##########
be/src/vec/exec/format/parquet/vparquet_reader.h:
##########
@@ -58,18 +64,18 @@ class ParquetReadColumn {
 
 class ParquetReader {
 public:
-    ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file,
+    ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, 
size_t batch_size,
                   int64_t range_start_offset, int64_t range_size);
 
     ~ParquetReader();
 
     Status init_reader(const TupleDescriptor* tuple_desc,
                        const std::vector<SlotDescriptor*>& tuple_slot_descs,
-                       const std::vector<ExprContext*>& conjunct_ctxs, const 
std::string& timezone);
+                       std::vector<ExprContext*>& conjunct_ctxs, const 
std::string& timezone);
 
-    Status read_next_batch(Block* block);
+    Status read_next_batch(Block* block, bool* eof);
 
-    bool has_next() const { return !_batch_eof; };
+    bool has_next() const { return !*_file_eof; };

Review Comment:
   the `eof` of `read_next_batch` already told you the end-of-file info. No 
need this `has_next()` method.



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -45,26 +50,26 @@ void ParquetReader::close() {
 
 Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc,
                                   const std::vector<SlotDescriptor*>& 
tuple_slot_descs,
-                                  const std::vector<ExprContext*>& 
conjunct_ctxs,
+                                  std::vector<ExprContext*>& conjunct_ctxs,
                                   const std::string& timezone) {
     _file_reader->open();
+    _conjunct_ctxs.reset(&conjunct_ctxs);
     RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
     auto metadata = _file_metadata->to_thrift_metadata();

Review Comment:
   This `metadata` is unused?
   You can save it as a field of this class.



##########
be/src/vec/exec/format/parquet/vparquet_group_reader.cpp:
##########
@@ -24,235 +24,57 @@
 namespace doris::vectorized {
 
 RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
-                               const std::shared_ptr<FileMetaData>& 
file_metadata,
                                const std::vector<ParquetReadColumn>& 
read_columns,
-                               const std::map<std::string, int>& map_column,
-                               const std::vector<ExprContext*>& conjunct_ctxs)
+                               const int32_t row_group_id, tparquet::RowGroup& 
row_group)
         : _file_reader(file_reader),
-          _file_metadata(file_metadata),
           _read_columns(read_columns),
-          _map_column(map_column),
-          _conjunct_ctxs(conjunct_ctxs),
-          _current_row_group(-1) {}
+          _row_group_id(row_group_id),
+          _row_group_meta(row_group),
+          _total_rows(row_group.num_rows) {}
 
 RowGroupReader::~RowGroupReader() {
-    for (auto& column_reader : _column_readers) {
-        auto reader = column_reader.second;
-        reader->close();
-        delete reader;
-        reader = nullptr;
-    }
     _column_readers.clear();
 }
 
-Status RowGroupReader::init(const TupleDescriptor* tuple_desc, int64_t 
split_start_offset,
-                            int64_t split_size) {
-    _tuple_desc = tuple_desc;
-    _split_start_offset = split_start_offset;
-    _split_size = split_size;
-    _init_conjuncts(tuple_desc, _conjunct_ctxs);
-    RETURN_IF_ERROR(_init_column_readers());
+Status RowGroupReader::init(const FieldDescriptor& schema, 
std::vector<RowRange>& row_ranges) {
+    RETURN_IF_ERROR(_init_column_readers(schema, row_ranges));
     return Status::OK();
 }
 
-void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
-                                     const std::vector<ExprContext*>& 
conjunct_ctxs) {
-    if (tuple_desc->slots().empty()) {
-        return;
-    }
-    for (auto& read_col : _read_columns) {
-        _parquet_column_ids.emplace(read_col.parquet_column_id);
-    }
-
-    for (int i = 0; i < tuple_desc->slots().size(); i++) {
-        auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name());
-        if (col_iter == _map_column.end()) {
-            continue;
-        }
-        int parquet_col_id = col_iter->second;
-        if (_parquet_column_ids.end() == 
_parquet_column_ids.find(parquet_col_id)) {
-            continue;
-        }
-        for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) {
-            Expr* conjunct = conjunct_ctxs[conj_idx]->root();
-            if (conjunct->get_num_children() == 0) {
-                continue;
-            }
-            Expr* raw_slot = conjunct->get_child(0);
-            if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
-                continue;
-            }
-            SlotRef* slot_ref = (SlotRef*)raw_slot;
-            SlotId conjunct_slot_id = slot_ref->slot_id();
-            if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
-                // Get conjuncts by conjunct_slot_id
-                auto iter = _slot_conjuncts.find(conjunct_slot_id);
-                if (_slot_conjuncts.end() == iter) {
-                    std::vector<ExprContext*> conjuncts;
-                    conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
-                    _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, 
conjuncts));
-                } else {
-                    std::vector<ExprContext*> conjuncts = iter->second;
-                    conjuncts.emplace_back(conjunct_ctxs[conj_idx]);
-                }
-            }
-        }
-    }
-}
-
-Status RowGroupReader::_init_column_readers() {
+Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema,
+                                            std::vector<RowRange>& row_ranges) 
{
     for (auto& read_col : _read_columns) {
         SlotDescriptor* slot_desc = read_col.slot_desc;
-        FieldDescriptor schema = _file_metadata->schema();
         TypeDescriptor col_type = slot_desc->type();
-        const auto& field = schema.get_column(slot_desc->col_name());
-        const tparquet::RowGroup row_group =
-                
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
-        ParquetColumnReader* reader = nullptr;
-        RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, 
MAX_PARQUET_BLOCK_SIZE, field,
-                                                    read_col, 
slot_desc->type(), row_group,
-                                                    reader));
+        auto field = 
const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name()));
+        VLOG_DEBUG << "field: " << field->debug_string();
+        std::unique_ptr<ParquetColumnReader> reader;
+        RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, 
read_col, _row_group_meta,
+                                                    row_ranges, reader));
         if (reader == nullptr) {
+            VLOG_DEBUG << "Init row group reader failed";
             return Status::Corruption("Init row group reader failed");
         }
-        _column_readers[slot_desc->id()] = reader;
+        _column_readers[slot_desc->id()] = std::move(reader);
     }
     return Status::OK();
 }
 
-Status RowGroupReader::fill_columns_data(Block* block, const int32_t group_id) 
{
-    // get ColumnWithTypeAndName from src_block
+Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* 
_batch_eof) {
+    if (_read_rows >= _total_rows) {
+        *_batch_eof = true;
+    }
     for (auto& read_col : _read_columns) {
-        const tparquet::RowGroup row_group =
-                
_file_metadata->to_thrift_metadata().row_groups[_current_row_group];
-        auto& column_with_type_and_name = 
block->get_by_name(read_col.slot_desc->col_name());
-        
RETURN_IF_ERROR(_column_readers[read_col.slot_desc->id()]->read_column_data(
-                row_group, &column_with_type_and_name.column));
-        VLOG_DEBUG << column_with_type_and_name.name;
+        auto slot_desc = read_col.slot_desc;
+        auto& column_with_type_and_name = 
block->get_by_name(slot_desc->col_name());
+        auto column_ptr = column_with_type_and_name.column;
+        auto column_type = column_with_type_and_name.type;
+        RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
+                column_ptr, column_type, batch_size, &_read_rows, _batch_eof));

Review Comment:
   `_read_rows` should be a cumulative value. But here it just be set to the 
number values of current batch.



##########
be/src/vec/exec/format/parquet/vparquet_column_reader.cpp:
##########
@@ -19,50 +19,82 @@
 
 #include <common/status.h>
 #include <gen_cpp/parquet_types.h>
+#include <vec/columns/columns_number.h>
 
 #include "schema_desc.h"
 #include "vparquet_column_chunk_reader.h"
 
 namespace doris::vectorized {
 
-Status ScalarColumnReader::init(const FileReader* file, const FieldSchema* 
field,
-                                const tparquet::ColumnChunk* chunk, const 
TypeDescriptor& col_type,
-                                int64_t chunk_size) {
-    // todo1: init column chunk reader
-    // BufferedFileStreamReader stream_reader(reader, 0, chunk_size);
-    // _chunk_reader(&stream_reader, chunk, field);
-    // _chunk_reader.init();
-    return Status();
-}
-
-Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size,
-                                   const FieldSchema* field, const 
ParquetReadColumn& column,
-                                   const TypeDescriptor& col_type,
+Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
+                                   const ParquetReadColumn& column,
                                    const tparquet::RowGroup& row_group,
-                                   const ParquetColumnReader* reader) {
+                                   std::vector<RowRange>& row_ranges,
+                                   std::unique_ptr<ParquetColumnReader>& 
reader) {
     if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
         return Status::Corruption("not supported type");
     }
     if (field->type.type == TYPE_ARRAY) {
         return Status::Corruption("not supported array type yet");
     } else {
+        VLOG_DEBUG << "field->physical_column_index: " << 
field->physical_column_index;
+        tparquet::ColumnChunk chunk = 
row_group.columns[field->physical_column_index];
         ScalarColumnReader* scalar_reader = new ScalarColumnReader(column);
-        RETURN_IF_ERROR(scalar_reader->init(file, field,
-                                            
&row_group.columns[field->physical_column_index],
-                                            col_type, chunk_size));
-        reader = scalar_reader;
+        scalar_reader->init_column_metadata(chunk);
+        RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges));
+        reader.reset(scalar_reader);
     }
     return Status::OK();
 }
 
-Status ScalarColumnReader::read_column_data(const tparquet::RowGroup& 
row_group_meta,
-                                            ColumnPtr* data) {
-    // todo2: read data with chunk reader to load page data
-    // while (_chunk_reader.has_next) {
-    // _chunk_reader.next_page();
-    // _chunk_reader.load_page_data();
-    // }
-    return Status();
+void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& 
chunk) {
+    auto chunk_meta = chunk.meta_data;
+    int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
+                                  ? chunk_meta.dictionary_page_offset
+                                  : chunk_meta.data_page_offset;
+    size_t chunk_len = chunk_meta.total_compressed_size;
+    _metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, 
chunk_meta));
+}
+
+void ParquetColumnReader::_skipped_pages() {}
+
+Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, 
tparquet::ColumnChunk* chunk,
+                                std::vector<RowRange>& row_ranges) {
+    BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), 
_metadata->size());
+    _row_ranges.reset(&row_ranges);
+    _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field));
+    _chunk_reader->init();
+    return Status::OK();
+}
+
+Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
+                                            size_t batch_size, int64_t* 
read_rows, bool* eof) {
+    if (_chunk_reader->num_values() <= 0) {
+        // seek to next page header
+        _chunk_reader->next_page();
+        if (_row_ranges->size() != 0) {
+            _skipped_pages();
+        }
+        // load data to decoder
+        _chunk_reader->load_page_data();
+    }
+    size_t read_values =
+            _chunk_reader->num_values() < batch_size ? 
_chunk_reader->num_values() : batch_size;

Review Comment:
   I think we don't need to check the number of values of a `_chunk_reader`.
   Just call `_chunk_reader->decode_values` and return the number of values 
read.



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -90,63 +95,240 @@ Status ParquetReader::_init_read_columns(const 
std::vector<SlotDescriptor*>& tup
         auto physical_type = 
_file_metadata->schema().get_column(parquet_col_id)->physical_type;
         column.parquet_type = physical_type;
         _read_columns.emplace_back(column);
+        VLOG_DEBUG << "slot_desc " << slot_desc->debug_string();
     }
     return Status::OK();
 }
 
-Status ParquetReader::read_next_batch(Block* block) {
-    int32_t group_id = 0;
-    RETURN_IF_ERROR(_row_group_reader->get_next_row_group(&group_id));
+Status ParquetReader::read_next_batch(Block* block, bool* eof) {
+    DCHECK(_total_groups == _row_group_readers.size());
+    if (_total_groups == 0) {
+        *eof = true;
+    }
+    bool _batch_eof = false;
+    auto row_group_reader = _row_group_readers[_current_row_group_id];
+    RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size, 
&_batch_eof));
+    if (_batch_eof) {
+        _current_row_group_id++;
+        if (_current_row_group_id > _total_groups) {
+            *eof = true;
+        }

Review Comment:
   missing `else`?
   In `else` block, you should open the next row group reader and call 
`next_batch()`



##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp:
##########
@@ -43,11 +44,12 @@ Status ColumnChunkReader::init() {
     // get the block compression codec
     RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
_block_compress_codec));
 
+    VLOG_DEBUG << "initColumnChunkReader finish";
     return Status::OK();
 }
 
 Status ColumnChunkReader::next_page() {
-    RETURN_IF_ERROR(_page_reader->next_page());
+    RETURN_IF_ERROR(_page_reader->next_page_header());
     _num_values = _page_reader->get_page_header()->data_page_header.num_values;

Review Comment:
   Better rename `_num_values` to `_left_num_values`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to