This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new db20e1f [refactor](storage) VGenericIterator to reuse Schema (#7858) db20e1f is described below commit db20e1f323744cd2fab91f120df644a4d47106be Author: zuochunwei <zchw...@qq.com> AuthorDate: Wed Feb 9 13:06:03 2022 +0800 [refactor](storage) VGenericIterator to reuse Schema (#7858) 1. reuse Schema to avoid copying, because clone Schema will generate a lot of sub Field object 2. call interface provided by Block to reduce code lines --- be/src/olap/row_block2.h | 2 +- be/src/olap/rowset/beta_rowset_reader.cpp | 6 +-- be/src/olap/rowset/beta_rowset_reader.h | 1 + .../rowset/segment_v2/empty_segment_iterator.h | 2 +- be/src/olap/rowset/segment_v2/segment_iterator.h | 3 +- be/src/vec/olap/vgeneric_iterators.cpp | 61 ++++++++-------------- 6 files changed, 29 insertions(+), 46 deletions(-) diff --git a/be/src/olap/row_block2.h b/be/src/olap/row_block2.h index b98ab95..7f2b79d 100644 --- a/be/src/olap/row_block2.h +++ b/be/src/olap/row_block2.h @@ -111,7 +111,7 @@ public: private: Status _copy_data_to_column(int cid, vectorized::MutableColumnPtr& mutable_column_ptr); - Schema _schema; + const Schema& _schema; size_t _capacity; // _column_vector_batches[cid] == null if cid is not in `_schema`. // memory are not allocated from `_pool` because we don't wan't to reallocate them in clear() diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 263a4cc..3aed8eb 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -55,7 +55,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { _stats = _context->stats; } // SegmentIterator will load seek columns on demand - Schema schema(_context->tablet_schema->columns(), *(_context->return_columns)); + _schema = std::make_unique<Schema>(_context->tablet_schema->columns(), *(_context->return_columns)); // convert RowsetReaderContext to StorageReadOptions StorageReadOptions read_options; @@ -102,7 +102,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; for (auto& seg_ptr : _segment_cache_handle.get_segments()) { std::unique_ptr<RowwiseIterator> iter; - auto s = seg_ptr->new_iterator(schema, read_options, _parent_tracker, &iter); + auto s = seg_ptr->new_iterator(*_schema, read_options, _parent_tracker, &iter); if (!s.ok()) { LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); return OLAP_ERR_ROWSET_READER_INIT; @@ -131,7 +131,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { _iterator.reset(final_iterator); // init input block - _input_block.reset(new RowBlockV2(schema, + _input_block.reset(new RowBlockV2(*_schema, std::min(1024, read_context->batch_size), _parent_tracker)); if (!read_context->is_vec) { diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index add0c31..997ab12 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -58,6 +58,7 @@ public: RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } private: + std::unique_ptr<Schema> _schema; RowsetReaderContext* _context; BetaRowsetSharedPtr _rowset; diff --git a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h index 3e1a4f9..0c186ed 100644 --- a/be/src/olap/rowset/segment_v2/empty_segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/empty_segment_iterator.h @@ -35,7 +35,7 @@ public: Status next_batch(vectorized::Block* block) override; private: - Schema _schema; + const Schema& _schema; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 0577526..2eae13e 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -103,8 +103,7 @@ private: class BitmapRangeIterator; std::shared_ptr<Segment> _segment; - // TODO(zc): rethink if we need copy it - Schema _schema; + const Schema& _schema; // _column_iterators.size() == _schema.num_columns() // _column_iterators[cid] == nullptr if cid is not in _schema std::vector<ColumnIterator*> _column_iterators; diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 3bee64b..f0f148d 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -100,7 +100,7 @@ public: const Schema& schema() const override { return _schema; } private: - Schema _schema; + const Schema& _schema; size_t _num_rows; size_t _rows_returned; }; @@ -136,12 +136,19 @@ public: { if (!_block) { const Schema& schema = _iter->schema(); - for (auto &column_desc : schema.columns()) { + const auto& column_ids = schema.column_ids(); + for (size_t i = 0; i < schema.num_column_ids(); ++i) { + auto column_desc = schema.column(column_ids[i]); auto data_type = Schema::get_data_type_ptr(column_desc->type()); if (data_type == nullptr) { return Status::RuntimeError("invalid data type"); } - _block.insert(ColumnWithTypeAndName(data_type->create_column(), data_type, column_desc->name())); + if (column_desc->is_nullable()) { + data_type = std::make_shared<vectorized::DataTypeNullable>(std::move(data_type)); + } + auto column = data_type->create_column(); + column->reserve(_block_row_max); + _block.insert(ColumnWithTypeAndName(std::move(column), data_type, column_desc->name())); } } else { _block.clear_column_data(); @@ -152,43 +159,17 @@ public: // Initialize this context and will prepare data for current_row() Status init(const StorageReadOptions& opts); - int compare_row(const VMergeIteratorContext& rhs) const { + bool compare(const VMergeIteratorContext& rhs) const { const Schema& schema = _iter->schema(); int num = schema.num_key_columns(); - for (uint32_t cid = 0; cid < num; ++cid) { -#if 0 - auto name = schema.column(cid)->name(); - auto l_col = this->_block.get_by_name(name); - auto r_col = rhs._block.get_by_name(name); - -#else - //because the columns of block will be inserted by cid asc order - //so no need to get column by get_by_name() - auto l_col = this->_block.get_by_position(cid); - auto r_col = rhs._block.get_by_position(cid); -#endif - - auto l_cp = l_col.column; - auto r_cp = r_col.column; - - auto res = l_cp->compare_at(_index_in_block, rhs._index_in_block, *r_cp, -1); - if (res) { - return res; - } - } - - return 0; - } - - bool compare(const VMergeIteratorContext& rhs) const { - int cmp_res = this->compare_row(rhs); + int cmp_res = this->_block.compare_at(_index_in_block, rhs._index_in_block, num, rhs._block, -1); if (cmp_res != 0) { return cmp_res > 0; } return this->data_id() < rhs.data_id(); } - void copy_row_to(vectorized::Block* block) { + void copy_row(vectorized::Block* block) { vectorized::Block& src = _block; vectorized::Block& dst = *block; @@ -230,9 +211,11 @@ private: bool _valid = false; size_t _index_in_block = -1; + int _block_row_max = 4096; }; Status VMergeIteratorContext::init(const StorageReadOptions& opts) { + _block_row_max = opts.block_row_max; RETURN_IF_ERROR(_iter->init(opts)); RETURN_IF_ERROR(block_reset()); RETURN_IF_ERROR(_load_next_block()); @@ -246,7 +229,7 @@ Status VMergeIteratorContext::advance() { // NOTE: we increase _index_in_block directly to valid one check do { _index_in_block++; - if (_index_in_block < _block.rows()) { + if (LIKELY(_index_in_block < _block.rows())) { return Status::OK(); } // current batch has no data, load next batch @@ -299,7 +282,7 @@ private: // It will be released after '_merge_heap' has been built. std::vector<RowwiseIterator*> _origin_iters; - std::unique_ptr<Schema> _schema; + const Schema* _schema = nullptr; struct VMergeContextComparator { bool operator()(const VMergeIteratorContext* lhs, const VMergeIteratorContext* rhs) const { @@ -320,10 +303,10 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { if (_origin_iters.empty()) { return Status::OK(); } - _schema.reset(new Schema((*(_origin_iters.begin()))->schema())); + _schema = &(*_origin_iters.begin())->schema(); for (auto iter : _origin_iters) { - std::unique_ptr<VMergeIteratorContext> ctx(new VMergeIteratorContext(iter)); + auto ctx = std::make_unique<VMergeIteratorContext>(iter); RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { continue; @@ -347,7 +330,7 @@ Status VMergeIterator::next_batch(vectorized::Block* block) { _merge_heap.pop(); // copy current row to block - ctx->copy_row_to(block); + ctx->copy_row(block); RETURN_IF_ERROR(ctx->advance()); if (ctx->valid()) { @@ -383,7 +366,7 @@ public: const Schema& schema() const override { return *_schema; } private: - std::unique_ptr<Schema> _schema; + const Schema* _schema = nullptr; RowwiseIterator* _cur_iter = nullptr; std::deque<RowwiseIterator*> _origin_iters; }; @@ -396,8 +379,8 @@ Status VUnionIterator::init(const StorageReadOptions& opts) { for (auto iter : _origin_iters) { RETURN_IF_ERROR(iter->init(opts)); } - _schema.reset(new Schema((*(_origin_iters.begin()))->schema())); _cur_iter = *(_origin_iters.begin()); + _schema = &_cur_iter->schema(); return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org