This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 88170a94e8737e19696f614b1ea21ca5d0f407d3
Author: zzzxl1993 <474696...@qq.com>
AuthorDate: Tue Mar 19 18:49:43 2024 +0800

    (invert index) modify of time series compaction policy
---
 be/src/agent/task_worker_pool.cpp                  |  23 +++--
 be/src/olap/cumulative_compaction.cpp              |   8 +-
 be/src/olap/cumulative_compaction_policy.h         |   8 ++
 .../cumulative_compaction_time_series_policy.cpp   | 108 +++++++++++++++++++--
 .../cumulative_compaction_time_series_policy.h     |   4 +
 be/src/olap/full_compaction.cpp                    |   6 +-
 be/src/olap/rowset/rowset_meta.h                   |   6 ++
 be/src/olap/tablet_meta.cpp                        |  17 +++-
 be/src/olap/tablet_meta.h                          |  10 +-
 .../Create/CREATE-TABLE.md                         |   9 ++
 .../Create/CREATE-TABLE.md                         |   8 ++
 .../main/java/org/apache/doris/alter/Alter.java    |   4 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |   1 +
 .../apache/doris/alter/SchemaChangeHandler.java    |   7 ++
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |   1 +
 .../analysis/ModifyTablePropertiesClause.java      |  17 ++++
 .../java/org/apache/doris/backup/RestoreJob.java   |   1 +
 .../main/java/org/apache/doris/catalog/Env.java    |  11 ++-
 .../java/org/apache/doris/catalog/OlapTable.java   |  14 +++
 .../org/apache/doris/catalog/TableProperty.java    |  18 +++-
 .../apache/doris/common/util/PropertyAnalyzer.java |  29 +++++-
 .../apache/doris/datasource/InternalCatalog.java   |  20 +++-
 .../org/apache/doris/master/ReportHandler.java     |   1 +
 .../org/apache/doris/task/CreateReplicaTask.java   |   5 +
 .../doris/task/UpdateTabletMetaInfoTask.java       |   5 +
 .../java/org/apache/doris/task/AgentTaskTest.java  |   2 +-
 gensrc/proto/olap_file.proto                       |   2 +
 gensrc/thrift/AgentService.thrift                  |   2 +
 28 files changed, 320 insertions(+), 27 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 00c16cfa12d..34918a5b0a2 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -52,6 +52,7 @@
 #include "io/fs/local_file_system.h"
 #include "io/fs/path.h"
 #include "io/fs/s3_file_system.h"
+#include "olap/cumulative_compaction_time_series_policy.h"
 #include "olap/data_dir.h"
 #include "olap/olap_common.h"
 #include "olap/rowset/rowset_meta.h"
@@ -691,8 +692,8 @@ void update_tablet_meta_callback(StorageEngine& engine, 
const TAgentTaskRequest&
             need_to_save = true;
         }
         if (tablet_meta_info.__isset.compaction_policy) {
-            if (tablet_meta_info.compaction_policy != "size_based" &&
-                tablet_meta_info.compaction_policy != "time_series") {
+            if (tablet_meta_info.compaction_policy != 
CUMULATIVE_SIZE_BASED_POLICY &&
+                tablet_meta_info.compaction_policy != 
CUMULATIVE_TIME_SERIES_POLICY) {
                 status = Status::InvalidArgument(
                         "invalid compaction policy, only support for 
size_based or "
                         "time_series");
@@ -702,7 +703,7 @@ void update_tablet_meta_callback(StorageEngine& engine, 
const TAgentTaskRequest&
             need_to_save = true;
         }
         if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
-            if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+            if (tablet->tablet_meta()->compaction_policy() != 
CUMULATIVE_TIME_SERIES_POLICY) {
                 status = Status::InvalidArgument(
                         "only time series compaction policy support time 
series config");
                 continue;
@@ -712,7 +713,7 @@ void update_tablet_meta_callback(StorageEngine& engine, 
const TAgentTaskRequest&
             need_to_save = true;
         }
         if 
(tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
-            if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+            if (tablet->tablet_meta()->compaction_policy() != 
CUMULATIVE_TIME_SERIES_POLICY) {
                 status = Status::InvalidArgument(
                         "only time series compaction policy support time 
series config");
                 continue;
@@ -722,7 +723,7 @@ void update_tablet_meta_callback(StorageEngine& engine, 
const TAgentTaskRequest&
             need_to_save = true;
         }
         if 
(tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
-            if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+            if (tablet->tablet_meta()->compaction_policy() != 
CUMULATIVE_TIME_SERIES_POLICY) {
                 status = Status::InvalidArgument(
                         "only time series compaction policy support time 
series config");
                 continue;
@@ -732,7 +733,7 @@ void update_tablet_meta_callback(StorageEngine& engine, 
const TAgentTaskRequest&
             need_to_save = true;
         }
         if 
(tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) {
-            if (tablet->tablet_meta()->compaction_policy() != "time_series") {
+            if (tablet->tablet_meta()->compaction_policy() != 
CUMULATIVE_TIME_SERIES_POLICY) {
                 status = Status::InvalidArgument(
                         "only time series compaction policy support time 
series config");
                 continue;
@@ -741,6 +742,16 @@ void update_tablet_meta_callback(StorageEngine& engine, 
const TAgentTaskRequest&
                     
tablet_meta_info.time_series_compaction_empty_rowsets_threshold);
             need_to_save = true;
         }
+        if (tablet_meta_info.__isset.time_series_compaction_level_threshold) {
+            if (tablet->tablet_meta()->compaction_policy() != 
CUMULATIVE_TIME_SERIES_POLICY) {
+                status = Status::InvalidArgument(
+                        "only time series compaction policy support time 
series config");
+                continue;
+            }
+            tablet->tablet_meta()->set_time_series_compaction_level_threshold(
+                    tablet_meta_info.time_series_compaction_level_threshold);
+            need_to_save = true;
+        }
         if (tablet_meta_info.__isset.replica_id) {
             tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
         }
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index 1f54c1f3285..42748012cab 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -81,13 +81,17 @@ Status CumulativeCompaction::execute_compact_impl() {
     // 4. set state to success
     _state = CompactionState::SUCCESS;
 
-    // 5. set cumulative point
+    // 5. set cumulative level
+    
_tablet->cumulative_compaction_policy()->update_compaction_level(_tablet.get(), 
_input_rowsets,
+                                                                     
_output_rowset);
+
+    // 6. set cumulative point
     _tablet->cumulative_compaction_policy()->update_cumulative_point(
             _tablet.get(), _input_rowsets, _output_rowset, 
_last_delete_version);
     VLOG_CRITICAL << "after cumulative compaction, current cumulative point is 
"
                   << _tablet->cumulative_layer_point() << ", tablet=" << 
_tablet->tablet_id();
 
-    // 6. add metric to cumulative compaction
+    // 7. add metric to cumulative compaction
     
DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
     
DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size);
 
diff --git a/be/src/olap/cumulative_compaction_policy.h 
b/be/src/olap/cumulative_compaction_policy.h
index b9b3bb46c0f..e2e5ca5460d 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -96,6 +96,11 @@ public:
                                             int64_t current_cumulative_point,
                                             int64_t* cumulative_point) = 0;
 
+    // Updates the compaction level of a tablet after a compaction operation.
+    virtual void update_compaction_level(Tablet* tablet,
+                                         const std::vector<RowsetSharedPtr>& 
input_rowsets,
+                                         RowsetSharedPtr output_rowset) = 0;
+
     /// Fetch cumulative policy name
     virtual std::string_view name() = 0;
 };
@@ -149,6 +154,9 @@ public:
     /// Its main policy is calculating the accumulative compaction score after 
current cumulative_point in tablet.
     uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
 
+    void update_compaction_level(Tablet* tablet, const 
std::vector<RowsetSharedPtr>& input_rowsets,
+                                 RowsetSharedPtr output_rowset) override {}
+
     std::string_view name() override { return CUMULATIVE_SIZE_BASED_POLICY; }
 
 private:
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp 
b/be/src/olap/cumulative_compaction_time_series_policy.cpp
index 6f7cb8e2e35..6c3f949723a 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.cpp
+++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp
@@ -17,19 +17,24 @@
 
 #include "olap/cumulative_compaction_time_series_policy.h"
 
+#include <algorithm>
+
 #include "common/logging.h"
 #include "olap/tablet.h"
+#include "olap/tablet_meta.h"
 #include "util/time.h"
 namespace doris {
 
 uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* 
tablet) {
     uint32_t score = 0;
+    uint32_t level0_score = 0;
     bool base_rowset_exist = false;
     const int64_t point = tablet->cumulative_layer_point();
 
-    int64_t total_size = 0;
+    int64_t level0_total_size = 0;
     RowsetMetaSharedPtr first_meta;
     int64_t first_version = INT64_MAX;
+    std::list<RowsetMetaSharedPtr> checked_rs_metas;
     // NOTE: tablet._meta_lock is hold
     auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
     // check the base rowset and collect the rowsets of cumulative part
@@ -47,8 +52,13 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
             continue;
         } else {
             // collect the rowsets of cumulative part
-            total_size += rs_meta->total_disk_size();
             score += rs_meta->get_compaction_score();
+            if (rs_meta->compaction_level() == 0) {
+                level0_total_size += rs_meta->total_disk_size();
+                level0_score += rs_meta->get_compaction_score();
+            } else {
+                checked_rs_metas.push_back(rs_meta);
+            }
         }
     }
 
@@ -73,21 +83,39 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
     // Condition 1: the size of input files for compaction meets the 
requirement of parameter compaction_goal_size
     int64_t compaction_goal_size_mbytes =
             tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
-    if (total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
+    if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
         return score;
     }
 
     // Condition 2: the number of input files reaches the threshold specified 
by parameter compaction_file_count_threshold
-    if (score >= 
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
+    if (level0_score >= 
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
         return score;
     }
 
+    // Condition 3: level1 achieve compaction_goal_size
+    if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+        checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
+            return a->version().first < b->version().first;
+        });
+        int32_t rs_meta_count = 0;
+        int64_t continuous_size = 0;
+        for (const auto& rs_meta : checked_rs_metas) {
+            rs_meta_count++;
+            continuous_size += rs_meta->total_disk_size();
+            if (rs_meta_count >= 2) {
+                if (continuous_size >= compaction_goal_size_mbytes * 1024 * 
1024) {
+                    return score;
+                }
+            }
+        }
+    }
+
     int64_t now = UnixMillis();
     int64_t last_cumu = tablet->last_cumu_compaction_success_time();
     if (last_cumu != 0) {
         int64_t cumu_interval = now - last_cumu;
 
-        // Condition 3: the time interval between compactions exceeds the 
value specified by parameter _compaction_time_threshold_second
+        // Condition 4: the time interval between compactions exceeds the 
value specified by parameter _compaction_time_threshold_second
         if (cumu_interval >
             
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 
1000)) {
             return score;
@@ -163,6 +191,13 @@ void 
TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
             break;
         }
 
+        // upgrade: [0 0 2 1 1 0 0]
+        if (!is_delete && 
tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
+            rs->compaction_level() == 1) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
         // include one situation: When the segment is not deleted, and is 
singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
         prev_version = rs->version().second;
         *ret_cumulative_point = prev_version + 1;
@@ -193,6 +228,9 @@ int 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
         return 0;
     }
 
+    int64_t compaction_goal_size_mbytes =
+            tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
+
     int transient_size = 0;
     *compaction_score = 0;
     input_rowsets->clear();
@@ -231,8 +269,7 @@ int 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
         input_rowsets->push_back(rowset);
 
         // Condition 1: the size of input files for compaction meets the 
requirement of parameter compaction_goal_size
-        if (total_size >=
-            (tablet->tablet_meta()->time_series_compaction_goal_size_mbytes() 
* 1024 * 1024)) {
+        if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
             if (input_rowsets->size() == 1 &&
                 
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
                 // Only 1 non-overlapping rowset, skip it
@@ -262,20 +299,47 @@ int 
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
         return transient_size;
     }
 
+    // Condition 3: level1 achieve compaction_goal_size
+    std::vector<RowsetSharedPtr> level1_rowsets;
+    if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+        int64_t continuous_size = 0;
+        for (const auto& rowset : candidate_rowsets) {
+            const auto& rs_meta = rowset->rowset_meta();
+            if (rs_meta->compaction_level() == 0) {
+                break;
+            }
+            level1_rowsets.push_back(rowset);
+            continuous_size += rs_meta->total_disk_size();
+            if (level1_rowsets.size() >= 2) {
+                if (continuous_size >= compaction_goal_size_mbytes * 1024 * 
1024) {
+                    input_rowsets->swap(level1_rowsets);
+                    return input_rowsets->size();
+                }
+            }
+        }
+    }
+
     int64_t now = UnixMillis();
     int64_t last_cumu = tablet->last_cumu_compaction_success_time();
     if (last_cumu != 0) {
         int64_t cumu_interval = now - last_cumu;
 
-        // Condition 3: the time interval between compactions exceeds the 
value specified by parameter compaction_time_threshold_second
+        // Condition 4: the time interval between compactions exceeds the 
value specified by parameter compaction_time_threshold_second
         if (cumu_interval >
             
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 
1000)) {
+            if 
(tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+                if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
+                    input_rowsets->swap(level1_rowsets);
+                    return input_rowsets->size();
+                }
+            }
             return transient_size;
         }
     }
 
     input_rowsets->clear();
     *compaction_score = 0;
+
     return 0;
 }
 
@@ -288,7 +352,35 @@ void 
TimeSeriesCumulativeCompactionPolicy::update_cumulative_point(
         return;
     }
 
+    if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
+        output_rowset->rowset_meta()->compaction_level() < 2) {
+        return;
+    }
+
     tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
 }
 
+void TimeSeriesCumulativeCompactionPolicy::update_compaction_level(
+        Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
+        RowsetSharedPtr output_rowset) {
+    if (tablet->tablet_state() != TABLET_RUNNING || 
output_rowset->num_segments() == 0) {
+        return;
+    }
+
+    int64_t first_level = 0;
+    for (size_t i = 0; i < input_rowsets.size(); i++) {
+        int64_t cur_level = 
input_rowsets[i]->rowset_meta()->compaction_level();
+        if (i == 0) {
+            first_level = cur_level;
+        } else {
+            if (first_level != cur_level) {
+                LOG(ERROR) << "Failed to check compaction level, first_level: 
" << first_level
+                           << ", cur_level: " << cur_level;
+            }
+        }
+    }
+
+    output_rowset->rowset_meta()->set_compaction_level(first_level + 1);
+}
+
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.h 
b/be/src/olap/cumulative_compaction_time_series_policy.h
index 015dce055e9..4c134202e1d 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.h
+++ b/be/src/olap/cumulative_compaction_time_series_policy.h
@@ -59,6 +59,10 @@ public:
     void update_cumulative_point(Tablet* tablet, const 
std::vector<RowsetSharedPtr>& input_rowsets,
                                  RowsetSharedPtr _output_rowset,
                                  Version& last_delete_version) override;
+
+    void update_compaction_level(Tablet* tablet, const 
std::vector<RowsetSharedPtr>& input_rowsets,
+                                 RowsetSharedPtr output_rowset) override;
+
     std::string_view name() override { return CUMULATIVE_TIME_SERIES_POLICY; }
 };
 
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 927b4a33198..ba336c7cbe9 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -73,7 +73,11 @@ Status FullCompaction::execute_compact_impl() {
     // 3. set state to success
     _state = CompactionState::SUCCESS;
 
-    // 4. set cumulative point
+    // 4. set cumulative level
+    
_tablet->cumulative_compaction_policy()->update_compaction_level(_tablet.get(), 
_input_rowsets,
+                                                                     
_output_rowset);
+
+    // 5. set cumulative point
     Version last_version = _input_rowsets.back()->version();
     
_tablet->cumulative_compaction_policy()->update_cumulative_point(_tablet.get(), 
_input_rowsets,
                                                                      
_output_rowset, last_version);
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 7e1dfaa57c3..30457d30bc6 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -304,6 +304,12 @@ public:
 
     const TabletSchemaSPtr& tablet_schema() { return _schema; }
 
+    void set_compaction_level(int64_t compaction_level) {
+        _rowset_meta_pb.set_compaction_level(compaction_level);
+    }
+
+    int64_t compaction_level() { return _rowset_meta_pb.compaction_level(); }
+
     // Because the member field '_handle' is a raw pointer, use member func 
'init' to replace copy ctor
     RowsetMeta(const RowsetMeta&) = delete;
     RowsetMeta operator=(const RowsetMeta&) = delete;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 07cc07bc873..23f4428f747 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -71,7 +71,8 @@ TabletMetaSharedPtr TabletMeta::create(
             request.time_series_compaction_file_count_threshold,
             request.time_series_compaction_time_threshold_seconds,
             request.time_series_compaction_empty_rowsets_threshold,
-            request.inverted_index_storage_format);
+            request.inverted_index_storage_format,
+            request.time_series_compaction_level_threshold);
 }
 
 TabletMeta::TabletMeta()
@@ -91,7 +92,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
                        int64_t time_series_compaction_file_count_threshold,
                        int64_t time_series_compaction_time_threshold_seconds,
                        int64_t time_series_compaction_empty_rowsets_threshold,
-                       TInvertedIndexStorageFormat::type 
inverted_index_storage_format)
+                       TInvertedIndexStorageFormat::type 
inverted_index_storage_format,
+                       int64_t time_series_compaction_level_threshold)
         : _tablet_uid(0, 0),
           _schema(new TabletSchema),
           _delete_bitmap(new DeleteBitmap(tablet_id)) {
@@ -121,6 +123,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t 
partition_id, int64_t tablet_id
             time_series_compaction_time_threshold_seconds);
     tablet_meta_pb.set_time_series_compaction_empty_rowsets_threshold(
             time_series_compaction_empty_rowsets_threshold);
+    tablet_meta_pb.set_time_series_compaction_level_threshold(
+            time_series_compaction_level_threshold);
     TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
     schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
     
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
@@ -334,7 +338,8 @@ TabletMeta::TabletMeta(const TabletMeta& b)
           _time_series_compaction_time_threshold_seconds(
                   b._time_series_compaction_time_threshold_seconds),
           _time_series_compaction_empty_rowsets_threshold(
-                  b._time_series_compaction_empty_rowsets_threshold) {};
+                  b._time_series_compaction_empty_rowsets_threshold),
+          
_time_series_compaction_level_threshold(b._time_series_compaction_level_threshold)
 {};
 
 void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& 
tcolumn,
                                           ColumnPB* column) {
@@ -635,6 +640,8 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
             tablet_meta_pb.time_series_compaction_time_threshold_seconds();
     _time_series_compaction_empty_rowsets_threshold =
             tablet_meta_pb.time_series_compaction_empty_rowsets_threshold();
+    _time_series_compaction_level_threshold =
+            tablet_meta_pb.time_series_compaction_level_threshold();
 }
 
 void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
@@ -718,6 +725,8 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
             time_series_compaction_time_threshold_seconds());
     tablet_meta_pb->set_time_series_compaction_empty_rowsets_threshold(
             time_series_compaction_empty_rowsets_threshold());
+    tablet_meta_pb->set_time_series_compaction_level_threshold(
+            time_series_compaction_level_threshold());
 }
 
 int64_t TabletMeta::mem_size() const {
@@ -906,6 +915,8 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
     if (a._time_series_compaction_empty_rowsets_threshold !=
         b._time_series_compaction_empty_rowsets_threshold)
         return false;
+    if (a._time_series_compaction_level_threshold != 
b._time_series_compaction_level_threshold)
+        return false;
     return true;
 }
 
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 4400be42af9..d21354366a9 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -112,7 +112,8 @@ public:
                int64_t time_series_compaction_time_threshold_seconds = 3600,
                int64_t time_series_compaction_empty_rowsets_threshold = 5,
                TInvertedIndexStorageFormat::type inverted_index_storage_format 
=
-                       TInvertedIndexStorageFormat::V1);
+                       TInvertedIndexStorageFormat::V1,
+               int64_t time_series_compaction_level_threshold = 1);
     // If need add a filed in TableMeta, filed init copy in copy construct 
function
     TabletMeta(const TabletMeta& tablet_meta);
     TabletMeta(TabletMeta&& tablet_meta) = delete;
@@ -260,6 +261,12 @@ public:
     int64_t time_series_compaction_empty_rowsets_threshold() const {
         return _time_series_compaction_empty_rowsets_threshold;
     }
+    void set_time_series_compaction_level_threshold(int64_t level_threshold) {
+        _time_series_compaction_level_threshold = level_threshold;
+    }
+    int64_t time_series_compaction_level_threshold() const {
+        return _time_series_compaction_level_threshold;
+    }
 
 private:
     Status _save_meta(DataDir* data_dir);
@@ -313,6 +320,7 @@ private:
     int64_t _time_series_compaction_file_count_threshold = 0;
     int64_t _time_series_compaction_time_threshold_seconds = 0;
     int64_t _time_series_compaction_empty_rowsets_threshold = 0;
+    int64_t _time_series_compaction_level_threshold = 0;
 
     mutable std::shared_mutex _meta_lock;
 };
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index 44dbf5cceb8..0bf9590ebf1 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -461,6 +461,15 @@ Set table properties. The following attributes are 
currently supported:
 
     `"time_series_compaction_time_threshold_seconds" = "3600"`
 
+* `time_series_compaction_level_threshold`
+
+    When time series compaction policy is applied, This parameter defaults to 
1.  When set to 2, it is used to control the re-merging of segments that have 
been 
+    
+    merged once, ensuring that the segment size reaches the 
time_series_compaction_goal_size_mbytes, which can achieve the effect of 
reducing the number of 
+    
+    segments.
+
+    `"time_series_compaction_level_threshold" = "2"`
 
 * Dynamic partition related
 
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
index e8ac8e69f0a..657adea881d 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md
@@ -444,6 +444,14 @@ UNIQUE KEY(k1, k2)
 
     `"time_series_compaction_time_threshold_seconds" = "3600"`
 
+* `time_series_compaction_level_threshold`
+
+    compaction 的合并策略为 time_series 
时,此参数默认为1,当设置为2时用来控住对于合并过一次的段再合并一层,保证段大小达到time_series_compaction_goal_size_mbytes,
+    
+    能达到段数量减少的效果。
+
+    `"time_series_compaction_level_threshold" = "2"`
+
 * 动态分区相关
 
     动态分区相关参数如下:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 63759885294..1d408b6799f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -509,7 +509,9 @@ public class Alter {
                         || properties
                             
.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)
                         || properties
-                            
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD));
+                            
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
+                        || properties
+                            
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
                 ((SchemaChangeHandler) 
schemaChangeHandler).updateTableProperties(db, tableName, properties);
             } else {
                 throw new DdlException("Invalid alter operation: " + 
alterClause.getOpType());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index f60a47632ab..61e4496a358 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -276,6 +276,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
                                 
tbl.getTimeSeriesCompactionFileCountThreshold(),
                                 
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
                                 
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+                                tbl.getTimeSeriesCompactionLevelThreshold(),
                                 tbl.storeRowColumn(),
                                 binlogConfig);
                         
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), 
baseSchemaHash);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 55c939a4669..b4d032dc78c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2234,6 +2234,13 @@ public class SchemaChangeHandler extends AlterHandler {
                         
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)));
         }
 
+        if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
 {
+            timeSeriesCompactionConfig
+                        
.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+                        Long.parseLong(properties
+                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)));
+        }
+
         if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null 
&& timeSeriesCompactionConfig.isEmpty()
                 && 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)
                 && 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 4a8588a0ff2..011cc53979f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -281,6 +281,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                                     
tbl.getTimeSeriesCompactionFileCountThreshold(),
                                     
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
                                     
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+                                    
tbl.getTimeSeriesCompactionLevelThreshold(),
                                     tbl.storeRowColumn(),
                                     binlogConfig);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index cc4dede8f12..c2bc7bc7d0d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -222,6 +222,23 @@ public class ModifyTablePropertiesClause extends 
AlterTableClause {
             }
             this.needTableStable = false;
             this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
 {
+            long levelThreshold;
+            String levelThresholdStr = properties
+                                
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
+            try {
+                levelThreshold = Long.parseLong(levelThresholdStr);
+                if (levelThreshold < 1 || levelThreshold > 2) {
+                    throw new AnalysisException(
+                        "time_series_compaction_level_threshold can not be 
less than 1 or greater than 2:"
+                        + levelThresholdStr);
+                }
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_level_threshold format: "
+                        + levelThresholdStr);
+            }
+            this.needTableStable = false;
+            this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
         } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) {
             if 
(!properties.get(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD).equalsIgnoreCase("true")
                     && !properties.get(PropertyAnalyzer
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index d4470a1029e..2e6bbb162ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1084,6 +1084,7 @@ public class RestoreJob extends AbstractJob {
                             
localTbl.getTimeSeriesCompactionFileCountThreshold(),
                             
localTbl.getTimeSeriesCompactionTimeThresholdSeconds(),
                             
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+                            localTbl.getTimeSeriesCompactionLevelThreshold(),
                             localTbl.storeRowColumn(),
                             binlogConfig);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 5bf2f3eb1b5..632597bf6df 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3445,6 +3445,14 @@ public class Env {
                 
sb.append(olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()).append("\"");
             }
 
+            // time series compaction level threshold
+            if (olapTable.getCompactionPolicy() != null && 
olapTable.getCompactionPolicy()
+                                                            
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
+                sb.append(",\n\"").append(PropertyAnalyzer
+                                    
.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD).append("\" = \"");
+                
sb.append(olapTable.getTimeSeriesCompactionLevelThreshold()).append("\"");
+            }
+
             // disable auto compaction
             
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION).append("\"
 = \"");
             sb.append(olapTable.disableAutoCompaction()).append("\"");
@@ -4874,7 +4882,8 @@ public class Env {
                 .buildSkipWriteIndexOnLoad()
                 .buildDisableAutoCompaction()
                 .buildEnableSingleReplicaCompaction()
-                .buildTimeSeriesCompactionEmptyRowsetsThreshold();
+                .buildTimeSeriesCompactionEmptyRowsetsThreshold()
+                .buildTimeSeriesCompactionLevelThreshold();
 
         // need to update partition info meta
         for (Partition partition : table.getPartitions()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 0badc90fd3f..78e463a8a23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2162,6 +2162,20 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         return null;
     }
 
+    public void setTimeSeriesCompactionLevelThreshold(long 
timeSeriesCompactionLevelThreshold) {
+        TableProperty tableProperty = getOrCreatTableProperty();
+        
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+                                                
Long.valueOf(timeSeriesCompactionLevelThreshold).toString());
+        tableProperty.buildTimeSeriesCompactionLevelThreshold();
+    }
+
+    public Long getTimeSeriesCompactionLevelThreshold() {
+        if (tableProperty != null) {
+            return tableProperty.timeSeriesCompactionLevelThreshold();
+        }
+        return null;
+    }
+
     public int getBaseSchemaVersion() {
         MaterializedIndexMeta baseIndexMeta = indexIdToMeta.get(baseIndexId);
         return baseIndexMeta.getSchemaVersion();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index dd3dd165d77..b41e8b4c37b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -106,6 +106,9 @@ public class TableProperty implements Writable {
     private long timeSeriesCompactionEmptyRowsetsThreshold
                                     = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE;
 
+    private long timeSeriesCompactionLevelThreshold
+                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
+
     private DataSortInfo dataSortInfo = new DataSortInfo();
 
     public TableProperty(Map<String, String> properties) {
@@ -143,6 +146,7 @@ public class TableProperty implements Writable {
                 buildEnableSingleReplicaCompaction();
                 buildDisableAutoCompaction();
                 buildTimeSeriesCompactionEmptyRowsetsThreshold();
+                buildTimeSeriesCompactionLevelThreshold();
                 break;
             default:
                 break;
@@ -300,6 +304,17 @@ public class TableProperty implements Writable {
         return timeSeriesCompactionEmptyRowsetsThreshold;
     }
 
+    public TableProperty buildTimeSeriesCompactionLevelThreshold() {
+        timeSeriesCompactionLevelThreshold = Long.parseLong(properties
+                    
.getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+                    
String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE)));
+        return this;
+    }
+
+    public long timeSeriesCompactionLevelThreshold() {
+        return timeSeriesCompactionLevelThreshold;
+    }
+
     public TableProperty buildMinLoadReplicaNum() {
         minLoadReplicaNum = Short.parseShort(
                 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM, 
"-1"));
@@ -587,7 +602,8 @@ public class TableProperty implements Writable {
                 .buildTimeSeriesCompactionTimeThresholdSeconds()
                 .buildDisableAutoCompaction()
                 .buildEnableSingleReplicaCompaction()
-                .buildTimeSeriesCompactionEmptyRowsetsThreshold();
+                .buildTimeSeriesCompactionEmptyRowsetsThreshold()
+                .buildTimeSeriesCompactionLevelThreshold();
         if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
             // get replica num from property map and create replica allocation
             String repNum = 
tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 9bacb50db32..9737d95e2e6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -151,6 +151,9 @@ public class PropertyAnalyzer {
     public static final String 
PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD =
             "time_series_compaction_empty_rowsets_threshold";
 
+    public static final String 
PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD =
+            "time_series_compaction_level_threshold";
+
     public static final String PROPERTIES_MUTABLE = "mutable";
 
     public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced";
@@ -198,7 +201,7 @@ public class PropertyAnalyzer {
     public static final long 
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE = 2000;
     public static final long 
TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600;
     public static final long 
TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE = 5;
-
+    public static final long 
TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE = 1;
 
     /**
      * check and replace members of DataProperty by properties.
@@ -751,6 +754,30 @@ public class PropertyAnalyzer {
         return emptyRowsetsThreshold;
     }
 
+    public static long analyzeTimeSeriesCompactionLevelThreshold(Map<String, 
String> properties)
+            throws AnalysisException {
+        long levelThreshold = 
TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
+        if (properties == null || properties.isEmpty()) {
+            return levelThreshold;
+        }
+        if 
(properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) {
+            String levelThresholdStr = properties
+                                    
.get(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
+            
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
+            try {
+                levelThreshold = Long.parseLong(levelThresholdStr);
+                if (levelThreshold < 1 || levelThreshold > 2) {
+                    throw new 
AnalysisException("time_series_compaction_level_threshold can not"
+                            + " less than 1 or greater than 2: " + 
levelThreshold);
+                }
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_level_threshold: "
+                        + levelThreshold);
+            }
+        }
+        return levelThreshold;
+    }
+
     public static long 
analyzeTimeSeriesCompactionFileCountThreshold(Map<String, String> properties)
             throws AnalysisException {
         long fileCountThreshold = 
TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index b6b89c0ad7d..4e306a246c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1482,6 +1482,10 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD,
                                                 
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold().toString());
             }
+            if 
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
 {
+                
properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
+                                                
olapTable.getTimeSeriesCompactionLevelThreshold().toString());
+            }
             if 
(!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) {
                 properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, 
olapTable.getStoragePolicy());
             }
@@ -1909,6 +1913,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             tbl.getTimeSeriesCompactionFileCountThreshold(),
                             tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
                             tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+                            tbl.getTimeSeriesCompactionLevelThreshold(),
                             tbl.storeRowColumn(), binlogConfig);
 
                     task.setStorageFormat(tbl.getStorageFormat());
@@ -2176,7 +2181,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 || properties
                         
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)
                 || properties
-                        
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)))
 {
+                        
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
+                || properties
+                        
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)))
 {
             throw new DdlException("only time series compaction policy support 
for time series config");
         }
 
@@ -2224,6 +2231,17 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         }
         
olapTable.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
 
+        // set time series compaction level threshold
+        long timeSeriesCompactionLevelThreshold
+                                     = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
+        try {
+            timeSeriesCompactionLevelThreshold = PropertyAnalyzer
+                                    
.analyzeTimeSeriesCompactionLevelThreshold(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        
olapTable.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
+
         // get storage format
         TStorageFormat storageFormat = TStorageFormat.V2; // default is 
segment v2
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 6950dc83e9a..6b24bea1ed5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -849,6 +849,7 @@ public class ReportHandler extends Daemon {
                                             
olapTable.getTimeSeriesCompactionFileCountThreshold(),
                                             
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
                                             
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+                                            
olapTable.getTimeSeriesCompactionLevelThreshold(),
                                             olapTable.storeRowColumn(),
                                             binlogConfig);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 262ac8e84fe..7a5262ba0ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -116,6 +116,8 @@ public class CreateReplicaTask extends AgentTask {
 
     private long timeSeriesCompactionEmptyRowsetsThreshold;
 
+    private long timeSeriesCompactionLevelThreshold;
+
     private boolean storeRowColumn;
 
     private BinlogConfig binlogConfig;
@@ -140,6 +142,7 @@ public class CreateReplicaTask extends AgentTask {
                              long timeSeriesCompactionFileCountThreshold,
                              long timeSeriesCompactionTimeThresholdSeconds,
                              long timeSeriesCompactionEmptyRowsetsThreshold,
+                             long timeSeriesCompactionLevelThreshold,
                              boolean storeRowColumn,
                              BinlogConfig binlogConfig) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, 
indexId, tabletId);
@@ -182,6 +185,7 @@ public class CreateReplicaTask extends AgentTask {
         this.timeSeriesCompactionFileCountThreshold = 
timeSeriesCompactionFileCountThreshold;
         this.timeSeriesCompactionTimeThresholdSeconds = 
timeSeriesCompactionTimeThresholdSeconds;
         this.timeSeriesCompactionEmptyRowsetsThreshold = 
timeSeriesCompactionEmptyRowsetsThreshold;
+        this.timeSeriesCompactionLevelThreshold = 
timeSeriesCompactionLevelThreshold;
         this.storeRowColumn = storeRowColumn;
         this.binlogConfig = binlogConfig;
     }
@@ -343,6 +347,7 @@ public class CreateReplicaTask extends AgentTask {
         
createTabletReq.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
         
createTabletReq.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
         
createTabletReq.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
+        
createTabletReq.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
 
         if (binlogConfig != null) {
             createTabletReq.setBinlogConfig(binlogConfig.toThrift());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
index ad20b7b918d..7d4c6a3d022 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
@@ -164,6 +164,11 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
                         
metaInfo.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionConfig
                                     
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD));
                     }
+                    if (timeSeriesCompactionConfig
+                            
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))
 {
+                        
metaInfo.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionConfig
+                                    
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD));
+                    }
                 }
                 if (enableSingleReplicaCompaction >= 0) {
                     
metaInfo.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction > 0);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 60ec442bedf..3dc4bc4695c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -107,7 +107,7 @@ public class AgentTaskTest {
         createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, 
partitionId,
                 indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, 
version, KeysType.AGG_KEYS, storageType,
                 TStorageMedium.SSD, columns, null, 0, latch, null, false, 
TTabletType.TABLET_TYPE_DISK, null,
-                TCompressionType.LZ4F, false, "", false, false, false, "", 0, 
0, 0, 0, false, null);
+                TCompressionType.LZ4F, false, "", false, false, false, "", 0, 
0, 0, 0, 0, false, null);
 
         // drop
         dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, 
schemaHash1, false);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 5eedd8af079..bea90d86006 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -113,6 +113,7 @@ message RowsetMetaPB {
     reserved 50;
     // to indicate whether the data between the segments overlap
     optional SegmentsOverlapPB segments_overlap_pb = 51 [default = 
OVERLAP_UNKNOWN];
+    optional int64 compaction_level = 52 [default = 0];
 }
 
 message SegmentStatisticsPB {
@@ -345,6 +346,7 @@ message TabletMetaPB {
     optional int64 time_series_compaction_file_count_threshold = 30 [default = 
2000];
     optional int64 time_series_compaction_time_threshold_seconds = 31 [default 
= 3600];
     optional int64 time_series_compaction_empty_rowsets_threshold = 32 
[default = 5];
+    optional int64 time_series_compaction_level_threshold = 33 [default = 1];
 }
 
 message OLAPRawDeltaHeaderMessage {
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index 99196c2e024..645d38eed72 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -160,6 +160,7 @@ struct TCreateTabletReq {
     25: optional i64 time_series_compaction_time_threshold_seconds = 3600
     26: optional i64 time_series_compaction_empty_rowsets_threshold = 5
     27: optional TInvertedIndexStorageFormat inverted_index_storage_format = 
TInvertedIndexStorageFormat.V1
+    28: optional i64 time_series_compaction_level_threshold = 1
 }
 
 struct TDropTabletReq {
@@ -430,6 +431,7 @@ struct TTabletMetaInfo {
     15: optional bool skip_write_index_on_load
     16: optional bool disable_auto_compaction
     17: optional i64 time_series_compaction_empty_rowsets_threshold
+    18: optional i64 time_series_compaction_level_threshold
 }
 
 struct TUpdateTabletMetaInfoReq {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to