This is an automated email from the ASF dual-hosted git repository. weixiang pushed a commit to branch memtable_opt_rebase_bak in repository https://gitbox.apache.org/repos/asf/doris.git
commit bf98cd8c7efa74242970d421b1174b8eaec85059 Author: weixiang <weixian...@meituan.com> AuthorDate: Fri Apr 29 18:04:35 2022 +0800 [wx-opt-mem-feature] add profile in memtable --- be/src/olap/memtable.cpp | 151 ++++++++++++++++++++++++++--------------------- be/src/olap/memtable.h | 24 ++++++++ 2 files changed, 107 insertions(+), 68 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index e1c97f2d4d..7f431a7289 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -55,6 +55,7 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS); _block_aggregator = std::make_unique<vectorized::BlockAggregator>(_schema, _tablet_schema, true); + _init_profile(); } else { _vec_skip_list = nullptr; if (_keys_type == KeysType::DUP_KEYS) { @@ -100,6 +101,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) { MemTable::~MemTable() { std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>()); _mem_tracker->release(_mem_usage); + print_profile(); } MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {} @@ -117,31 +119,35 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, } bool MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) { - if (_is_first_insertion) { - _is_first_insertion = false; - auto cloneBlock = block->clone_without_columns(); - _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock); - if (_keys_type != KeysType::DUP_KEYS) { - _init_agg_functions(block); + { + SCOPED_TIMER(_insert_time); + if (_is_first_insertion) { + _is_first_insertion = false; + auto cloneBlock = block->clone_without_columns(); + _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock); + if (_keys_type != KeysType::DUP_KEYS) { + _init_agg_functions(block); + } } - } - _block->add_rows(block, row_pos, num_rows); - _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows(); - // Memtalbe is full, do not flush immediately - // First try to merge these blocks - // If the merged memtable is still full or we can not benefit a lot from merge at first - // Then flush the memtable into disk. - bool is_flush = false; - if (is_full()) { - size_t before_merge_bytes = bytes_allocated(); - _merge(); - size_t after_merged_bytes = bytes_allocated(); - // TODO(weixiang): magic number here, make it configurable later. - if (is_full() || (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) { - is_flush = true; + _block->add_rows(block, row_pos, num_rows); + _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows(); + // Memtalbe is full, do not flush immediately + // First try to merge these blocks + // If the merged memtable is still full or we can not benefit a lot from merge at first + // Then flush the memtable into disk. + bool is_flush = false; + if (is_full()) { + size_t before_merge_bytes = bytes_allocated(); + _merge(); + size_t after_merged_bytes = bytes_allocated(); + // TODO(weixiang): magic number here, make it configurable later. + if (is_full() || + (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) { + is_flush = true; + } } + return is_flush; } - return is_flush; } size_t MemTable::bytes_allocated() const { @@ -162,35 +168,41 @@ void MemTable::_merge() { } void MemTable::_agg(const bool finalize) { - // note that the _block had been sorted before. - if (_sorted_block == nullptr || _sorted_block->rows() <= 0) { - return; - } - vectorized::Block sorted_block = _sorted_block->to_block(); - _block_aggregator->append_block(&sorted_block); - _block_aggregator->partial_sort_merged_aggregate(); - if (finalize) { - _sorted_block.reset(); - } else { - _sorted_block->clear_column_data(); + { + SCOPED_TIMER(_agg_time); + // note that the _block had been sorted before. + if (_sorted_block == nullptr || _sorted_block->rows() <= 0) { + return; + } + vectorized::Block sorted_block = _sorted_block->to_block(); + _block_aggregator->append_block(&sorted_block); + _block_aggregator->partial_sort_merged_aggregate(); + if (finalize) { + _sorted_block.reset(); + } else { + _sorted_block->clear_column_data(); + } } } void MemTable::_sort(const bool finalize) { - _index_for_sort.resize(_block->rows()); - for (uint32_t i = 0; i < _block->rows(); i++) { - _index_for_sort[i] = {i, i}; - } + { + SCOPED_TIMER(_sort_time); + _index_for_sort.resize(_block->rows()); + for (uint32_t i = 0; i < _block->rows(); i++) { + _index_for_sort[i] = {i, i}; + } - _sort_block_by_rows(); - _sorted_block = _block->create_same_struct_block(0); - _append_sorted_block(_block.get(), _sorted_block.get()); - if (finalize) { - _block.reset(); - } else { - _block->clear_column_data(); + _sort_block_by_rows(); + _sorted_block = _block->create_same_struct_block(0); + _append_sorted_block(_block.get(), _sorted_block.get()); + if (finalize) { + _block.reset(); + } else { + _block->clear_column_data(); + } + _block_bytes_usage = 0; } - _block_bytes_usage = 0; } void MemTable::_sort_block_by_rows() { @@ -219,32 +231,35 @@ void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::M } void MemTable::finalize() { - //TODO(weixiang): check here - if (_block == nullptr) { - return; - } - - if (_keys_type != KeysType::DUP_KEYS) { - // agg mode - if (_block->rows() > 0) { - _merge(); - } - if (_merge_count > 1) { - _block = _block_aggregator->get_partial_agged_block(); - _block_aggregator->reset_aggregator(); - _sort(true); - _agg(true); - } else { - _block.reset(); - _sorted_block.reset(); + { + SCOPED_TIMER(_finalize_time); + //TODO(weixiang): check here + if (_block == nullptr) { + return; } - _block_bytes_usage = 0; - _sorted_block = _block_aggregator->get_partial_agged_block(); + if (_keys_type != KeysType::DUP_KEYS) { + // agg mode + if (_block->rows() > 0) { + _merge(); + } + if (_merge_count > 1) { + _block = _block_aggregator->get_partial_agged_block(); + _block_aggregator->reset_aggregator(); + _sort(true); + _agg(true); + } else { + _block.reset(); + _sorted_block.reset(); + } - } else { - // dup mode - _sort(true); + _block_bytes_usage = 0; + _sorted_block = _block_aggregator->get_partial_agged_block(); + + } else { + // dup mode + _sort(true); + } } } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index daca1f71fc..3df94c6f97 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -23,6 +23,7 @@ #include "olap/olap_define.h" #include "olap/skiplist.h" #include "runtime/mem_tracker.h" +#include "util/runtime_profile.h" #include "util/tuple_row_zorder_compare.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/common/string_ref.h" @@ -162,6 +163,21 @@ private: void _append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst); + void _init_profile() { + _profile.reset(new RuntimeProfile("Memtable")); + _insert_time = ADD_TIMER(_profile, "insert time"); + _sort_time = ADD_TIMER(_profile, "sort time"); + _agg_time = ADD_TIMER(_profile, "agg time"); + _finalize_time = ADD_TIMER(_profile, "finalize time"); + } + + void print_profile() { + std::stringstream ss; + _profile->pretty_print(&ss); + LOG(INFO) << ss.str(); + } + + int64_t _tablet_id; Schema* _schema; const TabletSchema* _tablet_schema; @@ -239,6 +255,14 @@ private: size_t _block_bytes_usage = 0; size_t _agg_bytes_usage = 0; int _merge_count = 0; + + std::unique_ptr<RuntimeProfile> _profile; + RuntimeProfile::Counter* _insert_time; + RuntimeProfile::Counter* _sort_time; + RuntimeProfile::Counter* _agg_time; + RuntimeProfile::Counter* _finalize_time; + + }; // class MemTable inline std::ostream& operator<<(std::ostream& os, const MemTable& table) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org