This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fe5e5d2bf4 [refactor] separate agg and flush in memtable (#15713) fe5e5d2bf4 is described below commit fe5e5d2bf427e7c29f47692e3f7f86212c74b375 Author: zbtzbtzbt <bit_pku_...@163.com> AuthorDate: Wed Jan 11 10:07:34 2023 +0800 [refactor] separate agg and flush in memtable (#15713) --- be/src/common/config.h | 9 ++++----- be/src/olap/delta_writer.cpp | 14 +++++++------- be/src/olap/memtable.cpp | 11 +++++++---- be/src/olap/memtable.h | 4 ++-- be/src/olap/push_handler.cpp | 2 +- 5 files changed, 21 insertions(+), 19 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 91b9eecc44..394ffd71c4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -490,13 +490,12 @@ CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100"); // Alignment CONF_Int32(memory_max_alignment, "16"); -// write buffer size before flush +// max write buffer size before flush, default 200MB CONF_mInt64(write_buffer_size, "209715200"); - -// max buffer size used in memtable for the aggregated table -CONF_mInt64(memtable_max_buffer_size, "419430400"); +// max buffer size used in memtable for the aggregated table, default 400MB +CONF_mInt64(write_buffer_size_for_agg, "419430400"); // write buffer size in push task for sparkload, default 1GB -CONF_mInt64(flush_size_for_sparkload, "1073741824"); +CONF_mInt64(write_buffer_size_for_sparkload, "1073741824"); // following 2 configs limit the memory consumption of load process on a Backend. // eg: memory limit to 80% of mem limit config but up to 100GB(default) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index ee17286b6b..1dd22c6e6c 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -169,14 +169,14 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> _total_received_rows += row_idxs.size(); _mem_table->insert(block, row_idxs); - if (_mem_table->need_to_agg()) { + if (UNLIKELY(_mem_table->need_agg())) { _mem_table->shrink_memtable_by_agg(); - if (_mem_table->is_flush()) { - auto s = _flush_memtable_async(); - _reset_mem_table(); - if (UNLIKELY(!s.ok())) { - return s; - } + } + if (UNLIKELY(_mem_table->need_flush())) { + auto s = _flush_memtable_async(); + _reset_mem_table(); + if (UNLIKELY(!s.ok())) { + return s; } } diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 48c4e605cf..ea1188e874 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -296,13 +296,16 @@ void MemTable::shrink_memtable_by_agg() { _collect_vskiplist_results<false>(); } -bool MemTable::is_flush() const { +bool MemTable::need_flush() const { return memory_usage() >= config::write_buffer_size; } -bool MemTable::need_to_agg() { - return _keys_type == KeysType::DUP_KEYS ? is_flush() - : memory_usage() >= config::memtable_max_buffer_size; +bool MemTable::need_agg() const { + if (_keys_type == KeysType::AGG_KEYS) { + return memory_usage() >= config::write_buffer_size_for_agg; + } + + return false; } Status MemTable::_generate_delete_bitmap(int64_t atomic_num_segments_before_flush, diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index a7a8af3efa..a908fa358a 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -58,9 +58,9 @@ public: void shrink_memtable_by_agg(); - bool is_flush() const; + bool need_flush() const; - bool need_to_agg(); + bool need_agg() const; /// Flush Status flush(); diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 55472c6d82..155b322c85 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -250,7 +250,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur VLOG_NOTICE << "start to convert etl file to delta."; while (!reader->eof()) { if (reader->mem_pool()->mem_tracker()->consumption() > - config::flush_size_for_sparkload) { + config::write_buffer_size_for_sparkload) { RETURN_NOT_OK(rowset_writer->flush()); reader->mem_pool()->free_all(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org