This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 95b05928fd7 [fix](compaction) fix time series compaction merge empty rowsets priority #34562 (#34765) 95b05928fd7 is described below commit 95b05928fd7f920255f9ca34c4fff1a5159d7b94 Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Tue May 14 09:10:09 2024 +0800 [fix](compaction) fix time series compaction merge empty rowsets priority #34562 (#34765) --- .../cumulative_compaction_time_series_policy.cpp | 39 +++++------ be/src/olap/delta_writer.cpp | 3 +- ...mulative_compaction_time_series_policy_test.cpp | 80 ++++++++++++++++++++++ 3 files changed, 101 insertions(+), 21 deletions(-) diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp b/be/src/olap/cumulative_compaction_time_series_policy.cpp index 6c3f949723a..3134364a4dd 100644 --- a/be/src/olap/cumulative_compaction_time_series_policy.cpp +++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp @@ -73,13 +73,6 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( return 0; } - // If there is a continuous set of empty rowsets, prioritize merging. - auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets( - tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold()); - if (!consecutive_empty_rowsets.empty()) { - return score; - } - // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size int64_t compaction_goal_size_mbytes = tablet->tablet_meta()->time_series_compaction_goal_size_mbytes(); @@ -126,6 +119,13 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( tablet->set_last_cumu_compaction_success_time(now); } + // Condition 5: If there is a continuous set of empty rowsets, prioritize merging. + auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets( + tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold()); + if (!consecutive_empty_rowsets.empty()) { + return score; + } + return 0; } @@ -215,19 +215,6 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( return 0; } - // If their are many empty rowsets, maybe should be compacted - auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets( - tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold()); - if (!consecutive_empty_rowsets.empty()) { - VLOG_NOTICE << "tablet is " << tablet->tablet_id() - << ", there are too many consecutive empty rowsets, size is " - << consecutive_empty_rowsets.size(); - input_rowsets->clear(); - input_rowsets->insert(input_rowsets->end(), consecutive_empty_rowsets.begin(), - consecutive_empty_rowsets.end()); - return 0; - } - int64_t compaction_goal_size_mbytes = tablet->tablet_meta()->time_series_compaction_goal_size_mbytes(); @@ -338,6 +325,18 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( } input_rowsets->clear(); + // Condition 5: If their are many empty rowsets, maybe should be compacted + auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets( + tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold()); + if (!consecutive_empty_rowsets.empty()) { + VLOG_NOTICE << "tablet is " << tablet->tablet_id() + << ", there are too many consecutive empty rowsets, size is " + << consecutive_empty_rowsets.size(); + input_rowsets->clear(); + input_rowsets->insert(input_rowsets->end(), consecutive_empty_rowsets.begin(), + consecutive_empty_rowsets.end()); + return 0; + } *compaction_score = 0; return 0; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1ec4dd17313..bf7adadb943 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -112,7 +112,8 @@ Status BaseDeltaWriter::init() { return Status::OK(); } -Status BaseDeltaWriter::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) { +Status BaseDeltaWriter::write(const vectorized::Block* block, + const std::vector<uint32_t>& row_idxs) { if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp index 1390f1deb14..57872083edf 100644 --- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp @@ -214,6 +214,58 @@ public: rs_metas->push_back(ptr5); } + void init_all_rs_meta_empty_nonoverlapping(std::vector<RowsetMetaSharedPtr>* rs_metas) { + RowsetMetaSharedPtr ptr1(new RowsetMeta()); + init_rs_meta(ptr1, 0, 1); + ptr1->set_total_disk_size(1 * 1024); + rs_metas->push_back(ptr1); + + RowsetMetaSharedPtr ptr2(new RowsetMeta()); + init_rs_meta(ptr2, 2, 3); + ptr2->set_total_disk_size(2 * 1024); + rs_metas->push_back(ptr2); + + RowsetMetaSharedPtr ptr3(new RowsetMeta()); + init_rs_meta(ptr3, 4, 4); + ptr3->set_num_segments(0); + rs_metas->push_back(ptr3); + + RowsetMetaSharedPtr ptr4(new RowsetMeta()); + init_rs_meta(ptr4, 5, 5); + ptr4->set_num_segments(0); + rs_metas->push_back(ptr4); + + RowsetMetaSharedPtr ptr5(new RowsetMeta()); + init_rs_meta(ptr5, 6, 6); + ptr5->set_num_segments(0); + rs_metas->push_back(ptr5); + + RowsetMetaSharedPtr ptr6(new RowsetMeta()); + init_rs_meta(ptr6, 7, 7); + ptr6->set_num_segments(0); + rs_metas->push_back(ptr6); + + RowsetMetaSharedPtr ptr7(new RowsetMeta()); + init_rs_meta(ptr7, 8, 8); + ptr7->set_num_segments(0); + rs_metas->push_back(ptr7); + + RowsetMetaSharedPtr ptr8(new RowsetMeta()); + init_rs_meta(ptr8, 9, 9); + ptr8->set_num_segments(0); + rs_metas->push_back(ptr8); + + RowsetMetaSharedPtr ptr9(new RowsetMeta()); + init_rs_meta(ptr9, 10, 10); + ptr9->set_num_segments(0); + rs_metas->push_back(ptr9); + + RowsetMetaSharedPtr ptr10(new RowsetMeta()); + init_rs_meta(ptr10, 11, 11); + ptr10->set_total_disk_size(2 * 1024); + rs_metas->push_back(ptr10); + } + void init_rs_meta_pick_empty(std::vector<RowsetMetaSharedPtr>* rs_metas) { RowsetMetaSharedPtr ptr1(new RowsetMeta()); init_rs_meta(ptr1, 0, 1); @@ -597,6 +649,34 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, _pick_missing_version_cumulativ static_cast<void>(compaction.find_longest_consecutive_version(&rowsets3, nullptr)); EXPECT_EQ(0, rowsets3.size()); } + +TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_empty_rowsets) { + std::vector<RowsetMetaSharedPtr> rs_metas; + init_all_rs_meta_empty_nonoverlapping(&rs_metas); + + for (auto& rowset : rs_metas) { + static_cast<void>(_tablet_meta->add_rs_meta(rowset)); + } + + TabletSharedPtr _tablet( + new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY)); + static_cast<void>(_tablet->init()); + _tablet->calculate_cumulative_point(); + + auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction(); + + std::vector<RowsetSharedPtr> input_rowsets; + Version last_delete_version {-1, -1}; + size_t compaction_score = 0; + + _tablet->_cumulative_compaction_policy->pick_input_rowsets( + _tablet.get(), candidate_rowsets, 10, 5, &input_rowsets, &last_delete_version, + &compaction_score, config::enable_delete_when_cumu_compaction); + + EXPECT_EQ(7, input_rowsets.size()); + EXPECT_EQ(-1, last_delete_version.first); + EXPECT_EQ(-1, last_delete_version.second); +} } // namespace doris // @brief Test Stub --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org