This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2306e46658 [Enhancement](compaction) reduce VMergeIterator copy block 
(#12316)
2306e46658 is described below

commit 2306e46658681a1d294975d810c82c1e209d016b
Author: Pxl <pxl...@qq.com>
AuthorDate: Tue Sep 13 16:19:34 2022 +0800

    [Enhancement](compaction) reduce VMergeIterator copy block (#12316)
    
    This pr change make VMergeIterator support return row reference to instead 
copy a full block.
---
 be/src/olap/compaction.cpp                |   4 +-
 be/src/olap/iterators.h                   |  10 +-
 be/src/olap/rowset/beta_rowset_reader.cpp |  26 ++-
 be/src/olap/rowset/beta_rowset_reader.h   |   2 +
 be/src/olap/rowset/rowset_reader.h        |   5 +-
 be/src/vec/core/block.h                   |  19 +++
 be/src/vec/olap/block_reader.h            |   2 +-
 be/src/vec/olap/vcollect_iterator.cpp     |  90 ++++++-----
 be/src/vec/olap/vcollect_iterator.h       |  64 ++++++--
 be/src/vec/olap/vgeneric_iterators.cpp    | 259 +++++++++++++++++-------------
 10 files changed, 306 insertions(+), 175 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 8f9e8910d9..2e9b4a6b55 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -236,7 +236,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
               << ". elapsed time=" << watch.get_elapse_second()
               << "s. cumulative_compaction_policy="
               << _tablet->cumulative_compaction_policy()->name()
-              << ", compact_row_per_second=" << _input_row_num / 
watch.get_elapse_second();
+              << ", compact_row_per_second=" << int(_input_row_num / 
watch.get_elapse_second());
 
     return Status::OK();
 }
@@ -336,7 +336,7 @@ Status Compaction::check_correctness(const 
Merger::Statistics& stats) {
     // 1. check row number
     if (_input_row_num != _output_rowset->num_rows() + stats.merged_rows + 
stats.filtered_rows) {
         LOG(WARNING) << "row_num does not match between cumulative input and 
output! "
-                     << "input_row_num=" << _input_row_num
+                     << "tablet=" << _tablet->full_name() << ", 
input_row_num=" << _input_row_num
                      << ", merged_row_num=" << stats.merged_rows
                      << ", filtered_row_num=" << stats.filtered_rows
                      << ", output_row_num=" << _output_rowset->num_rows();
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 3d9690f70b..22f081d0eb 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -94,8 +94,8 @@ public:
 // Used to read data in RowBlockV2 one by one
 class RowwiseIterator {
 public:
-    RowwiseIterator() {}
-    virtual ~RowwiseIterator() {}
+    RowwiseIterator() = default;
+    virtual ~RowwiseIterator() = default;
 
     // Initialize this iterator and make it ready to read with
     // input options.
@@ -116,6 +116,12 @@ public:
         return Status::NotSupported("to be implemented");
     }
 
+    virtual Status next_block_view(vectorized::BlockView* block_view) {
+        return Status::NotSupported("to be implemented");
+    }
+
+    virtual bool support_return_data_by_ref() { return false; }
+
     virtual Status current_block_row_locations(std::vector<RowLocation>* 
block_row_locations) {
         return Status::NotSupported("to be implemented");
     }
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 17223c9eb6..df15b72f62 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -24,7 +24,6 @@
 #include "olap/row_block.h"
 #include "olap/row_block2.h"
 #include "olap/row_cursor.h"
-#include "olap/rowset/segment_v2/segment_iterator.h"
 #include "olap/schema.h"
 #include "olap/tablet_meta.h"
 #include "vec/core/block.h"
@@ -119,7 +118,9 @@ Status BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
         for (uint32_t seg_id = 0; seg_id < rowset()->num_segments(); ++seg_id) 
{
             auto d = read_context->delete_bitmap->get_agg(
                     {rowset_id, seg_id, read_context->version.second});
-            if (d->isEmpty()) continue; // Empty delete bitmap for the segment
+            if (d->isEmpty()) {
+                continue; // Empty delete bitmap for the segment
+            }
             VLOG_TRACE << "Get the delete bitmap for rowset: " << 
rowset_id.to_string()
                        << ", segment id:" << seg_id << ", size:" << 
d->cardinality();
             read_options.delete_bitmap.emplace(seg_id, std::move(d));
@@ -323,6 +324,27 @@ Status BetaRowsetReader::next_block(vectorized::Block* 
block) {
     return Status::OK();
 }
 
+Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
+    SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
+    if (config::enable_storage_vectorization && _context->is_vec) {
+        do {
+            auto s = _iterator->next_block_view(block_view);
+            if (!s.ok()) {
+                if (s.is_end_of_file()) {
+                    return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
+                } else {
+                    LOG(WARNING) << "failed to read next block: " << 
s.to_string();
+                    return 
Status::OLAPInternalError(OLAP_ERR_ROWSET_READ_FAILED);
+                }
+            }
+        } while (block_view->empty());
+    } else {
+        return Status::NotSupported("block view only support 
enable_storage_vectorization");
+    }
+
+    return Status::OK();
+}
+
 bool BetaRowsetReader::_should_push_down_value_predicates() const {
     // if unique table with rowset [0-x] or [0-1] [2-y] [...],
     // value column predicates can be pushdown on rowset [0-x] or [2-y], [2-y] 
must be compaction and not overlapping
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index b987efc9ad..5424722c16 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -38,6 +38,8 @@ public:
     // It's ok, because we only get ref here, the block's owner is this reader.
     Status next_block(RowBlock** block) override;
     Status next_block(vectorized::Block* block) override;
+    Status next_block_view(vectorized::BlockView* block_view) override;
+    bool support_return_data_by_ref() override { return 
_iterator->support_return_data_by_ref(); }
 
     bool delete_flag() override { return _rowset->delete_flag(); }
 
diff --git a/be/src/olap/rowset/rowset_reader.h 
b/be/src/olap/rowset/rowset_reader.h
index 75f780b953..eecf594254 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -38,7 +38,7 @@ using RowsetReaderSharedPtr = std::shared_ptr<RowsetReader>;
 
 class RowsetReader {
 public:
-    virtual ~RowsetReader() {}
+    virtual ~RowsetReader() = default;
 
     // reader init
     virtual Status init(RowsetReaderContext* read_context) = 0;
@@ -52,6 +52,9 @@ public:
 
     virtual Status next_block(vectorized::Block* block) = 0;
 
+    virtual Status next_block_view(vectorized::BlockView* block_view) = 0;
+    virtual bool support_return_data_by_ref() { return false; }
+
     virtual bool delete_flag() = 0;
 
     virtual Version version() = 0;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index eb229cd173..aa603a1800 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -504,5 +504,24 @@ public:
     }
 };
 
+struct IteratorRowRef {
+    std::shared_ptr<Block> block;
+    int row_pos;
+    bool is_same;
+
+    template <typename T>
+    int compare(const IteratorRowRef& rhs, const T& compare_arguments) const {
+        return block->compare_at(row_pos, rhs.row_pos, compare_arguments, 
*rhs.block, -1);
+    }
+
+    void reset() {
+        block = nullptr;
+        row_pos = -1;
+        is_same = false;
+    }
+};
+
+using BlockView = std::vector<IteratorRowRef>;
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 0a4a8f807c..682caed065 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -99,7 +99,7 @@ private:
     std::vector<bool> _stored_has_null_tag;
     std::vector<bool> _stored_has_string_tag;
 
-    phmap::flat_hash_map<const Block*, std::vector<std::pair<int16_t, 
int16_t>>> _temp_ref_map;
+    phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>> 
_temp_ref_map;
 
     bool _eof = false;
 
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 159e59be6b..7d4560f107 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -17,15 +17,12 @@
 
 #include "vec/olap/vcollect_iterator.h"
 
-#include <memory>
-
-#include "olap/rowset/beta_rowset_reader.h"
+#include "common/status.h"
+#include "util/defer_op.h"
 
 namespace doris {
 namespace vectorized {
 
-VCollectIterator::~VCollectIterator() {}
-
 #define RETURN_IF_NOT_EOF_AND_OK(stmt)                                         
         \
     do {                                                                       
         \
         const Status& _status_ = (stmt);                                       
         \
@@ -68,9 +65,10 @@ Status 
VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade
         return Status::OK();
     } else if (_merge) {
         DCHECK(!rs_readers.empty());
+        bool have_multiple_child = false;
         for (auto [c_iter, r_iter] = std::pair {_children.begin(), 
rs_readers.begin()};
              c_iter != _children.end();) {
-            auto s = (*c_iter)->init();
+            auto s = (*c_iter)->init(have_multiple_child);
             if (!s.ok()) {
                 delete (*c_iter);
                 c_iter = _children.erase(c_iter);
@@ -79,6 +77,7 @@ Status 
VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade
                     return s;
                 }
             } else {
+                have_multiple_child = true;
                 ++c_iter;
                 ++r_iter;
             }
@@ -135,11 +134,8 @@ bool 
VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, L
     const IteratorRowRef& rhs_ref = *rhs->current_row_ref();
 
     int cmp_res = UNLIKELY(lhs->compare_columns())
-                          ? lhs_ref.block->compare_at(lhs_ref.row_pos, 
rhs_ref.row_pos,
-                                                      lhs->compare_columns(), 
*rhs_ref.block, -1)
-                          : lhs_ref.block->compare_at(lhs_ref.row_pos, 
rhs_ref.row_pos,
-                                                      
lhs->tablet_schema().num_key_columns(),
-                                                      *rhs_ref.block, -1);
+                          ? lhs_ref.compare(rhs_ref, lhs->compare_columns())
+                          : lhs_ref.compare(rhs_ref, 
lhs->tablet_schema().num_key_columns());
     if (cmp_res != 0) {
         return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0;
     }
@@ -192,15 +188,22 @@ 
VCollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader
                                                  TabletReader* reader)
         : LevelIterator(reader), _rs_reader(rs_reader), _reader(reader) {
     DCHECK_EQ(RowsetTypePB::BETA_ROWSET, rs_reader->type());
-    _block = std::make_shared<Block>(_schema.create_block(
-            _reader->_return_columns, 
_reader->_tablet_columns_convert_to_null_set));
-    _ref.block = _block;
-    _ref.row_pos = 0;
-    _ref.is_same = false;
 }
 
-Status VCollectIterator::Level0Iterator::init() {
-    return _refresh_current_row();
+Status VCollectIterator::Level0Iterator::init(bool get_data_by_ref) {
+    _get_data_by_ref = get_data_by_ref && 
_rs_reader->support_return_data_by_ref() &&
+                       config::enable_storage_vectorization;
+    if (!_get_data_by_ref) {
+        _block = std::make_shared<Block>(_schema.create_block(
+                _reader->_return_columns, 
_reader->_tablet_columns_convert_to_null_set));
+    }
+    auto st = _refresh_current_row();
+    if (_get_data_by_ref && _block_view.size()) {
+        _ref = _block_view[0];
+    } else {
+        _ref = {_block, 0, false};
+    }
+    return st;
 }
 
 int64_t VCollectIterator::Level0Iterator::version() const {
@@ -209,42 +212,50 @@ int64_t VCollectIterator::Level0Iterator::version() const 
{
 
 Status VCollectIterator::Level0Iterator::_refresh_current_row() {
     do {
-        if (_block->rows() != 0 && _ref.row_pos < _block->rows()) {
+        if (!_is_empty() && _current_valid()) {
             return Status::OK();
         } else {
-            _ref.is_same = false;
-            _ref.row_pos = 0;
-            _block->clear_column_data();
-            auto res = _rs_reader->next_block(_block.get());
+            _reset();
+            auto res = _refresh();
             if (!res.ok() && res.precise_code() != OLAP_ERR_DATA_EOF) {
                 return res;
             }
-            if (res.precise_code() == OLAP_ERR_DATA_EOF && _block->rows() == 
0) {
-                _ref.row_pos = -1;
-                return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
+            if (res.precise_code() == OLAP_ERR_DATA_EOF && _is_empty()) {
+                break;
             }
 
             if (UNLIKELY(_reader->_reader_context.record_rowids)) {
                 
RETURN_NOT_OK(_rs_reader->current_block_row_locations(&_block_row_locations));
-                DCHECK_EQ(_block_row_locations.size(), _block->rows());
             }
         }
-    } while (_block->rows() != 0);
+    } while (!_is_empty());
     _ref.row_pos = -1;
+    _current = -1;
     return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
 }
 
 Status VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) {
-    _ref.row_pos++;
+    if (_get_data_by_ref) {
+        _current++;
+    } else {
+        _ref.row_pos++;
+    }
+
     RETURN_NOT_OK(_refresh_current_row());
+
+    if (_get_data_by_ref) {
+        _ref = _block_view[_current];
+    }
+
     *ref = _ref;
     return Status::OK();
 }
 
 Status VCollectIterator::Level0Iterator::next(Block* block) {
-    if (UNLIKELY(_ref.block->rows() > 0 && _ref.row_pos == 0)) {
+    CHECK(!_get_data_by_ref);
+    if (_ref.row_pos == 0 && _ref.block != nullptr && 
UNLIKELY(_ref.block->rows() > 0)) {
         block->swap(*_ref.block);
-        _ref.row_pos = -1;
+        _ref.reset();
         return Status::OK();
     } else {
         auto res = _rs_reader->next_block(block);
@@ -262,7 +273,7 @@ Status VCollectIterator::Level0Iterator::next(Block* block) 
{
 }
 
 RowLocation VCollectIterator::Level0Iterator::current_row_location() {
-    RowLocation& segment_row_id = _block_row_locations[_ref.row_pos];
+    RowLocation& segment_row_id = _block_row_locations[_get_data_by_ref ? 
_current : _ref.row_pos];
     return RowLocation(_rs_reader->rowset()->rowset_id(), 
segment_row_id.segment_id,
                        segment_row_id.row_id);
 }
@@ -287,7 +298,7 @@ VCollectIterator::Level1Iterator::Level1Iterator(
           _merge(merge),
           _is_reverse(is_reverse),
           _skip_same(skip_same) {
-    _ref.row_pos = -1; // represent eof
+    _ref.reset();
     _batch_size = reader->_batch_size;
 }
 
@@ -303,7 +314,9 @@ VCollectIterator::Level1Iterator::~Level1Iterator() {
         while (!_heap->empty()) {
             auto child = _heap->top();
             _heap->pop();
-            if (child) delete child;
+            if (child) {
+                delete child;
+            }
         }
     }
 }
@@ -315,7 +328,7 @@ VCollectIterator::Level1Iterator::~Level1Iterator() {
 //      Others when error happens
 Status VCollectIterator::Level1Iterator::next(IteratorRowRef* ref) {
     if (UNLIKELY(_cur_child == nullptr)) {
-        _ref.row_pos = -1;
+        _ref.reset();
         return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
     }
     if (_merge) {
@@ -348,7 +361,7 @@ int64_t VCollectIterator::Level1Iterator::version() const {
     return -1;
 }
 
-Status VCollectIterator::Level1Iterator::init() {
+Status VCollectIterator::Level1Iterator::init(bool get_data_by_ref) {
     if (_children.empty()) {
         return Status::OK();
     }
@@ -392,11 +405,12 @@ Status 
VCollectIterator::Level1Iterator::_merge_next(IteratorRowRef* ref) {
         if (!_heap->empty()) {
             _cur_child = _heap->top();
         } else {
+            _ref.reset();
             _cur_child = nullptr;
-            _ref.row_pos = -1;
             return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
         }
     } else {
+        _ref.reset();
         _cur_child = nullptr;
         LOG(WARNING) << "failed to get next from child, res=" << res;
         return res;
@@ -465,7 +479,7 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* 
block) {
                                                      pre_row_ref.row_pos, 
continuous_row_in_block);
             }
             continuous_row_in_block = 0;
-            pre_row_ref.block = nullptr;
+            pre_row_ref.reset();
         }
         auto res = _merge_next(&cur_row);
         if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) {
diff --git a/be/src/vec/olap/vcollect_iterator.h 
b/be/src/vec/olap/vcollect_iterator.h
index eabb0ad2e5..69ef3da70a 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -17,13 +17,13 @@
 
 #pragma once
 
+#include "common/status.h"
 #ifdef USE_LIBCPP
 #include <queue>
 #else
 #include <ext/pb_ds/priority_queue.hpp>
 #endif
 
-#include "olap/olap_define.h"
 #include "olap/reader.h"
 #include "olap/rowset/rowset_reader.h"
 #include "vec/core/block.h"
@@ -34,16 +34,10 @@ class TabletSchema;
 
 namespace vectorized {
 
-struct IteratorRowRef {
-    std::shared_ptr<Block> block;
-    int16_t row_pos;
-    bool is_same;
-};
-
 class VCollectIterator {
 public:
     // Hold reader point to get reader params
-    ~VCollectIterator();
+    ~VCollectIterator() = default;
 
     void init(TabletReader* reader, bool force_merge, bool is_reverse);
 
@@ -83,7 +77,7 @@ private:
                 : _schema(reader->tablet_schema()),
                   
_compare_columns(reader->_reader_context.read_orderby_key_columns) {};
 
-        virtual Status init() = 0;
+        virtual Status init(bool get_data_by_ref = false) = 0;
 
         virtual int64_t version() const = 0;
 
@@ -95,7 +89,7 @@ private:
 
         void set_same(bool same) { _ref.is_same = same; }
 
-        bool is_same() { return _ref.is_same; }
+        bool is_same() const { return _ref.is_same; }
 
         virtual ~LevelIterator() = default;
 
@@ -140,9 +134,9 @@ private:
     class Level0Iterator : public LevelIterator {
     public:
         Level0Iterator(RowsetReaderSharedPtr rs_reader, TabletReader* reader);
-        ~Level0Iterator() {}
+        ~Level0Iterator() override = default;
 
-        Status init() override;
+        Status init(bool get_data_by_ref = false) override;
 
         int64_t version() const override;
 
@@ -156,11 +150,53 @@ private:
 
     private:
         Status _refresh_current_row();
+        Status _next_by_ref(IteratorRowRef* ref);
+        Status _refresh_current_row_by_ref();
+
+        bool _is_empty() {
+            if (_get_data_by_ref) {
+                return _block_view.empty();
+            } else {
+                return _block->rows() == 0;
+            }
+        }
+
+        bool _current_valid() {
+            if (_get_data_by_ref) {
+                return _current < _block_view.size();
+            } else {
+                return _ref.row_pos < _block->rows();
+            }
+        }
+
+        void _reset() {
+            if (_get_data_by_ref) {
+                _block_view.clear();
+                _ref.reset();
+                _current = 0;
+            } else {
+                _ref.is_same = false;
+                _ref.row_pos = 0;
+                _block->clear_column_data();
+            }
+        }
+
+        Status _refresh() {
+            if (_get_data_by_ref) {
+                return _rs_reader->next_block_view(&_block_view);
+            } else {
+                return _rs_reader->next_block(_block.get());
+            }
+        }
 
         RowsetReaderSharedPtr _rs_reader;
         TabletReader* _reader = nullptr;
         std::shared_ptr<Block> _block;
+
+        int _current;
+        BlockView _block_view;
         std::vector<RowLocation> _block_row_locations;
+        bool _get_data_by_ref = false;
     };
 
     // Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or 
mixed)
@@ -169,7 +205,7 @@ private:
         Level1Iterator(const std::list<LevelIterator*>& children, 
TabletReader* reader, bool merge,
                        bool is_reverse, bool skip_same);
 
-        Status init() override;
+        Status init(bool get_data_by_ref = false) override;
 
         int64_t version() const override;
 
@@ -181,7 +217,7 @@ private:
 
         Status current_block_row_locations(std::vector<RowLocation>* 
block_row_locations) override;
 
-        ~Level1Iterator();
+        ~Level1Iterator() override;
 
     private:
         Status _merge_next(IteratorRowRef* ref);
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index 9f50040b2e..280d7b054a 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <memory>
 #include <queue>
 #include <utility>
 
+#include "common/status.h"
 #include "olap/iterators.h"
-#include "olap/row.h"
-#include "olap/row_block2.h"
+#include "olap/schema.h"
+#include "vec/core/block.h"
 
 namespace doris {
 
@@ -44,17 +46,17 @@ public:
     // Will generate num_rows rows in total
     VAutoIncrementIterator(const Schema& schema, size_t num_rows)
             : _schema(schema), _num_rows(num_rows), _rows_returned() {}
-    ~VAutoIncrementIterator() override {}
+    ~VAutoIncrementIterator() override = default;
 
     // NOTE: Currently, this function will ignore StorageReadOptions
     Status init(const StorageReadOptions& opts) override;
 
-    Status next_batch(vectorized::Block* block) override {
+    Status next_batch(Block* block) override {
         int row_idx = 0;
         while (_rows_returned < _num_rows) {
             for (int j = 0; j < _schema.num_columns(); ++j) {
-                vectorized::ColumnWithTypeAndName& vc = 
block->get_by_position(j);
-                vectorized::IColumn& vi = (vectorized::IColumn&)(*vc.column);
+                ColumnWithTypeAndName& vc = block->get_by_position(j);
+                IColumn& vi = (IColumn&)(*vc.column);
 
                 char data[16] = {};
                 size_t data_len = 0;
@@ -91,7 +93,9 @@ public:
             ++_rows_returned;
         }
 
-        if (row_idx > 0) return Status::OK();
+        if (row_idx > 0) {
+            return Status::OK();
+        }
         return Status::EndOfFile("End of VAutoIncrementIterator");
     }
 
@@ -139,8 +143,8 @@ public:
         _iter = nullptr;
     }
 
-    Status block_reset() {
-        if (!_block) {
+    Status block_reset(const std::shared_ptr<Block>& block) {
+        if (!*block) {
             const Schema& schema = _iter->schema();
             const auto& column_ids = schema.column_ids();
             for (size_t i = 0; i < schema.num_column_ids(); ++i) {
@@ -151,11 +155,11 @@ public:
                 }
                 auto column = data_type->create_column();
                 column->reserve(_block_row_max);
-                _block.insert(
+                block->insert(
                         ColumnWithTypeAndName(std::move(column), data_type, 
column_desc->name()));
             }
         } else {
-            _block.clear_column_data();
+            block->clear_column_data();
         }
         return Status::OK();
     }
@@ -165,10 +169,10 @@ public:
 
     bool compare(const VMergeIteratorContext& rhs) const {
         int cmp_res = UNLIKELY(_compare_columns)
-                              ? this->_block.compare_at(_index_in_block, 
rhs._index_in_block,
-                                                        _compare_columns, 
rhs._block, -1)
-                              : this->_block.compare_at(_index_in_block, 
rhs._index_in_block,
-                                                        _num_key_columns, 
rhs._block, -1);
+                              ? _block->compare_at(_index_in_block, 
rhs._index_in_block,
+                                                   _compare_columns, 
*rhs._block, -1)
+                              : _block->compare_at(_index_in_block, 
rhs._index_in_block,
+                                                   _num_key_columns, 
*rhs._block, -1);
 
         if (cmp_res != 0) {
             return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0;
@@ -176,47 +180,52 @@ public:
 
         auto col_cmp_res = 0;
         if (_sequence_id_idx != -1) {
-            col_cmp_res = this->_block.compare_column_at(_index_in_block, 
rhs._index_in_block,
-                                                         _sequence_id_idx, 
rhs._block, -1);
+            col_cmp_res = _block->compare_column_at(_index_in_block, 
rhs._index_in_block,
+                                                    _sequence_id_idx, 
*rhs._block, -1);
         }
-        auto result = col_cmp_res == 0 ? this->data_id() < rhs.data_id() : 
col_cmp_res < 0;
+        auto result = col_cmp_res == 0 ? data_id() < rhs.data_id() : 
col_cmp_res < 0;
 
         if (_is_unique) {
-            result ? this->set_skip(true) : rhs.set_skip(true);
+            result ? set_skip(true) : rhs.set_skip(true);
         }
         return result;
     }
 
-    // there is two situation in copy_rows:
-    // 1...   `advanced = false` when current block finished, we should copy 
block before advance(iterator)
-    // If we iterator a block from start to end, _index_in_block=rows()-1, and 
_cur_batch_num=rows,
-    // so we should copy from (_index_in_block - _cur_batch_num + 1)
-
-    // 2...   `advanced = true` when current block not finished and we 
advanced to next block, now
-    // cur_batch_num = (pre_block iteraotr num) + 1, but actually pre_block 
iterator num is cur_batch_num -1
-    // so we have a ` if (advanced) start -- `
-    void copy_rows(vectorized::Block* block, bool advanced = true) {
-        vectorized::Block& src = _block;
-        vectorized::Block& dst = *block;
+    // `advanced = false` when current block finished
+    void copy_rows(Block* block, bool advanced = true) {
+        Block& src = *_block;
+        Block& dst = *block;
         if (_cur_batch_num == 0) {
             return;
         }
 
+        // copy a row to dst block column by column
+        size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
+        DCHECK(start >= 0);
+
         for (size_t i = 0; i < _num_columns; ++i) {
             auto& s_col = src.get_by_position(i);
             auto& d_col = dst.get_by_position(i);
 
-            vectorized::ColumnPtr& s_cp = s_col.column;
-            vectorized::ColumnPtr& d_cp = d_col.column;
+            ColumnPtr& s_cp = s_col.column;
+            ColumnPtr& d_cp = d_col.column;
 
-            //copy a row to dst block column by column
-            size_t start = _index_in_block - _cur_batch_num + 1;
-            if (advanced) {
-                start--;
-            }
-            DCHECK(start >= 0);
-            ((vectorized::IColumn&)(*d_cp)).insert_range_from(*s_cp, start, 
_cur_batch_num);
+            d_cp->assume_mutable()->insert_range_from(*s_cp, start, 
_cur_batch_num);
+        }
+        _cur_batch_num = 0;
+    }
+
+    void copy_rows(BlockView* view, bool advanced = true) {
+        if (_cur_batch_num == 0) {
+            return;
+        }
+        size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
+        DCHECK(start >= 0);
+
+        for (size_t i = 0; i < _cur_batch_num; ++i) {
+            view->push_back({_block, static_cast<int>(start + i), false});
         }
+
         _cur_batch_num = 0;
     }
 
@@ -245,12 +254,7 @@ public:
 
     void reset_cur_batch() { _cur_batch_num = 0; }
 
-    bool is_cur_block_finished() {
-        if (_index_in_block == _block.rows() - 1) {
-            return true;
-        }
-        return false;
-    }
+    bool is_cur_block_finished() { return _index_in_block == _block->rows() - 
1; }
 
 private:
     // Load next block into _block
@@ -258,9 +262,6 @@ private:
 
     RowwiseIterator* _iter;
 
-    // used to store data load from iterator->next_batch(Vectorized::Block*)
-    vectorized::Block _block;
-
     int _sequence_id_idx = -1;
     bool _is_unique = false;
     bool _is_reverse = false;
@@ -275,13 +276,17 @@ private:
     std::vector<RowLocation> _block_row_locations;
     bool _record_rowids = false;
     size_t _cur_batch_num = 0;
+
+    // used to store data load from iterator->next_batch(Block*)
+    std::shared_ptr<Block> _block;
+    // used to store data still on block view
+    std::list<std::shared_ptr<Block>> _block_list;
 };
 
 Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
     _block_row_max = opts.block_row_max;
     _record_rowids = opts.record_rowids;
     RETURN_IF_ERROR(_iter->init(opts));
-    RETURN_IF_ERROR(block_reset());
     RETURN_IF_ERROR(_load_next_block());
     if (valid()) {
         RETURN_IF_ERROR(advance());
@@ -294,7 +299,7 @@ Status VMergeIteratorContext::advance() {
     // NOTE: we increase _index_in_block directly to valid one check
     do {
         _index_in_block++;
-        if (LIKELY(_index_in_block < _block.rows())) {
+        if (LIKELY(_index_in_block < _block->rows())) {
             return Status::OK();
         }
         // current batch has no data, load next batch
@@ -305,8 +310,23 @@ Status VMergeIteratorContext::advance() {
 
 Status VMergeIteratorContext::_load_next_block() {
     do {
-        block_reset();
-        Status st = _iter->next_batch(&_block);
+        if (_block != nullptr) {
+            _block_list.push_back(_block);
+            _block = nullptr;
+        }
+        for (auto it = _block_list.begin(); it != _block_list.end(); it++) {
+            if (it->use_count() == 1) {
+                block_reset(*it);
+                _block = *it;
+                _block_list.erase(it);
+                break;
+            }
+        }
+        if (_block == nullptr) {
+            _block = std::make_shared<Block>();
+            block_reset(_block);
+        }
+        Status st = _iter->next_batch(_block.get());
         if (!st.ok()) {
             _valid = false;
             if (st.is_end_of_file()) {
@@ -318,7 +338,7 @@ Status VMergeIteratorContext::_load_next_block() {
         if (UNLIKELY(_record_rowids)) {
             
RETURN_IF_ERROR(_iter->current_block_row_locations(&_block_row_locations));
         }
-    } while (_block.rows() == 0);
+    } while (_block->rows() == 0);
     _index_in_block = -1;
     _valid = true;
     return Status::OK();
@@ -345,7 +365,10 @@ public:
 
     Status init(const StorageReadOptions& opts) override;
 
-    Status next_batch(vectorized::Block* block) override;
+    Status next_batch(Block* block) override { return _next_batch(block); }
+    Status next_block_view(BlockView* block_view) override { return 
_next_batch(block_view); }
+
+    bool support_return_data_by_ref() override { return true; }
 
     const Schema& schema() const override { return *_schema; }
 
@@ -356,6 +379,71 @@ public:
     }
 
 private:
+    int _get_size(Block* block) { return block->rows(); }
+    int _get_size(BlockView* block_view) { return block_view->size(); }
+
+    template <typename T>
+    Status _next_batch(T* block) {
+        if (UNLIKELY(_record_rowids)) {
+            _block_row_locations.resize(_block_row_max);
+        }
+        size_t row_idx = 0;
+        VMergeIteratorContext* pre_ctx = nullptr;
+        while (_get_size(block) < _block_row_max) {
+            if (_merge_heap.empty()) {
+                break;
+            }
+
+            auto ctx = _merge_heap.top();
+            _merge_heap.pop();
+
+            if (!ctx->need_skip()) {
+                ctx->add_cur_batch();
+                if (pre_ctx != ctx) {
+                    if (pre_ctx) {
+                        pre_ctx->copy_rows(block);
+                    }
+                    pre_ctx = ctx;
+                }
+                if (UNLIKELY(_record_rowids)) {
+                    _block_row_locations[row_idx] = 
ctx->current_row_location();
+                }
+                row_idx++;
+                if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) 
{
+                    // current block finished, ctx not advance
+                    // so copy start_idx = (_index_in_block - _cur_batch_num + 
1)
+                    ctx->copy_rows(block, false);
+                    pre_ctx = nullptr;
+                }
+            } else if (_merged_rows != nullptr) {
+                (*_merged_rows)++;
+                // need skip cur row, so flush rows in pre_ctx
+                if (pre_ctx) {
+                    pre_ctx->copy_rows(block);
+                    pre_ctx = nullptr;
+                }
+            }
+
+            RETURN_IF_ERROR(ctx->advance());
+            if (ctx->valid()) {
+                _merge_heap.push(ctx);
+            } else {
+                // Release ctx earlier to reduce resource consumed
+                delete ctx;
+            }
+        }
+        if (!_merge_heap.empty()) {
+            return Status::OK();
+        }
+        // Still last batch needs to be processed
+
+        if (UNLIKELY(_record_rowids)) {
+            _block_row_locations.resize(row_idx);
+        }
+
+        return Status::EndOfFile("no more data in segment");
+    }
+
     // It will be released after '_merge_heap' has been built.
     std::vector<RowwiseIterator*> _origin_iters;
 
@@ -406,65 +494,6 @@ Status VMergeIterator::init(const StorageReadOptions& 
opts) {
     return Status::OK();
 }
 
-Status VMergeIterator::next_batch(vectorized::Block* block) {
-    if (UNLIKELY(_record_rowids)) {
-        _block_row_locations.resize(_block_row_max);
-    }
-    size_t row_idx = 0;
-    VMergeIteratorContext* pre_ctx = nullptr;
-    while (block->rows() < _block_row_max) {
-        if (_merge_heap.empty()) break;
-
-        auto ctx = _merge_heap.top();
-        _merge_heap.pop();
-
-        if (!ctx->need_skip()) {
-            ctx->add_cur_batch();
-            if (pre_ctx != ctx) {
-                if (pre_ctx) {
-                    pre_ctx->copy_rows(block);
-                }
-                pre_ctx = ctx;
-            }
-            if (UNLIKELY(_record_rowids)) {
-                _block_row_locations[row_idx] = ctx->current_row_location();
-            }
-            row_idx++;
-            if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
-                // current block finished, ctx not advance
-                // so copy start_idx = (_index_in_block - _cur_batch_num + 1)
-                ctx->copy_rows(block, false);
-                pre_ctx = nullptr;
-            }
-        } else if (_merged_rows != nullptr) {
-            (*_merged_rows)++;
-            // need skip cur row, so flush rows in pre_ctx
-            if (pre_ctx) {
-                pre_ctx->copy_rows(block);
-                pre_ctx = nullptr;
-            }
-        }
-
-        RETURN_IF_ERROR(ctx->advance());
-        if (ctx->valid()) {
-            _merge_heap.push(ctx);
-        } else {
-            // Release ctx earlier to reduce resource consumed
-            delete ctx;
-        }
-    }
-    if (!_merge_heap.empty()) {
-        return Status::OK();
-    }
-    // Still last batch needs to be processed
-
-    if (UNLIKELY(_record_rowids)) {
-        _block_row_locations.resize(row_idx);
-    }
-
-    return Status::EndOfFile("no more data in segment");
-}
-
 // VUnionIterator will read data from input iterator one by one.
 class VUnionIterator : public RowwiseIterator {
 public:
@@ -480,7 +509,7 @@ public:
 
     Status init(const StorageReadOptions& opts) override;
 
-    Status next_batch(vectorized::Block* block) override;
+    Status next_batch(Block* block) override;
 
     const Schema& schema() const override { return *_schema; }
 
@@ -505,7 +534,7 @@ Status VUnionIterator::init(const StorageReadOptions& opts) 
{
     return Status::OK();
 }
 
-Status VUnionIterator::next_batch(vectorized::Block* block) {
+Status VUnionIterator::next_batch(Block* block) {
     while (_cur_iter != nullptr) {
         auto st = _cur_iter->next_batch(block);
         if (st.is_end_of_file()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to