This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 95b05928fd7 [fix](compaction) fix time series compaction merge empty 
rowsets priority #34562 (#34765)
95b05928fd7 is described below

commit 95b05928fd7f920255f9ca34c4fff1a5159d7b94
Author: Sun Chenyang <csun5...@gmail.com>
AuthorDate: Tue May 14 09:10:09 2024 +0800

    [fix](compaction) fix time series compaction merge empty rowsets priority 
#34562 (#34765)
---
 .../cumulative_compaction_time_series_policy.cpp   | 39 +++++------
 be/src/olap/delta_writer.cpp                       |  3 +-
 ...mulative_compaction_time_series_policy_test.cpp | 80 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 21 deletions(-)

diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp 
b/be/src/olap/cumulative_compaction_time_series_policy.cpp
index 6c3f949723a..3134364a4dd 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.cpp
+++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp
@@ -73,13 +73,6 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
         return 0;
     }
 
-    // If there is a continuous set of empty rowsets, prioritize merging.
-    auto consecutive_empty_rowsets = 
tablet->pick_first_consecutive_empty_rowsets(
-            
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
-    if (!consecutive_empty_rowsets.empty()) {
-        return score;
-    }
-
     // Condition 1: the size of input files for compaction meets the 
requirement of parameter compaction_goal_size
     int64_t compaction_goal_size_mbytes =
             tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
@@ -126,6 +119,13 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
         tablet->set_last_cumu_compaction_success_time(now);
     }
 
+    // Condition 5: If there is a continuous set of empty rowsets, prioritize 
merging.
+    auto consecutive_empty_rowsets = 
tablet->pick_first_consecutive_empty_rowsets(
+            
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
+    if (!consecutive_empty_rowsets.empty()) {
+        return score;
+    }
+
     return 0;
 }
 
@@ -215,19 +215,6 @@ int 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
         return 0;
     }
 
-    // If their are many empty rowsets, maybe should be compacted
-    auto consecutive_empty_rowsets = 
tablet->pick_first_consecutive_empty_rowsets(
-            
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
-    if (!consecutive_empty_rowsets.empty()) {
-        VLOG_NOTICE << "tablet is " << tablet->tablet_id()
-                    << ", there are too many consecutive empty rowsets, size 
is "
-                    << consecutive_empty_rowsets.size();
-        input_rowsets->clear();
-        input_rowsets->insert(input_rowsets->end(), 
consecutive_empty_rowsets.begin(),
-                              consecutive_empty_rowsets.end());
-        return 0;
-    }
-
     int64_t compaction_goal_size_mbytes =
             tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
 
@@ -338,6 +325,18 @@ int 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
     }
 
     input_rowsets->clear();
+    // Condition 5: If their are many empty rowsets, maybe should be compacted
+    auto consecutive_empty_rowsets = 
tablet->pick_first_consecutive_empty_rowsets(
+            
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
+    if (!consecutive_empty_rowsets.empty()) {
+        VLOG_NOTICE << "tablet is " << tablet->tablet_id()
+                    << ", there are too many consecutive empty rowsets, size 
is "
+                    << consecutive_empty_rowsets.size();
+        input_rowsets->clear();
+        input_rowsets->insert(input_rowsets->end(), 
consecutive_empty_rowsets.begin(),
+                              consecutive_empty_rowsets.end());
+        return 0;
+    }
     *compaction_score = 0;
 
     return 0;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 1ec4dd17313..bf7adadb943 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -112,7 +112,8 @@ Status BaseDeltaWriter::init() {
     return Status::OK();
 }
 
-Status BaseDeltaWriter::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs) {
+Status BaseDeltaWriter::write(const vectorized::Block* block,
+                              const std::vector<uint32_t>& row_idxs) {
     if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
     }
diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp 
b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
index 1390f1deb14..57872083edf 100644
--- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
@@ -214,6 +214,58 @@ public:
         rs_metas->push_back(ptr5);
     }
 
+    void 
init_all_rs_meta_empty_nonoverlapping(std::vector<RowsetMetaSharedPtr>* 
rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_total_disk_size(1 * 1024);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_total_disk_size(2 * 1024);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 4);
+        ptr3->set_num_segments(0);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 5, 5);
+        ptr4->set_num_segments(0);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 6, 6);
+        ptr5->set_num_segments(0);
+        rs_metas->push_back(ptr5);
+
+        RowsetMetaSharedPtr ptr6(new RowsetMeta());
+        init_rs_meta(ptr6, 7, 7);
+        ptr6->set_num_segments(0);
+        rs_metas->push_back(ptr6);
+
+        RowsetMetaSharedPtr ptr7(new RowsetMeta());
+        init_rs_meta(ptr7, 8, 8);
+        ptr7->set_num_segments(0);
+        rs_metas->push_back(ptr7);
+
+        RowsetMetaSharedPtr ptr8(new RowsetMeta());
+        init_rs_meta(ptr8, 9, 9);
+        ptr8->set_num_segments(0);
+        rs_metas->push_back(ptr8);
+
+        RowsetMetaSharedPtr ptr9(new RowsetMeta());
+        init_rs_meta(ptr9, 10, 10);
+        ptr9->set_num_segments(0);
+        rs_metas->push_back(ptr9);
+
+        RowsetMetaSharedPtr ptr10(new RowsetMeta());
+        init_rs_meta(ptr10, 11, 11);
+        ptr10->set_total_disk_size(2 * 1024);
+        rs_metas->push_back(ptr10);
+    }
+
     void init_rs_meta_pick_empty(std::vector<RowsetMetaSharedPtr>* rs_metas) {
         RowsetMetaSharedPtr ptr1(new RowsetMeta());
         init_rs_meta(ptr1, 0, 1);
@@ -597,6 +649,34 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, 
_pick_missing_version_cumulativ
     static_cast<void>(compaction.find_longest_consecutive_version(&rowsets3, 
nullptr));
     EXPECT_EQ(0, rowsets3.size());
 }
+
+TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_empty_rowsets) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_all_rs_meta_empty_nonoverlapping(&rs_metas);
+
+    for (auto& rowset : rs_metas) {
+        static_cast<void>(_tablet_meta->add_rs_meta(rowset));
+    }
+
+    TabletSharedPtr _tablet(
+            new Tablet(_engine, _tablet_meta, nullptr, 
CUMULATIVE_TIME_SERIES_POLICY));
+    static_cast<void>(_tablet->init());
+    _tablet->calculate_cumulative_point();
+
+    auto candidate_rowsets = 
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version {-1, -1};
+    size_t compaction_score = 0;
+
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            _tablet.get(), candidate_rowsets, 10, 5, &input_rowsets, 
&last_delete_version,
+            &compaction_score, config::enable_delete_when_cumu_compaction);
+
+    EXPECT_EQ(7, input_rowsets.size());
+    EXPECT_EQ(-1, last_delete_version.first);
+    EXPECT_EQ(-1, last_delete_version.second);
+}
 } // namespace doris
 
 // @brief Test Stub


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to