This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new db20e1f  [refactor](storage) VGenericIterator to reuse Schema (#7858)
db20e1f is described below

commit db20e1f323744cd2fab91f120df644a4d47106be
Author: zuochunwei <zchw...@qq.com>
AuthorDate: Wed Feb 9 13:06:03 2022 +0800

    [refactor](storage) VGenericIterator to reuse Schema (#7858)
    
    1. reuse Schema to avoid copying, because clone Schema will generate a lot 
of sub Field object
    2. call interface provided by Block to reduce code lines
---
 be/src/olap/row_block2.h                           |  2 +-
 be/src/olap/rowset/beta_rowset_reader.cpp          |  6 +--
 be/src/olap/rowset/beta_rowset_reader.h            |  1 +
 .../rowset/segment_v2/empty_segment_iterator.h     |  2 +-
 be/src/olap/rowset/segment_v2/segment_iterator.h   |  3 +-
 be/src/vec/olap/vgeneric_iterators.cpp             | 61 ++++++++--------------
 6 files changed, 29 insertions(+), 46 deletions(-)

diff --git a/be/src/olap/row_block2.h b/be/src/olap/row_block2.h
index b98ab95..7f2b79d 100644
--- a/be/src/olap/row_block2.h
+++ b/be/src/olap/row_block2.h
@@ -111,7 +111,7 @@ public:
 private:
     Status _copy_data_to_column(int cid, vectorized::MutableColumnPtr& 
mutable_column_ptr);
 
-    Schema _schema;
+    const Schema& _schema;
     size_t _capacity;
     // _column_vector_batches[cid] == null if cid is not in `_schema`.
     // memory are not allocated from `_pool` because we don't wan't to 
reallocate them in clear()
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 263a4cc..3aed8eb 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -55,7 +55,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
         _stats = _context->stats;
     }
     // SegmentIterator will load seek columns on demand
-    Schema schema(_context->tablet_schema->columns(), 
*(_context->return_columns));
+    _schema = std::make_unique<Schema>(_context->tablet_schema->columns(), 
*(_context->return_columns));
 
     // convert RowsetReaderContext to StorageReadOptions
     StorageReadOptions read_options;
@@ -102,7 +102,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
     std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
     for (auto& seg_ptr : _segment_cache_handle.get_segments()) {
         std::unique_ptr<RowwiseIterator> iter;
-        auto s = seg_ptr->new_iterator(schema, read_options, _parent_tracker, 
&iter);
+        auto s = seg_ptr->new_iterator(*_schema, read_options, 
_parent_tracker, &iter);
         if (!s.ok()) {
             LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << 
"]: " << s.to_string();
             return OLAP_ERR_ROWSET_READER_INIT;
@@ -131,7 +131,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
     _iterator.reset(final_iterator);
 
     // init input block
-    _input_block.reset(new RowBlockV2(schema,
+    _input_block.reset(new RowBlockV2(*_schema,
             std::min(1024, read_context->batch_size), _parent_tracker));
 
     if (!read_context->is_vec) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index add0c31..997ab12 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -58,6 +58,7 @@ public:
     RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
 
 private:
+    std::unique_ptr<Schema> _schema;
     RowsetReaderContext* _context;
     BetaRowsetSharedPtr _rowset;
 
diff --git a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h 
b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
index 3e1a4f9..0c186ed 100644
--- a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h
@@ -35,7 +35,7 @@ public:
     Status next_batch(vectorized::Block* block) override;
 
 private:
-    Schema _schema;
+    const Schema& _schema;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 0577526..2eae13e 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -103,8 +103,7 @@ private:
     class BitmapRangeIterator;
 
     std::shared_ptr<Segment> _segment;
-    // TODO(zc): rethink if we need copy it
-    Schema _schema;
+    const Schema& _schema;
     // _column_iterators.size() == _schema.num_columns()
     // _column_iterators[cid] == nullptr if cid is not in _schema
     std::vector<ColumnIterator*> _column_iterators;
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index 3bee64b..f0f148d 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -100,7 +100,7 @@ public:
     const Schema& schema() const override { return _schema; }
 
 private:
-    Schema _schema;
+    const Schema& _schema;
     size_t _num_rows;
     size_t _rows_returned;
 };
@@ -136,12 +136,19 @@ public:
     {
         if (!_block) {
             const Schema& schema = _iter->schema();
-            for (auto &column_desc : schema.columns()) {
+            const auto& column_ids = schema.column_ids();
+            for (size_t i = 0; i < schema.num_column_ids(); ++i) {
+                auto column_desc = schema.column(column_ids[i]);
                 auto data_type = 
Schema::get_data_type_ptr(column_desc->type());
                 if (data_type == nullptr) {
                     return Status::RuntimeError("invalid data type");
                 }
-                
_block.insert(ColumnWithTypeAndName(data_type->create_column(), data_type, 
column_desc->name()));
+                if (column_desc->is_nullable()) {
+                    data_type = 
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type));
+                }
+                auto column = data_type->create_column();
+                column->reserve(_block_row_max);
+                _block.insert(ColumnWithTypeAndName(std::move(column), 
data_type, column_desc->name()));
             }
         } else {
             _block.clear_column_data();
@@ -152,43 +159,17 @@ public:
     // Initialize this context and will prepare data for current_row()
     Status init(const StorageReadOptions& opts);
 
-    int compare_row(const VMergeIteratorContext& rhs) const {
+    bool compare(const VMergeIteratorContext& rhs) const {
         const Schema& schema = _iter->schema();
         int num = schema.num_key_columns();
-        for (uint32_t cid = 0; cid < num; ++cid) {
-#if 0
-            auto name = schema.column(cid)->name();
-            auto l_col = this->_block.get_by_name(name);
-            auto r_col = rhs._block.get_by_name(name);
-
-#else
-            //because the columns of block will be inserted by cid asc order
-            //so no need to get column by get_by_name()
-            auto l_col = this->_block.get_by_position(cid);
-            auto r_col = rhs._block.get_by_position(cid);
-#endif
-
-            auto l_cp = l_col.column;
-            auto r_cp = r_col.column;
-
-            auto res = l_cp->compare_at(_index_in_block, rhs._index_in_block, 
*r_cp, -1);
-            if (res) {
-                return res;
-            }
-        }
-
-        return 0;
-    }
-
-    bool compare(const VMergeIteratorContext& rhs) const {
-        int cmp_res = this->compare_row(rhs);
+        int cmp_res = this->_block.compare_at(_index_in_block, 
rhs._index_in_block, num, rhs._block, -1);
         if (cmp_res != 0) {
             return cmp_res > 0;
         }
         return this->data_id() < rhs.data_id();
     }
 
-    void copy_row_to(vectorized::Block* block) {
+    void copy_row(vectorized::Block* block) {
         vectorized::Block& src = _block;
         vectorized::Block& dst = *block;
 
@@ -230,9 +211,11 @@ private:
 
     bool _valid = false;
     size_t _index_in_block = -1;
+    int _block_row_max = 4096;
 };
 
 Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
+    _block_row_max = opts.block_row_max;
     RETURN_IF_ERROR(_iter->init(opts));
     RETURN_IF_ERROR(block_reset());
     RETURN_IF_ERROR(_load_next_block());
@@ -246,7 +229,7 @@ Status VMergeIteratorContext::advance() {
     // NOTE: we increase _index_in_block directly to valid one check
     do {
         _index_in_block++;
-        if (_index_in_block < _block.rows()) {
+        if (LIKELY(_index_in_block < _block.rows())) {
             return Status::OK();
         }
         // current batch has no data, load next batch
@@ -299,7 +282,7 @@ private:
     // It will be released after '_merge_heap' has been built.
     std::vector<RowwiseIterator*> _origin_iters;
 
-    std::unique_ptr<Schema> _schema;
+    const Schema* _schema = nullptr;
 
     struct VMergeContextComparator {
         bool operator()(const VMergeIteratorContext* lhs, const 
VMergeIteratorContext* rhs) const {
@@ -320,10 +303,10 @@ Status VMergeIterator::init(const StorageReadOptions& 
opts) {
     if (_origin_iters.empty()) {
         return Status::OK();
     }
-    _schema.reset(new Schema((*(_origin_iters.begin()))->schema()));
+    _schema = &(*_origin_iters.begin())->schema();
 
     for (auto iter : _origin_iters) {
-        std::unique_ptr<VMergeIteratorContext> ctx(new 
VMergeIteratorContext(iter));
+        auto ctx = std::make_unique<VMergeIteratorContext>(iter);
         RETURN_IF_ERROR(ctx->init(opts));
         if (!ctx->valid()) {
             continue;
@@ -347,7 +330,7 @@ Status VMergeIterator::next_batch(vectorized::Block* block) 
{
         _merge_heap.pop();
 
         // copy current row to block
-        ctx->copy_row_to(block);
+        ctx->copy_row(block);
 
         RETURN_IF_ERROR(ctx->advance());
         if (ctx->valid()) {
@@ -383,7 +366,7 @@ public:
     const Schema& schema() const override { return *_schema; }
 
 private:
-    std::unique_ptr<Schema> _schema;
+    const Schema* _schema = nullptr;
     RowwiseIterator* _cur_iter = nullptr;
     std::deque<RowwiseIterator*> _origin_iters;
 };
@@ -396,8 +379,8 @@ Status VUnionIterator::init(const StorageReadOptions& opts) 
{
     for (auto iter : _origin_iters) {
         RETURN_IF_ERROR(iter->init(opts));
     }
-    _schema.reset(new Schema((*(_origin_iters.begin()))->schema()));
     _cur_iter = *(_origin_iters.begin());
+    _schema = &_cur_iter->schema();
     return Status::OK();
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to