This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new df1f06e Optimized the read performance of the table when have multi versions (#4958) df1f06e is described below commit df1f06e60b1339ef6e2756d0c4cb492cb64986c7 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Tue Dec 1 12:25:11 2020 +0800 Optimized the read performance of the table when have multi versions (#4958) * Optimized the read performance of the table when have multi versions, changed the merge method of the unique table, merged the cumulative version data first, and then merged with the base version. For the data with only one base version, read directly without merging --- .gitignore | 13 +- be/src/olap/CMakeLists.txt | 1 + be/src/olap/collect_iterator.cpp | 327 +++++++++++++++++++++++++++++++++++++++ be/src/olap/collect_iterator.h | 171 ++++++++++++++++++++ be/src/olap/reader.cpp | 297 ++++++----------------------------- be/src/olap/reader.h | 54 +------ be/src/olap/tablet_manager.cpp | 2 +- 7 files changed, 565 insertions(+), 300 deletions(-) diff --git a/.gitignore b/.gitignore index 8fb5733..322ddd5 100644 --- a/.gitignore +++ b/.gitignore @@ -50,8 +50,19 @@ be/src/gen_cpp/*.cc be/src/gen_cpp/*.cpp be/src/gen_cpp/*.h be/src/gen_cpp/opcode -be/ut_build_ASAN/ +be/ut_build_* be/tags +be/build* +be/test/olap/test_data/tablet_meta_test.hdr +build + +ui/dist +ui/node_modules +ui/package-lock.json +docs/package-lock.json +fe/fe-common/.classpath +rpc_data/ + #ignore vscode project file .vscode diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 4fa49fa..f9827f8 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -33,6 +33,7 @@ add_library(Olap STATIC bloom_filter_reader.cpp bloom_filter_writer.cpp byte_buffer.cpp + collect_iterator.cpp compaction.cpp compaction_permit_limiter.cpp comparison_predicate.cpp diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp new file mode 100644 index 0000000..fba50f3 --- /dev/null +++ b/be/src/olap/collect_iterator.cpp @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/collect_iterator.h" + +#include "olap/reader.h" +#include "olap/row.h" +#include "olap/row_block.h" +#include "olap/row_cursor.h" + +namespace doris { + +CollectIterator::~CollectIterator() {} + +void CollectIterator::init(Reader* reader) { + _reader = reader; + // when aggregate is enabled or key_type is DUP_KEYS, we don't merge + // multiple data to aggregate for performance in user fetch + if (_reader->_reader_type == READER_QUERY && + (_reader->_aggregation || _reader->_tablet->keys_type() == KeysType::DUP_KEYS)) { + _merge = false; + } +} + +OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { + std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader, _reader)); + RETURN_NOT_OK(child->init()); + if (child->current_row() == nullptr) { + return OLAP_SUCCESS; + } + + LevelIterator* child_ptr = child.release(); + _children.push_back(child_ptr); + _rs_readers.push_back(rs_reader); + return OLAP_SUCCESS; +} + +// Build a merge heap. If _merge is true, a rowset with the max rownum +// status will be used as the base rowset, and the other rowsets will be merged first and +// then merged with the base rowset. +void CollectIterator::build_heap() { + DCHECK(_rs_readers.size() == _children.size()); + _reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS; + if (_children.empty()) { + _inner_iter.reset(nullptr); + return; + } else if (_merge) { + DCHECK(!_rs_readers.empty()); + // build merge heap with two children, a base rowset as level0iterator and + // other cumulative rowsets as a level1iterator + if (_children.size() > 1) { + // find base rowset(max rownum), + RowsetReaderSharedPtr base_reader = _rs_readers[0]; + int base_reader_idx = 0; + for (size_t i = 1; i < _rs_readers.size(); ++i) { + if (_rs_readers[i]->rowset()->rowset_meta()->num_rows() > + base_reader->rowset()->rowset_meta()->num_rows()) { + base_reader = _rs_readers[i]; + base_reader_idx = i; + } + } + std::vector<LevelIterator*> cumu_children; + for (size_t i = 0; i < _rs_readers.size(); ++i) { + if (i != base_reader_idx) { + cumu_children.push_back(_children[i]); + } + } + Level1Iterator* cumu_iter = + new Level1Iterator(cumu_children, cumu_children.size() > 1, _reverse); + cumu_iter->init(); + std::vector<LevelIterator*> children; + children.push_back(_children[base_reader_idx]); + children.push_back(cumu_iter); + _inner_iter.reset(new Level1Iterator(children, _merge, _reverse)); + } else { + _inner_iter.reset(new Level1Iterator(_children, _merge, _reverse)); + } + } else { + _inner_iter.reset(new Level1Iterator(_children, _merge, _reverse)); + } + _inner_iter->init(); +} + +bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a, + const LevelIterator* b) { + // First compare row cursor. + const RowCursor* first = a->current_row(); + const RowCursor* second = b->current_row(); + int cmp_res = compare_row(*first, *second); + if (cmp_res != 0) { + return cmp_res > 0; + } + // if row cursors equal, compare data version. + // read data from higher version to lower version. + // for UNIQUE_KEYS just read the highest version and no need agg_update. + // for AGG_KEYS if a version is deleted, the lower version no need to agg_update + if (_reverse) { + return a->version() < b->version(); + } + return a->version() > b->version(); +} + +void CollectIterator::clear() { + for (auto child : _children) { + if (child != nullptr) { + delete child; + child = nullptr; + } + } + _children.clear(); +} + +const RowCursor* CollectIterator::current_row(bool* delete_flag) const { + if (LIKELY(_inner_iter)) { + return _inner_iter->current_row(delete_flag); + } + return nullptr; +} + +OLAPStatus CollectIterator::next(const RowCursor** row, bool* delete_flag) { + if (LIKELY(_inner_iter)) { + return _inner_iter->next(row, delete_flag); + } else { + return OLAP_ERR_DATA_EOF; + } +} + +CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader) + : _rs_reader(rs_reader), _is_delete(rs_reader->delete_flag()), _reader(reader) {} + +CollectIterator::Level0Iterator::~Level0Iterator() {} + +OLAPStatus CollectIterator::Level0Iterator::init() { + auto res = _row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to init row cursor, res=" << res; + return res; + } + RETURN_NOT_OK(_refresh_current_row()); + return OLAP_SUCCESS; +} + +const RowCursor* CollectIterator::Level0Iterator::current_row(bool* delete_flag) const { + *delete_flag = _is_delete || _current_row->is_delete(); + return _current_row; +} + +const RowCursor* CollectIterator::Level0Iterator::current_row() const { + return _current_row; +} + +int32_t CollectIterator::Level0Iterator::version() const { + return _rs_reader->version().second; +} + +OLAPStatus CollectIterator::Level0Iterator::_refresh_current_row() { + do { + if (_row_block != nullptr && _row_block->has_remaining()) { + size_t pos = _row_block->pos(); + _row_block->get_row(pos, &_row_cursor); + if (_row_block->block_status() == DEL_PARTIAL_SATISFIED && + _reader->_delete_handler.is_filter_data(_rs_reader->version().second, + _row_cursor)) { + _reader->_stats.rows_del_filtered++; + _row_block->pos_inc(); + continue; + } + _current_row = &_row_cursor; + return OLAP_SUCCESS; + } else { + auto res = _rs_reader->next_block(&_row_block); + if (res != OLAP_SUCCESS) { + _current_row = nullptr; + return res; + } + } + } while (_row_block != nullptr); + _current_row = nullptr; + return OLAP_ERR_DATA_EOF; +} + +OLAPStatus CollectIterator::Level0Iterator::next(const RowCursor** row, bool* delete_flag) { + _row_block->pos_inc(); + auto res = _refresh_current_row(); + *row = _current_row; + *delete_flag = _is_delete; + if (_current_row != nullptr) { + *delete_flag = _is_delete || _current_row->is_delete(); + } + return res; +} + +CollectIterator::Level1Iterator::Level1Iterator( + const std::vector<CollectIterator::LevelIterator*>& children, bool merge, bool reverse) + : _children(children), _merge(merge), _reverse(reverse) {} + +CollectIterator::LevelIterator::~LevelIterator() {} + +CollectIterator::Level1Iterator::~Level1Iterator() { + for (auto child : _children) { + if (child != nullptr) { + delete child; + child = nullptr; + } + } +} + +// Read next row into *row. +// Returns +// OLAP_SUCCESS when read successfully. +// OLAP_ERR_DATA_EOF and set *row to nullptr when EOF is reached. +// Others when error happens +OLAPStatus CollectIterator::Level1Iterator::next(const RowCursor** row, bool* delete_flag) { + if (UNLIKELY(_children.size() == 0)) { + return OLAP_ERR_DATA_EOF; + } + if (_merge) { + return _merge_next(row, delete_flag); + } else { + return _normal_next(row, delete_flag); + } +} + +// Get top row of the heap, nullptr if reach end. +const RowCursor* CollectIterator::Level1Iterator::current_row(bool* delete_flag) const { + if (_cur_child != nullptr) { + return _cur_child->current_row(delete_flag); + } + return nullptr; +} + +// Get top row of the heap, nullptr if reach end. +const RowCursor* CollectIterator::Level1Iterator::current_row() const { + if (_cur_child != nullptr) { + return _cur_child->current_row(); + } + return nullptr; +} + +int32_t CollectIterator::Level1Iterator::version() const { + if (_cur_child != nullptr) { + return _cur_child->version(); + } + return -1; +} + +OLAPStatus CollectIterator::Level1Iterator::init() { + if (_children.size() == 0) { + return OLAP_SUCCESS; + } + // Only when there are multiple children that need to be merged + if (_merge && _children.size() > 1) { + _heap.reset(new MergeHeap(LevelIteratorComparator(_reverse))); + for (auto child : _children) { + if (child == nullptr || child->current_row() == nullptr) { + continue; + } + _heap->push(child); + _cur_child = _heap->top(); + } + } else { + _merge = false; + _heap.reset(nullptr); + _cur_child = _children[_child_idx]; + } + return OLAP_SUCCESS; +} + +inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor** row, + bool* delete_flag) { + _heap->pop(); + auto res = _cur_child->next(row, delete_flag); + if (res == OLAP_SUCCESS) { + _heap->push(_cur_child); + _cur_child = _heap->top(); + } else if (res == OLAP_ERR_DATA_EOF) { + if (!_heap->empty()) { + _cur_child = _heap->top(); + } else { + _cur_child = nullptr; + return OLAP_ERR_DATA_EOF; + } + } else { + LOG(WARNING) << "failed to get next from child, res=" << res; + return res; + } + *row = _cur_child->current_row(delete_flag); + return OLAP_SUCCESS; +} + +inline OLAPStatus CollectIterator::Level1Iterator::_normal_next(const RowCursor** row, + bool* delete_flag) { + auto res = _cur_child->next(row, delete_flag); + if (LIKELY(res == OLAP_SUCCESS)) { + return OLAP_SUCCESS; + } else if (res == OLAP_ERR_DATA_EOF) { + // this child has been read, to read next + _child_idx++; + if (_child_idx < _children.size()) { + _cur_child = _children[_child_idx]; + *row = _cur_child->current_row(delete_flag); + return OLAP_SUCCESS; + } else { + _cur_child = nullptr; + return OLAP_ERR_DATA_EOF; + } + } else { + LOG(WARNING) << "failed to get next from child, res=" << res; + return res; + } +} + +} // namespace doris diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h new file mode 100644 index 0000000..97a4834 --- /dev/null +++ b/be/src/olap/collect_iterator.h @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/olap_define.h" +#include "olap/row_cursor.h" +#include "olap/rowset/rowset_reader.h" + +namespace doris { + +class Reader; +class RowCursor; + +class CollectIterator { +public: + ~CollectIterator(); + + // Hold reader point to get reader params + void init(Reader* reader); + + OLAPStatus add_child(RowsetReaderSharedPtr rs_reader); + + void build_heap(); + + // Get top row of the heap, nullptr if reach end. + const RowCursor* current_row(bool* delete_flag) const; + + // Read next row into *row. + // Returns + // OLAP_SUCCESS when read successfully. + // OLAP_ERR_DATA_EOF and set *row to nullptr when EOF is reached. + // Others when error happens + OLAPStatus next(const RowCursor** row, bool* delete_flag); + + // Clear the MergeSet element and reset state. + void clear(); + +private: + // This interface is the actual implementation of the new version of iterator. + // It currently contains two implementations, one is Level0Iterator, + // which only reads data from the rowset reader, and the other is Level1Iterator, + // which can read merged data from multiple LevelIterators through MergeHeap. + // By using Level1Iterator, some rowset readers can be merged in advance and + // then merged with other rowset readers. + class LevelIterator { + public: + virtual OLAPStatus init() = 0; + + virtual const RowCursor* current_row(bool* delete_flag) const = 0; + + virtual const RowCursor* current_row() const = 0; + + virtual int32_t version() const = 0; + + virtual OLAPStatus next(const RowCursor** row, bool* delete_flag) = 0; + virtual ~LevelIterator() = 0; + }; + // Compare row cursors between multiple merge elements, + // if row cursors equal, compare data version. + class LevelIteratorComparator { + public: + LevelIteratorComparator(const bool reverse = false) : _reverse(reverse) {} + bool operator()(const LevelIterator* a, const LevelIterator* b); + + private: + bool _reverse; + OlapReaderStatistics* _stats; + }; + + typedef std::priority_queue<LevelIterator*, std::vector<LevelIterator*>, + LevelIteratorComparator> + MergeHeap; + // Iterate from rowset reader. This Iterator usually like a leaf node + class Level0Iterator : public LevelIterator { + public: + Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader); + + OLAPStatus init(); + + const RowCursor* current_row(bool* delete_flag) const; + + const RowCursor* current_row() const; + + int32_t version() const; + + OLAPStatus next(const RowCursor** row, bool* delete_flag); + + ~Level0Iterator(); + + private: + // refresh_current_row + OLAPStatus _refresh_current_row(); + + RowsetReaderSharedPtr _rs_reader; + const RowCursor* _current_row = nullptr; + bool _is_delete = false; + Reader* _reader = nullptr; + // point to rows inside `_row_block` + RowCursor _row_cursor; + RowBlock* _row_block = nullptr; + }; + // Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed) + class Level1Iterator : public LevelIterator { + public: + Level1Iterator(const std::vector<LevelIterator*>& children, bool merge, bool reverse); + + OLAPStatus init(); + + const RowCursor* current_row(bool* delete_flag) const; + + const RowCursor* current_row() const; + + int32_t version() const; + + OLAPStatus next(const RowCursor** row, bool* delete_flag); + + ~Level1Iterator(); + + private: + inline OLAPStatus _merge_next(const RowCursor** row, bool* delete_flag); + inline OLAPStatus _normal_next(const RowCursor** row, bool* delete_flag); + + // each Level0Iterator corresponds to a rowset reader + const std::vector<LevelIterator*> _children; + // point to the Level0Iterator containing the next output row. + // null when CollectIterator hasn't been initialized or reaches EOF. + LevelIterator* _cur_child = nullptr; + + // when `_merge == true`, rowset reader returns ordered rows and CollectIterator uses a priority queue to merge + // sort them. The output of CollectIterator is also ordered. + // When `_merge == false`, rowset reader returns *partial* ordered rows. CollectIterator simply returns all rows + // from the first rowset, the second rowset, .., the last rowset. The output of CollectorIterator is also + // *partially* ordered. + bool _merge = true; + bool _reverse = false; + // used when `_merge == true` + std::unique_ptr<MergeHeap> _heap; + // used when `_merge == false` + int _child_idx = 0; + }; + + std::unique_ptr<LevelIterator> _inner_iter; + + // each LevelIterator corresponds to a rowset reader + std::vector<LevelIterator*> _children; + + bool _merge = true; + bool _reverse = false; + + // Hold reader point to access read params, such as fetch conditions. + Reader* _reader = nullptr; + std::vector<RowsetReaderSharedPtr> _rs_readers; + +}; + +} // namespace doris diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 8a8e96f..43598e5 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -19,6 +19,7 @@ #include <sstream> +#include "olap/collect_iterator.h" #include "olap/comparison_predicate.h" #include "olap/in_list_predicate.h" #include "olap/null_predicate.h" @@ -39,261 +40,54 @@ using std::vector; namespace doris { -class CollectIterator { -public: - ~CollectIterator(); - - // Hold reader point to get reader params - void init(Reader* reader); - - OLAPStatus add_child(RowsetReaderSharedPtr rs_reader); - - // Get top row of the heap, nullptr if reach end. - const RowCursor* current_row(bool* delete_flag) const { - if (_cur_child != nullptr) { - return _cur_child->current_row(delete_flag); - } - return nullptr; - } - - // Read next row into *row. - // Returns - // OLAP_SUCCESS when read successfully. - // OLAP_ERR_DATA_EOF and set *row to nullptr when EOF is reached. - // Others when error happens - OLAPStatus next(const RowCursor** row, bool* delete_flag) { - DCHECK(_cur_child != nullptr); - if (_merge) { - return _merge_next(row, delete_flag); - } else { - return _normal_next(row, delete_flag); - } +void ReaderParams::check_validation() const { + if (UNLIKELY(version.first == -1)) { + LOG(FATAL) << "version is not set. tablet=" << tablet->full_name(); } +} - // Clear the MergeSet element and reset state. - void clear(); - -private: - class ChildCtx { - public: - ChildCtx(RowsetReaderSharedPtr rs_reader, Reader* reader) - : _rs_reader(rs_reader), _is_delete(rs_reader->delete_flag()), _reader(reader) {} - - OLAPStatus init() { - auto res = _row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to init row cursor, res=" << res; - return res; - } - RETURN_NOT_OK(_refresh_current_row()); - return OLAP_SUCCESS; - } - - const RowCursor* current_row(bool* delete_flag) const { - *delete_flag = _is_delete || _current_row->is_delete(); - return _current_row; - } - - const RowCursor* current_row() const { return _current_row; } - - int32_t version() const { return _rs_reader->version().second; } - - OLAPStatus next(const RowCursor** row, bool* delete_flag) { - _row_block->pos_inc(); - auto res = _refresh_current_row(); - *row = _current_row; - *delete_flag = _is_delete; - if (_current_row != nullptr) { - *delete_flag = _is_delete || _current_row->is_delete(); - }; - return res; - } - - private: - // refresh_current_row - OLAPStatus _refresh_current_row() { - do { - if (_row_block != nullptr && _row_block->has_remaining()) { - size_t pos = _row_block->pos(); - _row_block->get_row(pos, &_row_cursor); - if (_row_block->block_status() == DEL_PARTIAL_SATISFIED && - _reader->_delete_handler.is_filter_data(_rs_reader->version().second, - _row_cursor)) { - _reader->_stats.rows_del_filtered++; - _row_block->pos_inc(); - continue; - } - _current_row = &_row_cursor; - return OLAP_SUCCESS; - } else { - auto res = _rs_reader->next_block(&_row_block); - if (res != OLAP_SUCCESS) { - _current_row = nullptr; - return res; - } - } - } while (_row_block != nullptr); - _current_row = nullptr; - return OLAP_ERR_DATA_EOF; - } +std::string ReaderParams::to_string() { + std::stringstream ss; + ss << "tablet=" << tablet->full_name() << " reader_type=" << reader_type + << " aggregation=" << aggregation << " version=" << version << " range=" << range + << " end_range=" << end_range; - RowsetReaderSharedPtr _rs_reader; - const RowCursor* _current_row = nullptr; - bool _is_delete = false; - Reader* _reader = nullptr; - - RowCursor _row_cursor; // point to rows inside `_row_block` - RowBlock* _row_block = nullptr; - }; - - // Compare row cursors between multiple merge elements, - // if row cursors equal, compare data version. - class ChildCtxComparator { - public: - ChildCtxComparator(const bool& revparam = false) { _reverse = revparam; } - bool operator()(const ChildCtx* a, const ChildCtx* b); - - private: - bool _reverse; - }; - - inline OLAPStatus _merge_next(const RowCursor** row, bool* delete_flag); - inline OLAPStatus _normal_next(const RowCursor** row, bool* delete_flag); - - // each ChildCtx corresponds to a rowset reader - std::vector<ChildCtx*> _children; - // point to the ChildCtx containing the next output row. - // null when CollectIterator hasn't been initialized or reaches EOF. - ChildCtx* _cur_child = nullptr; - - // when `_merge == true`, rowset reader returns ordered rows and CollectIterator uses a priority queue to merge - // sort them. The output of CollectIterator is also ordered. - // When `_merge == false`, rowset reader returns *partial* ordered rows. CollectIterator simply returns all rows - // from the first rowset, the second rowset, .., the last rowset. The output of CollectorIterator is also - // *partially* ordered. - bool _merge = true; - // used when `_merge == true` - typedef std::priority_queue<ChildCtx*, std::vector<ChildCtx*>, ChildCtxComparator> MergeHeap; - std::unique_ptr<MergeHeap> _heap; - // used when `_merge == false` - int _child_idx = 0; - - // Hold reader point to access read params, such as fetch conditions. - Reader* _reader = nullptr; -}; - -CollectIterator::~CollectIterator() { - for (auto child : _children) { - delete child; + for (auto& key : start_key) { + ss << " keys=" << key; } -} -void CollectIterator::init(Reader* reader) { - _reader = reader; - // when aggregate is enabled or key_type is DUP_KEYS, we don't merge - // multiple data to aggregate for performance in user fetch - if (_reader->_reader_type == READER_QUERY && - (_reader->_aggregation || _reader->_tablet->keys_type() == KeysType::DUP_KEYS)) { - _merge = false; - _heap.reset(nullptr); - } else if (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS) { - _heap.reset(new MergeHeap(ChildCtxComparator(true))); - } else { - _heap.reset(new MergeHeap()); + for (auto& key : end_key) { + ss << " end_keys=" << key; } -} -OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { - std::unique_ptr<ChildCtx> child(new ChildCtx(rs_reader, _reader)); - RETURN_NOT_OK(child->init()); - if (child->current_row() == nullptr) { - return OLAP_SUCCESS; + for (auto& condition : conditions) { + ss << " conditions=" << apache::thrift::ThriftDebugString(condition); } - ChildCtx* child_ptr = child.release(); - _children.push_back(child_ptr); - if (_merge) { - _heap->push(child_ptr); - _cur_child = _heap->top(); - } else { - if (_cur_child == nullptr) { - _cur_child = _children[_child_idx]; - } - } - return OLAP_SUCCESS; + return ss.str(); } - -inline OLAPStatus CollectIterator::_merge_next(const RowCursor** row, bool* delete_flag) { - _heap->pop(); - auto res = _cur_child->next(row, delete_flag); - if (res == OLAP_SUCCESS) { - _heap->push(_cur_child); - _cur_child = _heap->top(); - } else if (res == OLAP_ERR_DATA_EOF) { - if (!_heap->empty()) { - _cur_child = _heap->top(); - } else { - _cur_child = nullptr; - return OLAP_ERR_DATA_EOF; - } - } else { - LOG(WARNING) << "failed to get next from child, res=" << res; - return res; +Reader::KeysParam::~KeysParam() { + for (auto start_key : start_keys) { + SAFE_DELETE(start_key); } - *row = _cur_child->current_row(delete_flag); - return OLAP_SUCCESS; -} -inline OLAPStatus CollectIterator::_normal_next(const RowCursor** row, bool* delete_flag) { - auto res = _cur_child->next(row, delete_flag); - if (LIKELY(res == OLAP_SUCCESS)) { - return OLAP_SUCCESS; - } else if (res == OLAP_ERR_DATA_EOF) { - // this child has been read, to read next - _child_idx++; - if (_child_idx < _children.size()) { - _cur_child = _children[_child_idx]; - *row = _cur_child->current_row(delete_flag); - return OLAP_SUCCESS; - } else { - _cur_child = nullptr; - return OLAP_ERR_DATA_EOF; - } - } else { - LOG(WARNING) << "failed to get next from child, res=" << res; - return res; + for (auto end_key : end_keys) { + SAFE_DELETE(end_key); } } -bool CollectIterator::ChildCtxComparator::operator()(const ChildCtx* a, const ChildCtx* b) { - // First compare row cursor. - const RowCursor* first = a->current_row(); - const RowCursor* second = b->current_row(); - int cmp_res = compare_row(*first, *second); - if (cmp_res != 0) { - return cmp_res > 0; - } - // if row cursors equal, compare data version. - // read data from higher version to lower version. - // for UNIQUE_KEYS just read the highest version and no need agg_update. - // for AGG_KEYS if a version is deleted, the lower version no need to agg_update - if (_reverse) { - return a->version() < b->version(); - } - return a->version() > b->version(); -} +std::string Reader::KeysParam::to_string() const { + std::stringstream ss; + ss << "range=" << range << " end_range=" << end_range; -void CollectIterator::clear() { - while (_heap != nullptr && !_heap->empty()) { - _heap->pop(); + for (auto start_key : start_keys) { + ss << " keys=" << start_key->to_string(); } - for (auto child : _children) { - delete child; + for (auto end_key : end_keys) { + ss << " end_keys=" << end_key->to_string(); } - // _children.swap(std::vector<ChildCtx*>()); - _children.clear(); - _cur_child = nullptr; - _child_idx = 0; + + return ss.str(); } Reader::Reader() { @@ -326,18 +120,23 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return res; } - switch (_tablet->keys_type()) { - case KeysType::DUP_KEYS: + if (_rs_readers.size() == 1 && + !_rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) { _next_row_func = &Reader::_dup_key_next_row; - break; - case KeysType::UNIQUE_KEYS: - _next_row_func = &Reader::_unique_key_next_row; - break; - case KeysType::AGG_KEYS: - _next_row_func = &Reader::_agg_key_next_row; - break; - default: - break; + } else { + switch (_tablet->keys_type()) { + case KeysType::DUP_KEYS: + _next_row_func = &Reader::_dup_key_next_row; + break; + case KeysType::UNIQUE_KEYS: + _next_row_func = &Reader::_unique_key_next_row; + break; + case KeysType::AGG_KEYS: + _next_row_func = &Reader::_agg_key_next_row; + break; + default: + break; + } } DCHECK(_next_row_func != nullptr) << "No next row function for type:" << _tablet->keys_type(); @@ -555,7 +354,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { } _rs_readers.push_back(rs_reader); } - + _collect_iter->build_heap(); _next_key = _collect_iter->current_row(&_next_delete_flag); return OLAP_SUCCESS; } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index d096b74..0cd7142 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -74,32 +74,9 @@ struct ReaderParams { RuntimeProfile* profile = nullptr; RuntimeState* runtime_state = nullptr; - void check_validation() const { - if (UNLIKELY(version.first == -1)) { - LOG(FATAL) << "version is not set. tablet=" << tablet->full_name(); - } - } - - std::string to_string() { - std::stringstream ss; - ss << "tablet=" << tablet->full_name() << " reader_type=" << reader_type - << " aggregation=" << aggregation << " version=" << version << " range=" << range - << " end_range=" << end_range; - - for (auto& key : start_key) { - ss << " keys=" << key; - } - - for (auto& key : end_key) { - ss << " end_keys=" << key; - } + void check_validation() const; - for (auto& condition : conditions) { - ss << " conditions=" << apache::thrift::ThriftDebugString(condition); - } - - return ss.str(); - } + std::string to_string(); }; class Reader { @@ -132,30 +109,9 @@ public: private: struct KeysParam { - ~KeysParam() { - for (auto start_key : start_keys) { - SAFE_DELETE(start_key); - } - - for (auto end_key : end_keys) { - SAFE_DELETE(end_key); - } - } - - std::string to_string() const { - std::stringstream ss; - ss << "range=" << range << " end_range=" << end_range; - - for (auto start_key : start_keys) { - ss << " keys=" << start_key->to_string(); - } - - for (auto end_key : end_keys) { - ss << " end_keys=" << end_key->to_string(); - } - - return ss.str(); - } + ~KeysParam(); + + std::string to_string() const; std::string range; std::string end_range; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index ea6b969..19540f2 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -725,7 +725,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction( } if (now_ms - last_failure_ms <= config::min_compaction_failure_interval_sec * 1000) { - VLOG(1) << "Too often to check compaction, skip it." + VLOG(1) << "Too often to check compaction, skip it. " << "compaction_type=" << compaction_type_str << ", last_failure_time_ms=" << last_failure_ms << ", tablet_id=" << tablet_ptr->tablet_id(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org