This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2306e46658 [Enhancement](compaction) reduce VMergeIterator copy block (#12316) 2306e46658 is described below commit 2306e46658681a1d294975d810c82c1e209d016b Author: Pxl <pxl...@qq.com> AuthorDate: Tue Sep 13 16:19:34 2022 +0800 [Enhancement](compaction) reduce VMergeIterator copy block (#12316) This pr change make VMergeIterator support return row reference to instead copy a full block. --- be/src/olap/compaction.cpp | 4 +- be/src/olap/iterators.h | 10 +- be/src/olap/rowset/beta_rowset_reader.cpp | 26 ++- be/src/olap/rowset/beta_rowset_reader.h | 2 + be/src/olap/rowset/rowset_reader.h | 5 +- be/src/vec/core/block.h | 19 +++ be/src/vec/olap/block_reader.h | 2 +- be/src/vec/olap/vcollect_iterator.cpp | 90 ++++++----- be/src/vec/olap/vcollect_iterator.h | 64 ++++++-- be/src/vec/olap/vgeneric_iterators.cpp | 259 +++++++++++++++++------------- 10 files changed, 306 insertions(+), 175 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 8f9e8910d9..2e9b4a6b55 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -236,7 +236,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { << ". elapsed time=" << watch.get_elapse_second() << "s. cumulative_compaction_policy=" << _tablet->cumulative_compaction_policy()->name() - << ", compact_row_per_second=" << _input_row_num / watch.get_elapse_second(); + << ", compact_row_per_second=" << int(_input_row_num / watch.get_elapse_second()); return Status::OK(); } @@ -336,7 +336,7 @@ Status Compaction::check_correctness(const Merger::Statistics& stats) { // 1. check row number if (_input_row_num != _output_rowset->num_rows() + stats.merged_rows + stats.filtered_rows) { LOG(WARNING) << "row_num does not match between cumulative input and output! " - << "input_row_num=" << _input_row_num + << "tablet=" << _tablet->full_name() << ", input_row_num=" << _input_row_num << ", merged_row_num=" << stats.merged_rows << ", filtered_row_num=" << stats.filtered_rows << ", output_row_num=" << _output_rowset->num_rows(); diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 3d9690f70b..22f081d0eb 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -94,8 +94,8 @@ public: // Used to read data in RowBlockV2 one by one class RowwiseIterator { public: - RowwiseIterator() {} - virtual ~RowwiseIterator() {} + RowwiseIterator() = default; + virtual ~RowwiseIterator() = default; // Initialize this iterator and make it ready to read with // input options. @@ -116,6 +116,12 @@ public: return Status::NotSupported("to be implemented"); } + virtual Status next_block_view(vectorized::BlockView* block_view) { + return Status::NotSupported("to be implemented"); + } + + virtual bool support_return_data_by_ref() { return false; } + virtual Status current_block_row_locations(std::vector<RowLocation>* block_row_locations) { return Status::NotSupported("to be implemented"); } diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 17223c9eb6..df15b72f62 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -24,7 +24,6 @@ #include "olap/row_block.h" #include "olap/row_block2.h" #include "olap/row_cursor.h" -#include "olap/rowset/segment_v2/segment_iterator.h" #include "olap/schema.h" #include "olap/tablet_meta.h" #include "vec/core/block.h" @@ -119,7 +118,9 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) { for (uint32_t seg_id = 0; seg_id < rowset()->num_segments(); ++seg_id) { auto d = read_context->delete_bitmap->get_agg( {rowset_id, seg_id, read_context->version.second}); - if (d->isEmpty()) continue; // Empty delete bitmap for the segment + if (d->isEmpty()) { + continue; // Empty delete bitmap for the segment + } VLOG_TRACE << "Get the delete bitmap for rowset: " << rowset_id.to_string() << ", segment id:" << seg_id << ", size:" << d->cardinality(); read_options.delete_bitmap.emplace(seg_id, std::move(d)); @@ -323,6 +324,27 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) { return Status::OK(); } +Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) { + SCOPED_RAW_TIMER(&_stats->block_fetch_ns); + if (config::enable_storage_vectorization && _context->is_vec) { + do { + auto s = _iterator->next_block_view(block_view); + if (!s.ok()) { + if (s.is_end_of_file()) { + return Status::OLAPInternalError(OLAP_ERR_DATA_EOF); + } else { + LOG(WARNING) << "failed to read next block: " << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_ROWSET_READ_FAILED); + } + } + } while (block_view->empty()); + } else { + return Status::NotSupported("block view only support enable_storage_vectorization"); + } + + return Status::OK(); +} + bool BetaRowsetReader::_should_push_down_value_predicates() const { // if unique table with rowset [0-x] or [0-1] [2-y] [...], // value column predicates can be pushdown on rowset [0-x] or [2-y], [2-y] must be compaction and not overlapping diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index b987efc9ad..5424722c16 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -38,6 +38,8 @@ public: // It's ok, because we only get ref here, the block's owner is this reader. Status next_block(RowBlock** block) override; Status next_block(vectorized::Block* block) override; + Status next_block_view(vectorized::BlockView* block_view) override; + bool support_return_data_by_ref() override { return _iterator->support_return_data_by_ref(); } bool delete_flag() override { return _rowset->delete_flag(); } diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 75f780b953..eecf594254 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -38,7 +38,7 @@ using RowsetReaderSharedPtr = std::shared_ptr<RowsetReader>; class RowsetReader { public: - virtual ~RowsetReader() {} + virtual ~RowsetReader() = default; // reader init virtual Status init(RowsetReaderContext* read_context) = 0; @@ -52,6 +52,9 @@ public: virtual Status next_block(vectorized::Block* block) = 0; + virtual Status next_block_view(vectorized::BlockView* block_view) = 0; + virtual bool support_return_data_by_ref() { return false; } + virtual bool delete_flag() = 0; virtual Version version() = 0; diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index eb229cd173..aa603a1800 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -504,5 +504,24 @@ public: } }; +struct IteratorRowRef { + std::shared_ptr<Block> block; + int row_pos; + bool is_same; + + template <typename T> + int compare(const IteratorRowRef& rhs, const T& compare_arguments) const { + return block->compare_at(row_pos, rhs.row_pos, compare_arguments, *rhs.block, -1); + } + + void reset() { + block = nullptr; + row_pos = -1; + is_same = false; + } +}; + +using BlockView = std::vector<IteratorRowRef>; + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index 0a4a8f807c..682caed065 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -99,7 +99,7 @@ private: std::vector<bool> _stored_has_null_tag; std::vector<bool> _stored_has_string_tag; - phmap::flat_hash_map<const Block*, std::vector<std::pair<int16_t, int16_t>>> _temp_ref_map; + phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>> _temp_ref_map; bool _eof = false; diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 159e59be6b..7d4560f107 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -17,15 +17,12 @@ #include "vec/olap/vcollect_iterator.h" -#include <memory> - -#include "olap/rowset/beta_rowset_reader.h" +#include "common/status.h" +#include "util/defer_op.h" namespace doris { namespace vectorized { -VCollectIterator::~VCollectIterator() {} - #define RETURN_IF_NOT_EOF_AND_OK(stmt) \ do { \ const Status& _status_ = (stmt); \ @@ -68,9 +65,10 @@ Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade return Status::OK(); } else if (_merge) { DCHECK(!rs_readers.empty()); + bool have_multiple_child = false; for (auto [c_iter, r_iter] = std::pair {_children.begin(), rs_readers.begin()}; c_iter != _children.end();) { - auto s = (*c_iter)->init(); + auto s = (*c_iter)->init(have_multiple_child); if (!s.ok()) { delete (*c_iter); c_iter = _children.erase(c_iter); @@ -79,6 +77,7 @@ Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade return s; } } else { + have_multiple_child = true; ++c_iter; ++r_iter; } @@ -135,11 +134,8 @@ bool VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, L const IteratorRowRef& rhs_ref = *rhs->current_row_ref(); int cmp_res = UNLIKELY(lhs->compare_columns()) - ? lhs_ref.block->compare_at(lhs_ref.row_pos, rhs_ref.row_pos, - lhs->compare_columns(), *rhs_ref.block, -1) - : lhs_ref.block->compare_at(lhs_ref.row_pos, rhs_ref.row_pos, - lhs->tablet_schema().num_key_columns(), - *rhs_ref.block, -1); + ? lhs_ref.compare(rhs_ref, lhs->compare_columns()) + : lhs_ref.compare(rhs_ref, lhs->tablet_schema().num_key_columns()); if (cmp_res != 0) { return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0; } @@ -192,15 +188,22 @@ VCollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader TabletReader* reader) : LevelIterator(reader), _rs_reader(rs_reader), _reader(reader) { DCHECK_EQ(RowsetTypePB::BETA_ROWSET, rs_reader->type()); - _block = std::make_shared<Block>(_schema.create_block( - _reader->_return_columns, _reader->_tablet_columns_convert_to_null_set)); - _ref.block = _block; - _ref.row_pos = 0; - _ref.is_same = false; } -Status VCollectIterator::Level0Iterator::init() { - return _refresh_current_row(); +Status VCollectIterator::Level0Iterator::init(bool get_data_by_ref) { + _get_data_by_ref = get_data_by_ref && _rs_reader->support_return_data_by_ref() && + config::enable_storage_vectorization; + if (!_get_data_by_ref) { + _block = std::make_shared<Block>(_schema.create_block( + _reader->_return_columns, _reader->_tablet_columns_convert_to_null_set)); + } + auto st = _refresh_current_row(); + if (_get_data_by_ref && _block_view.size()) { + _ref = _block_view[0]; + } else { + _ref = {_block, 0, false}; + } + return st; } int64_t VCollectIterator::Level0Iterator::version() const { @@ -209,42 +212,50 @@ int64_t VCollectIterator::Level0Iterator::version() const { Status VCollectIterator::Level0Iterator::_refresh_current_row() { do { - if (_block->rows() != 0 && _ref.row_pos < _block->rows()) { + if (!_is_empty() && _current_valid()) { return Status::OK(); } else { - _ref.is_same = false; - _ref.row_pos = 0; - _block->clear_column_data(); - auto res = _rs_reader->next_block(_block.get()); + _reset(); + auto res = _refresh(); if (!res.ok() && res.precise_code() != OLAP_ERR_DATA_EOF) { return res; } - if (res.precise_code() == OLAP_ERR_DATA_EOF && _block->rows() == 0) { - _ref.row_pos = -1; - return Status::OLAPInternalError(OLAP_ERR_DATA_EOF); + if (res.precise_code() == OLAP_ERR_DATA_EOF && _is_empty()) { + break; } if (UNLIKELY(_reader->_reader_context.record_rowids)) { RETURN_NOT_OK(_rs_reader->current_block_row_locations(&_block_row_locations)); - DCHECK_EQ(_block_row_locations.size(), _block->rows()); } } - } while (_block->rows() != 0); + } while (!_is_empty()); _ref.row_pos = -1; + _current = -1; return Status::OLAPInternalError(OLAP_ERR_DATA_EOF); } Status VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) { - _ref.row_pos++; + if (_get_data_by_ref) { + _current++; + } else { + _ref.row_pos++; + } + RETURN_NOT_OK(_refresh_current_row()); + + if (_get_data_by_ref) { + _ref = _block_view[_current]; + } + *ref = _ref; return Status::OK(); } Status VCollectIterator::Level0Iterator::next(Block* block) { - if (UNLIKELY(_ref.block->rows() > 0 && _ref.row_pos == 0)) { + CHECK(!_get_data_by_ref); + if (_ref.row_pos == 0 && _ref.block != nullptr && UNLIKELY(_ref.block->rows() > 0)) { block->swap(*_ref.block); - _ref.row_pos = -1; + _ref.reset(); return Status::OK(); } else { auto res = _rs_reader->next_block(block); @@ -262,7 +273,7 @@ Status VCollectIterator::Level0Iterator::next(Block* block) { } RowLocation VCollectIterator::Level0Iterator::current_row_location() { - RowLocation& segment_row_id = _block_row_locations[_ref.row_pos]; + RowLocation& segment_row_id = _block_row_locations[_get_data_by_ref ? _current : _ref.row_pos]; return RowLocation(_rs_reader->rowset()->rowset_id(), segment_row_id.segment_id, segment_row_id.row_id); } @@ -287,7 +298,7 @@ VCollectIterator::Level1Iterator::Level1Iterator( _merge(merge), _is_reverse(is_reverse), _skip_same(skip_same) { - _ref.row_pos = -1; // represent eof + _ref.reset(); _batch_size = reader->_batch_size; } @@ -303,7 +314,9 @@ VCollectIterator::Level1Iterator::~Level1Iterator() { while (!_heap->empty()) { auto child = _heap->top(); _heap->pop(); - if (child) delete child; + if (child) { + delete child; + } } } } @@ -315,7 +328,7 @@ VCollectIterator::Level1Iterator::~Level1Iterator() { // Others when error happens Status VCollectIterator::Level1Iterator::next(IteratorRowRef* ref) { if (UNLIKELY(_cur_child == nullptr)) { - _ref.row_pos = -1; + _ref.reset(); return Status::OLAPInternalError(OLAP_ERR_DATA_EOF); } if (_merge) { @@ -348,7 +361,7 @@ int64_t VCollectIterator::Level1Iterator::version() const { return -1; } -Status VCollectIterator::Level1Iterator::init() { +Status VCollectIterator::Level1Iterator::init(bool get_data_by_ref) { if (_children.empty()) { return Status::OK(); } @@ -392,11 +405,12 @@ Status VCollectIterator::Level1Iterator::_merge_next(IteratorRowRef* ref) { if (!_heap->empty()) { _cur_child = _heap->top(); } else { + _ref.reset(); _cur_child = nullptr; - _ref.row_pos = -1; return Status::OLAPInternalError(OLAP_ERR_DATA_EOF); } } else { + _ref.reset(); _cur_child = nullptr; LOG(WARNING) << "failed to get next from child, res=" << res; return res; @@ -465,7 +479,7 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) { pre_row_ref.row_pos, continuous_row_in_block); } continuous_row_in_block = 0; - pre_row_ref.block = nullptr; + pre_row_ref.reset(); } auto res = _merge_next(&cur_row); if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) { diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h index eabb0ad2e5..69ef3da70a 100644 --- a/be/src/vec/olap/vcollect_iterator.h +++ b/be/src/vec/olap/vcollect_iterator.h @@ -17,13 +17,13 @@ #pragma once +#include "common/status.h" #ifdef USE_LIBCPP #include <queue> #else #include <ext/pb_ds/priority_queue.hpp> #endif -#include "olap/olap_define.h" #include "olap/reader.h" #include "olap/rowset/rowset_reader.h" #include "vec/core/block.h" @@ -34,16 +34,10 @@ class TabletSchema; namespace vectorized { -struct IteratorRowRef { - std::shared_ptr<Block> block; - int16_t row_pos; - bool is_same; -}; - class VCollectIterator { public: // Hold reader point to get reader params - ~VCollectIterator(); + ~VCollectIterator() = default; void init(TabletReader* reader, bool force_merge, bool is_reverse); @@ -83,7 +77,7 @@ private: : _schema(reader->tablet_schema()), _compare_columns(reader->_reader_context.read_orderby_key_columns) {}; - virtual Status init() = 0; + virtual Status init(bool get_data_by_ref = false) = 0; virtual int64_t version() const = 0; @@ -95,7 +89,7 @@ private: void set_same(bool same) { _ref.is_same = same; } - bool is_same() { return _ref.is_same; } + bool is_same() const { return _ref.is_same; } virtual ~LevelIterator() = default; @@ -140,9 +134,9 @@ private: class Level0Iterator : public LevelIterator { public: Level0Iterator(RowsetReaderSharedPtr rs_reader, TabletReader* reader); - ~Level0Iterator() {} + ~Level0Iterator() override = default; - Status init() override; + Status init(bool get_data_by_ref = false) override; int64_t version() const override; @@ -156,11 +150,53 @@ private: private: Status _refresh_current_row(); + Status _next_by_ref(IteratorRowRef* ref); + Status _refresh_current_row_by_ref(); + + bool _is_empty() { + if (_get_data_by_ref) { + return _block_view.empty(); + } else { + return _block->rows() == 0; + } + } + + bool _current_valid() { + if (_get_data_by_ref) { + return _current < _block_view.size(); + } else { + return _ref.row_pos < _block->rows(); + } + } + + void _reset() { + if (_get_data_by_ref) { + _block_view.clear(); + _ref.reset(); + _current = 0; + } else { + _ref.is_same = false; + _ref.row_pos = 0; + _block->clear_column_data(); + } + } + + Status _refresh() { + if (_get_data_by_ref) { + return _rs_reader->next_block_view(&_block_view); + } else { + return _rs_reader->next_block(_block.get()); + } + } RowsetReaderSharedPtr _rs_reader; TabletReader* _reader = nullptr; std::shared_ptr<Block> _block; + + int _current; + BlockView _block_view; std::vector<RowLocation> _block_row_locations; + bool _get_data_by_ref = false; }; // Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed) @@ -169,7 +205,7 @@ private: Level1Iterator(const std::list<LevelIterator*>& children, TabletReader* reader, bool merge, bool is_reverse, bool skip_same); - Status init() override; + Status init(bool get_data_by_ref = false) override; int64_t version() const override; @@ -181,7 +217,7 @@ private: Status current_block_row_locations(std::vector<RowLocation>* block_row_locations) override; - ~Level1Iterator(); + ~Level1Iterator() override; private: Status _merge_next(IteratorRowRef* ref); diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 9f50040b2e..280d7b054a 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +#include <memory> #include <queue> #include <utility> +#include "common/status.h" #include "olap/iterators.h" -#include "olap/row.h" -#include "olap/row_block2.h" +#include "olap/schema.h" +#include "vec/core/block.h" namespace doris { @@ -44,17 +46,17 @@ public: // Will generate num_rows rows in total VAutoIncrementIterator(const Schema& schema, size_t num_rows) : _schema(schema), _num_rows(num_rows), _rows_returned() {} - ~VAutoIncrementIterator() override {} + ~VAutoIncrementIterator() override = default; // NOTE: Currently, this function will ignore StorageReadOptions Status init(const StorageReadOptions& opts) override; - Status next_batch(vectorized::Block* block) override { + Status next_batch(Block* block) override { int row_idx = 0; while (_rows_returned < _num_rows) { for (int j = 0; j < _schema.num_columns(); ++j) { - vectorized::ColumnWithTypeAndName& vc = block->get_by_position(j); - vectorized::IColumn& vi = (vectorized::IColumn&)(*vc.column); + ColumnWithTypeAndName& vc = block->get_by_position(j); + IColumn& vi = (IColumn&)(*vc.column); char data[16] = {}; size_t data_len = 0; @@ -91,7 +93,9 @@ public: ++_rows_returned; } - if (row_idx > 0) return Status::OK(); + if (row_idx > 0) { + return Status::OK(); + } return Status::EndOfFile("End of VAutoIncrementIterator"); } @@ -139,8 +143,8 @@ public: _iter = nullptr; } - Status block_reset() { - if (!_block) { + Status block_reset(const std::shared_ptr<Block>& block) { + if (!*block) { const Schema& schema = _iter->schema(); const auto& column_ids = schema.column_ids(); for (size_t i = 0; i < schema.num_column_ids(); ++i) { @@ -151,11 +155,11 @@ public: } auto column = data_type->create_column(); column->reserve(_block_row_max); - _block.insert( + block->insert( ColumnWithTypeAndName(std::move(column), data_type, column_desc->name())); } } else { - _block.clear_column_data(); + block->clear_column_data(); } return Status::OK(); } @@ -165,10 +169,10 @@ public: bool compare(const VMergeIteratorContext& rhs) const { int cmp_res = UNLIKELY(_compare_columns) - ? this->_block.compare_at(_index_in_block, rhs._index_in_block, - _compare_columns, rhs._block, -1) - : this->_block.compare_at(_index_in_block, rhs._index_in_block, - _num_key_columns, rhs._block, -1); + ? _block->compare_at(_index_in_block, rhs._index_in_block, + _compare_columns, *rhs._block, -1) + : _block->compare_at(_index_in_block, rhs._index_in_block, + _num_key_columns, *rhs._block, -1); if (cmp_res != 0) { return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0; @@ -176,47 +180,52 @@ public: auto col_cmp_res = 0; if (_sequence_id_idx != -1) { - col_cmp_res = this->_block.compare_column_at(_index_in_block, rhs._index_in_block, - _sequence_id_idx, rhs._block, -1); + col_cmp_res = _block->compare_column_at(_index_in_block, rhs._index_in_block, + _sequence_id_idx, *rhs._block, -1); } - auto result = col_cmp_res == 0 ? this->data_id() < rhs.data_id() : col_cmp_res < 0; + auto result = col_cmp_res == 0 ? data_id() < rhs.data_id() : col_cmp_res < 0; if (_is_unique) { - result ? this->set_skip(true) : rhs.set_skip(true); + result ? set_skip(true) : rhs.set_skip(true); } return result; } - // there is two situation in copy_rows: - // 1... `advanced = false` when current block finished, we should copy block before advance(iterator) - // If we iterator a block from start to end, _index_in_block=rows()-1, and _cur_batch_num=rows, - // so we should copy from (_index_in_block - _cur_batch_num + 1) - - // 2... `advanced = true` when current block not finished and we advanced to next block, now - // cur_batch_num = (pre_block iteraotr num) + 1, but actually pre_block iterator num is cur_batch_num -1 - // so we have a ` if (advanced) start -- ` - void copy_rows(vectorized::Block* block, bool advanced = true) { - vectorized::Block& src = _block; - vectorized::Block& dst = *block; + // `advanced = false` when current block finished + void copy_rows(Block* block, bool advanced = true) { + Block& src = *_block; + Block& dst = *block; if (_cur_batch_num == 0) { return; } + // copy a row to dst block column by column + size_t start = _index_in_block - _cur_batch_num + 1 - advanced; + DCHECK(start >= 0); + for (size_t i = 0; i < _num_columns; ++i) { auto& s_col = src.get_by_position(i); auto& d_col = dst.get_by_position(i); - vectorized::ColumnPtr& s_cp = s_col.column; - vectorized::ColumnPtr& d_cp = d_col.column; + ColumnPtr& s_cp = s_col.column; + ColumnPtr& d_cp = d_col.column; - //copy a row to dst block column by column - size_t start = _index_in_block - _cur_batch_num + 1; - if (advanced) { - start--; - } - DCHECK(start >= 0); - ((vectorized::IColumn&)(*d_cp)).insert_range_from(*s_cp, start, _cur_batch_num); + d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num); + } + _cur_batch_num = 0; + } + + void copy_rows(BlockView* view, bool advanced = true) { + if (_cur_batch_num == 0) { + return; + } + size_t start = _index_in_block - _cur_batch_num + 1 - advanced; + DCHECK(start >= 0); + + for (size_t i = 0; i < _cur_batch_num; ++i) { + view->push_back({_block, static_cast<int>(start + i), false}); } + _cur_batch_num = 0; } @@ -245,12 +254,7 @@ public: void reset_cur_batch() { _cur_batch_num = 0; } - bool is_cur_block_finished() { - if (_index_in_block == _block.rows() - 1) { - return true; - } - return false; - } + bool is_cur_block_finished() { return _index_in_block == _block->rows() - 1; } private: // Load next block into _block @@ -258,9 +262,6 @@ private: RowwiseIterator* _iter; - // used to store data load from iterator->next_batch(Vectorized::Block*) - vectorized::Block _block; - int _sequence_id_idx = -1; bool _is_unique = false; bool _is_reverse = false; @@ -275,13 +276,17 @@ private: std::vector<RowLocation> _block_row_locations; bool _record_rowids = false; size_t _cur_batch_num = 0; + + // used to store data load from iterator->next_batch(Block*) + std::shared_ptr<Block> _block; + // used to store data still on block view + std::list<std::shared_ptr<Block>> _block_list; }; Status VMergeIteratorContext::init(const StorageReadOptions& opts) { _block_row_max = opts.block_row_max; _record_rowids = opts.record_rowids; RETURN_IF_ERROR(_iter->init(opts)); - RETURN_IF_ERROR(block_reset()); RETURN_IF_ERROR(_load_next_block()); if (valid()) { RETURN_IF_ERROR(advance()); @@ -294,7 +299,7 @@ Status VMergeIteratorContext::advance() { // NOTE: we increase _index_in_block directly to valid one check do { _index_in_block++; - if (LIKELY(_index_in_block < _block.rows())) { + if (LIKELY(_index_in_block < _block->rows())) { return Status::OK(); } // current batch has no data, load next batch @@ -305,8 +310,23 @@ Status VMergeIteratorContext::advance() { Status VMergeIteratorContext::_load_next_block() { do { - block_reset(); - Status st = _iter->next_batch(&_block); + if (_block != nullptr) { + _block_list.push_back(_block); + _block = nullptr; + } + for (auto it = _block_list.begin(); it != _block_list.end(); it++) { + if (it->use_count() == 1) { + block_reset(*it); + _block = *it; + _block_list.erase(it); + break; + } + } + if (_block == nullptr) { + _block = std::make_shared<Block>(); + block_reset(_block); + } + Status st = _iter->next_batch(_block.get()); if (!st.ok()) { _valid = false; if (st.is_end_of_file()) { @@ -318,7 +338,7 @@ Status VMergeIteratorContext::_load_next_block() { if (UNLIKELY(_record_rowids)) { RETURN_IF_ERROR(_iter->current_block_row_locations(&_block_row_locations)); } - } while (_block.rows() == 0); + } while (_block->rows() == 0); _index_in_block = -1; _valid = true; return Status::OK(); @@ -345,7 +365,10 @@ public: Status init(const StorageReadOptions& opts) override; - Status next_batch(vectorized::Block* block) override; + Status next_batch(Block* block) override { return _next_batch(block); } + Status next_block_view(BlockView* block_view) override { return _next_batch(block_view); } + + bool support_return_data_by_ref() override { return true; } const Schema& schema() const override { return *_schema; } @@ -356,6 +379,71 @@ public: } private: + int _get_size(Block* block) { return block->rows(); } + int _get_size(BlockView* block_view) { return block_view->size(); } + + template <typename T> + Status _next_batch(T* block) { + if (UNLIKELY(_record_rowids)) { + _block_row_locations.resize(_block_row_max); + } + size_t row_idx = 0; + VMergeIteratorContext* pre_ctx = nullptr; + while (_get_size(block) < _block_row_max) { + if (_merge_heap.empty()) { + break; + } + + auto ctx = _merge_heap.top(); + _merge_heap.pop(); + + if (!ctx->need_skip()) { + ctx->add_cur_batch(); + if (pre_ctx != ctx) { + if (pre_ctx) { + pre_ctx->copy_rows(block); + } + pre_ctx = ctx; + } + if (UNLIKELY(_record_rowids)) { + _block_row_locations[row_idx] = ctx->current_row_location(); + } + row_idx++; + if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) { + // current block finished, ctx not advance + // so copy start_idx = (_index_in_block - _cur_batch_num + 1) + ctx->copy_rows(block, false); + pre_ctx = nullptr; + } + } else if (_merged_rows != nullptr) { + (*_merged_rows)++; + // need skip cur row, so flush rows in pre_ctx + if (pre_ctx) { + pre_ctx->copy_rows(block); + pre_ctx = nullptr; + } + } + + RETURN_IF_ERROR(ctx->advance()); + if (ctx->valid()) { + _merge_heap.push(ctx); + } else { + // Release ctx earlier to reduce resource consumed + delete ctx; + } + } + if (!_merge_heap.empty()) { + return Status::OK(); + } + // Still last batch needs to be processed + + if (UNLIKELY(_record_rowids)) { + _block_row_locations.resize(row_idx); + } + + return Status::EndOfFile("no more data in segment"); + } + // It will be released after '_merge_heap' has been built. std::vector<RowwiseIterator*> _origin_iters; @@ -406,65 +494,6 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { return Status::OK(); } -Status VMergeIterator::next_batch(vectorized::Block* block) { - if (UNLIKELY(_record_rowids)) { - _block_row_locations.resize(_block_row_max); - } - size_t row_idx = 0; - VMergeIteratorContext* pre_ctx = nullptr; - while (block->rows() < _block_row_max) { - if (_merge_heap.empty()) break; - - auto ctx = _merge_heap.top(); - _merge_heap.pop(); - - if (!ctx->need_skip()) { - ctx->add_cur_batch(); - if (pre_ctx != ctx) { - if (pre_ctx) { - pre_ctx->copy_rows(block); - } - pre_ctx = ctx; - } - if (UNLIKELY(_record_rowids)) { - _block_row_locations[row_idx] = ctx->current_row_location(); - } - row_idx++; - if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) { - // current block finished, ctx not advance - // so copy start_idx = (_index_in_block - _cur_batch_num + 1) - ctx->copy_rows(block, false); - pre_ctx = nullptr; - } - } else if (_merged_rows != nullptr) { - (*_merged_rows)++; - // need skip cur row, so flush rows in pre_ctx - if (pre_ctx) { - pre_ctx->copy_rows(block); - pre_ctx = nullptr; - } - } - - RETURN_IF_ERROR(ctx->advance()); - if (ctx->valid()) { - _merge_heap.push(ctx); - } else { - // Release ctx earlier to reduce resource consumed - delete ctx; - } - } - if (!_merge_heap.empty()) { - return Status::OK(); - } - // Still last batch needs to be processed - - if (UNLIKELY(_record_rowids)) { - _block_row_locations.resize(row_idx); - } - - return Status::EndOfFile("no more data in segment"); -} - // VUnionIterator will read data from input iterator one by one. class VUnionIterator : public RowwiseIterator { public: @@ -480,7 +509,7 @@ public: Status init(const StorageReadOptions& opts) override; - Status next_batch(vectorized::Block* block) override; + Status next_batch(Block* block) override; const Schema& schema() const override { return *_schema; } @@ -505,7 +534,7 @@ Status VUnionIterator::init(const StorageReadOptions& opts) { return Status::OK(); } -Status VUnionIterator::next_batch(vectorized::Block* block) { +Status VUnionIterator::next_batch(Block* block) { while (_cur_iter != nullptr) { auto st = _cur_iter->next_batch(block); if (st.is_end_of_file()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org