This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6a26435e8d [bugfix](compaction) fix promotion size bug (#14836) 6a26435e8d is described below commit 6a26435e8dd1ec98f14f3001fbf010b8f7221e68 Author: yixiutt <102007456+yixi...@users.noreply.github.com> AuthorDate: Wed Dec 7 18:54:30 2022 +0800 [bugfix](compaction) fix promotion size bug (#14836) --- be/src/common/config.h | 2 +- be/src/olap/cumulative_compaction_policy.cpp | 20 +++++++++++--------- be/src/olap/cumulative_compaction_policy.h | 11 +++++------ be/src/olap/tablet.cpp | 2 +- be/src/olap/tablet.h | 11 +++++++++++ be/test/olap/cumulative_compaction_policy_test.cpp | 11 ++--------- 6 files changed, 31 insertions(+), 26 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index aeb9dc2fad..9b88c7d1aa 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -298,7 +298,7 @@ CONF_mInt64(compaction_min_size_mbytes, "64"); // cumulative compaction policy: min and max delta file's number CONF_mInt64(cumulative_compaction_min_deltas, "5"); -CONF_mInt64(cumulative_compaction_max_deltas, "1000"); +CONF_mInt64(cumulative_compaction_max_deltas, "100"); // This config can be set to limit thread number in segcompaction thread pool. CONF_mInt32(seg_compaction_max_threads, "10"); diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index 35d65be797..3ccbe0eea7 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -76,7 +76,7 @@ void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point( CHECK((*base_rowset_meta)->start_version() == 0); int64_t promotion_size = 0; - _calc_promotion_size(*base_rowset_meta, &promotion_size); + _calc_promotion_size(tablet, *base_rowset_meta, &promotion_size); int64_t prev_version = -1; for (const RowsetMetaSharedPtr& rs : existing_rss) { @@ -119,7 +119,8 @@ void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point( } } -void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(RowsetMetaSharedPtr base_rowset_meta, +void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(Tablet* tablet, + RowsetMetaSharedPtr base_rowset_meta, int64_t* promotion_size) { int64_t base_size = base_rowset_meta->total_disk_size(); *promotion_size = base_size * _promotion_ratio; @@ -130,11 +131,12 @@ void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(RowsetMetaSharedP } else if (*promotion_size <= _promotion_min_size) { *promotion_size = _promotion_min_size; } - _refresh_tablet_promotion_size(*promotion_size); + _refresh_tablet_promotion_size(tablet, *promotion_size); } -void SizeBasedCumulativeCompactionPolicy::_refresh_tablet_promotion_size(int64_t promotion_size) { - _tablet_promotion_size = promotion_size; +void SizeBasedCumulativeCompactionPolicy::_refresh_tablet_promotion_size(Tablet* tablet, + int64_t promotion_size) { + tablet->set_cumulative_promotion_size(promotion_size); } void SizeBasedCumulativeCompactionPolicy::update_cumulative_point( @@ -151,14 +153,14 @@ void SizeBasedCumulativeCompactionPolicy::update_cumulative_point( // if rowsets have no delete version, check output_rowset total disk size // satisfies promotion size. size_t total_size = output_rowset->rowset_meta()->total_disk_size(); - if (total_size >= _tablet_promotion_size) { + if (total_size >= tablet->cumulative_promotion_size()) { tablet->set_cumulative_layer_point(output_rowset->end_version() + 1); } } } void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score( - TabletState state, const std::vector<RowsetMetaSharedPtr>& all_metas, + Tablet* tablet, TabletState state, const std::vector<RowsetMetaSharedPtr>& all_metas, int64_t current_cumulative_point, uint32_t* score) { bool base_rowset_exist = false; const int64_t point = current_cumulative_point; @@ -199,7 +201,7 @@ void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score( // Use "first"(not base) version to calc promotion size // because some tablet do not have base version(under alter operation) - _calc_promotion_size(first_meta, &promotion_size); + _calc_promotion_size(tablet, first_meta, &promotion_size); // If base version does not exist, but its state is RUNNING. // It is abnormal, do not select it and set *score = 0 @@ -239,7 +241,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version, size_t* compaction_score) { - size_t promotion_size = _tablet_promotion_size; + size_t promotion_size = tablet->cumulative_promotion_size(); int transient_size = 0; *compaction_score = 0; int64_t total_size = 0; diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h index 15454c1892..fabd9344c1 100644 --- a/be/src/olap/cumulative_compaction_policy.h +++ b/be/src/olap/cumulative_compaction_policy.h @@ -55,7 +55,7 @@ public: /// param current_cumulative_point, current cumulative point value. /// return score, the result score after calculate. virtual void calc_cumulative_compaction_score( - TabletState state, const std::vector<RowsetMetaSharedPtr>& all_rowsets, + Tablet* tablet, TabletState state, const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t current_cumulative_point, uint32_t* score) = 0; /// This function implements the policy which represents how to pick the candidate rowsets for compaction. @@ -151,7 +151,7 @@ public: /// Num based cumulative compaction policy implements calc cumulative compaction score function. /// Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet. - void calc_cumulative_compaction_score(TabletState state, + void calc_cumulative_compaction_score(Tablet* tablet, TabletState state, const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t current_cumulative_point, uint32_t* score) override; @@ -160,7 +160,8 @@ public: private: /// calculate promotion size using current base rowset meta size and promotion configs - void _calc_promotion_size(RowsetMetaSharedPtr base_rowset_meta, int64_t* promotion_size); + void _calc_promotion_size(Tablet* tablet, RowsetMetaSharedPtr base_rowset_meta, + int64_t* promotion_size); /// calculate the disk size belong to which level, the level is divide by power of 2 /// between compaction_promotion_min_size_mbytes @@ -168,7 +169,7 @@ private: int _level_size(const int64_t size); /// when policy calculate cumulative_compaction_score, update promotion size at the same time - void _refresh_tablet_promotion_size(int64_t promotion_size); + void _refresh_tablet_promotion_size(Tablet* tablet, int64_t promotion_size); private: /// cumulative compaction promotion size, unit is byte. @@ -179,8 +180,6 @@ private: int64_t _promotion_min_size; /// lower bound size to do compaction compaction. int64_t _compaction_min_size; - /// record tablet promotion size, it is updated each time when calculate cumulative_compaction_score - int64_t _tablet_promotion_size; /// levels division of disk size, same level rowsets can do compaction std::vector<int64_t> _levels; }; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 05d42ff607..93b4e67525 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -838,7 +838,7 @@ const uint32_t Tablet::_calc_cumulative_compaction_score( #endif uint32_t score = 0; _cumulative_compaction_policy->calc_cumulative_compaction_score( - tablet_state(), _tablet_meta->all_rs_metas(), cumulative_layer_point(), &score); + this, tablet_state(), _tablet_meta->all_rs_metas(), cumulative_layer_point(), &score); return score; } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index c64803a62e..688f984502 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -83,6 +83,8 @@ public: const int64_t cumulative_layer_point() const; void set_cumulative_layer_point(int64_t new_point); + inline const int64_t cumulative_promotion_size() const; + inline void set_cumulative_promotion_size(int64_t new_size); // Disk space occupied by tablet, contain local and remote. size_t tablet_footprint(); @@ -445,6 +447,7 @@ private: // timestamp of last base compaction success std::atomic<int64_t> _last_base_compaction_success_millis; std::atomic<int64_t> _cumulative_point; + std::atomic<int64_t> _cumulative_promotion_size; std::atomic<int32_t> _newly_created_rowset_num; std::atomic<int64_t> _last_checkpoint_time; @@ -512,6 +515,14 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) { _cumulative_point = new_point; } +inline const int64_t Tablet::cumulative_promotion_size() const { + return _cumulative_promotion_size; +} + +inline void Tablet::set_cumulative_promotion_size(int64_t new_size) { + _cumulative_promotion_size = new_size; +} + inline bool Tablet::enable_unique_key_merge_on_write() const { #ifdef BE_TEST if (_tablet_meta == nullptr) { diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp index 3f46c508d1..5be78e9190 100644 --- a/be/test/olap/cumulative_compaction_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_policy_test.cpp @@ -649,11 +649,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_big) { _tablet->init(); _tablet->calculate_cumulative_point(); - SizeBasedCumulativeCompactionPolicy* policy = - dynamic_cast<SizeBasedCumulativeCompactionPolicy*>( - _tablet->_cumulative_compaction_policy.get()); - - EXPECT_EQ(1073741824, policy->_tablet_promotion_size); + EXPECT_EQ(1073741824, _tablet->cumulative_promotion_size()); } TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_small) { @@ -668,10 +664,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_small) { _tablet->init(); _tablet->calculate_cumulative_point(); - SizeBasedCumulativeCompactionPolicy* policy = - dynamic_cast<SizeBasedCumulativeCompactionPolicy*>( - _tablet->_cumulative_compaction_policy.get()); - EXPECT_EQ(67108864, policy->_tablet_promotion_size); + EXPECT_EQ(67108864, _tablet->cumulative_promotion_size()); } TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org