This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dc3ed1c  [Compaction]Compaction rules optimization (#4212)
dc3ed1c is described below

commit dc3ed1c525e08f9bd8acfb01b3507c6c7d230164
Author: ZhangYu0123 <67053339+zhangyu0...@users.noreply.github.com>
AuthorDate: Wed Aug 19 09:34:13 2020 +0800

    [Compaction]Compaction rules optimization (#4212)
    
    Compaction rules optimization, the detail problem description and design to 
see #4164.
    This pr commits 2 functions:
    (1) add the cumulative policy configable, and implement original policy.
    (2) implement universal policy, the optimization version in #4164.
---
 be/src/common/config.h                             |   21 +
 be/src/olap/CMakeLists.txt                         |    1 +
 be/src/olap/cumulative_compaction.cpp              |   63 +-
 be/src/olap/cumulative_compaction.h                |    4 +-
 be/src/olap/cumulative_compaction_policy.cpp       |  468 +++++++++
 be/src/olap/cumulative_compaction_policy.h         |  263 +++++
 be/src/olap/olap_server.cpp                        |   34 +-
 be/src/olap/rowset/rowset_meta.h                   |    4 +
 be/src/olap/storage_engine.h                       |    2 +
 be/src/olap/tablet.cpp                             |  116 +--
 be/src/olap/tablet.h                               |   23 +-
 be/src/olap/version_graph.cpp                      |    3 +-
 be/test/olap/cumulative_compaction_policy_test.cpp | 1022 ++++++++++++++++++++
 docs/en/administrator-guide/config/be_config.md    |   42 +-
 docs/zh-CN/administrator-guide/config/be_config.md |   40 +
 15 files changed, 1976 insertions(+), 130 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 145d9b3..08151ad 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -268,6 +268,27 @@ namespace config {
     CONF_mInt64(base_compaction_interval_seconds_since_last_operation, 
"86400");
     CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
 
+    // config the cumulative compaction policy
+    // Valid configs: num_base, size_based
+    // num_based policy, the original version of cumulative compaction, 
cumulative version compaction once.
+    // size_based policy, a optimization version of cumulative compaction, 
targeting the use cases requiring 
+    // lower write amplification, trading off read amplification and space 
amplification.
+    CONF_String(cumulative_compaction_policy, "num_based");
+
+    // In size_based policy, output rowset of cumulative compaction total disk 
size exceed this config size, 
+    // this rowset will be given to base compaction, unit is m byte.
+    CONF_mInt64(cumulative_size_based_promotion_size_mbytes, "1024");
+    // In size_based policy, output rowset of cumulative compaction total disk 
size exceed this config ratio of
+    // base rowset's total disk size, this rowset will be given to base 
compaction. The value must be between 
+    // 0 and 1.
+    CONF_mDouble(cumulative_size_based_promotion_ratio, "0.05");
+    // In size_based policy, the smallest size of rowset promotion. When the 
rowset is less than this config, this 
+    // rowset will be not given to base compaction. The unit is m byte.
+    CONF_mInt64(cumulative_size_based_promotion_min_size_mbytes, "64");
+    // The lower bound size to do cumulative compaction. When total disk size 
of candidate rowsets is less than 
+    // this size, size_based policy also does cumulative compaction. The unit 
is m byte.
+    CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");
+
     // cumulative compaction policy: max delta file's size unit:B
     CONF_mInt32(cumulative_compaction_check_interval_seconds, "10");
     CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 884c045..13c11a0 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -37,6 +37,7 @@ add_library(Olap STATIC
     comparison_predicate.cpp
     compress.cpp
     cumulative_compaction.cpp
+    cumulative_compaction_policy.cpp
     delete_handler.cpp
     delta_writer.cpp
     file_helper.cpp
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index a5f1358..c6bf9f8 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -27,7 +27,7 @@ CumulativeCompaction::CumulativeCompaction(TabletSharedPtr 
tablet, const std::st
         : Compaction(tablet, label, parent_tracker),
           
_cumulative_rowset_size_threshold(config::cumulative_compaction_budgeted_bytes) 
{}
 
-CumulativeCompaction::~CumulativeCompaction() { }
+CumulativeCompaction::~CumulativeCompaction() {}
 
 OLAPStatus CumulativeCompaction::compact() {
     if (!_tablet->init_succeeded()) {
@@ -41,7 +41,7 @@ OLAPStatus CumulativeCompaction::compact() {
     }
     TRACE("got cumulative compaction lock");
 
-    // 1.calculate cumulative point 
+    // 1.calculate cumulative point
     _tablet->calculate_cumulative_point();
     TRACE("calculated cumulative point");
     LOG(INFO) << "after calculate, current cumulative point is " << 
_tablet->cumulative_layer_point() 
@@ -60,9 +60,10 @@ OLAPStatus CumulativeCompaction::compact() {
     _state = CompactionState::SUCCESS;
 
     // 5. set cumulative point
-    _tablet->set_cumulative_layer_point(_input_rowsets.back()->end_version() + 
1);
-    LOG(INFO) << "after cumulative compaction, current cumulative point is " 
-        << _tablet->cumulative_layer_point()  << ", tablet=" << 
_tablet->full_name() ;
+    
_tablet->cumulative_compaction_policy()->update_cumulative_point(_input_rowsets,
 _output_rowset,
+                                                                     
_last_delete_version);
+    LOG(INFO) << "after cumulative compaction, current cumulative point is "
+              << _tablet->cumulative_layer_point() << ", tablet=" << 
_tablet->full_name();
 
     // 6. add metric to cumulative compaction
     
DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(_input_rowsets.size());
@@ -74,62 +75,30 @@ OLAPStatus CumulativeCompaction::compact() {
 
 OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
     std::vector<RowsetSharedPtr> candidate_rowsets;
+
     _tablet->pick_candicate_rowsets_to_cumulative_compaction(
-        config::cumulative_compaction_skip_window_seconds, &candidate_rowsets);
+            config::cumulative_compaction_skip_window_seconds, 
&candidate_rowsets);
 
     if (candidate_rowsets.empty()) {
         return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
     }
 
-    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), 
Rowset::comparator);
     RETURN_NOT_OK(check_version_continuity(candidate_rowsets));
 
-    std::vector<RowsetSharedPtr> transient_rowsets;
     size_t compaction_score = 0;
-    // the last delete version we meet when traversing candidate_rowsets
-    Version last_delete_version { -1, -1 };
-
-    for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
-        RowsetSharedPtr rowset = candidate_rowsets[i];
-        if (_tablet->version_for_delete_predicate(rowset->version())) {
-            last_delete_version = rowset->version();
-            if (!transient_rowsets.empty()) {
-                // we meet a delete version, and there were other versions 
before.
-                // we should compact those version before handling them over 
to base compaction
-                _input_rowsets = transient_rowsets;
-                break;
-            }
-
-            // we meet a delete version, and no other versions before, skip it 
and continue
-            transient_rowsets.clear();
-            compaction_score = 0;
-            continue;
-        }
-
-        if (compaction_score >= 
config::max_cumulative_compaction_num_singleton_deltas) {
-            // got enough segments
-            break;
-        }
-
-        compaction_score += rowset->rowset_meta()->get_compaction_score();
-        transient_rowsets.push_back(rowset); 
-    }
-
-    // if we have a sufficient number of segments,
-    // or have other versions before encountering the delete version, we 
should process the compaction.
-    if (compaction_score >= 
config::min_cumulative_compaction_num_singleton_deltas
-        || (last_delete_version.first != -1 && !transient_rowsets.empty())) {
-        _input_rowsets = transient_rowsets;
-    }
+    int transient_size = 
_tablet->cumulative_compaction_policy()->pick_input_rowsets(
+            candidate_rowsets, 
config::max_cumulative_compaction_num_singleton_deltas,
+            config::min_cumulative_compaction_num_singleton_deltas, 
&_input_rowsets,
+            &_last_delete_version, &compaction_score);
 
     // Cumulative compaction will process with at least 1 rowset.
     // So when there is no rowset being chosen, we should return 
OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS:
     if (_input_rowsets.empty()) {
-        if (last_delete_version.first != -1) {
+        if (_last_delete_version.first != -1) {
             // we meet a delete version, should increase the cumulative point 
to let base compaction handle the delete version.
             // plus 1 to skip the delete version.
             // NOTICE: after that, the cumulative point may be larger than max 
version of this tablet, but it doen't matter.
-            _tablet->set_cumulative_layer_point(last_delete_version.first + 1);
+            _tablet->set_cumulative_layer_point(_last_delete_version.first + 
1);
             return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
         }
 
@@ -149,8 +118,8 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
             if (cumu_interval > interval_threshold && base_interval > 
interval_threshold) {
                 // before increasing cumulative point, we should make sure all 
rowsets are non-overlapping.
                 // if at least one rowset is overlapping, we should compact 
them first.
-                CHECK(candidate_rowsets.size() == transient_rowsets.size())
-                    << "tablet: " << _tablet->full_name() << ", "<<  
candidate_rowsets.size() << " vs. " << transient_rowsets.size();
+                CHECK(candidate_rowsets.size() == transient_size)
+                    << "tablet: " << _tablet->full_name() << ", "<<  
candidate_rowsets.size() << " vs. " << transient_size;
                 for (auto& rs : candidate_rowsets) {
                     if (rs->rowset_meta()->is_segments_overlapping()) {
                         _input_rowsets = candidate_rowsets;
diff --git a/be/src/olap/cumulative_compaction.h 
b/be/src/olap/cumulative_compaction.h
index f32268d..93c0399 100755
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cumulative_compaction.h
@@ -19,8 +19,8 @@
 #define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_H
 
 #include <string>
-
 #include "olap/compaction.h"
+#include "olap/cumulative_compaction_policy.h"
 
 namespace doris {
 
@@ -46,6 +46,8 @@ protected:
 private:
     int64_t _cumulative_rowset_size_threshold;
 
+    Version _last_delete_version { -1, -1 };
+
     DISALLOW_COPY_AND_ASSIGN(CumulativeCompaction);
 };
 
diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
new file mode 100644
index 0000000..33fef23
--- /dev/null
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "olap/cumulative_compaction_policy.h"
+#include "util/time.h"
+
+#include <boost/algorithm/string.hpp>
+
+namespace doris {
+
+SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy(
+        std::shared_ptr<Tablet> tablet, int64_t size_based_promotion_size, 
double size_based_promotion_ratio,
+        int64_t size_based_promotion_min_size, int64_t 
size_based_compaction_lower_bound_size)
+        : CumulativeCompactionPolicy(tablet),
+          _size_based_promotion_size(size_based_promotion_size),
+          _size_based_promotion_ratio(size_based_promotion_ratio),
+          _size_based_promotion_min_size(size_based_promotion_min_size),
+          
_size_based_compaction_lower_bound_size(size_based_compaction_lower_bound_size) 
{
+
+    // init _levels by divide 2 between size_based_promotion_size and 
size_based_compaction_lower_bound_size
+    int64_t i_size = size_based_promotion_size / 2;
+
+    while (i_size >= size_based_compaction_lower_bound_size) {
+        _levels.push_back(i_size);
+        i_size /= 2;
+    }
+}
+
+void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
+        const std::vector<RowsetMetaSharedPtr>& all_metas, int64_t 
current_cumulative_point,
+        int64_t* ret_cumulative_point) {
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction 
process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+    // check base rowset frist version must be zero
+    CHECK((*base_rowset_meta)->start_version() == 0);
+
+    int64_t promotion_size = 0;
+    _calc_promotion_size(*base_rowset_meta, &promotion_size);
+
+    int64_t prev_version = -1;
+    for (const RowsetMetaSharedPtr& rs : existing_rss) {
+        if (rs->version().first > prev_version + 1) {
+            // There is a hole, do not continue
+            break;
+        }
+
+        bool is_delete = _tablet->version_for_delete_predicate(rs->version());
+
+        // break the loop if segments in this rowset is overlapping, or is a 
singleton.
+        if (!is_delete && (rs->is_segments_overlapping() || 
rs->is_singleton_delta())) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
+        // check the rowset is whether less than promotion size
+        if (!is_delete && rs->version().first != 0 && rs->total_disk_size() < 
promotion_size) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
+        prev_version = rs->version().second;
+        *ret_cumulative_point = prev_version + 1;
+    }
+    VLOG(3) << "cumulative compaction size_based policy, calculate cumulative 
point value = "
+              << *ret_cumulative_point << ", calc promotion size value = " << 
promotion_size
+              << " tablet = " << _tablet->full_name();
+}
+
+void 
SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(RowsetMetaSharedPtr 
base_rowset_meta,
+                                                               int64_t* 
promotion_size) {
+    int64_t base_size = base_rowset_meta->total_disk_size();
+    *promotion_size = base_size * _size_based_promotion_ratio;
+
+    // promotion_size is between _size_based_promotion_size and 
_size_based_promotion_min_size
+    if (*promotion_size >= _size_based_promotion_size) {
+        *promotion_size = _size_based_promotion_size;
+    } else if (*promotion_size <= _size_based_promotion_min_size) {
+        *promotion_size = _size_based_promotion_min_size;
+    }
+    _refresh_tablet_size_based_promotion_size(*promotion_size);
+}
+
+void 
SizeBasedCumulativeCompactionPolicy::_refresh_tablet_size_based_promotion_size(
+        int64_t promotion_size) {
+    _tablet_size_based_promotion_size = promotion_size;
+}
+
+void SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
+        const std::vector<RowsetSharedPtr>& input_rowsets, RowsetSharedPtr 
output_rowset,
+        Version& last_delete_version) {
+
+    // if rowsets have delete version, move to the last directly
+    if (last_delete_version.first != -1) {
+        _tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
+    } else {
+        // if rowsets have not delete version, check output_rowset total disk 
size 
+        // satisfies promotion size.
+        size_t total_size = output_rowset->rowset_meta()->total_disk_size();
+        if (total_size >= _tablet_size_based_promotion_size) {
+            _tablet->set_cumulative_layer_point(output_rowset->end_version() + 
1);
+        }
+    }
+}
+
+void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
+        const std::vector<RowsetMetaSharedPtr>& all_metas, int64_t 
current_cumulative_point,
+        uint32_t* score) {
+
+    bool base_rowset_exist = false;
+    const int64_t point = current_cumulative_point;
+    int64_t promotion_size = 0;
+    
+    std::vector<RowsetMetaSharedPtr> rowset_to_compact;
+    int64_t total_size = 0;
+
+    // check the base rowset and collect the rowsets of cumulative part 
+    auto rs_meta_iter = all_metas.begin();
+    for (; rs_meta_iter != all_metas.end(); rs_meta_iter++) {
+        auto rs_meta = *rs_meta_iter;
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+            _calc_promotion_size(rs_meta, &promotion_size);
+        }
+        if (rs_meta->end_version() < point) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than 
_break_ here.
+            continue;
+        } else {
+            // collect the rowsets of cumulative part 
+            total_size += rs_meta->total_disk_size();
+            *score += rs_meta->get_compaction_score();
+            rowset_to_compact.push_back(rs_meta);
+        }
+    }
+
+    // If base version does not exist, it may be that tablet is doing alter 
table. 
+    // Do not select it and set *score = 0
+    if (!base_rowset_exist) {
+        *score = 0;
+        return;
+    }
+
+    // if total_size is greater than promotion_size, return total score
+    if (total_size >= promotion_size) {
+        return;
+    }
+
+    // sort the rowsets of cumulative part 
+    std::sort(rowset_to_compact.begin(), rowset_to_compact.end(), 
RowsetMeta::comparator);
+
+    // calculate the rowsets to do cumulative compaction
+    for (auto &rs_meta : rowset_to_compact) {
+        int current_level = _level_size(rs_meta->total_disk_size());
+        int remain_level = _level_size(total_size - 
rs_meta->total_disk_size());
+        // if current level less then remain level, score contains current 
rowset
+        // and process return; otherwise, score does not contains current 
rowset.
+        if (current_level <= remain_level) {
+            return;
+        }
+        total_size -= rs_meta->total_disk_size();
+        *score -= rs_meta->get_compaction_score();
+    }
+}
+
+int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
+        const std::vector<RowsetSharedPtr>& candidate_rowsets, const int64_t 
max_compaction_score,
+        const int64_t min_compaction_score, std::vector<RowsetSharedPtr>* 
input_rowsets,
+        Version* last_delete_version, size_t* compaction_score) {
+    size_t promotion_size = _tablet_size_based_promotion_size;
+    int transient_size = 0;
+    *compaction_score = 0;
+    int64_t total_size = 0;
+    for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
+        RowsetSharedPtr rowset = candidate_rowsets[i];
+        // check whether this rowset is delete version
+        if (_tablet->version_for_delete_predicate(rowset->version())) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions 
before.
+                // we should compact those version before handling them over 
to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, 
skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                continue;
+            }
+        }
+        if (*compaction_score >= max_compaction_score) {
+            // got enough segments
+            break;
+        }
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+    }
+
+    if (total_size >= promotion_size) {
+        return transient_size;
+    }
+
+    // if there is delete version, do compaction directly
+    if (last_delete_version->first != -1) {
+        if(input_rowsets->size() == 1) {
+            auto rs_meta = input_rowsets->front()->rowset_meta();
+            // if there is only one rowset and not overlapping, 
+            // we do not need to do cumulative compaction
+            if (!rs_meta->is_segments_overlapping()) {
+                input_rowsets->clear();
+                *compaction_score = 0;
+            }
+        }
+        return transient_size;
+    }
+
+    auto rs_iter = input_rowsets->begin();
+    while (rs_iter != input_rowsets->end()) {
+        auto rs_meta = (*rs_iter)->rowset_meta();
+        int current_level = _level_size(rs_meta->total_disk_size());
+        int remain_level = _level_size(total_size - 
rs_meta->total_disk_size());
+         // if current level less then remain level, input rowsets contain 
current rowset
+        // and process return; otherwise, input rowsets do not contain current 
rowset.
+        if (current_level <= remain_level) {
+            break;
+        }
+        total_size -= rs_meta->total_disk_size();
+        *compaction_score -= rs_meta->get_compaction_score();
+
+        rs_iter = input_rowsets->erase(rs_iter);
+    }
+
+    LOG(INFO) << "cumulative compaction size_based policy, compaction_score = 
" << *compaction_score
+              << ", total_size = " << total_size
+              << ", calc promotion size value = " << promotion_size
+              << ", tablet = " << _tablet->full_name() << ", input_rowset size 
"
+              << input_rowsets->size();
+
+    // empty return
+    if (input_rowsets->empty()) {
+        return transient_size;
+    }
+
+    // if we have a sufficient number of segments, we should process the 
compaction.
+    // otherwise, we check number of segments and total_size whether can do 
compaction.
+    if (total_size < _size_based_compaction_lower_bound_size && 
*compaction_score < min_compaction_score) {
+        input_rowsets->clear();
+        *compaction_score = 0;
+    } else if (total_size >= _size_based_compaction_lower_bound_size &&
+                   input_rowsets->size() == 1) {
+        auto rs_meta = input_rowsets->front()->rowset_meta();
+        // if there is only one rowset and not overlapping, 
+        // we do not need to do compaction
+        if (!rs_meta->is_segments_overlapping()) {
+            input_rowsets->clear();
+            *compaction_score = 0;
+        }
+    }
+    return transient_size;
+}
+
+int SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
+
+    for (auto &i : _levels) {
+        if (size >= i) {
+            return i;
+        }
+    }
+    return 0;
+}
+
+void NumBasedCumulativeCompactionPolicy::update_cumulative_point(
+        const std::vector<RowsetSharedPtr>& input_rowsets, RowsetSharedPtr 
_output_rowset,
+        Version& last_delete_version) {
+    // use the version after end version of the last input rowsets to update 
cumulative point
+    int64_t cumulative_point = input_rowsets.back()->end_version() + 1;
+    _tablet->set_cumulative_layer_point(cumulative_point);
+}
+
+int NumBasedCumulativeCompactionPolicy::pick_input_rowsets(
+        const std::vector<RowsetSharedPtr>& candidate_rowsets, const int64_t 
max_compaction_score,
+        const int64_t min_compaction_score, std::vector<RowsetSharedPtr>* 
input_rowsets,
+        Version* last_delete_version, size_t* compaction_score) {
+    *compaction_score = 0;
+    int transient_size = 0;
+    for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
+        RowsetSharedPtr rowset = candidate_rowsets[i];
+        // check whether this rowset is delete version
+        if (_tablet->version_for_delete_predicate(rowset->version())) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions 
before.
+                // we should compact those version before handling them over 
to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, 
skip it and continue
+                input_rowsets->clear();
+                transient_size = 0;
+                *compaction_score = 0;
+                continue;
+            }
+        }
+        if (*compaction_score >= max_compaction_score) {
+            // got enough segments
+            break;
+        }
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        input_rowsets->push_back(rowset);
+        transient_size += 1;
+    }
+
+    if (input_rowsets->empty()) {
+        return transient_size;
+    }
+
+    // if we have a sufficient number of segments,
+    // or have other versions before encountering the delete version, we 
should process the compaction.
+    if (last_delete_version->first == -1 && *compaction_score < 
min_compaction_score) {
+        input_rowsets->clear();
+    }
+    return transient_size;
+}
+
+void 
NumBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(const 
std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                          const int64_t 
current_cumulative_point,
+                                          uint32_t* score) {
+    bool base_rowset_exist = false;
+    const int64_t point = current_cumulative_point;
+    for (auto& rs_meta : all_rowsets) {
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+        }
+        if (rs_meta->start_version() < point) {
+            // all_rs_metas() is not sorted, so we use _continue_ other than 
_break_ here.
+            continue;
+        }
+        *score += rs_meta->get_compaction_score();
+    }
+
+    // If base version does not exist, it may be that tablet is doing alter 
table. 
+    // Do not select it and set *score = 0
+    if (!base_rowset_exist) {
+        *score = 0;
+    }
+}
+
+void NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(const 
std::vector<RowsetMetaSharedPtr>& all_metas, 
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+
+    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
+    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction 
process.
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    int64_t prev_version = -1;
+    for (const RowsetMetaSharedPtr& rs : existing_rss) {
+        if (rs->version().first > prev_version + 1) {
+            // There is a hole, do not continue
+            break;
+        }
+        // break the loop if segments in this rowset is overlapping, or is a 
singleton.
+        if (rs->is_segments_overlapping() || rs->is_singleton_delta()) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
+        prev_version = rs->version().second;
+        *ret_cumulative_point = prev_version + 1;
+    }
+}
+
+void CumulativeCompactionPolicy::pick_candicate_rowsets(int64_t 
skip_window_sec, 
+        const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,
+        int64_t cumulative_point, std::vector<RowsetSharedPtr>* 
candidate_rowsets) {
+
+    int64_t now = UnixSeconds();
+    for (auto& it : rs_version_map) {
+        // find all rowset version greater than cumulative_point and skip the 
create time in skip_window_sec
+        if (it.first.first >= cumulative_point && (it.second->creation_time() 
+ skip_window_sec < now)) {
+            candidate_rowsets->push_back(it.second);
+        }
+    }
+    std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), 
Rowset::comparator);
+
+}
+
+std::unique_ptr<CumulativeCompactionPolicy> 
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string
 type,
+                            std::shared_ptr<Tablet> tablet) {
+
+    CompactionPolicy policy_type;
+    _parse_cumulative_compaction_policy(type, &policy_type);
+
+    if (policy_type == NUM_BASED_POLICY) {
+        return std::unique_ptr<CumulativeCompactionPolicy>(new 
NumBasedCumulativeCompactionPolicy(tablet));
+    }
+    else if(policy_type == SIZE_BASED_POLICY) {
+        return std::unique_ptr<CumulativeCompactionPolicy>(new 
SizeBasedCumulativeCompactionPolicy(tablet));
+    }
+
+    return std::unique_ptr<CumulativeCompactionPolicy>(new 
NumBasedCumulativeCompactionPolicy(tablet));
+}
+
+void 
CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(std::string
 type, CompactionPolicy *policy_type) {
+
+    boost::to_upper(type);
+    if (type == CUMULATIVE_NUM_BASED_POLICY) {
+        *policy_type = NUM_BASED_POLICY;
+    }
+    else if (type == CUMULATIVE_SIZE_BASED_POLICY) {
+        *policy_type = SIZE_BASED_POLICY;
+    } else {
+        LOG(FATAL) << "parse cumulative compaction policy error " << type;
+    }
+}
+}
\ No newline at end of file
diff --git a/be/src/olap/cumulative_compaction_policy.h 
b/be/src/olap/cumulative_compaction_policy.h
new file mode 100644
index 0000000..6103cc9
--- /dev/null
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef OLAP_CUMULATIVE_COMPACTION_POLICY_H
+#define OLAP_CUMULATIVE_COMPACTION_POLICY_H
+
+#include <string>
+
+#include "olap/utils.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+
+class Tablet;
+
+/// This CompactionPolicy enum is used to represent the type of compaction 
policy.
+/// Now it has two values, NUM_BASED_POLICY and SIZE_BASED_POLICY.
+/// NUM_BASED_POLICY means current compaction policy implemented by num based 
policy.
+/// SIZE_BASED_POLICY means current comapction policy implemented by 
size_based policy.
+enum CompactionPolicy {
+    NUM_BASED_POLICY = 0,
+    SIZE_BASED_POLICY = 1,
+};
+
+const static std::string CUMULATIVE_NUM_BASED_POLICY = "NUM_BASED";
+const static std::string CUMULATIVE_SIZE_BASED_POLICY = "SIZE_BASED";
+/// This class CumulativeCompactionPolicy is the base class of cumulative 
compaction policy.
+/// It defines the policy to do cumulative compaction. It has different 
derived classes, which implements 
+/// concrete cumulative compaction algorithm. The policy is configured by 
conf::cumulative_compaction_policy.
+/// The policy functions is the main steps to do cumulative compaction. For 
example, how to pick candicate 
+/// rowsets from tablet using current policy, how to calculate the cumulative 
point and how to calculate
+/// the tablet cumulative compcation score and so on.
+class CumulativeCompactionPolicy {
+
+public:
+    /// Constructor function of CumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    CumulativeCompactionPolicy(std::shared_ptr<Tablet> tablet) : 
_tablet(tablet){}
+
+    /// Destructor function of CumulativeCompactionPolicy.
+    virtual ~CumulativeCompactionPolicy() {}
+
+    /// Calculate the cumulative compaction score of the tablet. This function 
uses rowsets meta and current 
+    /// cumulative point to calculative the score of tablet. The score depends 
on the concrete algorithm of policy.
+    /// In general, the score represents the segments nums to do cumulative 
compaction in total rowsets. The more
+    /// score tablet gets, the earlier it can do  cumulative compaction.
+    /// param all_rowsets, all rowsets in tablet.
+    /// param current_cumulative_point, current cumulative point value.
+    /// return score, the result score after calculate.
+    virtual void calc_cumulative_compaction_score(
+            const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t 
current_cumulative_point,
+            uint32_t* score) = 0;
+
+    /// This function implements the policy which represents how to pick the 
candicate rowsets for compaction. 
+    /// This base class gives a unified implementation. Its derived classes 
also can override this function each other.
+    /// param skip_window_sec, it means skipping the rowsets which use create 
time plus skip_window_sec is greater than now.
+    /// param rs_version_map, mapping from version to rowset
+    /// param cumulative_point,  current cumulative point of tablet
+    /// return candidate_rowsets, the container of candidate rowsets 
+    virtual void pick_candicate_rowsets(
+            int64_t skip_window_sec,
+            const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,
+            int64_t cumulative_point, std::vector<RowsetSharedPtr>* 
candidate_rowsets);
+    
+    /// Pick input rowsets from candidate rowsets for compaction. This 
function is pure virtual function. 
+    /// Its implemention depands on concrete compaction policy.
+    /// param candidate_rowsets, the candidate_rowsets vector container to 
pick input rowsets
+    /// return input_rowsets, the vector container as return
+    /// return last_delete_version, if has delete rowset, record the delete 
version from input_rowsets
+    /// return compaction_score, calculate the compaction score of picked 
input rowset
+    virtual int pick_input_rowsets(const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+                                    const int64_t max_compaction_score,
+                                    const int64_t min_compaction_score,
+                                    std::vector<RowsetSharedPtr>* 
input_rowsets,
+                                    Version* last_delete_version, size_t* 
compaction_score) = 0;
+
+    /// Update tablet's cumulative point after cumulative compaction finished. 
This function is pure virtual function.
+    /// Each derived has its own update policy which deponds on its concrete 
algorithm. When the cumulative point moves 
+    /// after output rowset, then output rowset will do base compaction next 
time.
+    /// param input_rowsets, the picked input rowset to do compaction just now
+    /// param output_rowset, the result rowset after compaction
+    virtual void update_cumulative_point(const std::vector<RowsetSharedPtr>& 
input_rowsets,
+                                         RowsetSharedPtr output_rowset,
+                                         Version& last_delete_version) = 0;
+
+    /// Calculate tablet's cumulatvie point before compaction. This 
calculation just executes once when the tablet compacts
+    /// first time after BE initialization and then motion of cumulatvie point 
depends on update_cumulative_point policy.
+    /// This function is pure virtual function. In genaral, the cumulative 
point splits the rowsets into two parts:
+    /// base rowsets, cumulative rowsets.
+    /// param all_rowsets, all rowsets in the tablet
+    /// param current_cumulative_point, current cumulative position
+    /// return cumulative_point, the result of calculating cumulative point 
position
+    virtual void calculate_cumulative_point(const 
std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                            int64_t current_cumulative_point,
+                                            int64_t* cumulative_point) = 0;
+
+protected:
+    /// tablet pointer
+    std::shared_ptr<Tablet> _tablet;
+};
+
+/// Num based cumulative compcation policy implemention. Num based policy 
which derives CumulativeCompactionPolicy is early 
+/// basic algorithm. This policy uses linear structure to compact rowsets. The 
cumulative rowsets compact only once and 
+/// then the output will do base compaction. It can make segments of rowsets 
in order and compact small rowsets to a bigger one.
+class NumBasedCumulativeCompactionPolicy final : public 
CumulativeCompactionPolicy {
+    
+public:
+    /// Constructor function of NumBasedCumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    NumBasedCumulativeCompactionPolicy(std::shared_ptr<Tablet> tablet)
+            : CumulativeCompactionPolicy(tablet){}
+
+    /// Destructor function of NumBasedCumulativeCompactionPolicy.
+    ~NumBasedCumulativeCompactionPolicy() {}
+
+    /// Num based cumulative compaction policy implements pick input rowsets 
function.
+    /// Its main policy is picking rowsets from candidate rowsets by comparing 
accumulative compaction_score and
+    /// max_cumulative_compaction_num_singleton_deltas or checking whether 
there is delete version rowset.
+    int pick_input_rowsets(const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+                            const int64_t max_compaction_score,
+                            const int64_t min_compaction_score,
+                            std::vector<RowsetSharedPtr>* input_rowsets,
+                            Version* last_delete_version, size_t* 
compaction_score) override;
+
+    /// Num based cumulative compaction policy implements update cumulative 
point function.
+    /// Its main policy is using the last input version to update the 
cumulative point. It aims that every rowsets only 
+    /// do compact once.
+    void update_cumulative_point(const std::vector<RowsetSharedPtr>& 
input_rowsets,
+                                 RowsetSharedPtr _output_rowset,
+                                 Version& last_delete_version) override;
+
+    /// Num based cumulative compaction policy implements calculate cumulative 
point function.
+    /// When the first time the tablet does compact, this calculation is 
executed. Its main policy is to find first rowset
+    /// which is segments_overlapping type, it represent this rowset is not 
compacted and use this version as cumulative point. 
+    void calculate_cumulative_point(const std::vector<RowsetMetaSharedPtr>& 
all_rowsets,
+                                     int64_t current_cumulative_point,
+                                     int64_t* cumulative_point) override;
+
+    /// Num based cumulative compaction policy implements calc cumulative 
compaction score function.
+    /// Its main policy is calculating the accumulative compaction score after 
current cumulative_point in tablet.
+    void calc_cumulative_compaction_score(const 
std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                          int64_t current_cumulative_point,
+                                          uint32_t* score) override;
+
+};
+
+/// SizeBased cumulative compcation policy implemention. SizeBased policy 
which derives CumulativeCompactionPolicy is a optimized
+/// version of num based cumulative compaction policy. This policy also uses 
linear structure to compact rowsets. The cumulative rowsets 
+/// can do compaction when they are in same level size. And when output rowset 
exceeds the promotion radio of base size or min promotion
+/// size, it will do base compaction. This policy is targeting the use cases 
requiring lower write amplification, trading off read 
+/// amplification and space amplification.
+class SizeBasedCumulativeCompactionPolicy final : public 
CumulativeCompactionPolicy {
+
+public:
+    /// Constructor function of SizeBasedCumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method.
+    /// param tablet, the shared pointer of tablet 
+    SizeBasedCumulativeCompactionPolicy(
+            std::shared_ptr<Tablet> tablet,
+            int64_t size_based_promotion_size =
+                    config::cumulative_size_based_promotion_size_mbytes * 1024 
* 1024,
+            double size_based_promotion_ratio =
+                    config::cumulative_size_based_promotion_ratio,
+            int64_t size_based_promotion_min_size =
+                    config::cumulative_size_based_promotion_min_size_mbytes * 
1024 * 1024,
+            int64_t size_based_compaction_lower_bound_size =
+                    config::cumulative_size_based_compaction_lower_size_mbytes 
* 1024 * 1024);
+    
+    /// Destructor function of SizeBasedCumulativeCompactionPolicy.
+    ~SizeBasedCumulativeCompactionPolicy() {}
+
+    /// SizeBased cumulative compaction policy implements calculate cumulative 
point function.
+    /// When the first time the tablet does compact, this calculation is 
executed. Its main policy is to find first rowset
+    /// which does not satifie the promotion conditions. 
+    void calculate_cumulative_point(const std::vector<RowsetMetaSharedPtr>& 
all_rowsets,
+                                    int64_t current_cumulative_point,
+                                    int64_t* cumulative_point) override;
+
+    /// SizeBased cumulative compaction policy implements pick input rowsets 
function.
+    /// Its main policy is picking rowsets from candidate rowsets by comparing 
accumulative compaction_score,
+    /// max_cumulative_compaction_num_singleton_deltas or checking whether 
there is delete version rowset,
+    /// and choose those rowset in the same level to do cumulative compaction.
+    int pick_input_rowsets(const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+                            const int64_t max_compaction_score,
+                            const int64_t min_compaction_score,
+                            std::vector<RowsetSharedPtr>* input_rowsets,
+                            Version* last_delete_version, size_t* 
compaction_score) override;
+
+    /// SizeBased cumulative compaction policy implements update cumulative 
point function.
+    /// Its main policy is judging the output rowset size whether satifies the 
promotion size.
+    /// If it satified, this policy will update the cumulative point.
+    void update_cumulative_point(const std::vector<RowsetSharedPtr>& 
input_rowsets,
+                                 RowsetSharedPtr _output_rowset, Version& 
last_delete_version);
+
+    /// Num based cumulative compaction policy implements calc cumulative 
compaction score function.
+    /// Its main policy is calculating the accumulative compaction score after 
current cumulative_point in tablet.
+    void calc_cumulative_compaction_score(const 
std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                          int64_t current_cumulative_point,
+                                          uint32_t* score) override;
+
+private:
+    /// calculate promotion size using current base rowset meta size and 
promition configs
+    void _calc_promotion_size(RowsetMetaSharedPtr base_rowset_meta, int64_t* 
promotion_size);
+
+    /// calculate the disk size belong to which level, the level is divide by 
power of 2
+    /// between cumulative_size_based_promotion_min_size_mbytes
+    /// and cumulative_size_based_promotion_size_mbytes
+    int _level_size(const int64_t size);
+
+    /// when policy calcalute cumulative_compaction_score, update promotion 
size at the same time
+    void _refresh_tablet_size_based_promotion_size(int64_t promotion_size);
+
+private:
+    /// cumulative compaction promotion size, unit is byte.
+    int64_t _size_based_promotion_size;
+    /// cumulative compaction promotion ratio of base rowset total disk size.
+    double _size_based_promotion_ratio;
+    /// cumulative compaction promotion min size, unit is byte.
+    int64_t _size_based_promotion_min_size;
+    /// lower bound size to do compaction compaction.
+    int64_t _size_based_compaction_lower_bound_size;
+    /// record tablet promotion size, it is updated each time when calculate 
cumulative_compaction_score 
+    int64_t _tablet_size_based_promotion_size;
+    /// levels division of disk size, same level rowsets can do compaction
+    std::vector<int64_t> _levels;
+};
+
+/// The factory of CumulativeCompactionPolicy, it can product diffrent policy 
according to the `policy` parameter.
+class CumulativeCompactionPolicyFactory {
+
+public:
+    /// Static factory function. It can product diffrent policy according to 
the `policy` parameter and use tablet ptr 
+    /// to construct the policy. Now it can product size based and num based 
policies.
+    static std::unique_ptr<CumulativeCompactionPolicy> 
create_cumulative_compaction_policy(
+            std::string policy, std::shared_ptr<Tablet> tablet);
+
+private:
+    /// It is a static function to help to check the policy config and convert 
to CompactionPolicy enum variable
+    static void _parse_cumulative_compaction_policy(std::string policy,
+                                                    CompactionPolicy* 
policy_type);
+};
+
+}
+#endif // OLAP_CUMULATIVE_COMPACTION_POLICY_H
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 760a751..dac000a 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -25,6 +25,7 @@
 #include <string>
 
 #include <gperftools/profiler.h>
+#include <boost/algorithm/string.hpp>
 
 #include "common/status.h"
 #include "olap/cumulative_compaction.h"
@@ -76,6 +77,7 @@ Status StorageEngine::start_bg_threads() {
     _disk_stat_monitor_thread.detach();
     LOG(INFO) << "disk stat monitor thread started";
 
+    
     // convert store map to vector
     std::vector<DataDir*> data_dirs;
     for (auto& tmp_store : _store_map) {
@@ -83,7 +85,8 @@ Status StorageEngine::start_bg_threads() {
     }
     int32_t data_dir_num = data_dirs.size();
 
-    
+    // check cumulative compaction config
+    _check_cumulative_compaction_config();
     // base and cumulative compaction threads
     int32_t base_compaction_num_threads_per_disk = std::max<int32_t>(1, 
config::base_compaction_num_threads_per_disk);
     int32_t cumulative_compaction_num_threads_per_disk = std::max<int32_t>(1, 
config::cumulative_compaction_num_threads_per_disk);
@@ -283,6 +286,34 @@ void* 
StorageEngine::_disk_stat_monitor_thread_callback(void* arg) {
     return nullptr;
 }
 
+void StorageEngine::_check_cumulative_compaction_config() {
+
+    std::string cumulative_compaction_type = 
config::cumulative_compaction_policy;
+    boost::to_upper(cumulative_compaction_type);
+
+    // if size_based policy is used, check size_based policy configs
+    if (cumulative_compaction_type == CUMULATIVE_SIZE_BASED_POLICY) {
+        int64_t size_based_promotion_size =
+                config::cumulative_size_based_promotion_size_mbytes;
+        int64_t size_based_promotion_min_size =
+                config::cumulative_size_based_promotion_min_size_mbytes;
+        int64_t size_based_compaction_lower_bound_size =
+                config::cumulative_size_based_compaction_lower_size_mbytes;
+
+        // check size_based_promotion_size must be greater than 
size_based_promotion_min_size and 2 * size_based_compaction_lower_bound_size
+        int64_t should_min_size_based_promotion_size =
+                std::max(size_based_promotion_min_size, 2 * 
size_based_compaction_lower_bound_size);
+
+        if (size_based_promotion_size < should_min_size_based_promotion_size) {
+            size_based_promotion_size = should_min_size_based_promotion_size;
+            LOG(WARNING) << "the config size_based_promotion_size is adjusted 
to "
+                            "size_based_promotion_min_size or  2 * 
size_based_compaction_lower_bound_size "
+                         << should_min_size_based_promotion_size
+                         << ", because size_based_promotion_size is small";
+        }
+    }
+}
+
 void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, 
DataDir* data_dir) {
 #ifdef GOOGLE_PROFILER
     ProfilerRegisterThread();
@@ -290,7 +321,6 @@ void* 
StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir*
     LOG(INFO) << "try to start cumulative compaction process!";
 
     while (!_stop_bg_worker) {
-
         if (!config::disable_auto_compaction) {
             // must be here, because this thread is start on start and
             // cgroup is not initialized at this time
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 277deab..882f466 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -317,6 +317,10 @@ public:
         _rowset_meta_pb.set_segments_overlap_pb(segments_overlap);
     }
 
+    static bool comparator(const RowsetMetaSharedPtr& left, const 
RowsetMetaSharedPtr& right) {
+        return left->end_version() < right->end_version();
+    }
+
     // return true if segments in this rowset has overlapping data.
     // this is not same as `segments_overlap()` method.
     // `segments_overlap()` only return the value of "segments_overlap" field 
in rowset meta,
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index f7e9aa3..ed7bb5d 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -230,6 +230,8 @@ private:
     void* _base_compaction_thread_callback(void* arg, DataDir* data_dir);
     // cumulative process function
     void* _cumulative_compaction_thread_callback(void* arg, DataDir* data_dir);
+    // check cumulative compaction config
+    void _check_cumulative_compaction_config();
 
     // garbage sweep thread process function. clear snapshot and trash folder
     void* _garbage_sweeper_thread_callback(void* arg);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 2f54b96..24e42c4 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -56,14 +56,16 @@ TabletSharedPtr 
Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
     return std::make_shared<Tablet>(tablet_meta, data_dir);
 }
 
-Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) :
+Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
+               const std::string& cumulative_compaction_type) :
         BaseTablet(tablet_meta, data_dir),
         _is_bad(false),
         _last_cumu_compaction_failure_millis(0),
         _last_base_compaction_failure_millis(0),
         _last_cumu_compaction_success_millis(0),
         _last_base_compaction_success_millis(0),
-        _cumulative_point(kInvalidCumulativePoint) {
+        _cumulative_point(K_INVALID_CUMULATIVE_POINT),
+        _cumulative_compaction_type(cumulative_compaction_type) {
     // change _rs_graph to _timestamped_version_tracker
     
_timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas());
 
@@ -75,6 +77,13 @@ OLAPStatus Tablet::_init_once_action() {
     OLAPStatus res = OLAP_SUCCESS;
     VLOG(3) << "begin to load tablet. tablet=" << full_name()
             << ", version_size=" << _tablet_meta->version_count();
+
+    std::shared_ptr<Tablet> this_ptr = 
std::dynamic_pointer_cast<Tablet>(shared_from_this());
+    // init cumulative compaction policy by type
+    _cumulative_compaction_policy =
+            
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
+                    _cumulative_compaction_type, this_ptr);
+
     for (const auto& rs_meta :  _tablet_meta->all_rs_metas()) {
         Version version = rs_meta->version();
         RowsetSharedPtr rowset;
@@ -421,7 +430,7 @@ void Tablet::delete_expired_stale_rowset() {
         // When there is no consistent versions, we must reconstruct the 
tracker.
         if (status != OLAP_SUCCESS) {
             LOG(WARNING) << "The consistent version check fails, there are 
bugs. "
-                << "Reconstruct the tracker to recover versions in tablet=" << 
tablet_id();
+                         << "Reconstruct the tracker to recover versions in 
tablet=" << tablet_id();
 
             
_timestamped_version_tracker.recover_versioned_tracker(stale_version_path_map);
 
@@ -454,25 +463,27 @@ void Tablet::delete_expired_stale_rowset() {
                 // delete rowset
                 StorageEngine::instance()->add_unused_rowset(it->second);
                 _stale_rs_version_map.erase(it);
-                LOG(INFO) << "delete stale rowset tablet=" << full_name() 
-                    <<" version[" << timestampedVersion->version().first << 
"," << timestampedVersion->version().second 
-                    << "] move to unused_rowset success " << std::fixed << 
expired_stale_sweep_endtime;
+                LOG(INFO) << "delete stale rowset tablet=" << full_name() << " 
version["
+                          << timestampedVersion->version().first << ","
+                          << timestampedVersion->version().second
+                          << "] move to unused_rowset success " << std::fixed
+                          << expired_stale_sweep_endtime;
             } else {
-                LOG(WARNING) << "delete stale rowset tablet=" << full_name() 
-                    <<" version[" << timestampedVersion->version().first << 
"," << timestampedVersion->version().second 
-                    << "] not find in stale rs version map";
+                LOG(WARNING) << "delete stale rowset tablet=" << full_name() 
<< " version["
+                             << timestampedVersion->version().first << ","
+                             << timestampedVersion->version().second
+                             << "] not find in stale rs version map";
             }
             _delete_stale_rowset_by_version(timestampedVersion->version());
         }
         to_delete_iter++;
     }
-    LOG(INFO) << "delete stale rowset _stale_rs_version_map tablet="
-        << full_name() << " current_size=" << _stale_rs_version_map.size()
-        << " old_size=" << old_size
-        << " current_meta_size="  <<  _tablet_meta->all_stale_rs_metas().size()
-        << " old_meta_size=" << old_meta_size
-        << " sweep endtime " << std::fixed << expired_stale_sweep_endtime;
-        
+    LOG(INFO) << "delete stale rowset _stale_rs_version_map tablet=" << 
full_name()
+              << " current_size=" << _stale_rs_version_map.size() << " 
old_size=" << old_size
+              << " current_meta_size=" << 
_tablet_meta->all_stale_rs_metas().size()
+              << " old_meta_size=" << old_meta_size << " sweep endtime " << 
std::fixed
+              << expired_stale_sweep_endtime;
+
 #ifndef BE_TEST
     save_meta();
 #endif
@@ -665,22 +676,9 @@ bool Tablet::can_do_compaction() {
 
 const uint32_t Tablet::calc_cumulative_compaction_score() const {
     uint32_t score = 0;
-    bool base_rowset_exist = false;
-    const int64_t point = cumulative_layer_point();
-    for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
-        if (rs_meta->start_version() == 0) {
-            base_rowset_exist = true;
-        }
-        if (rs_meta->start_version() < point) {
-            // all_rs_metas() is not sorted, so we use _continue_ other than 
_break_ here.
-            continue;
-        }
-
-        score += rs_meta->get_compaction_score();
-    }
-
-    // base不存在可能是tablet正在做alter table,先不选它,设score=0
-    return base_rowset_exist ? score : 0;
+    _cumulative_compaction_policy->calc_cumulative_compaction_score(
+            _tablet_meta->all_rs_metas(), cumulative_layer_point(), &score);
+    return score;
 }
 
 const uint32_t Tablet::calc_base_compaction_score() const {
@@ -790,40 +788,16 @@ void 
Tablet::_max_continuous_version_from_begining_unlocked(Version* version,
 
 void Tablet::calculate_cumulative_point() {
     WriteLock wrlock(&_meta_lock);
-    if (_cumulative_point != kInvalidCumulativePoint) {
-        // only calculate the point once.
-        // after that, cumulative point will be updated along with compaction 
process.
-        return;
-    }
-
-    std::list<RowsetMetaSharedPtr> existing_rss;
-    for (auto& rs : _tablet_meta->all_rs_metas()) {
-        existing_rss.emplace_back(rs);
-    }
-
-    // sort the existing rowsets by version in ascending order
-    existing_rss.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
-        // simple because 2 versions are certainly not overlapping
-        return a->version().first < b->version().first;
-    });
 
-    int64_t prev_version = -1;
-    for (const RowsetMetaSharedPtr& rs : existing_rss) {
-        if (rs->version().first > prev_version + 1) {
-            // There is a hole, do not continue
-            break;
-        }
+    int64_t ret_cumulative_point;
+    _cumulative_compaction_policy->calculate_cumulative_point(
+            _tablet_meta->all_rs_metas(), _cumulative_point, 
&ret_cumulative_point);
 
-        bool is_delete = version_for_delete_predicate(rs->version());
-        // break the loop if segments in this rowset is overlapping, or is a 
singleton and not delete rowset.
-        if (rs->is_segments_overlapping() || (rs->is_singleton_delta() && 
!is_delete)) {
-            _cumulative_point = rs->version().first;
-            break;
-        }
-
-        prev_version = rs->version().second;
-        _cumulative_point = prev_version + 1;
+    if(ret_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
+        return;
     }
+
+    _cumulative_point = ret_cumulative_point;
 }
 
 OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings,
@@ -994,15 +968,10 @@ TabletInfo Tablet::get_tablet_info() const {
     return TabletInfo(tablet_id(), schema_hash(), tablet_uid());
 }
 
-void Tablet::pick_candicate_rowsets_to_cumulative_compaction(int64_t 
skip_window_sec,
-                                                             
std::vector<RowsetSharedPtr>* candidate_rowsets) {
-    int64_t now = UnixSeconds();
+void Tablet::pick_candicate_rowsets_to_cumulative_compaction(
+        int64_t skip_window_sec, std::vector<RowsetSharedPtr>* 
candidate_rowsets) {
     ReadLock rdlock(&_meta_lock);
-    for (auto& it : _rs_version_map) {
-        if (it.first.first >= _cumulative_point && (it.second->creation_time() 
+ skip_window_sec < now)) {
-            candidate_rowsets->push_back(it.second);
-        }
-    }
+    _cumulative_compaction_policy->pick_candicate_rowsets(skip_window_sec, 
_rs_version_map, _cumulative_point, candidate_rowsets);
 }
 
 void 
Tablet::pick_candicate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* 
candidate_rowsets) {
@@ -1064,9 +1033,10 @@ void Tablet::get_compaction_status(std::string* 
json_result) {
     for (int i = 0; i < rowsets.size(); ++i) {
         const Version& ver = rowsets[i]->version();
         rapidjson::Value value;
-        std::string version_str = strings::Substitute("[$0-$1] $2 $3 $4",
+        std::string version_str = strings::Substitute("[$0-$1] $2 $3 $4 $5",
             ver.first, ver.second, rowsets[i]->num_segments(), 
(delete_flags[i] ? "DELETE" : "DATA"),
-            
SegmentsOverlapPB_Name(rowsets[i]->rowset_meta()->segments_overlap()));
+            
SegmentsOverlapPB_Name(rowsets[i]->rowset_meta()->segments_overlap()), 
+            rowsets[i]->rowset_meta()->total_disk_size());
         value.SetString(version_str.c_str(), version_str.length(), 
versions_arr.GetAllocator());
         versions_arr.PushBack(value, versions_arr.GetAllocator());
     }
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 83deaa0..3da8ae8 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -36,6 +36,7 @@
 #include "olap/tablet_meta.h"
 #include "olap/utils.h"
 #include "olap/base_tablet.h"
+#include "olap/cumulative_compaction_policy.h"
 #include "util/once.h"
 
 namespace doris {
@@ -43,6 +44,7 @@ namespace doris {
 class DataDir;
 class Tablet;
 class TabletMeta;
+class CumulativeCompactionPolicy;
 
 using TabletSharedPtr = std::shared_ptr<Tablet>;
 
@@ -51,7 +53,8 @@ public:
     static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr 
tablet_meta,
                                                    DataDir* data_dir = 
nullptr);
 
-    Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
+    Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
+           const std::string& cumulative_compaction_type = 
config::cumulative_compaction_policy);
 
     OLAPStatus init();
     inline bool init_succeeded();
@@ -73,6 +76,7 @@ public:
     inline size_t num_rows();
     inline int version_count() const;
     inline Version max_version() const;
+    inline CumulativeCompactionPolicy* cumulative_compaction_policy();
 
     // propreties encapsulated in TabletSchema
     inline KeysType keys_type() const;
@@ -202,8 +206,8 @@ public:
 
     TabletInfo get_tablet_info() const;
 
-    void pick_candicate_rowsets_to_cumulative_compaction(int64_t 
skip_window_sec,
-                                                         
std::vector<RowsetSharedPtr>* candidate_rowsets);
+    void pick_candicate_rowsets_to_cumulative_compaction(
+            int64_t skip_window_sec, std::vector<RowsetSharedPtr>* 
candidate_rowsets);
     void 
pick_candicate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* 
candidate_rowsets);
 
     void calculate_cumulative_point();
@@ -243,9 +247,10 @@ private:
     OLAPStatus _capture_consistent_rowsets_unlocked(const vector<Version>& 
version_path,
                                                     vector<RowsetSharedPtr>* 
rowsets) const;
 
-private:
-    static const int64_t kInvalidCumulativePoint = -1;
+public:
+    static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
 
+private:
     TimestampedVersionTracker _timestamped_version_tracker;
     
     DorisCallOnce<OLAPStatus> _init_once;
@@ -290,6 +295,10 @@ private:
     std::atomic<int64_t> _cumulative_point;
     std::atomic<int32_t> _newly_created_rowset_num;
     std::atomic<int64_t> _last_checkpoint_time;
+
+    // cumulative compaction policy
+    std::unique_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
+    std::string _cumulative_compaction_type;
     DISALLOW_COPY_AND_ASSIGN(Tablet);
 
 public:
@@ -297,6 +306,10 @@ public:
     IntCounter flush_count;
 };
 
+inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
+    return _cumulative_compaction_policy.get();
+}
+
 inline bool Tablet::init_succeeded() {
     return _init_once.has_called() && _init_once.stored_result() == 
OLAP_SUCCESS;
 }
diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp
index ad57f71..a64153c 100644
--- a/be/src/olap/version_graph.cpp
+++ b/be/src/olap/version_graph.cpp
@@ -167,6 +167,7 @@ void TimestampedVersionTracker::capture_expired_paths(
         }
         iter++;
     }
+    
 }
 
 PathVersionListSharedPtr 
TimestampedVersionTracker::fetch_path_version_by_id(int64_t path_id) {
@@ -417,11 +418,11 @@ OLAPStatus 
VersionGraph::capture_consistent_versions(const Version& spec_version
                 if (_version_graph[top_vertex_index].value > 
_version_graph[it].value) {
                     continue;
                 }
-
                 visited[it] = true;
                 predecessor[it] = top_vertex_index;
                 bfs_queue.push(it);
             }
+            
         }
     }
 
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp 
b/be/test/olap/cumulative_compaction_policy_test.cpp
new file mode 100644
index 0000000..3792a1d
--- /dev/null
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -0,0 +1,1022 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <sstream>
+
+#include "olap/tablet_meta.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/cumulative_compaction_policy.h"
+
+namespace doris {
+
+class TestNumBasedCumulativeCompactionPolicy : public testing::Test {
+
+public:
+    TestNumBasedCumulativeCompactionPolicy() {}
+    void SetUp() {
+        _tablet_meta = static_cast<TabletMetaSharedPtr>(
+                new TabletMeta(1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 
8}}, UniqueId(9, 10),
+                               TTabletType::TABLET_TYPE_DISK));
+
+        _json_rowset_meta = R"({
+            "rowset_id": 540081,
+            "tablet_id": 15673,
+            "txn_id": 4042,
+            "tablet_schema_hash": 567997577,
+            "rowset_type": "BETA_ROWSET",
+            "rowset_state": "VISIBLE",
+            "start_version": 2,
+            "end_version": 2,
+            "version_hash": 8391828013814912580,
+            "num_rows": 3929,
+            "total_disk_size": 84699,
+            "data_disk_size": 84464,
+            "index_disk_size": 235,
+            "empty": false,
+            "load_id": {
+                "hi": -5350970832824939812,
+                "lo": -6717994719194512122
+            },
+            "creation_time": 1553765670,
+            "alpha_rowset_extra_meta_pb": {
+                "segment_groups": [
+                {
+                    "segment_group_id": 0,
+                    "num_segments": 2,
+                    "index_size": 132,
+                    "data_size": 576,
+                    "num_rows": 5,
+                    "zone_maps": [
+                    {
+                        "min": "MQ==",
+                        "max": "NQ==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "MQ==",
+                        "max": "Mw==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "J2J1c2gn",
+                        "max": "J3RvbSc=",
+                        "null_flag": false
+                    }
+                    ],
+                    "empty": false
+                },
+                {
+                    "segment_group_id": 1,
+                    "num_segments": 1,
+                    "index_size": 132,
+                    "data_size": 576,
+                    "num_rows": 5,
+                    "zone_maps": [
+                    {
+                        "min": "MQ==",
+                        "max": "NQ==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "MQ==",
+                        "max": "Mw==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "J2J1c2gn",
+                        "max": "J3RvbSc=",
+                        "null_flag": false
+                    }
+                    ],
+                    "empty": false
+                }
+                ]
+            }
+        })";
+    }
+    void TearDown() {}
+
+    void init_rs_meta(RowsetMetaSharedPtr &pb1, int64_t start, int64_t end) {
+
+        pb1->init_from_json(_json_rowset_meta);
+        pb1->set_start_version(start);
+        pb1->set_end_version(end);
+        pb1->set_creation_time(10000);
+    }
+
+    void init_all_rs_meta(std::vector<RowsetMetaSharedPtr>* rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 0);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 1, 1);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 2, 2);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 3, 3);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 4, 4);
+        rs_metas->push_back(ptr5);
+    }
+
+    void init_all_rs_meta_cal_point(std::vector<RowsetMetaSharedPtr>* 
rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 4);
+        ptr3->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 5, 5);
+        ptr4->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr4);
+    }
+
+    void init_all_rs_meta_delete(std::vector<RowsetMetaSharedPtr>* rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 4);
+        ptr3->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 5, 5);
+        DeletePredicatePB del;
+        del.add_sub_predicates("a = 1");
+        del.set_version(5);
+        ptr4->set_delete_predicate(del);
+        ptr4->set_segments_overlap(OVERLAP_UNKNOWN);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 6, 6);
+        ptr5->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr5);
+    }
+
+protected:
+    std::string _json_rowset_meta;
+    TabletMetaSharedPtr _tablet_meta;
+};
+
+TEST_F(TestNumBasedCumulativeCompactionPolicy, 
calc_cumulative_compaction_score) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_all_rs_meta(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_NUM_BASED_POLICY));
+    _tablet->init();
+
+    const uint32_t score = _tablet->calc_cumulative_compaction_score();
+    
+    ASSERT_EQ(15, score);
+}
+
+TEST_F(TestNumBasedCumulativeCompactionPolicy, calculate_cumulative_point) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_all_rs_meta_cal_point(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_NUM_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+    
+    ASSERT_EQ(4, _tablet->cumulative_layer_point());
+}
+
+TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_candicate_rowsets) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_all_rs_meta_cal_point(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_NUM_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    ASSERT_EQ(2, candidate_rowsets.size());
+}
+
+TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_all_rs_meta_cal_point(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_NUM_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    NumBasedCumulativeCompactionPolicy policy(_tablet.get());
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+    
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version{-1, -1};
+    size_t compaction_score = 0;
+    policy.pick_input_rowsets(candidate_rowsets, 10, 5, &input_rowsets, 
&last_delete_version,
+                              &compaction_score);
+
+    ASSERT_EQ(2, input_rowsets.size());
+    ASSERT_EQ(6, compaction_score);
+    ASSERT_EQ(-1, last_delete_version.first);
+    ASSERT_EQ(-1, last_delete_version.second);
+}
+
+TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_all_rs_meta_delete(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_NUM_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    NumBasedCumulativeCompactionPolicy policy(_tablet.get());
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version{-1, -1};
+    size_t compaction_score = 0;
+
+    policy.pick_input_rowsets(candidate_rowsets, 10, 5, &input_rowsets, 
&last_delete_version,
+                              &compaction_score);
+
+    ASSERT_EQ(1, input_rowsets.size());
+    ASSERT_EQ(3, compaction_score);
+    ASSERT_EQ(5, last_delete_version.first);
+    ASSERT_EQ(5, last_delete_version.second);
+}
+
+class TestSizeBasedCumulativeCompactionPolicy : public testing::Test {
+
+public:
+    TestSizeBasedCumulativeCompactionPolicy() {}
+    void SetUp() {
+        config::cumulative_size_based_promotion_size_mbytes = 1024;
+        config::cumulative_size_based_promotion_ratio = 0.05;
+        config::cumulative_size_based_promotion_min_size_mbytes = 64;
+        config::cumulative_size_based_compaction_lower_size_mbytes = 64;
+
+        _tablet_meta = static_cast<TabletMetaSharedPtr>(
+                new TabletMeta(1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 
8}}, UniqueId(9, 10),
+                               TTabletType::TABLET_TYPE_DISK));
+
+        _json_rowset_meta = R"({
+            "rowset_id": 540081,
+            "tablet_id": 15673,
+            "txn_id": 4042,
+            "tablet_schema_hash": 567997577,
+            "rowset_type": "BETA_ROWSET",
+            "rowset_state": "VISIBLE",
+            "start_version": 2,
+            "end_version": 2,
+            "version_hash": 8391828013814912580,
+            "num_rows": 3929,
+            "total_disk_size": 41,
+            "data_disk_size": 41,
+            "index_disk_size": 235,
+            "empty": false,
+            "load_id": {
+                "hi": -5350970832824939812,
+                "lo": -6717994719194512122
+            },
+            "creation_time": 1553765670,
+            "alpha_rowset_extra_meta_pb": {
+                "segment_groups": [
+                {
+                    "segment_group_id": 0,
+                    "num_segments": 2,
+                    "index_size": 132,
+                    "data_size": 576,
+                    "num_rows": 5,
+                    "zone_maps": [
+                    {
+                        "min": "MQ==",
+                        "max": "NQ==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "MQ==",
+                        "max": "Mw==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "J2J1c2gn",
+                        "max": "J3RvbSc=",
+                        "null_flag": false
+                    }
+                    ],
+                    "empty": false
+                },
+                {
+                    "segment_group_id": 1,
+                    "num_segments": 1,
+                    "index_size": 132,
+                    "data_size": 576,
+                    "num_rows": 5,
+                    "zone_maps": [
+                    {
+                        "min": "MQ==",
+                        "max": "NQ==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "MQ==",
+                        "max": "Mw==",
+                        "null_flag": false
+                    },
+                    {
+                        "min": "J2J1c2gn",
+                        "max": "J3RvbSc=",
+                        "null_flag": false
+                    }
+                    ],
+                    "empty": false
+                }
+                ]
+            }
+        })";
+    }
+    void TearDown() {}
+
+    void init_rs_meta(RowsetMetaSharedPtr &pb1, int64_t start, int64_t end) {
+
+        pb1->init_from_json(_json_rowset_meta);
+        pb1->set_start_version(start);
+        pb1->set_end_version(end);
+        pb1->set_total_disk_size(41);
+        pb1->set_creation_time(10000);
+    }
+
+    void init_rs_meta_small_base(std::vector<RowsetMetaSharedPtr>* rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 0);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 1, 1);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 2, 2);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 3, 3);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 4, 4);
+        rs_metas->push_back(ptr5);
+    }
+
+    void init_rs_meta_big_base(std::vector<RowsetMetaSharedPtr>* rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_total_disk_size(1024*1024*1024);
+        ptr1->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_total_disk_size(65*1024*1024);
+        ptr2->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 5);
+        ptr3->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 6, 6);
+        ptr4->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 7, 7);
+        rs_metas->push_back(ptr5);
+    }
+
+    void init_rs_meta_pick_promotion(std::vector<RowsetMetaSharedPtr>* 
rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_total_disk_size(1024*1024*1024);
+        ptr1->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_total_disk_size(65*1024*1024);
+        ptr2->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 5);
+        ptr3->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 6, 6);
+        ptr4->set_total_disk_size(65*1024*1024);
+        ptr4->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr4);
+    }
+
+    void init_rs_meta_pick_not_same_level(std::vector<RowsetMetaSharedPtr>* 
rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_total_disk_size(21474836480L); // 20G
+        ptr1->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_total_disk_size(129*1024*1024);
+        ptr2->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 5);
+        ptr3->set_total_disk_size(12*1024*1024);
+        ptr3->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 6, 6);
+        ptr4->set_segments_overlap(OVERLAPPING);
+        ptr4->set_total_disk_size(12*1024*1024);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 7, 7);
+        rs_metas->push_back(ptr5);
+
+        RowsetMetaSharedPtr ptr6(new RowsetMeta());
+        init_rs_meta(ptr6, 8, 8);
+        rs_metas->push_back(ptr6);
+    }
+
+    void init_rs_meta_pick_empty(std::vector<RowsetMetaSharedPtr>* rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_total_disk_size(21474836480L); // 20G
+        ptr1->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_total_disk_size(257*1024*1024);
+        ptr2->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 5);
+        ptr3->set_total_disk_size(129*1024*1024);
+        ptr3->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        ptr4->set_total_disk_size(65*1024*1024);
+        init_rs_meta(ptr4, 6, 6);
+        ptr4->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr4);
+
+    }
+
+    void 
init_rs_meta_pick_empty_not_reach_min_limit(std::vector<RowsetMetaSharedPtr>* 
rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_total_disk_size(21474836480L); // 20G
+        ptr1->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_total_disk_size(257*1024*1024);
+        ptr2->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 5);
+        ptr3->set_total_disk_size(1*1024*1024);
+        ptr3->set_num_segments(1);
+        ptr3->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 6, 6);
+        ptr4->set_total_disk_size(1*1024*1024);
+        ptr4->set_num_segments(1);
+        ptr4->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 7, 7);
+        ptr5->set_total_disk_size(1*1024*1024);
+        ptr5->set_num_segments(1);
+        ptr5->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr5);
+
+    }
+
+    void init_all_rs_meta_cal_point(std::vector<RowsetMetaSharedPtr>* 
rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 4);
+        ptr3->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 5, 5);
+        ptr4->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr4);
+    }
+
+    void init_all_rs_meta_delete(std::vector<RowsetMetaSharedPtr>* rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 1);
+        ptr1->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 2, 3);
+        ptr2->set_segments_overlap(NONOVERLAPPING);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 4, 4);
+        ptr3->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr4(new RowsetMeta());
+        init_rs_meta(ptr4, 5, 5);
+        DeletePredicatePB del;
+        del.add_sub_predicates("a = 1");
+        del.set_version(5);
+        ptr4->set_delete_predicate(del);
+        ptr4->set_segments_overlap(OVERLAP_UNKNOWN);
+        rs_metas->push_back(ptr4);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 6, 6);
+        ptr5->set_segments_overlap(OVERLAPPING);
+        rs_metas->push_back(ptr5);
+    }
+
+protected:
+    std::string _json_rowset_meta;
+    TabletMetaSharedPtr _tablet_meta;
+};
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
calc_cumulative_compaction_score) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_small_base(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    const uint32_t score = _tablet->calc_cumulative_compaction_score();
+
+    ASSERT_EQ(15, score);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
calc_cumulative_compaction_score_big_base) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_big_base(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+    const uint32_t score = _tablet->calc_cumulative_compaction_score();
+
+    ASSERT_EQ(7, score);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
calculate_cumulative_point_big_base) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_big_base(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+ 
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+    
+    ASSERT_EQ(4, _tablet->cumulative_layer_point());
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
calculate_cumulative_point_overlap) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_all_rs_meta_cal_point(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+    
+    ASSERT_EQ(2, _tablet->cumulative_layer_point());
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candicate_rowsets) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_all_rs_meta_cal_point(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    ASSERT_EQ(3, candidate_rowsets.size());
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_candicate_rowsets_big_base) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_big_base(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    ASSERT_EQ(3, candidate_rowsets.size());
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_small_base(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version{-1, -1};
+    size_t compaction_score = 0;
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            candidate_rowsets, 10, 5, &input_rowsets, &last_delete_version, 
&compaction_score);
+
+    ASSERT_EQ(4, input_rowsets.size());
+    ASSERT_EQ(12, compaction_score);
+    ASSERT_EQ(-1, last_delete_version.first);
+    ASSERT_EQ(-1, last_delete_version.second);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_big_base) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_big_base(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version{-1, -1};
+    size_t compaction_score = 0;
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            candidate_rowsets, 10, 5, &input_rowsets, &last_delete_version, 
&compaction_score);
+
+    ASSERT_EQ(3, input_rowsets.size());
+    ASSERT_EQ(7, compaction_score);
+    ASSERT_EQ(-1, last_delete_version.first);
+    ASSERT_EQ(-1, last_delete_version.second);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_promotion) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_pick_promotion(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version{-1, -1};
+    size_t compaction_score = 0;
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            candidate_rowsets, 10, 5, &input_rowsets, &last_delete_version, 
&compaction_score);
+
+    ASSERT_EQ(2, input_rowsets.size());
+    ASSERT_EQ(4, compaction_score);
+    ASSERT_EQ(-1, last_delete_version.first);
+    ASSERT_EQ(-1, last_delete_version.second);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_not_same_level) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_pick_not_same_level(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version{-1, -1};
+    size_t compaction_score = 0;
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            candidate_rowsets, 10, 5, &input_rowsets, &last_delete_version, 
&compaction_score);
+
+    ASSERT_EQ(4, input_rowsets.size());
+    ASSERT_EQ(10, compaction_score);
+    ASSERT_EQ(-1, last_delete_version.first);
+    ASSERT_EQ(-1, last_delete_version.second);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_pick_empty(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version{-1, -1};
+    size_t compaction_score = 0;
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            candidate_rowsets, 10, 5, &input_rowsets, &last_delete_version, 
&compaction_score);
+
+    ASSERT_EQ(0, input_rowsets.size());
+    ASSERT_EQ(0, compaction_score);
+    ASSERT_EQ(-1, last_delete_version.first);
+    ASSERT_EQ(-1, last_delete_version.second);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
pick_input_rowsets_not_reach_min_limit) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_pick_empty_not_reach_min_limit(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version{-1, -1};
+    size_t compaction_score = 0;
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            candidate_rowsets, 10, 5, &input_rowsets, &last_delete_version, 
&compaction_score);
+
+    ASSERT_EQ(0, input_rowsets.size());
+    ASSERT_EQ(0, compaction_score);
+    ASSERT_EQ(-1, last_delete_version.first);
+    ASSERT_EQ(-1, last_delete_version.second);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_all_rs_meta_delete(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    
+    _tablet->pick_candicate_rowsets_to_cumulative_compaction(1000, 
&candidate_rowsets);
+
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version last_delete_version{-1, -1};
+    size_t compaction_score = 0;
+
+    _tablet->_cumulative_compaction_policy->pick_input_rowsets(
+            candidate_rowsets, 10, 5, &input_rowsets, &last_delete_version, 
&compaction_score);
+
+    ASSERT_EQ(2, input_rowsets.size());
+    ASSERT_EQ(4, compaction_score);
+    ASSERT_EQ(5, last_delete_version.first);
+    ASSERT_EQ(5, last_delete_version.second);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_big) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_pick_not_same_level(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    SizeBasedCumulativeCompactionPolicy* policy =
+            dynamic_cast<SizeBasedCumulativeCompactionPolicy*>(
+                    _tablet->_cumulative_compaction_policy.get());
+
+    ASSERT_EQ(1073741824, policy->_tablet_size_based_promotion_size);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_small) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_small_base(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+    _tablet->calculate_cumulative_point();
+
+    SizeBasedCumulativeCompactionPolicy* policy =
+            dynamic_cast<SizeBasedCumulativeCompactionPolicy*>(
+                    _tablet->_cumulative_compaction_policy.get());
+    ASSERT_EQ(67108864, policy->_tablet_size_based_promotion_size);
+}
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) {
+
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_small_base(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+
+    SizeBasedCumulativeCompactionPolicy* policy =
+            dynamic_cast<SizeBasedCumulativeCompactionPolicy*>(
+                    _tablet->_cumulative_compaction_policy.get());
+
+    ASSERT_EQ(4, policy->_levels.size());
+    ASSERT_EQ(536870912, policy->_levels[0]);
+    ASSERT_EQ(268435456, policy->_levels[1]);
+    ASSERT_EQ(134217728, policy->_levels[2]);
+    ASSERT_EQ(67108864, policy->_levels[3]);
+}
+}
+
+// @brief Test Stub
+int main(int argc, char** argv) {
+    testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS(); 
+}
\ No newline at end of file
diff --git a/docs/en/administrator-guide/config/be_config.md 
b/docs/en/administrator-guide/config/be_config.md
index f9f30e3..1beaa46 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -156,6 +156,46 @@ Generally it needs to be turned off. When you want to 
manually operate the compa
 
 Similar to `base_compaction_trace_threshold`.
 
+### `cumulative_compaction_policy`
+
+* Type: string
+* Description: Configure the merge policy of the cumulative compaction stage. 
Currently, two merge policy have been implemented, num_based and size_based.
+* Default value: size_based
+
+In detail, ordinary is the initial version of the cumulative compaction merge 
policy. After a cumulative compaction, the base compaction process is directly 
performed. The size_based policy is an optimized version of the ordinary 
strategy. Versions are merged only when the disk volume of the rowset is of the 
same order of magnitude. After the compaction, the output rowset which satifies 
the conditions is promoted to the base compaction stage. In the case of a large 
number of small batch  [...]
+
+### `cumulative_size_based_promotion_size_mbytes`
+
+* Type: int64
+* Description: Under the size_based policy, the total disk size of the output 
rowset of cumulative compaction exceeds this configuration size, and the rowset 
will be used for base compaction. The unit is m bytes.
+* Default value: 1024
+
+In general, if the configuration is less than 2G, in order to prevent the 
cumulative compression time from being too long, resulting in the version 
backlog.
+
+### `cumulative_size_based_promotion_ratio`
+
+* Type: double
+* Description: Under the size_based policy, when the total disk size of the 
cumulative compaction output rowset exceeds the configuration ratio of the base 
version rowset, the rowset will be used for base compaction.
+* Default value: 0.05
+
+Generally, it is recommended that the configuration should not be higher than 
0.1 and lower than 0.02.
+
+### `cumulative_size_based_promotion_min_size_mbytes`
+
+* Type: int64
+* Description: Under the size_based strategy, if the total disk size of the 
output rowset of the cumulative compaction is lower than this configuration 
size, the rowset will not undergo base compaction and is still in the 
cumulative compaction process. The unit is m bytes.
+* Default value: 64
+
+Generally, the configuration is within 512m. If the configuration is too 
large, the size of the early base version is too small, and base compaction has 
not been performed.
+
+### `cumulative_size_based_compaction_lower_size_mbytes`
+
+* Type: int64
+* Description: Under the size_based strategy, when the cumulative compaction 
is merged, the selected rowsets to be merged have a larger disk size than this 
configuration, then they are divided and merged according to the level policy. 
When it is smaller than this configuration, merge directly. The unit is m bytes.
+* Default value: 64
+
+Generally, the configuration is within 128m. Over configuration will cause 
more cumulative compaction write amplification.
+
 ### `default_num_rows_per_column_file_block`
 
 ### `default_query_options`
@@ -256,7 +296,7 @@ The default value is `false`.
 ### ignore_rowset_stale_unconsistent_delete
 
 * Type: boolean
-* Description:It is used to decide whether to delete the outdated merged 
rowset if it cannot form a consistent version path.
+* Description:It is used to decide whether to delete the outdated merged 
rowset if it cannot form a consistent version path.
 * Default: false
 
 The merged expired rowset version path will be deleted after half an hour. In 
abnormal situations, deleting these versions will result in the problem that 
the consistent path of the query cannot be constructed. When the configuration 
is false, the program check is strict and the program will directly report an 
error and exit.
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md 
b/docs/zh-CN/administrator-guide/config/be_config.md
index 98269cf..a5dd8a9 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -154,6 +154,46 @@ Metrics: 
{"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 
 与base_compaction_trace_threshold类似。
 
+### `cumulative_compaction_policy`
+
+* 类型:string
+* 描述:配置 cumulative compaction 阶段的合并策略,目前实现了两种合并策略,num_based和size_based
+* 默认值:size_based
+
+详细说明,ordinary,是最初版本的cumulative compaction合并策略,做一次cumulative compaction之后直接base 
compaction流程。size_based,通用策略是ordinary策略的优化版本,仅当rowset的磁盘体积在相同数量级时才进行版本合并。合并之后满足条件的rowset进行晋升到base
 compaction阶段。能够做到在大量小批量导入的情况下:降低base 
compact的写入放大率,并在读取放大率和空间放大率之间进行权衡,同时减少了文件版本的数据。
+
+### `cumulative_size_based_promotion_size_mbytes`
+
+* 类型:int64
+* 描述:在size_based策略下,cumulative compaction的输出rowset总磁盘大小超过了此配置大小,该rowset将用于base 
compaction。单位是m字节。
+* 默认值:1024
+
+一般情况下,配置在2G以内,为了防止cumulative compaction时间过长,导致版本积压。
+
+### `cumulative_size_based_promotion_ratio`
+
+* 类型:double
+* 描述:在size_based策略下,cumulative 
compaction的输出rowset总磁盘大小超过base版本rowset的配置比例时,该rowset将用于base compaction。
+* 默认值:0.05
+
+一般情况下,建议配置不要高于0.1,低于0.02。
+
+### `cumulative_size_based_promotion_min_size_mbytes`
+
+* 类型:int64
+* 描述:在size_based策略下,cumulative compaction的输出rowset总磁盘大小低于此配置大小,该rowset将不进行base 
compaction,仍然处于cumulative compaction流程中。单位是m字节。
+* 默认值:64
+
+一般情况下,配置在512m以内,配置过大会导致base版本早期的大小过小,一直不进行base compaction。
+
+### `cumulative_size_based_compaction_lower_size_mbytes`
+
+* 类型:int64
+* 描述:在size_based策略下,cumulative 
compaction进行合并时,选出的要进行合并的rowset的总磁盘大小大于此配置时,才按级别策略划分合并。小于这个配置时,直接执行合并。单位是m字节。
+* 默认值:64
+
+一般情况下,配置在128m以内,配置过大会导致cumulative compaction写放大较多。
+
 ### `default_num_rows_per_column_file_block`
 
 ### `default_query_options`


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to