This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 22cb7b8fcbb [improvement](compaction) be do not compact invisible 
version to avoid query error -230 #28082 (#36222)
22cb7b8fcbb is described below

commit 22cb7b8fcbb938e9342361c2d5bc1ec1fe23a579
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Jun 27 13:45:21 2024 +0800

    [improvement](compaction) be do not compact invisible version to avoid 
query error -230 #28082 (#36222)
    
    cherry pick from #28082
---
 be/src/agent/agent_server.cpp                      |  13 +
 be/src/agent/agent_server.h                        |   1 +
 be/src/agent/task_worker_pool.cpp                  |  13 +
 be/src/agent/task_worker_pool.h                    |   2 +
 be/src/common/config.cpp                           |   7 +
 be/src/common/config.h                             |   7 +
 be/src/olap/full_compaction.cpp                    |   2 +-
 be/src/olap/olap_common.h                          |  21 +
 be/src/olap/tablet.cpp                             |  74 +++-
 be/src/olap/tablet.h                               |  13 +
 be/src/olap/tablet_manager.cpp                     |  72 +++-
 be/src/olap/tablet_manager.h                       |  19 +-
 be/src/olap/tablet_meta.cpp                        |  10 +
 be/src/olap/tablet_meta.h                          |   1 +
 be/src/olap/task/engine_clone_task.cpp             |   2 +
 .../main/java/org/apache/doris/common/Config.java  |   6 +-
 .../java/org/apache/doris/alter/AlterHandler.java  |   3 +-
 .../java/org/apache/doris/backup/RestoreJob.java   |   3 +-
 .../main/java/org/apache/doris/catalog/Env.java    |   8 +-
 .../org/apache/doris/catalog/MetadataViewer.java   |   2 +-
 .../java/org/apache/doris/catalog/Replica.java     |  75 ++--
 .../main/java/org/apache/doris/catalog/Tablet.java |  22 +-
 .../apache/doris/catalog/TabletInvertedIndex.java  |  50 ++-
 .../org/apache/doris/catalog/TabletStatMgr.java    |  11 +-
 .../org/apache/doris/clone/TabletSchedCtx.java     |  13 +-
 .../org/apache/doris/clone/TabletScheduler.java    |   6 +-
 .../apache/doris/common/proc/ReplicasProcNode.java |   5 +-
 .../doris/common/proc/TabletHealthProcDir.java     |   3 +-
 .../apache/doris/common/proc/TabletsProcDir.java   |  11 +-
 .../apache/doris/datasource/InternalCatalog.java   |   5 +-
 ...oCollector.java => PartitionInfoCollector.java} |  47 ++-
 .../org/apache/doris/master/ReportHandler.java     |  67 +--
 .../java/org/apache/doris/system/Diagnoser.java    |   5 +-
 .../java/org/apache/doris/task/AgentBatchTask.java |  10 +
 .../doris/task/UpdateVisibleVersionTask.java       |  40 ++
 .../doris/transaction/DatabaseTransactionMgr.java  |  24 +-
 .../doris/transaction/GlobalTransactionMgr.java    |   5 +-
 .../doris/transaction/PublishVersionDaemon.java    |  30 +-
 .../org/apache/doris/alter/RollupJobV2Test.java    |  10 +-
 .../apache/doris/alter/SchemaChangeJobV2Test.java  |   6 +-
 .../org/apache/doris/analysis/ShowReplicaTest.java |   3 +-
 .../java/org/apache/doris/catalog/ReplicaTest.java |  21 +-
 .../doris/clone/DiskReblanceWhenSchedulerIdle.java |   3 +-
 .../org/apache/doris/clone/RebalancerTestUtil.java |   5 +-
 .../org/apache/doris/clone/RepairVersionTest.java  |   8 +-
 .../doris/clone/TabletReplicaTooSlowTest.java      |   4 +-
 .../org/apache/doris/clone/TabletSchedCtxTest.java |  16 +-
 .../org/apache/doris/planner/QueryPlanTest.java    |  12 +-
 .../transaction/DatabaseTransactionMgrTest.java    |   6 +-
 .../transaction/GlobalTransactionMgrTest.java      |  29 +-
 gensrc/thrift/AgentService.thrift                  |   5 +
 gensrc/thrift/BackendService.thrift                |   5 +-
 gensrc/thrift/MasterService.thrift                 |   4 +-
 gensrc/thrift/Types.thrift                         |   3 +-
 .../test_compaction_with_visible_version.out       | 448 +++++++++++++++++++++
 .../doris/regression/suite/SuiteCluster.groovy     |   3 +
 .../test_compaction_with_visible_version.groovy    | 275 +++++++++++++
 57 files changed, 1333 insertions(+), 241 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index e7217cdcff0..5355c037b19 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -163,6 +163,10 @@ void AgentServer::start_workers(ExecEnv* exec_env) {
 
     _clean_trash_workers = std::make_unique<TaskWorkerPool>(
             "CLEAN_TRASH", 1, [&engine](auto&& task) {return 
clean_trash_callback(engine, task); });
+
+    _update_visible_version_workers = std::make_unique<TaskWorkerPool>(
+            "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return 
visible_version_callback(engine, task); });
+
     // clang-format on
 }
 
@@ -278,6 +282,15 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
                         "task(signature={}) has wrong request member = 
clean_trash_req", signature);
             }
             break;
+        case TTaskType::UPDATE_VISIBLE_VERSION:
+            if (task.__isset.visible_version_req) {
+                _update_visible_version_workers->submit_task(task);
+            } else {
+                ret_st = Status::InvalidArgument(
+                        "task(signature={}) has wrong request member = 
visible_version_req",
+                        signature);
+            }
+            break;
         default:
             ret_st = Status::InvalidArgument("task(signature={}, type={}) has 
wrong task type",
                                              signature, task_type);
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index b789bbe98de..9f3d91d5621 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -97,6 +97,7 @@ private:
     std::unique_ptr<TopicSubscriber> _topic_subscriber;
     std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
     std::unique_ptr<TaskWorkerPool> _clean_trash_workers;
+    std::unique_ptr<TaskWorkerPool> _update_visible_version_workers;
 };
 
 } // end namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 686de72e1b9..c9d222114e0 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -355,6 +355,7 @@ bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
 bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
 bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", 
"STORAGE_MEDIUM_MIGRATE");
 bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
+bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", 
"UPDATE_VISIBLE_VERSION");
 
 void add_task_count(const TAgentTaskRequest& task, int n) {
     // clang-format off
@@ -382,6 +383,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
     ADD_TASK_COUNT(CLONE)
     ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
     ADD_TASK_COUNT(GC_BINLOG)
+    ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
     #undef ADD_TASK_COUNT
     case TTaskType::REALTIME_PUSH:
     case TTaskType::PUSH:
@@ -970,6 +972,11 @@ void report_tablet_callback(StorageEngine& engine, const 
TMasterInfo& master_inf
         
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
         return;
     }
+
+    std::map<int64_t, int64_t> partitions_version;
+    
engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
+    request.__set_partitions_version(std::move(partitions_version));
+
     int64_t max_compaction_score =
             
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
                      
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
@@ -1699,6 +1706,12 @@ void gc_binlog_callback(StorageEngine& engine, const 
TAgentTaskRequest& req) {
     engine.gc_binlogs(gc_tablet_infos);
 }
 
+void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& 
req) {
+    const TVisibleVersionReq& visible_version_req = req.visible_version_req;
+    engine.tablet_manager()->update_partitions_visible_version(
+            visible_version_req.partition_version);
+}
+
 void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
                     const TAgentTaskRequest& req) {
     const auto& clone_req = req.clone_req;
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index f95a866a57a..14d9ff32686 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -163,6 +163,8 @@ void gc_binlog_callback(StorageEngine& engine, const 
TAgentTaskRequest& req);
 
 void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);
 
+void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& 
req);
+
 void report_task_callback(const TMasterInfo& master_info);
 
 void report_disk_callback(StorageEngine& engine, const TMasterInfo& 
master_info);
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ba173b0d03f..493ad699aac 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -430,6 +430,13 @@ DEFINE_Validator(compaction_task_num_per_fast_disk,
 // How many rounds of cumulative compaction for each round of base compaction 
when compaction tasks generation.
 DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, 
"9");
 
+// Not compact the invisible versions, but with some limitations:
+// if not timeout, keep no more than 
compaction_keep_invisible_version_max_count versions;
+// if timeout, keep no more than compaction_keep_invisible_version_min_count 
versions.
+DEFINE_mInt32(compaction_keep_invisible_version_timeout_sec, "1800");
+DEFINE_mInt32(compaction_keep_invisible_version_min_count, "50");
+DEFINE_mInt32(compaction_keep_invisible_version_max_count, "500");
+
 // Threshold to logging compaction trace, in seconds.
 DEFINE_mInt32(base_compaction_trace_threshold, "60");
 DEFINE_mInt32(cumulative_compaction_trace_threshold, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5c60ffae258..177bb03e02b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -477,6 +477,13 @@ DECLARE_mInt32(compaction_task_num_per_fast_disk);
 // How many rounds of cumulative compaction for each round of base compaction 
when compaction tasks generation.
 DECLARE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round);
 
+// Not compact the invisible versions, but with some limitations:
+// if not timeout, keep no more than 
compaction_keep_invisible_version_max_count versions;
+// if timeout, keep no more than compaction_keep_invisible_version_min_count 
versions.
+DECLARE_mInt32(compaction_keep_invisible_version_timeout_sec);
+DECLARE_mInt32(compaction_keep_invisible_version_min_count);
+DECLARE_mInt32(compaction_keep_invisible_version_max_count);
+
 // Threshold to logging compaction trace, in seconds.
 DECLARE_mInt32(base_compaction_trace_threshold);
 DECLARE_mInt32(cumulative_compaction_trace_threshold);
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index ba45eacb2c4..c11b87c06ad 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -136,7 +136,7 @@ Status FullCompaction::_check_all_version(const 
std::vector<RowsetSharedPtr>& ro
                 "Full compaction rowsets' versions not equal to all exist 
rowsets' versions. "
                 "full compaction rowsets max version={}-{}"
                 ", current rowsets max version={}-{}"
-                "full compaction rowsets min version={}-{}, current rowsets 
min version=0-1",
+                ", full compaction rowsets min version={}-{}, current rowsets 
min version=0-1",
                 last_rowset->start_version(), last_rowset->end_version(),
                 _tablet->max_version().first, _tablet->max_version().second,
                 first_rowset->start_version(), first_rowset->end_version());
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 80bfab6f4b9..c1a2e3c18b5 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -20,6 +20,7 @@
 #include <gen_cpp/Types_types.h>
 #include <netinet/in.h>
 
+#include <atomic>
 #include <charconv>
 #include <cstdint>
 #include <functional>
@@ -37,6 +38,7 @@
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset_fwd.h"
 #include "util/hash_util.hpp"
+#include "util/time.h"
 #include "util/uid_util.h"
 
 namespace doris {
@@ -517,6 +519,25 @@ struct RidAndPos {
 
 using PartialUpdateReadPlan = std::map<RowsetId, std::map<uint32_t, 
std::vector<RidAndPos>>>;
 
+// used for controll compaction
+struct VersionWithTime {
+    std::atomic<int64_t> version;
+    int64_t update_ts;
+
+    VersionWithTime() : version(0), update_ts(MonotonicMillis()) {}
+
+    void update_version_monoto(int64_t new_version) {
+        int64_t cur_version = version.load(std::memory_order_relaxed);
+        while (cur_version < new_version) {
+            if (version.compare_exchange_strong(cur_version, new_version, 
std::memory_order_relaxed,
+                                                std::memory_order_relaxed)) {
+                update_ts = MonotonicMillis();
+                break;
+            }
+        }
+    }
+};
+
 } // namespace doris
 
 // This intended to be a "good" hash function.  It may change from time to 
time.
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 27b1f94530d..4cf8fae0ded 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1349,25 +1349,41 @@ std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_cumulative_compac
     if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
         return candidate_rowsets;
     }
-    {
-        std::shared_lock rlock(_meta_lock);
-        for (const auto& [version, rs] : _rs_version_map) {
-            if (version.first >= _cumulative_point && rs->is_local()) {
-                candidate_rowsets.push_back(rs);
-            }
-        }
-    }
-    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), 
Rowset::comparator);
-    return candidate_rowsets;
+    return _pick_visible_rowsets_to_compaction(_cumulative_point,
+                                               
std::numeric_limits<int64_t>::max());
 }
 
 std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_base_compaction() {
+    return 
_pick_visible_rowsets_to_compaction(std::numeric_limits<int64_t>::min(),
+                                               _cumulative_point - 1);
+}
+
+std::vector<RowsetSharedPtr> Tablet::_pick_visible_rowsets_to_compaction(
+        int64_t min_start_version, int64_t max_start_version) {
+    auto [visible_version, update_ts] = get_visible_version_and_time();
+    bool update_time_long = MonotonicMillis() - update_ts >
+                            
config::compaction_keep_invisible_version_timeout_sec * 1000L;
+    int32_t keep_invisible_version_limit =
+            update_time_long ? 
config::compaction_keep_invisible_version_min_count
+                             : 
config::compaction_keep_invisible_version_max_count;
+
     std::vector<RowsetSharedPtr> candidate_rowsets;
     {
         std::shared_lock rlock(_meta_lock);
         for (const auto& [version, rs] : _rs_version_map) {
-            // Do compaction on local rowsets only.
-            if (version.first < _cumulative_point && rs->is_local()) {
+            int64_t version_start = version.first;
+            // rowset is remote or rowset is not in given range
+            if (!rs->is_local() || version_start < min_start_version ||
+                version_start > max_start_version) {
+                continue;
+            }
+
+            // can compact, met one of the conditions:
+            // 1. had been visible;
+            // 2. exceeds the limit of keep invisible versions.
+            int64_t version_end = version.second;
+            if (version_end <= visible_version ||
+                version_end > visible_version + keep_invisible_version_limit) {
                 candidate_rowsets.push_back(rs);
             }
         }
@@ -1390,13 +1406,8 @@ std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_full_compaction()
 
 std::vector<RowsetSharedPtr> Tablet::pick_first_consecutive_empty_rowsets(int 
limit) {
     std::vector<RowsetSharedPtr> consecutive_empty_rowsets;
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-    traverse_rowsets([&candidate_rowsets, this](const auto& rs) {
-        if (rs->is_local() && rs->start_version() >= _cumulative_point) {
-            candidate_rowsets.emplace_back(rs);
-        }
-    });
-    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), 
Rowset::comparator);
+    std::vector<RowsetSharedPtr> candidate_rowsets =
+            pick_candidate_rowsets_to_cumulative_compaction();
     int len = candidate_rowsets.size();
     for (int i = 0; i < len - 1; ++i) {
         auto rowset = candidate_rowsets[i];
@@ -1475,6 +1486,19 @@ std::string Tablet::_get_rowset_info_str(RowsetSharedPtr 
rowset, bool delete_fla
                                rowset->rowset_id().to_string(), disk_size);
 }
 
+std::tuple<int64_t, int64_t> Tablet::get_visible_version_and_time() const {
+    // some old tablet has bug, its partition_id is 0, fe couldn't update its 
visible version.
+    // so let this tablet's visible version become int64 max.
+    auto version_info = std::atomic_load_explicit(&_visible_version, 
std::memory_order_relaxed);
+    if (version_info != nullptr && partition_id() != 0) {
+        return 
std::make_tuple(version_info->version.load(std::memory_order_relaxed),
+                               version_info->update_ts);
+    } else {
+        return std::make_tuple(std::numeric_limits<int64_t>::max(),
+                               std::numeric_limits<int64_t>::max());
+    }
+}
+
 // For http compaction action
 void Tablet::get_compaction_status(std::string* json_result) {
     rapidjson::Document root;
@@ -1837,13 +1861,23 @@ void Tablet::build_tablet_report_info(TTabletInfo* 
tablet_info,
         }
     });
 
+    int64_t total_version_count = _tablet_meta->version_count();
+
+    // For compatibility.
+    // For old fe, it wouldn't send visible version request to be, then be's 
visible version is always 0.
+    // Let visible_version_count set to total_version_count in be's report.
+    int64_t visible_version_count = total_version_count;
+    if (auto [visible_version, _] = get_visible_version_and_time(); 
visible_version > 0) {
+        visible_version_count = 
_tablet_meta->version_count_cross_with_range({0, visible_version});
+    }
     // the report version is the largest continuous version, same logic as in 
FE side
     tablet_info->__set_version(cversion.second);
     // Useless but it is a required filed in TTabletInfo
     tablet_info->__set_version_hash(0);
     tablet_info->__set_partition_id(_tablet_meta->partition_id());
     tablet_info->__set_storage_medium(_data_dir->storage_medium());
-    tablet_info->__set_version_count(_tablet_meta->version_count());
+    tablet_info->__set_total_version_count(total_version_count);
+    tablet_info->__set_visible_version_count(visible_version_count);
     tablet_info->__set_path_hash(_data_dir->path_hash());
     
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
     tablet_info->__set_replica_id(replica_id());
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 46bd6802495..4de6a05d745 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -347,6 +347,12 @@ public:
 
     bool should_fetch_from_peer();
 
+    std::tuple<int64_t, int64_t> get_visible_version_and_time() const;
+
+    void set_visible_version(const std::shared_ptr<const VersionWithTime>& 
visible_version) {
+        std::atomic_store_explicit(&_visible_version, visible_version, 
std::memory_order_relaxed);
+    }
+
     inline bool all_beta() const {
         std::shared_lock rdlock(_meta_lock);
         return _tablet_meta->all_beta();
@@ -625,6 +631,10 @@ private:
     // When the proportion of empty edges in the adjacency matrix used to 
represent the version graph
     // in the version tracker is greater than the threshold, rebuild the 
version tracker
     bool _reconstruct_version_tracker_if_necessary();
+
+    std::vector<RowsetSharedPtr> _pick_visible_rowsets_to_compaction(int64_t 
min_start_version,
+                                                                     int64_t 
max_start_version);
+
     void _init_context_common_fields(RowsetWriterContext& context);
 
     void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const 
RowsetIdUnorderedSet& pre,
@@ -740,6 +750,9 @@ private:
 
     int64_t _io_error_times = 0;
 
+    // partition's visible version. it sync from fe, but not real-time.
+    std::shared_ptr<const VersionWithTime> _visible_version;
+
     std::atomic_bool _is_full_compaction_running = false;
 };
 
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 196b6dc9543..43cbba99087 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1024,13 +1024,14 @@ void 
TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>*
             tablet_info.__set_transaction_ids(find->second);
             expire_txn_map.erase(find);
         }
-        tablet_version_num_hist.add(tablet->version_count());
+        tablet_version_num_hist.add(tablet_info.total_version_count);
         auto& t_tablet_stat = local_cache->emplace_back();
         t_tablet_stat.__set_tablet_id(tablet_info.tablet_id);
         t_tablet_stat.__set_data_size(tablet_info.data_size);
         t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size);
-        t_tablet_stat.__set_row_num(tablet_info.row_count);
-        t_tablet_stat.__set_version_count(tablet_info.version_count);
+        t_tablet_stat.__set_row_count(tablet_info.row_count);
+        
t_tablet_stat.__set_total_version_count(tablet_info.total_version_count);
+        
t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count);
     };
     for_each_tablet(handler, filter_all_tablets);
 
@@ -1265,9 +1266,29 @@ void 
TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_ma
 
 void TabletManager::get_partition_related_tablets(int64_t partition_id,
                                                   std::set<TabletInfo>* 
tablet_infos) {
-    std::shared_lock rdlock(_partition_tablet_map_lock);
-    if (_partition_tablet_map.find(partition_id) != 
_partition_tablet_map.end()) {
-        *tablet_infos = _partition_tablet_map[partition_id];
+    std::shared_lock rdlock(_partitions_lock);
+    auto it = _partitions.find(partition_id);
+    if (it != _partitions.end()) {
+        *tablet_infos = it->second.tablets;
+    }
+}
+
+void TabletManager::get_partitions_visible_version(std::map<int64_t, int64_t>* 
partitions_version) {
+    std::shared_lock rdlock(_partitions_lock);
+    for (const auto& [partition_id, partition] : _partitions) {
+        partitions_version->insert(
+                {partition_id, 
partition.visible_version->version.load(std::memory_order_relaxed)});
+    }
+}
+
+void TabletManager::update_partitions_visible_version(
+        const std::map<int64_t, int64_t>& partitions_version) {
+    std::shared_lock rdlock(_partitions_lock);
+    for (auto [partition_id, version] : partitions_version) {
+        auto it = _partitions.find(partition_id);
+        if (it != _partitions.end()) {
+            it->second.visible_version->update_version_monoto(version);
+        }
     }
 }
 
@@ -1356,15 +1377,25 @@ TabletSharedPtr 
TabletManager::_get_tablet_unlocked(TTabletId tablet_id) {
 }
 
 void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) {
-    std::lock_guard<std::shared_mutex> wrlock(_partition_tablet_map_lock);
-    
_partition_tablet_map[tablet->partition_id()].insert(tablet->get_tablet_info());
+    std::lock_guard<std::shared_mutex> wrlock(_partitions_lock);
+    auto& partition = _partitions[tablet->partition_id()];
+    partition.tablets.insert(tablet->get_tablet_info());
+    tablet->set_visible_version(
+            std::static_pointer_cast<const 
VersionWithTime>(partition.visible_version));
 }
 
 void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& 
tablet) {
-    std::lock_guard<std::shared_mutex> wrlock(_partition_tablet_map_lock);
-    
_partition_tablet_map[tablet->partition_id()].erase(tablet->get_tablet_info());
-    if (_partition_tablet_map[tablet->partition_id()].empty()) {
-        _partition_tablet_map.erase(tablet->partition_id());
+    tablet->set_visible_version(nullptr);
+    std::lock_guard<std::shared_mutex> wrlock(_partitions_lock);
+    auto it = _partitions.find(tablet->partition_id());
+    if (it == _partitions.end()) {
+        return;
+    }
+
+    auto& tablets = it->second.tablets;
+    tablets.erase(tablet->get_tablet_info());
+    if (tablets.empty()) {
+        _partitions.erase(it);
     }
 }
 
@@ -1401,22 +1432,23 @@ void 
TabletManager::get_tablets_distribution_on_different_disks(
         std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk,
         std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& 
tablets_info_on_disk) {
     std::vector<DataDir*> data_dirs = _engine.get_stores();
-    std::map<int64_t, std::set<TabletInfo>> partition_tablet_map;
+    std::map<int64_t, Partition> partitions;
     {
-        // When drop tablet, '_partition_tablet_map_lock' is locked in 
'tablet_shard_lock'.
-        // To avoid locking 'tablet_shard_lock' in 
'_partition_tablet_map_lock', we lock and
-        // copy _partition_tablet_map here.
-        std::shared_lock rdlock(_partition_tablet_map_lock);
-        partition_tablet_map = _partition_tablet_map;
+        // When drop tablet, '_partitions_lock' is locked in 
'tablet_shard_lock'.
+        // To avoid locking 'tablet_shard_lock' in '_partitions_lock', we lock 
and
+        // copy _partitions here.
+        std::shared_lock rdlock(_partitions_lock);
+        partitions = _partitions;
     }
-    for (auto& [partition_id, tablet_infos] : partition_tablet_map) {
+
+    for (const auto& [partition_id, partition] : partitions) {
         std::map<DataDir*, int64_t> tablets_num;
         std::map<DataDir*, std::vector<TabletSize>> tablets_info;
         for (auto* data_dir : data_dirs) {
             tablets_num[data_dir] = 0;
         }
 
-        for (const auto& tablet_info : tablet_infos) {
+        for (const auto& tablet_info : partition.tablets) {
             // get_tablet() will hold 'tablet_shard_lock'
             TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id);
             if (tablet == nullptr) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 9f8164b853f..b090277677b 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -144,6 +144,10 @@ public:
 
     void get_partition_related_tablets(int64_t partition_id, 
std::set<TabletInfo>* tablet_infos);
 
+    void get_partitions_visible_version(std::map<int64_t, int64_t>* 
partitions_version);
+
+    void update_partitions_visible_version(const std::map<int64_t, int64_t>& 
partitions_version);
+
     void do_tablet_meta_checkpoint(DataDir* data_dir);
 
     void obtain_specific_quantity_tablets(std::vector<TabletInfo>& 
tablets_info, int64_t num);
@@ -229,22 +233,27 @@ private:
         std::set<int64_t> tablets_under_clone;
     };
 
+    struct Partition {
+        std::set<TabletInfo> tablets;
+        std::shared_ptr<VersionWithTime> visible_version {new VersionWithTime};
+    };
+
     StorageEngine& _engine;
 
     // TODO: memory size of TabletSchema cannot be accurately tracked.
-    // trace the memory use by meta of tablet
     std::shared_ptr<MemTracker> _tablet_meta_mem_tracker;
 
     const int32_t _tablets_shards_size;
     const int32_t _tablets_shards_mask;
     std::vector<tablets_shard> _tablets_shards;
 
-    // Protect _partition_tablet_map, should not be obtained before 
_tablet_map_lock to avoid dead lock
-    std::shared_mutex _partition_tablet_map_lock;
+    // Protect _partitions, should not be obtained before _tablet_map_lock to 
avoid dead lock
+    std::shared_mutex _partitions_lock;
+    // partition_id => partition
+    std::map<int64_t, Partition> _partitions;
+
     // Protect _shutdown_tablets, should not be obtained before 
_tablet_map_lock to avoid dead lock
     std::shared_mutex _shutdown_tablets_lock;
-    // partition_id => tablet_info
-    std::map<int64_t, std::set<TabletInfo>> _partition_tablet_map;
     // the delete tablets. notice only allow function `start_trash_sweep` can 
erase tablets in _shutdown_tablets
     std::list<TabletSharedPtr> _shutdown_tablets;
     std::mutex _gc_tablets_lock;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 4414f7187cd..72bf0f0ee39 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -745,6 +745,16 @@ Version TabletMeta::max_version() const {
     return max_version;
 }
 
+size_t TabletMeta::version_count_cross_with_range(const Version& range) const {
+    size_t count = 0;
+    for (const auto& rs_meta : _rs_metas) {
+        if (!(range.first > rs_meta->version().second || range.second < 
rs_meta->version().first)) {
+            count++;
+        }
+    }
+    return count;
+}
+
 Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) {
     // check RowsetMeta is valid
     for (auto& rs : _rs_metas) {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 460851bc772..ae038fa2c92 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -166,6 +166,7 @@ public:
     // Remote disk space occupied by tablet.
     size_t tablet_remote_size() const;
     size_t version_count() const;
+    size_t version_count_cross_with_range(const Version& range) const;
     Version max_version() const;
 
     TabletState tablet_state() const;
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 81c9973fcf2..ae5fc4c18dc 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -161,6 +161,8 @@ Status EngineCloneTask::execute() {
     }
     Status st = _do_clone();
     
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
+    
StorageEngine::instance()->tablet_manager()->update_partitions_visible_version(
+            {{_clone_req.partition_id, _clone_req.version}});
     return st;
 }
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6adf03c56cd..e1b29b7f849 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1508,11 +1508,11 @@ public class Config extends ConfigBase {
     public static int default_max_query_instances = -1;
 
     /*
-     * One master daemon thread will update global partition in memory
-     * info every partition_in_memory_update_interval_secs
+     * One master daemon thread will update global partition info, include in 
memory and visible version
+     * info every partition_info_update_interval_secs
      */
     @ConfField(mutable = false, masterOnly = true)
-    public static int partition_in_memory_update_interval_secs = 300;
+    public static int partition_info_update_interval_secs = 60;
 
     @ConfField(masterOnly = true)
     public static boolean enable_concurrent_update = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
index 030fd17452d..4231b11c987 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -241,8 +241,7 @@ public abstract class AlterHandler extends MasterDaemon {
                     task.getSignature(), replica, task.getVersion());
             boolean versionChanged = false;
             if (replica.getVersion() < task.getVersion()) {
-                replica.updateVersionInfo(task.getVersion(), 
replica.getDataSize(), replica.getRemoteDataSize(),
-                        replica.getRowCount());
+                replica.updateVersion(task.getVersion());
                 versionChanged = true;
             }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 12c42aee737..63459e5da53 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1785,8 +1785,7 @@ public class RestoreJob extends AbstractJob {
                         for (Tablet tablet : idx.getTablets()) {
                             for (Replica replica : tablet.getReplicas()) {
                                 if 
(!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) {
-                                    
replica.updateVersionInfo(part.getVisibleVersion(), replica.getDataSize(),
-                                            replica.getRemoteDataSize(), 
replica.getRowCount());
+                                    
replica.updateVersion(part.getVisibleVersion());
                                 }
                             }
                         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 09cb46fdf27..380eb361c00 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -178,7 +178,7 @@ import org.apache.doris.load.sync.SyncChecker;
 import org.apache.doris.load.sync.SyncJobManager;
 import org.apache.doris.master.Checkpoint;
 import org.apache.doris.master.MetaHelper;
-import org.apache.doris.master.PartitionInMemoryInfoCollector;
+import org.apache.doris.master.PartitionInfoCollector;
 import org.apache.doris.meta.MetaContext;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mtmv.MTMVAlterOpType;
@@ -370,7 +370,7 @@ public class Env {
     private PublishVersionDaemon publishVersionDaemon;
     private DeleteHandler deleteHandler;
     private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
-    private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
+    private PartitionInfoCollector partitionInfoCollector;
     private CooldownConfHandler cooldownConfHandler;
     private ExternalMetaIdMgr externalMetaIdMgr;
     private MetastoreEventsProcessor metastoreEventsProcessor;
@@ -665,7 +665,7 @@ public class Env {
         this.publishVersionDaemon = new PublishVersionDaemon();
         this.deleteHandler = new DeleteHandler();
         this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
-        this.partitionInMemoryInfoCollector = new 
PartitionInMemoryInfoCollector();
+        this.partitionInfoCollector = new PartitionInfoCollector();
         if (Config.enable_storage_policy) {
             this.cooldownConfHandler = new CooldownConfHandler();
         }
@@ -1691,7 +1691,7 @@ public class Env {
         // start daemon thread to update db used data quota for db txn manager 
periodically
         dbUsedDataQuotaInfoCollector.start();
         // start daemon thread to update global partition in memory 
information periodically
-        partitionInMemoryInfoCollector.start();
+        partitionInfoCollector.start();
         if (Config.enable_storage_policy) {
             cooldownConfHandler.start();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
index 333b7b146ac..1f1e2599d9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java
@@ -110,7 +110,7 @@ public class MetadataViewer {
                             
row.add(String.valueOf(replica.getLastSuccessVersion()));
                             row.add(String.valueOf(visibleVersion));
                             row.add(String.valueOf(replica.getSchemaHash()));
-                            row.add(String.valueOf(replica.getVersionCount()));
+                            
row.add(String.valueOf(replica.getTotalVersionCount()));
                             row.add(String.valueOf(replica.isBad()));
                             row.add(String.valueOf(replica.isUserDrop()));
                             row.add(replica.getState().name());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index ae8f4f37eed..f6b2944d754 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TTabletInfo;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.gson.annotations.SerializedName;
@@ -109,7 +110,8 @@ public class Replica implements Writable {
     @SerializedName(value = "lastSuccessVersionHash")
     private long lastSuccessVersionHash = 0L;
 
-    private volatile long versionCount = -1;
+    private volatile long totalVersionCount = -1;
+    private volatile long visibleVersionCount = -1;
 
     private long pathHash = -1;
 
@@ -233,14 +235,26 @@ public class Replica implements Writable {
         return dataSize;
     }
 
+    public void setDataSize(long dataSize) {
+        this.dataSize = dataSize;
+    }
+
     public long getRemoteDataSize() {
         return remoteDataSize;
     }
 
+    public void setRemoteDataSize(long remoteDataSize) {
+        this.remoteDataSize = remoteDataSize;
+    }
+
     public long getRowCount() {
         return rowCount;
     }
 
+    public void setRowCount(long rowCount) {
+        this.rowCount = rowCount;
+    }
+
     public long getLastFailedVersion() {
         return lastFailedVersion;
     }
@@ -321,28 +335,24 @@ public class Replica implements Writable {
         this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd;
     }
 
-    // for compatibility
-    public synchronized void updateStat(long dataSize, long rowNum) {
-        this.dataSize = dataSize;
-        this.rowCount = rowNum;
+    public void updateWithReport(TTabletInfo backendReplica) {
+        updateVersion(backendReplica.getVersion());
+        setDataSize(backendReplica.getDataSize());
+        setRemoteDataSize(backendReplica.getRemoteDataSize());
+        setRowCount(backendReplica.getRowCount());
+        setTotalVersionCount(backendReplica.getTotalVersionCount());
+        setVisibleVersionCount(
+                backendReplica.isSetVisibleVersionCount() ? 
backendReplica.getVisibleVersionCount()
+                        : backendReplica.getTotalVersionCount());
     }
 
-    public synchronized void updateStat(long dataSize, long remoteDataSize, 
long rowNum, long versionCount) {
-        this.dataSize = dataSize;
-        this.remoteDataSize = remoteDataSize;
-        this.rowCount = rowNum;
-        this.versionCount = versionCount;
+    public synchronized void updateVersion(long newVersion) {
+        updateReplicaVersion(newVersion, this.lastFailedVersion, 
this.lastSuccessVersion);
     }
 
-    public synchronized void updateVersionInfo(long newVersion, long 
newDataSize, long newRemoteDataSize,
-                                               long newRowCount) {
-        updateReplicaInfo(newVersion, this.lastFailedVersion, 
this.lastSuccessVersion, newDataSize, newRemoteDataSize,
-                newRowCount);
-    }
-
-    public synchronized void updateVersionWithFailedInfo(
+    public synchronized void updateVersionWithFailed(
             long newVersion, long lastFailedVersion, long lastSuccessVersion) {
-        updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, 
dataSize, remoteDataSize, rowCount);
+        updateReplicaVersion(newVersion, lastFailedVersion, 
lastSuccessVersion);
     }
 
     public synchronized void adminUpdateVersionInfo(Long version, Long 
lastFailedVersion, Long lastSuccessVersion,
@@ -405,9 +415,7 @@ public class Replica implements Writable {
      *      the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash 
is 0 or some unknown number.
      *      We just reset the LFV(hash) to recovery this replica.
      */
-    private void updateReplicaInfo(long newVersion,
-            long lastFailedVersion, long lastSuccessVersion,
-            long newDataSize, long newRemoteDataSize, long newRowCount) {
+    private void updateReplicaVersion(long newVersion, long lastFailedVersion, 
long lastSuccessVersion) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("before update: {}", this.toString());
         }
@@ -432,9 +440,6 @@ public class Replica implements Writable {
         long oldLastFailedVersion = this.lastFailedVersion;
 
         this.version = newVersion;
-        this.dataSize = newDataSize;
-        this.remoteDataSize = newRemoteDataSize;
-        this.rowCount = newRowCount;
 
         // just check it
         if (lastSuccessVersion <= this.version) {
@@ -497,7 +502,7 @@ public class Replica implements Writable {
     }
 
     public synchronized void updateLastFailedVersion(long lastFailedVersion) {
-        updateReplicaInfo(this.version, lastFailedVersion, 
this.lastSuccessVersion, dataSize, remoteDataSize, rowCount);
+        updateReplicaVersion(this.version, lastFailedVersion, 
this.lastSuccessVersion);
     }
 
     /*
@@ -542,16 +547,28 @@ public class Replica implements Writable {
         return state == ReplicaState.COMPACTION_TOO_SLOW;
     }
 
+    public boolean tooBigVersionCount() {
+        return visibleVersionCount >= 
Config.min_version_count_indicate_replica_compaction_too_slow;
+    }
+
     public boolean isNormal() {
         return state == ReplicaState.NORMAL;
     }
 
-    public long getVersionCount() {
-        return versionCount;
+    public long getTotalVersionCount() {
+        return totalVersionCount;
+    }
+
+    public void setTotalVersionCount(long totalVersionCount) {
+        this.totalVersionCount = totalVersionCount;
+    }
+
+    public long getVisibleVersionCount() {
+        return visibleVersionCount;
     }
 
-    public void setVersionCount(long versionCount) {
-        this.versionCount = versionCount;
+    public void setVisibleVersionCount(long visibleVersionCount) {
+        this.visibleVersionCount = visibleVersionCount;
     }
 
     public boolean checkVersionRegressive(long newVersion) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 708334bb353..7a18da60ad4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -269,16 +269,16 @@ public class Tablet extends MetaObject implements 
Writable {
         }
 
         if (Config.skip_compaction_slower_replica && 
allQueryableReplica.size() > 1) {
-            long minVersionCount = Long.MAX_VALUE;
-            for (Replica replica : allQueryableReplica) {
-                if (replica.getVersionCount() != -1 && 
replica.getVersionCount() < minVersionCount) {
-                    minVersionCount = replica.getVersionCount();
-                }
+            long minVersionCount = 
allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount)
+                    .filter(count -> count != -1).min().orElse(Long.MAX_VALUE);
+            long maxVersionCount = 
Config.min_version_count_indicate_replica_compaction_too_slow;
+            if (minVersionCount != Long.MAX_VALUE) {
+                maxVersionCount = Math.max(maxVersionCount, minVersionCount * 
QUERYABLE_TIMES_OF_MIN_VERSION_COUNT);
             }
-            final long finalMinVersionCount = minVersionCount;
-            return allQueryableReplica.stream().filter(replica -> 
replica.getVersionCount() == -1
-                            || replica.getVersionCount() < 
Config.min_version_count_indicate_replica_compaction_too_slow
-                            || replica.getVersionCount() < 
finalMinVersionCount * QUERYABLE_TIMES_OF_MIN_VERSION_COUNT)
+
+            final long finalMaxVersionCount = maxVersionCount;
+            return allQueryableReplica.stream()
+                    .filter(replica -> replica.getVisibleVersionCount() < 
finalMaxVersionCount)
                     .collect(Collectors.toList());
         }
         return allQueryableReplica;
@@ -508,7 +508,7 @@ public class Tablet extends MetaObject implements Writable {
 
                 if (versionCompleted) {
                     stable++;
-                    versions.add(replica.getVersionCount());
+                    versions.add(replica.getVisibleVersionCount());
 
                     allocNum = 
stableVersionCompleteAllocMap.getOrDefault(backend.getLocationTag(), (short) 0);
                     
stableVersionCompleteAllocMap.put(backend.getLocationTag(), (short) (allocNum + 
1));
@@ -599,7 +599,7 @@ public class Tablet extends MetaObject implements Writable {
             // get the max version diff
             long delta = versions.get(versions.size() - 1) - versions.get(0);
             double ratio = (double) delta / versions.get(versions.size() - 1);
-            if (versions.get(versions.size() - 1) > 
Config.min_version_count_indicate_replica_compaction_too_slow
+            if (versions.get(versions.size() - 1) >= 
Config.min_version_count_indicate_replica_compaction_too_slow
                     && ratio > 
Config.valid_version_count_delta_ratio_between_replicas) {
                 return Pair.of(TabletStatus.REPLICA_COMPACTION_TOO_SLOW, 
Priority.HIGH);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 5fd7ae721f2..c46d2a9b99f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.cooldown.CooldownConf;
+import org.apache.doris.master.PartitionInfoCollector.PartitionCollectInfo;
 import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.thrift.TPartitionVersionInfo;
 import org.apache.doris.thrift.TStorageMedium;
@@ -37,7 +38,7 @@ import org.apache.doris.transaction.TransactionStatus;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -96,7 +97,10 @@ public class TabletInvertedIndex {
     // backend id -> (tablet id -> replica)
     private Table<Long, Long, Replica> backingReplicaMetaTable = 
HashBasedTable.create();
 
-    private volatile ImmutableSet<Long> partitionIdInMemorySet = 
ImmutableSet.of();
+    // partition id -> partition info.
+    // notice partition info update every 
Config.partition_info_update_interval_secs seconds,
+    // so it may be stale.
+    private volatile ImmutableMap<Long, PartitionCollectInfo> 
partitionCollectInfoMap = ImmutableMap.of();
 
     private ForkJoinPool taskPool = new 
ForkJoinPool(Runtime.getRuntime().availableProcessors());
 
@@ -120,11 +124,13 @@ public class TabletInvertedIndex {
     }
 
     public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
+                             Map<Long, Long> backendPartitionsVersion,
                              final HashMap<Long, TStorageMedium> 
storageMediumMap,
                              ListMultimap<Long, Long> tabletSyncMap,
                              ListMultimap<Long, Long> tabletDeleteFromMeta,
                              Set<Long> tabletFoundInMeta,
                              ListMultimap<TStorageMedium, Long> 
tabletMigrationMap,
+                             Map<Long, Long> partitionVersionSyncMap,
                              Map<Long, ListMultimap<Long, 
TPartitionVersionInfo>> transactionsToPublish,
                              ListMultimap<Long, Long> transactionsToClear,
                              ListMultimap<Long, Long> tabletRecoveryMap,
@@ -132,6 +138,7 @@ public class TabletInvertedIndex {
                              List<CooldownConf> cooldownConfToPush,
                              List<CooldownConf> cooldownConfToUpdate) {
         List<Pair<TabletMeta, TTabletInfo>> cooldownTablets = new 
ArrayList<>();
+        long feTabletNum = 0;
         long stamp = readLock();
         long start = System.currentTimeMillis();
         try {
@@ -140,6 +147,7 @@ public class TabletInvertedIndex {
             }
             Map<Long, Replica> replicaMetaWithBackend = 
backingReplicaMetaTable.row(backendId);
             if (replicaMetaWithBackend != null) {
+                feTabletNum = replicaMetaWithBackend.size();
                 taskPool.submit(() -> {
                     // traverse replicas in meta with this backend
                     
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
@@ -160,11 +168,13 @@ public class TabletInvertedIndex {
                                 tabletMetaInfo = new TTabletMetaInfo();
                                 tabletMetaInfo.setReplicaId(replica.getId());
                             }
-                            if (partitionIdInMemorySet.contains(
-                                    backendTabletInfo.getPartitionId()) != 
backendTabletInfo.isIsInMemory()) {
+                            PartitionCollectInfo partitionCollectInfo =
+                                    
partitionCollectInfoMap.get(backendTabletInfo.getPartitionId());
+                            boolean isInMemory = partitionCollectInfo != null 
&& partitionCollectInfo.isInMemory();
+                            if (isInMemory != 
backendTabletInfo.isIsInMemory()) {
                                 if (tabletMetaInfo == null) {
                                     tabletMetaInfo = new TTabletMetaInfo();
-                                    
tabletMetaInfo.setIsInMemory(!backendTabletInfo.isIsInMemory());
+                                    tabletMetaInfo.setIsInMemory(isInMemory);
                                 }
                             }
                             if (Config.fix_tablet_partition_id_eq_0
@@ -323,8 +333,11 @@ public class TabletInvertedIndex {
 
                             // update replicase's version count
                             // no need to write log, and no need to get db 
lock.
-                            if (backendTabletInfo.isSetVersionCount()) {
-                                
replica.setVersionCount(backendTabletInfo.getVersionCount());
+                            if (backendTabletInfo.isSetTotalVersionCount()) {
+                                
replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount());
+                                
replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount()
+                                        ? 
backendTabletInfo.getVisibleVersionCount()
+                                                : 
backendTabletInfo.getTotalVersionCount());
                             }
                             if (tabletMetaInfo != null) {
                                 tabletMetaInfo.setTabletId(tabletId);
@@ -343,6 +356,15 @@ public class TabletInvertedIndex {
                             }
                         }
                     });
+
+                    
backendPartitionsVersion.entrySet().parallelStream().forEach(entry -> {
+                        long partitionId = entry.getKey();
+                        long backendVersion = entry.getValue();
+                        PartitionCollectInfo partitionInfo = 
partitionCollectInfoMap.get(partitionId);
+                        if (partitionInfo != null && 
partitionInfo.getVisibleVersion() > backendVersion) {
+                            partitionVersionSyncMap.put(partitionId, 
partitionInfo.getVisibleVersion());
+                        }
+                    });
                 }).join();
             }
         } finally {
@@ -351,11 +373,13 @@ public class TabletInvertedIndex {
         cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, 
cooldownConfToPush, cooldownConfToUpdate));
 
         long end = System.currentTimeMillis();
-        LOG.info("finished to do tablet diff with backend[{}]. sync: {}."
-                        + " metaDel: {}. foundInMeta: {}. migration: {}. "
-                        + "found invalid transactions {}. found republish 
transactions {}. tabletToUpdate: {}."
-                        + " need recovery: {}. cost: {} ms", backendId, 
tabletSyncMap.size(),
+        LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: 
{}, backend tablet num: {}. sync: {}."
+                        + " metaDel: {}. foundInMeta: {}. migration: {}. 
backend partition num: {}, backend need "
+                        + "update: {}. found invalid transactions {}. found 
republish "
+                        + "transactions {}. tabletToUpdate: {}. need recovery: 
{}. cost: {} ms",
+                backendId, feTabletNum, backendTablets.size(), 
tabletSyncMap.size(),
                 tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), 
tabletMigrationMap.size(),
+                backendPartitionsVersion.size(), 
partitionVersionSyncMap.size(),
                 transactionsToClear.size(), transactionsToPublish.size(), 
tabletToUpdate.size(),
                 tabletRecoveryMap.size(), (end - start));
     }
@@ -757,8 +781,8 @@ public class TabletInvertedIndex {
         }
     }
 
-    public void setPartitionIdInMemorySet(ImmutableSet<Long> 
partitionIdInMemorySet) {
-        this.partitionIdInMemorySet = partitionIdInMemorySet;
+    public void setPartitionCollectInfoMap(ImmutableMap<Long, 
PartitionCollectInfo> partitionCollectInfoMap) {
+        this.partitionCollectInfoMap = partitionCollectInfoMap;
     }
 
     public Map<Long, Long> getReplicaToTabletMap() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index e2b65a95f6e..896ecac6f8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -151,8 +151,12 @@ public class TabletStatMgr extends MasterDaemon {
                 if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) {
                     Replica replica = 
invertedIndex.getReplica(stat.getTabletId(), beId);
                     if (replica != null) {
-                        replica.updateStat(stat.getDataSize(), 
stat.getRemoteDataSize(), stat.getRowNum(),
-                                stat.getVersionCount());
+                        replica.setDataSize(stat.getDataSize());
+                        replica.setRemoteDataSize(stat.getRemoteDataSize());
+                        replica.setRowCount(stat.getRowCount());
+                        
replica.setTotalVersionCount(stat.getTotalVersionCount());
+                        
replica.setVisibleVersionCount(stat.isSetVisibleVersionCount() ? 
stat.getVisibleVersionCount()
+                                : stat.getTotalVersionCount());
                     }
                 }
             }
@@ -168,7 +172,8 @@ public class TabletStatMgr extends MasterDaemon {
                     continue;
                 }
                 // TODO(cmy) no db lock protected. I think it is ok even we 
get wrong row num
-                replica.updateStat(entry.getValue().getDataSize(), 
entry.getValue().getRowNum());
+                replica.setDataSize(entry.getValue().getDataSize());
+                replica.setRowCount(entry.getValue().getRowCount());
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 8401ec17bbd..e676774afcf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -574,15 +574,15 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         Replica chosenReplica = null;
         long maxVersionCount = Integer.MIN_VALUE;
         for (Replica replica : tablet.getReplicas()) {
-            if (replica.getVersionCount() > maxVersionCount) {
-                maxVersionCount = replica.getVersionCount();
+            if (replica.getVisibleVersionCount() > maxVersionCount) {
+                maxVersionCount = replica.getVisibleVersionCount();
                 chosenReplica = replica;
             }
         }
         boolean recovered = false;
         for (Replica replica : tablet.getReplicas()) {
             if (replica.isAlive() && replica.tooSlow() && 
(!replica.equals(chosenReplica)
-                    || replica.getVersionCount() < 
Config.min_version_count_indicate_replica_compaction_too_slow)) {
+                    || !replica.tooBigVersionCount())) {
                 if (chosenReplica != null) {
                     chosenReplica.setState(ReplicaState.NORMAL);
                     recovered = true;
@@ -1177,8 +1177,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                         "replica does not exist. backend id: " + 
destBackendId);
             }
 
-            replica.updateVersionInfo(reportedTablet.getVersion(), 
reportedTablet.getDataSize(),
-                    reportedTablet.getRemoteDataSize(), 
reportedTablet.getRowCount());
+            replica.updateWithReport(reportedTablet);
             if (replica.getLastFailedVersion() > 
partition.getCommittedVersion()
                     && reportedTablet.getVersion() >= 
partition.getCommittedVersion()
                     //&& !(reportedTablet.isSetVersionMiss() && 
reportedTablet.isVersionMiss()
@@ -1365,8 +1364,8 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
     public static class VersionCountComparator implements Comparator<Replica> {
         @Override
         public int compare(Replica r1, Replica r2) {
-            long verCount1 = r1.getVersionCount() == -1 ? Long.MAX_VALUE : 
r1.getVersionCount();
-            long verCount2 = r2.getVersionCount() == -1 ? Long.MAX_VALUE : 
r2.getVersionCount();
+            long verCount1 = r1.getVisibleVersionCount() == -1 ? 
Long.MAX_VALUE : r1.getVisibleVersionCount();
+            long verCount2 = r2.getVisibleVersionCount() == -1 ? 
Long.MAX_VALUE : r2.getVisibleVersionCount();
             if (verCount1 < verCount2) {
                 return -1;
             } else if (verCount1 > verCount2) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 357d63f0d56..3d4110054ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1110,13 +1110,13 @@ public class TabletScheduler extends MasterDaemon {
             if (replica.isAlive() && !replica.tooSlow()) {
                 normalReplicaCount++;
             }
-            if (replica.getVersionCount() > maxVersionCount) {
-                maxVersionCount = replica.getVersionCount();
+            if (replica.getVisibleVersionCount() > maxVersionCount) {
+                maxVersionCount = replica.getVisibleVersionCount();
                 chosenReplica = replica;
             }
         }
         if (chosenReplica != null && chosenReplica.isAlive() && 
!chosenReplica.tooSlow()
-                && chosenReplica.getVersionCount() > 
Config.min_version_count_indicate_replica_compaction_too_slow
+                && chosenReplica.tooBigVersionCount()
                 && normalReplicaCount - 1 >= tabletCtx.getReplicas().size() / 
2 + 1) {
             chosenReplica.setState(ReplicaState.COMPACTION_TOO_SLOW);
             LOG.info("set replica id :{} tablet id: {}, backend id: {} to 
COMPACTION_TOO_SLOW",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
index 9b2a8f6d8f7..edf2a9d3517 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
@@ -44,7 +44,7 @@ public class ReplicasProcNode implements ProcNodeInterface {
             
.add("BackendId").add("Version").add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
             
.add("SchemaHash").add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State").add("IsBad")
             .add("IsUserDrop")
-            .add("VersionCount").add("PathHash").add("Path")
+            
.add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path")
             .add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId")
             .add("CooldownMetaId").add("QueryHits").build();
 
@@ -117,7 +117,8 @@ public class ReplicasProcNode implements ProcNodeInterface {
                                         String.valueOf(replica.getState()),
                                         String.valueOf(replica.isBad()),
                                         String.valueOf(replica.isUserDrop()),
-                                        
String.valueOf(replica.getVersionCount()),
+                                        
String.valueOf(replica.getVisibleVersionCount()),
+                                        
String.valueOf(replica.getTotalVersionCount()),
                                         String.valueOf(replica.getPathHash()),
                                         path,
                                         metaUrl,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index ca50c418305..e28c74c327e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -261,8 +261,7 @@ public class TabletHealthProcDir implements 
ProcDirInterface {
                                     oversizeTabletIds.add(tablet.getId());
                                 }
                                 for (Replica replica : tablet.getReplicas()) {
-                                    if (replica.getVersionCount()
-                                            > 
Config.min_version_count_indicate_replica_compaction_too_slow) {
+                                    if (replica.tooBigVersionCount()) {
                                         replicaCompactionTooSlowNum++;
                                         
replicaCompactionTooSlowTabletIds.add(tablet.getId());
                                         break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
index c82a55bd380..46c89eb3253 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
@@ -52,9 +52,10 @@ public class TabletsProcDir implements ProcDirInterface {
             
.add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
             
.add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State")
             .add("LstConsistencyCheckTime").add("CheckVersion")
-            .add("VersionCount").add("QueryHits").add("PathHash").add("Path")
+            
.add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path")
             .add("MetaUrl").add("CompactionStatus")
-            .add("CooldownReplicaId").add("CooldownMetaId").build();
+            .add("CooldownReplicaId").add("CooldownMetaId")
+            .build();
 
     private Table table;
     private MaterializedIndex index;
@@ -113,7 +114,8 @@ public class TabletsProcDir implements ProcDirInterface {
                     tabletInfo.add(-1); // lst consistency check time
                     tabletInfo.add(-1); // check version
                     tabletInfo.add(-1); // check version hash
-                    tabletInfo.add(-1); // version count
+                    tabletInfo.add(-1); // visible version count
+                    tabletInfo.add(-1); // total version count
                     tabletInfo.add(0L); // query hits
                     tabletInfo.add(-1); // path hash
                     tabletInfo.add(FeConstants.null_string); // path
@@ -147,7 +149,8 @@ public class TabletsProcDir implements ProcDirInterface {
 
                         
tabletInfo.add(TimeUtils.longToTimeString(tablet.getLastCheckTime()));
                         tabletInfo.add(tablet.getCheckedVersion());
-                        tabletInfo.add(replica.getVersionCount());
+                        tabletInfo.add(replica.getVisibleVersionCount());
+                        tabletInfo.add(replica.getTotalVersionCount());
                         
tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L));
                         tabletInfo.add(replica.getPathHash());
                         
tabletInfo.add(pathHashToRoot.getOrDefault(replica.getPathHash(), ""));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index c8f52d5b4b2..b6c0c73eae2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1036,7 +1036,10 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         Tablet tablet = materializedIndex.getTablet(info.getTabletId());
         Replica replica = tablet.getReplicaById(info.getReplicaId());
         Preconditions.checkNotNull(replica, info);
-        replica.updateVersionInfo(info.getVersion(), info.getDataSize(), 
info.getRemoteDataSize(), info.getRowCount());
+        replica.updateVersion(info.getVersion());
+        replica.setDataSize(info.getDataSize());
+        replica.setRemoteDataSize(info.getRemoteDataSize());
+        replica.setRowCount(info.getRowCount());
         replica.setBad(false);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java
 b/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java
similarity index 68%
rename from 
fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java
index 77ed5829799..f4bf87ad109 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java
@@ -27,29 +27,49 @@ import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.MasterDaemon;
 
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableMap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 
-public class PartitionInMemoryInfoCollector extends MasterDaemon {
+public class PartitionInfoCollector extends MasterDaemon {
 
-    private static final Logger LOG = 
LogManager.getLogger(PartitionInMemoryInfoCollector.class);
+    // notice since collect partition info every 
Config.partition_info_update_interval_secs seconds,
+    // so partition collect info may be stale
+    public static class PartitionCollectInfo {
+        private long visibleVersion;
+        private boolean isInMemory;
 
-    public PartitionInMemoryInfoCollector() {
-        super("PartitionInMemoryInfoCollector", 
Config.partition_in_memory_update_interval_secs * 1000);
+        PartitionCollectInfo(long visibleVersion, boolean isInMemory) {
+            this.visibleVersion = visibleVersion;
+            this.isInMemory = isInMemory;
+        }
+
+        public long getVisibleVersion() {
+            return this.visibleVersion;
+        }
+
+        public boolean isInMemory() {
+            return this.isInMemory;
+        }
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(PartitionInfoCollector.class);
+
+    public PartitionInfoCollector() {
+        super("PartitionInfoCollector", 
Config.partition_info_update_interval_secs * 1000);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        updatePartitionInMemoryInfo();
+        updatePartitionCollectInfo();
     }
 
-    private void updatePartitionInMemoryInfo() {
+    private void updatePartitionCollectInfo() {
         Env env = Env.getCurrentEnv();
         TabletInvertedIndex tabletInvertedIndex = env.getTabletInvertedIndex();
-        ImmutableSet.Builder builder = ImmutableSet.builder();
+        ImmutableMap.Builder builder = ImmutableMap.builder();
         List<Long> dbIdList = env.getInternalCatalog().getDbIds();
         for (Long dbId : dbIdList) {
             Database db = env.getInternalCatalog().getDbNullable(dbId);
@@ -70,10 +90,12 @@ public class PartitionInMemoryInfoCollector extends 
MasterDaemon {
                     try {
                         OlapTable olapTable = (OlapTable) table;
                         for (Partition partition : 
olapTable.getAllPartitions()) {
-                            if 
(olapTable.getPartitionInfo().getIsInMemory(partition.getId())) {
+                            boolean isInMemory = 
olapTable.getPartitionInfo().getIsInMemory(partition.getId());
+                            if (isInMemory) {
                                 partitionInMemoryCount++;
-                                builder.add(partition.getId());
                             }
+                            builder.put(partition.getId(),
+                                    new 
PartitionCollectInfo(partition.getVisibleVersion(), isInMemory));
                         }
                     } finally {
                         table.readUnlock();
@@ -87,9 +109,6 @@ public class PartitionInMemoryInfoCollector extends 
MasterDaemon {
                 LOG.warn("Update database[" + db.getFullName() + "] partition 
in memory info failed", e);
             }
         }
-        ImmutableSet<Long> partitionIdInMemorySet = builder.build();
-        tabletInvertedIndex.setPartitionIdInMemorySet(partitionIdInMemorySet);
+        tabletInvertedIndex.setPartitionCollectInfoMap(builder.build());
     }
-
-
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 7c77c1dfad3..635a8bb675f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -71,6 +71,7 @@ import org.apache.doris.task.PushCooldownConfTask;
 import org.apache.doris.task.PushStoragePolicyTask;
 import org.apache.doris.task.StorageMediaMigrationTask;
 import org.apache.doris.task.UpdateTabletMetaInfoTask;
+import org.apache.doris.task.UpdateVisibleVersionTask;
 import org.apache.doris.thrift.TBackend;
 import org.apache.doris.thrift.TDisk;
 import org.apache.doris.thrift.TMasterResult;
@@ -155,6 +156,7 @@ public class ReportHandler extends Daemon {
         Map<TTaskType, Set<Long>> tasks = null;
         Map<String, TDisk> disks = null;
         Map<Long, TTablet> tablets = null;
+        Map<Long, Long> partitionsVersion = null;
         long reportVersion = -1;
 
         ReportType reportType = ReportType.UNKNOWN;
@@ -180,11 +182,15 @@ public class ReportHandler extends Daemon {
             reportType = ReportType.TABLET;
         }
 
+        if (request.isSetPartitionsVersion()) {
+            partitionsVersion = request.getPartitionsVersion();
+        }
+
         if (request.isSetTabletMaxCompactionScore()) {
             
backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore());
         }
 
-        ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, 
reportVersion,
+        ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, 
partitionsVersion, reportVersion,
                 request.getStoragePolicy(), request.getResource(), 
request.getNumCores(),
                 request.getPipelineExecutorSize());
         try {
@@ -231,6 +237,7 @@ public class ReportHandler extends Daemon {
         private Map<TTaskType, Set<Long>> tasks;
         private Map<String, TDisk> disks;
         private Map<Long, TTablet> tablets;
+        private Map<Long, Long> partitionsVersion;
         private long reportVersion;
 
         private List<TStoragePolicy> storagePolicies;
@@ -239,14 +246,15 @@ public class ReportHandler extends Daemon {
         private int pipelineExecutorSize;
 
         public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks,
-                Map<String, TDisk> disks,
-                Map<Long, TTablet> tablets, long reportVersion,
+                Map<String, TDisk> disks, Map<Long, TTablet> tablets,
+                Map<Long, Long> partitionsVersion, long reportVersion,
                 List<TStoragePolicy> storagePolicies, List<TStorageResource> 
storageResources, int cpuCores,
                 int pipelineExecutorSize) {
             this.beId = beId;
             this.tasks = tasks;
             this.disks = disks;
             this.tablets = tablets;
+            this.partitionsVersion = partitionsVersion;
             this.reportVersion = reportVersion;
             this.storagePolicies = storagePolicies;
             this.storageResources = storageResources;
@@ -273,7 +281,11 @@ public class ReportHandler extends Daemon {
                     LOG.warn("out of date report version {} from backend[{}]. 
current report version[{}]",
                             reportVersion, beId, backendReportVersion);
                 } else {
-                    ReportHandler.tabletReport(beId, tablets, reportVersion);
+                    Map<Long, Long> partitions = this.partitionsVersion;
+                    if (partitions == null) {
+                        partitions = Maps.newHashMap();
+                    }
+                    ReportHandler.tabletReport(beId, tablets, partitions, 
reportVersion);
                 }
             }
         }
@@ -408,7 +420,8 @@ public class ReportHandler extends Daemon {
     }
 
     // public for fe ut
-    public static void tabletReport(long backendId, Map<Long, TTablet> 
backendTablets, long backendReportVersion) {
+    public static void tabletReport(long backendId, Map<Long, TTablet> 
backendTablets,
+            Map<Long, Long> backendPartitionsVersion, long 
backendReportVersion) {
         long start = System.currentTimeMillis();
         LOG.info("backend[{}] reports {} tablet(s). report version: {}",
                 backendId, backendTablets.size(), backendReportVersion);
@@ -426,6 +439,9 @@ public class ReportHandler extends Daemon {
         // storage medium -> tablet id
         ListMultimap<TStorageMedium, Long> tabletMigrationMap = 
LinkedListMultimap.create();
 
+        // partition id -> visible version
+        Map<Long, Long> partitionVersionSyncMap = Maps.newConcurrentMap();
+
         // dbid -> txn id -> [partition info]
         Map<Long, ListMultimap<Long, TPartitionVersionInfo>> 
transactionsToPublish = Maps.newHashMap();
         ListMultimap<Long, Long> transactionsToClear = 
LinkedListMultimap.create();
@@ -439,11 +455,13 @@ public class ReportHandler extends Daemon {
         List<CooldownConf> cooldownConfToUpdate = new LinkedList<>();
 
         // 1. do the diff. find out (intersection) / (be - meta) / (meta - be)
-        Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, 
storageMediumMap,
+        Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, 
backendPartitionsVersion,
+                storageMediumMap,
                 tabletSyncMap,
                 tabletDeleteFromMeta,
                 tabletFoundInMeta,
                 tabletMigrationMap,
+                partitionVersionSyncMap,
                 transactionsToPublish,
                 transactionsToClear,
                 tabletRecoveryMap,
@@ -499,6 +517,9 @@ public class ReportHandler extends Daemon {
         if (!cooldownConfToUpdate.isEmpty()) {
             
Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate);
         }
+        if (!partitionVersionSyncMap.isEmpty()) {
+            handleUpdatePartitionVersion(partitionVersionSyncMap, backendId);
+        }
 
         final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
         Backend reportBackend = currentSystemInfo.getBackend(backendId);
@@ -659,27 +680,17 @@ public class ReportHandler extends Daemon {
                     // if the last failed version is changed, then fe will 
think schema change successfully.
                     // this is an fatal error.
                     if (replica.getState() == ReplicaState.NORMAL) {
-                        long metaVersion = replica.getVersion();
-                        long backendVersion = -1L;
-                        long rowCount = -1L;
-                        long dataSize = -1L;
-                        long remoteDataSize = -1L;
                         // schema change maybe successfully in fe, but not 
inform be,
                         // then be will report two schema hash
                         // just select the dest schema hash
-                        for (TTabletInfo tabletInfo : 
backendTablets.get(tabletId).getTabletInfos()) {
-                            if (tabletInfo.getSchemaHash() == schemaHash) {
-                                backendVersion = tabletInfo.getVersion();
-                                rowCount = tabletInfo.getRowCount();
-                                dataSize = tabletInfo.getDataSize();
-                                remoteDataSize = 
tabletInfo.getRemoteDataSize();
-                                break;
-                            }
-                        }
-                        if (backendVersion == -1L) {
+                        TTabletInfo tabletInfo = 
backendTablets.get(tabletId).getTabletInfos().stream()
+                                .filter(t -> t.getSchemaHash() == 
schemaHash).findFirst().orElse(null);
+                        if (tabletInfo == null) {
                             continue;
                         }
 
+                        long metaVersion = replica.getVersion();
+                        long backendVersion = tabletInfo.getVersion();
                         boolean needSync = false;
                         if (metaVersion < backendVersion) {
                             needSync = true;
@@ -702,7 +713,7 @@ public class ReportHandler extends Daemon {
                             // happens when
                             // 1. PUSH finished in BE but failed or not yet 
report to FE
                             // 2. repair for VERSION_INCOMPLETE finished in 
BE, but failed or not yet report to FE
-                            replica.updateVersionInfo(backendVersion, 
dataSize, remoteDataSize, rowCount);
+                            replica.updateWithReport(tabletInfo);
 
                             if (replica.getLastFailedVersion() < 0) {
                                 if (replica.setBad(false)) {
@@ -716,7 +727,9 @@ public class ReportHandler extends Daemon {
                                 ReplicaPersistInfo info = 
ReplicaPersistInfo.createForClone(dbId, tableId,
                                         partitionId, indexId, tabletId, 
backendId, replica.getId(),
                                         replica.getVersion(), schemaHash,
-                                        dataSize, remoteDataSize, rowCount,
+                                        tabletInfo.getDataSize(),
+                                        tabletInfo.getRemoteDataSize(),
+                                        tabletInfo.getRowCount(),
                                         replica.getLastFailedVersion(),
                                         replica.getLastSuccessVersion());
                                 
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
@@ -1033,6 +1046,14 @@ public class ReportHandler extends Daemon {
         AgentTaskExecutor.submit(batchTask);
     }
 
+    private static void handleUpdatePartitionVersion(Map<Long, Long> 
partitionVersionSyncMap, long backendId) {
+        AgentBatchTask batchTask = new AgentBatchTask();
+        UpdateVisibleVersionTask task = new 
UpdateVisibleVersionTask(backendId, partitionVersionSyncMap,
+                System.currentTimeMillis());
+        batchTask.addTask(task);
+        AgentTaskExecutor.submit(batchTask);
+    }
+
     private static void handleRecoverTablet(ListMultimap<Long, Long> 
tabletRecoveryMap,
                                             Map<Long, TTablet> backendTablets, 
long backendId) {
         // print a warn log here to indicate the exceptions on the backend
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
index 2b084fcfa23..5e7748a3524 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
@@ -26,7 +26,6 @@ import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
-import org.apache.doris.common.Config;
 
 import com.google.common.collect.Lists;
 import org.json.simple.JSONObject;
@@ -154,9 +153,9 @@ public class Diagnoser {
                         + ", and is bad: " + (replica.isBad() ? "Yes" : "No")
                         + ", and is going to drop: " + (replica.isUserDrop() ? 
"Yes" : "No"));
             }
-            if (replica.getVersionCount() > 
Config.min_version_count_indicate_replica_compaction_too_slow) {
+            if (replica.tooBigVersionCount()) {
                 compactionErr.append("Replica on backend " + 
replica.getBackendId() + "'s version count is too high: "
-                        + replica.getVersionCount());
+                        + replica.getVisibleVersionCount());
             }
         }
         results.add(Lists.newArrayList("ReplicaBackendStatus", 
(backendErr.length() == 0
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 03a82cbb56b..360d24c573c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -48,6 +48,7 @@ import org.apache.doris.thrift.TStorageMediumMigrateReq;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUpdateTabletMetaInfoReq;
 import org.apache.doris.thrift.TUploadReq;
+import org.apache.doris.thrift.TVisibleVersionReq;
 
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
@@ -402,6 +403,15 @@ public class AgentBatchTask implements Runnable {
                 tAgentTaskRequest.setCleanTrashReq(request);
                 return tAgentTaskRequest;
             }
+            case UPDATE_VISIBLE_VERSION: {
+                UpdateVisibleVersionTask visibleTask = 
(UpdateVisibleVersionTask) task;
+                TVisibleVersionReq request = visibleTask.toThrift();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(request.toString());
+                }
+                tAgentTaskRequest.setVisibleVersionReq(request);
+                return tAgentTaskRequest;
+            }
             default:
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("could not find task type for task [{}]", task);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java
new file mode 100644
index 00000000000..52ed3b1c490
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.thrift.TTaskType;
+import org.apache.doris.thrift.TVisibleVersionReq;
+
+import java.util.Map;
+
+public class UpdateVisibleVersionTask extends AgentTask {
+    private Map<Long, Long> partitionVisibleVersions;
+
+    public UpdateVisibleVersionTask(long backendId, Map<Long, Long> 
partitionVisibleVersions, long createTime) {
+        super(null, backendId, TTaskType.UPDATE_VISIBLE_VERSION, -1L, -1L, 
-1L, -1L, -1L, -1L, createTime);
+        this.partitionVisibleVersions = partitionVisibleVersions;
+    }
+
+    public TVisibleVersionReq toThrift() {
+        TVisibleVersionReq request = new TVisibleVersionReq();
+        partitionVisibleVersions.forEach((partitionId, version) -> {
+            request.putToPartitionVersion(partitionId, version);
+        });
+        return request;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index efcc760f0d2..3996664708a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -44,6 +44,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.QuotaExceedException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.InternalDatabaseUtil;
 import org.apache.doris.common.util.MetaLockUtils;
@@ -986,7 +987,12 @@ public class DatabaseTransactionMgr {
         }
     }
 
-    public void finishTransaction(long transactionId) throws UserException {
+    public void finishTransaction(long transactionId, Map<Long, Long> 
partitionVisibleVersions,
+            Map<Long, Set<Long>> backendPartitions) throws UserException {
+        if 
(DebugPointUtil.isEnable("DatabaseTransactionMgr.stop_finish_transaction")) {
+            return;
+        }
+
         TransactionState transactionState = null;
         readLock();
         try {
@@ -1049,7 +1055,7 @@ public class DatabaseTransactionMgr {
                     LOG.warn("afterStateTransform txn {} failed. exception: ", 
transactionState, e);
                 }
             }
-            updateCatalogAfterVisible(transactionState, db);
+            updateCatalogAfterVisible(transactionState, db, 
partitionVisibleVersions, backendPartitions);
         } finally {
             MetaLockUtils.writeUnlockTables(tableList);
         }
@@ -1964,7 +1970,8 @@ public class DatabaseTransactionMgr {
         }
     }
 
-    private boolean updateCatalogAfterVisible(TransactionState 
transactionState, Database db) {
+    private boolean updateCatalogAfterVisible(TransactionState 
transactionState, Database db,
+            Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> 
backendPartitions) {
         Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
         AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
         List<Long> newPartitionLoadedTableIds = new ArrayList<>();
@@ -2021,7 +2028,13 @@ public class DatabaseTransactionMgr {
                                     lastFailedVersion = newCommitVersion;
                                 }
                             }
-                            replica.updateVersionWithFailedInfo(newVersion, 
lastFailedVersion, lastSuccessVersion);
+                            replica.updateVersionWithFailed(newVersion, 
lastFailedVersion, lastSuccessVersion);
+                            Set<Long> partitionIds = 
backendPartitions.get(replica.getBackendId());
+                            if (partitionIds == null) {
+                                partitionIds = Sets.newHashSet();
+                                backendPartitions.put(replica.getBackendId(), 
partitionIds);
+                            }
+                            partitionIds.add(partitionId);
                         }
                     }
                 } // end for indices
@@ -2032,6 +2045,7 @@ public class DatabaseTransactionMgr {
                     newPartitionLoadedTableIds.add(tableId);
                 }
                 partition.updateVisibleVersionAndTime(version, versionTime);
+                partitionVisibleVersions.put(partition.getId(), version);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("transaction state {} set partition {}'s version 
to [{}]",
                             transactionState, partition.getId(), version);
@@ -2180,7 +2194,7 @@ public class DatabaseTransactionMgr {
             if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
                 updateCatalogAfterCommitted(transactionState, db, true);
             } else if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-                updateCatalogAfterVisible(transactionState, db);
+                updateCatalogAfterVisible(transactionState, db, 
Maps.newHashMap(), Maps.newHashMap());
             }
             unprotectUpsertTransactionState(transactionState, true);
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index dd62c02f408..24d92849b49 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -414,9 +414,10 @@ public class GlobalTransactionMgr implements Writable {
      * @param transactionId
      * @return
      */
-    public void finishTransaction(long dbId, long transactionId) throws 
UserException {
+    public void finishTransaction(long dbId, long transactionId, Map<Long, 
Long> partitionVisibleVersions,
+            Map<Long, Set<Long>> backendPartitions) throws UserException {
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
-        dbTransactionMgr.finishTransaction(transactionId);
+        dbTransactionMgr.finishTransaction(transactionId, 
partitionVisibleVersions, backendPartitions);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index a6665c2e220..22ca57f2399 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -32,6 +32,7 @@ import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.task.UpdateVisibleVersionTask;
 import org.apache.doris.thrift.TPartitionVersionInfo;
 import org.apache.doris.thrift.TTaskType;
 
@@ -61,14 +62,18 @@ public class PublishVersionDaemon extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
+        Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+        Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+
         try {
-            publishVersion();
+            publishVersion(partitionVisibleVersions, backendPartitions);
+            sendBackendVisibleVersion(partitionVisibleVersions, 
backendPartitions);
         } catch (Throwable t) {
             LOG.error("errors while publish version to all backends", t);
         }
     }
 
-    private void publishVersion() {
+    private void publishVersion(Map<Long, Long> partitionVisibleVersions, 
Map<Long, Set<Long>> backendPartitions) {
         if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) {
             return;
         }
@@ -177,7 +182,7 @@ public class PublishVersionDaemon extends MasterDaemon {
                 try {
                     // one transaction exception should not affect other 
transaction
                     
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
-                            transactionState.getTransactionId());
+                            transactionState.getTransactionId(), 
partitionVisibleVersions, backendPartitions);
                 } catch (Exception e) {
                     LOG.warn("error happens when finish transaction {}", 
transactionState.getTransactionId(), e);
                 }
@@ -232,4 +237,23 @@ public class PublishVersionDaemon extends MasterDaemon {
                 .collect(Collectors.groupingBy(p -> p.first,
                         Collectors.mapping(p -> p.second, 
Collectors.toSet())));
     }
+
+    private void sendBackendVisibleVersion(Map<Long, Long> 
partitionVisibleVersions,
+            Map<Long, Set<Long>> backendPartitions) {
+        if (partitionVisibleVersions.isEmpty() || backendPartitions.isEmpty()) 
{
+            return;
+        }
+
+        long createTime = System.currentTimeMillis();
+        AgentBatchTask batchTask = new AgentBatchTask();
+        backendPartitions.forEach((backendId, partitionIds) -> {
+            Map<Long, Long> backendPartitionVersions = 
partitionVisibleVersions.entrySet().stream()
+                    .filter(entry -> partitionIds.contains(entry.getKey()))
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+            UpdateVisibleVersionTask task = new 
UpdateVisibleVersionTask(backendId, backendPartitionVersions,
+                    createTime);
+            batchTask.addTask(task);
+        });
+        AgentTaskExecutor.submit(batchTask);
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java 
b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
index e93f524a919..143dd920397 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
@@ -221,10 +221,7 @@ public class RollupJobV2Test {
         MaterializedIndex shadowIndex = 
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
         for (Tablet shadowTablet : shadowIndex.getTablets()) {
             for (Replica shadowReplica : shadowTablet.getReplicas()) {
-                
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
-                        shadowReplica.getDataSize(),
-                        shadowReplica.getRemoteDataSize(),
-                        shadowReplica.getRowCount());
+                shadowReplica.updateVersion(testPartition.getVisibleVersion());
             }
         }
 
@@ -301,10 +298,7 @@ public class RollupJobV2Test {
         MaterializedIndex shadowIndex = 
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
         for (Tablet shadowTablet : shadowIndex.getTablets()) {
             for (Replica shadowReplica : shadowTablet.getReplicas()) {
-                
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
-                        shadowReplica.getDataSize(),
-                        shadowReplica.getRemoteDataSize(),
-                        shadowReplica.getRowCount());
+                shadowReplica.updateVersion(testPartition.getVisibleVersion());
             }
         }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java 
b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index 60020d08a8c..7a49a413750 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -213,8 +213,7 @@ public class SchemaChangeJobV2Test {
         MaterializedIndex shadowIndex = 
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
         for (Tablet shadowTablet : shadowIndex.getTablets()) {
             for (Replica shadowReplica : shadowTablet.getReplicas()) {
-                
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), 
shadowReplica.getDataSize(),
-                        shadowReplica.getRemoteDataSize(), 
shadowReplica.getRowCount());
+                shadowReplica.updateVersion(testPartition.getVisibleVersion());
             }
         }
 
@@ -296,8 +295,7 @@ public class SchemaChangeJobV2Test {
         MaterializedIndex shadowIndex = 
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
         for (Tablet shadowTablet : shadowIndex.getTablets()) {
             for (Replica shadowReplica : shadowTablet.getReplicas()) {
-                
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), 
shadowReplica.getDataSize(),
-                        shadowReplica.getRemoteDataSize(), 
shadowReplica.getRowCount());
+                shadowReplica.updateVersion(testPartition.getVisibleVersion());
             }
         }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java
index 21fe967a96d..54debab9a63 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java
@@ -73,7 +73,8 @@ public class ShowReplicaTest extends TestWithFeService {
             for (MaterializedIndex index : 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
                 for (Tablet tablet : index.getTablets()) {
                     for (Replica replica : tablet.getReplicas()) {
-                        replica.updateStat(1024, 2);
+                        replica.setDataSize(1024L);
+                        replica.setRowCount(2L);
                     }
                 }
             }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java
index d6a81cdd883..eb7dbca0775 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java
@@ -65,12 +65,7 @@ public class ReplicaTest {
 
         // update new version
         long newVersion = version + 1;
-        long newDataSize = dataSize + 100;
-        long newRowCount = rowCount + 10;
-        replica.updateVersionInfo(newVersion, newDataSize, 0, newRowCount);
-        Assert.assertEquals(newVersion, replica.getVersion());
-        Assert.assertEquals(newDataSize, replica.getDataSize());
-        Assert.assertEquals(newRowCount, replica.getRowCount());
+        replica.updateVersion(newVersion);
 
         // check version catch up
         Assert.assertFalse(replica.checkVersionCatchUp(5, false));
@@ -132,14 +127,14 @@ public class ReplicaTest {
     public void testUpdateVersion1() {
         Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, 
ReplicaState.NORMAL, 0, 3);
         // new version is little than original version, it is invalid the 
version will not update
-        originalReplica.updateVersionInfo(2, 100, 0, 78);
+        originalReplica.updateVersion(2);
         Assert.assertEquals(3, originalReplica.getVersion());
     }
 
     @Test
     public void testUpdateVersion2() {
         Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, 
ReplicaState.NORMAL, 0, 0);
-        originalReplica.updateVersionInfo(3, 100, 0, 78);
+        originalReplica.updateVersion(3);
         // if new version >= current version and last success version <= new 
version, then last success version should be updated
         Assert.assertEquals(3, originalReplica.getLastSuccessVersion());
         Assert.assertEquals(3, originalReplica.getVersion());
@@ -155,7 +150,7 @@ public class ReplicaTest {
         Assert.assertEquals(8, originalReplica.getLastFailedVersion());
 
         // update last success version 10
-        
originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(),
+        originalReplica.updateVersionWithFailed(originalReplica.getVersion(),
                 originalReplica.getLastFailedVersion(),
                 10);
         Assert.assertEquals(10, originalReplica.getLastSuccessVersion());
@@ -163,7 +158,7 @@ public class ReplicaTest {
         Assert.assertEquals(8, originalReplica.getLastFailedVersion());
 
         // update version to 8, the last success version and version should be 
10
-        originalReplica.updateVersionInfo(8, 100, 0, 78);
+        originalReplica.updateVersion(8);
         Assert.assertEquals(10, originalReplica.getLastSuccessVersion());
         Assert.assertEquals(10, originalReplica.getVersion());
         Assert.assertEquals(-1, originalReplica.getLastFailedVersion());
@@ -175,7 +170,7 @@ public class ReplicaTest {
         Assert.assertEquals(12, originalReplica.getLastFailedVersion());
 
         // update last success version to 15
-        
originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(),
+        originalReplica.updateVersionWithFailed(originalReplica.getVersion(),
                 originalReplica.getLastFailedVersion(),
                 15);
         Assert.assertEquals(15, originalReplica.getLastSuccessVersion());
@@ -189,13 +184,13 @@ public class ReplicaTest {
         Assert.assertEquals(18, originalReplica.getLastFailedVersion());
 
         // update version to 17 then version and success version is 17
-        originalReplica.updateVersionInfo(17, 100, 0, 78);
+        originalReplica.updateVersion(17);
         Assert.assertEquals(17, originalReplica.getLastSuccessVersion());
         Assert.assertEquals(17, originalReplica.getVersion());
         Assert.assertEquals(18, originalReplica.getLastFailedVersion());
 
         // update version to 18, then version and last success version should 
be 18 and failed version should be -1
-        originalReplica.updateVersionInfo(18, 100, 0, 78);
+        originalReplica.updateVersion(18);
         Assert.assertEquals(18, originalReplica.getLastSuccessVersion());
         Assert.assertEquals(18, originalReplica.getVersion());
         Assert.assertEquals(-1, originalReplica.getLastFailedVersion());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
index f563b846ed7..40b6683da3d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
@@ -110,7 +110,8 @@ public class DiskReblanceWhenSchedulerIdle extends 
TestWithFeService {
             Lists.newArrayList(tablet.getReplicas()).forEach(
                     replica -> {
                     if (replica.getBackendId() == backends.get(1).getId()) {
-                        replica.updateStat(totalCapacity / 4, 1);
+                        replica.setDataSize(totalCapacity / 4);
+                        replica.setRowCount(1);
                         tablet.deleteReplica(replica);
                         replica.setBackendId(backends.get(0).getId());
                         replica.setPathHash(diskInfo0.getPathHash());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 063faf2d3b2..1e6af5c7324 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -104,7 +104,7 @@ public class RebalancerTestUtil {
             replica.setPathHash(beIds.get(i));
             if (replicaSizes != null) {
                 // for disk rebalancer, every beId corresponding to a 
replicaSize
-                replica.updateStat(replicaSizes.get(i), 0);
+                replica.setDataSize(replicaSizes.get(i));
             }
             // isRestore set true, to avoid modifying 
Catalog.getCurrentInvertedIndex
             tablet.addReplica(replica, true);
@@ -165,7 +165,8 @@ public class RebalancerTestUtil {
                             for (Tablet tablet : idx.getTablets()) {
                                 long tabletSize = tableBaseSize * (1 + 
random.nextInt(tabletSkew));
                                 for (Replica replica : tablet.getReplicas()) {
-                                    replica.updateStat(tabletSize, 1000L);
+                                    replica.setDataSize(tabletSize);
+                                    replica.setRowCount(1000L);
                                 }
                             }
                         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
index 1150b32192f..1ac497dbebe 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
@@ -106,7 +106,7 @@ public class RepairVersionTest extends TestWithFeService {
         tablets.put(tablet.getId(), tTablet);
         Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
 
-        ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+        ReportHandler.tabletReport(replica.getBackendId(), tablets, 
Maps.newHashMap(), 100L);
 
         Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
         Assertions.assertEquals(-1L, replica.getLastFailedVersion());
@@ -136,11 +136,11 @@ public class RepairVersionTest extends TestWithFeService {
         Map<Long, TTablet> tablets = Maps.newHashMap();
         tablets.put(tablet.getId(), tTablet);
 
-        ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+        ReportHandler.tabletReport(replica.getBackendId(), tablets, 
Maps.newHashMap(), 100L);
         Assertions.assertEquals(-1L, replica.getLastFailedVersion());
 
         DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", 
new DebugPoint());
-        ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+        ReportHandler.tabletReport(replica.getBackendId(), tablets, 
Maps.newHashMap(), 100L);
         Assertions.assertEquals(replica.getVersion() + 1, 
replica.getLastFailedVersion());
 
         Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
@@ -160,7 +160,7 @@ public class RepairVersionTest extends TestWithFeService {
         long visibleVersion = 2L;
         partition.updateVisibleVersion(visibleVersion);
         partition.setNextVersion(visibleVersion + 1);
-        tablet.getReplicas().forEach(replica -> 
replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L));
+        tablet.getReplicas().forEach(replica -> 
replica.updateVersion(visibleVersion));
 
         Replica replica = tablet.getReplicas().iterator().next();
         Assertions.assertEquals(visibleVersion, replica.getVersion());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
index 6a38985b73f..7d918ef7db5 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java
@@ -141,7 +141,7 @@ public class TabletReplicaTooSlowTest {
             List<Long> pathHashes = be.getDisks().values().stream()
                     .map(DiskInfo::getPathHash).collect(Collectors.toList());
             Replica replica = cell.getValue();
-            replica.setVersionCount(versionCount);
+            replica.setVisibleVersionCount(versionCount);
             versionCount = versionCount + 200;
 
             replica.setPathHash(pathHashes.get(0));
@@ -171,7 +171,7 @@ public class TabletReplicaTooSlowTest {
             boolean found = false;
             for (Table.Cell<Long, Long, Replica> cell : 
replicaMetaTable.cellSet()) {
                 Replica replica = cell.getValue();
-                if (replica.getVersionCount() == 401) {
+                if (replica.getVisibleVersionCount() == 401) {
                     if (replica.tooSlow()) {
                         LOG.info("set to TOO_SLOW.");
                     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
index d4578e17d7f..852f072eca1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
@@ -84,19 +84,19 @@ public class TabletSchedCtxTest {
         TabletSchedCtx.VersionCountComparator countComparator = new 
TabletSchedCtx.VersionCountComparator();
         List<Replica> replicaList = Lists.newArrayList();
         Replica replica1 = new Replica();
-        replica1.setVersionCount(100);
+        replica1.setVisibleVersionCount(100);
         replica1.setState(Replica.ReplicaState.NORMAL);
 
         Replica replica2 = new Replica();
-        replica2.setVersionCount(50);
+        replica2.setVisibleVersionCount(50);
         replica2.setState(Replica.ReplicaState.NORMAL);
 
         Replica replica3 = new Replica();
-        replica3.setVersionCount(-1);
+        replica3.setVisibleVersionCount(-1);
         replica3.setState(Replica.ReplicaState.NORMAL);
 
         Replica replica4 = new Replica();
-        replica4.setVersionCount(200);
+        replica4.setVisibleVersionCount(200);
         replica4.setState(Replica.ReplicaState.NORMAL);
 
         replicaList.add(replica1);
@@ -105,10 +105,10 @@ public class TabletSchedCtxTest {
         replicaList.add(replica4);
 
         Collections.sort(replicaList, countComparator);
-        Assert.assertEquals(50, replicaList.get(0).getVersionCount());
-        Assert.assertEquals(100, replicaList.get(1).getVersionCount());
-        Assert.assertEquals(200, replicaList.get(2).getVersionCount());
-        Assert.assertEquals(-1, replicaList.get(3).getVersionCount());
+        Assert.assertEquals(50, replicaList.get(0).getVisibleVersionCount());
+        Assert.assertEquals(100, replicaList.get(1).getVisibleVersionCount());
+        Assert.assertEquals(200, replicaList.get(2).getVisibleVersionCount());
+        Assert.assertEquals(-1, replicaList.get(3).getVisibleVersionCount());
     }
 
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 76fe82e6599..9ff6e7ac3ca 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1101,7 +1101,7 @@ public class QueryPlanTest extends TestWithFeService {
                 mIndex.setRowCount(10000);
                 for (Tablet tablet : mIndex.getTablets()) {
                     for (Replica replica : tablet.getReplicas()) {
-                        replica.updateVersionInfo(2, 200000, 0, 10000);
+                        replica.updateVersion(2);
                     }
                 }
             }
@@ -1115,7 +1115,7 @@ public class QueryPlanTest extends TestWithFeService {
                 mIndex.setRowCount(10000);
                 for (Tablet tablet : mIndex.getTablets()) {
                     for (Replica replica : tablet.getReplicas()) {
-                        replica.updateVersionInfo(2, 200000, 0, 10000);
+                        replica.updateVersion(2);
                     }
                 }
             }
@@ -1199,7 +1199,7 @@ public class QueryPlanTest extends TestWithFeService {
                 mIndex.setRowCount(10000);
                 for (Tablet tablet : mIndex.getTablets()) {
                     for (Replica replica : tablet.getReplicas()) {
-                        replica.updateVersionInfo(2, 200000, 0, 10000);
+                        replica.updateVersion(2);
                     }
                 }
             }
@@ -1229,7 +1229,7 @@ public class QueryPlanTest extends TestWithFeService {
                 mIndex.setRowCount(0);
                 for (Tablet tablet : mIndex.getTablets()) {
                     for (Replica replica : tablet.getReplicas()) {
-                        replica.updateVersionInfo(2, 0, 0, 0);
+                        replica.updateVersion(2);
                     }
                 }
             }
@@ -1249,7 +1249,7 @@ public class QueryPlanTest extends TestWithFeService {
                 mIndex.setRowCount(10000);
                 for (Tablet tablet : mIndex.getTablets()) {
                     for (Replica replica : tablet.getReplicas()) {
-                        replica.updateVersionInfo(2, 200000, 0, 10000);
+                        replica.updateVersion(2);
                     }
                 }
             }
@@ -1278,7 +1278,7 @@ public class QueryPlanTest extends TestWithFeService {
                 mIndex.setRowCount(0);
                 for (Tablet tablet : mIndex.getTablets()) {
                     for (Replica replica : tablet.getReplicas()) {
-                        replica.updateVersionInfo(2, 0, 0, 0);
+                        replica.updateVersion(2);
                     }
                 }
             }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index ea63a5e18b1..437f1bcb209 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -42,6 +42,7 @@ import org.junit.rules.ExpectedException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class DatabaseTransactionMgrTest {
 
@@ -115,7 +116,10 @@ public class DatabaseTransactionMgrTest {
         setTransactionFinishPublish(transactionState1,
                 Lists.newArrayList(CatalogTestUtil.testBackendId1,
                         CatalogTestUtil.testBackendId2, 
CatalogTestUtil.testBackendId3));
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId1);
+        Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+        Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId1,
+                partitionVisibleVersions, backendPartitions);
         labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1);
 
         // txn 2, 3, 4
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 4f22d95c60b..5c8f72723ad 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -53,8 +53,10 @@ import 
org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import mockit.Injectable;
 import mockit.Mocked;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -65,6 +67,7 @@ import org.junit.Test;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 public class GlobalTransactionMgrTest {
@@ -474,7 +477,10 @@ public class GlobalTransactionMgrTest {
                         CatalogTestUtil.testBackendId2, 
CatalogTestUtil.testBackendId3));
         transactionState.getPublishVersionTasks()
                 
.get(CatalogTestUtil.testBackendId1).getErrorTablets().add(CatalogTestUtil.testTabletId1);
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId);
+        Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+        Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId,
+                partitionVisibleVersions, backendPartitions);
         transactionState = fakeEditLog.getTransaction(transactionId);
         Assert.assertEquals(TransactionStatus.VISIBLE, 
transactionState.getTransactionStatus());
         // check replica version
@@ -493,6 +499,14 @@ public class GlobalTransactionMgrTest {
             }
 
         }
+
+        Assert.assertEquals(ImmutableMap.of(testPartition.getId(), 
CatalogTestUtil.testStartVersion + 1),
+                partitionVisibleVersions);
+        Set<Long> partitionIds = Sets.newHashSet(testPartition.getId());
+        Assert.assertEquals(partitionIds, 
backendPartitions.get(CatalogTestUtil.testBackendId1));
+        Assert.assertEquals(partitionIds, 
backendPartitions.get(CatalogTestUtil.testBackendId2));
+        Assert.assertEquals(partitionIds, 
backendPartitions.get(CatalogTestUtil.testBackendId3));
+
         // slave replay new state and compare catalog
         slaveTransMgr.replayUpsertTransactionState(transactionState);
         Assert.assertTrue(CatalogTestUtil.compareCatalog(masterEnv, slaveEnv));
@@ -535,8 +549,13 @@ public class GlobalTransactionMgrTest {
         // backend2 publish failed
         transactionState.getPublishVersionTasks()
                 
.get(CatalogTestUtil.testBackendId2).getErrorTablets().add(CatalogTestUtil.testTabletId1);
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId);
+        Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
+        Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId,
+                partitionVisibleVersions, backendPartitions);
         Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
+        Assert.assertTrue(partitionVisibleVersions.isEmpty());
+        Assert.assertTrue(backendPartitions.isEmpty());
         Replica replica1 = 
tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
         Replica replica2 = 
tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
         Replica replica3 = 
tablet.getReplicaById(CatalogTestUtil.testReplicaId3);
@@ -554,7 +573,8 @@ public class GlobalTransactionMgrTest {
         backend2SuccTablets.put(CatalogTestUtil.testTabletId1, 0L);
         transactionState.getPublishVersionTasks()
                 
.get(CatalogTestUtil.testBackendId2).setSuccTablets(backend2SuccTablets);
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId);
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId,
+                partitionVisibleVersions, backendPartitions);
         Assert.assertEquals(TransactionStatus.VISIBLE, 
transactionState.getTransactionStatus());
         Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, 
replica1.getVersion());
         Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, 
replica2.getVersion());
@@ -619,7 +639,8 @@ public class GlobalTransactionMgrTest {
         
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
                 Lists.newArrayList(CatalogTestUtil.testBackendId1,
                         CatalogTestUtil.testBackendId2, 
CatalogTestUtil.testBackendId3));
-        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId2);
+        masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, 
transactionId2,
+                partitionVisibleVersions, backendPartitions);
         Assert.assertEquals(TransactionStatus.VISIBLE, 
transactionState.getTransactionStatus());
         Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, 
replica1.getVersion());
         Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, 
replica2.getVersion());
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index 9eb4ece8d72..6da55c66c5a 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -395,6 +395,10 @@ struct TPublishVersionRequest {
     4: optional set<Types.TTabletId> base_tablet_ids
 }
 
+struct TVisibleVersionReq {
+    1: required map<Types.TPartitionId, Types.TVersion> partition_version
+}
+
 struct TClearAlterTaskRequest {
     1: required Types.TTabletId tablet_id
     2: required Types.TSchemaHash schema_hash
@@ -496,6 +500,7 @@ struct TAgentTaskRequest {
     32: optional TAlterInvertedIndexReq alter_inverted_index_req
     33: optional TGcBinlogReq gc_binlog_req
     34: optional TCleanTrashReq clean_trash_req
+    35: optional TVisibleVersionReq visible_version_req
 }
 
 struct TAgentResult {
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 0a2edb8ccbf..12036ef93ce 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -33,9 +33,10 @@ struct TTabletStat {
     1: required i64 tablet_id
     // local data size
     2: optional i64 data_size
-    3: optional i64 row_num
-    4: optional i64 version_count
+    3: optional i64 row_count
+    4: optional i64 total_version_count
     5: optional i64 remote_data_size
+    6: optional i64 visible_version_count
 }
 
 struct TTabletStatResult {
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index 9acd3f85f7b..7442a86a990 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -33,7 +33,7 @@ struct TTabletInfo {
     6: required Types.TSize data_size
     7: optional Types.TStorageMedium storage_medium
     8: optional list<Types.TTransactionId> transaction_ids
-    9: optional i64 version_count
+    9: optional i64 total_version_count
     10: optional i64 path_hash
     11: optional bool version_miss
     12: optional bool used
@@ -46,6 +46,7 @@ struct TTabletInfo {
     // 18: optional bool is_cooldown
     19: optional i64 cooldown_term
     20: optional Types.TUniqueId cooldown_meta_id
+    21: optional i64 visible_version_count
 }
 
 struct TFinishTaskRequest {
@@ -106,6 +107,7 @@ struct TReportRequest {
     10: optional list<AgentService.TStorageResource> resource // only id and 
version
     11: i32 num_cores
     12: i32 pipeline_executor_size
+    13: optional map<Types.TPartitionId, Types.TVersion> partitions_version
 }
 
 struct TMasterResult {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 3c2fcfbd6fa..3cebb5a81ad 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -223,7 +223,8 @@ enum TTaskType {
     PUSH_STORAGE_POLICY,
     ALTER_INVERTED_INDEX,
     GC_BINLOG,
-    CLEAN_TRASH
+    CLEAN_TRASH,
+    UPDATE_VISIBLE_VERSION
 }
 
 enum TStmtType {
diff --git 
a/regression-test/data/compaction/test_compaction_with_visible_version.out 
b/regression-test/data/compaction/test_compaction_with_visible_version.out
new file mode 100644
index 00000000000..de90dd5fa2a
--- /dev/null
+++ b/regression-test/data/compaction/test_compaction_with_visible_version.out
@@ -0,0 +1,448 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_1 --
+0      0
+1      10
+2      20
+3      30
+4      40
+5      50
+6      60
+7      70
+8      80
+9      90
+10     100
+11     110
+12     120
+13     130
+14     140
+15     150
+16     160
+17     170
+18     180
+19     190
+20     200
+
+-- !select_2 --
+0      0
+1      10
+2      20
+3      30
+4      40
+5      50
+6      60
+7      70
+8      80
+9      90
+10     100
+11     110
+12     120
+13     130
+14     140
+15     150
+16     160
+17     170
+18     180
+19     190
+20     200
+
+-- !select_3 --
+0      0
+1      10
+2      20
+3      30
+4      40
+5      50
+6      60
+7      70
+8      80
+9      90
+10     100
+11     110
+12     120
+13     130
+14     140
+15     150
+16     160
+17     170
+18     180
+19     190
+20     200
+21     210
+22     220
+23     230
+24     240
+25     250
+26     260
+27     270
+28     280
+29     290
+30     300
+31     310
+32     320
+33     330
+34     340
+35     350
+36     360
+37     370
+38     380
+39     390
+40     400
+41     410
+
+-- !select_4 --
+0      0
+1      10
+2      20
+3      30
+4      40
+5      50
+6      60
+7      70
+8      80
+9      90
+10     100
+11     110
+12     120
+13     130
+14     140
+15     150
+16     160
+17     170
+18     180
+19     190
+20     200
+21     210
+22     220
+23     230
+24     240
+25     250
+26     260
+27     270
+28     280
+29     290
+30     300
+31     310
+32     320
+33     330
+34     340
+35     350
+36     360
+37     370
+38     380
+39     390
+40     400
+41     410
+
+-- !select_5 --
+0      0
+1      10
+2      20
+3      30
+4      40
+5      50
+6      60
+7      70
+8      80
+9      90
+10     100
+11     110
+12     120
+13     130
+14     140
+15     150
+16     160
+17     170
+18     180
+19     190
+20     200
+21     210
+22     220
+23     230
+24     240
+25     250
+26     260
+27     270
+28     280
+29     290
+30     300
+31     310
+32     320
+33     330
+34     340
+35     350
+36     360
+37     370
+38     380
+39     390
+40     400
+41     410
+
+-- !select_6 --
+0      0
+1      10
+2      20
+3      30
+4      40
+5      50
+6      60
+7      70
+8      80
+9      90
+10     100
+11     110
+12     120
+13     130
+14     140
+15     150
+16     160
+17     170
+18     180
+19     190
+20     200
+21     210
+22     220
+23     230
+24     240
+25     250
+26     260
+27     270
+28     280
+29     290
+30     300
+31     310
+32     320
+33     330
+34     340
+35     350
+36     360
+37     370
+38     380
+39     390
+40     400
+41     410
+42     420
+43     430
+44     440
+45     450
+46     460
+47     470
+48     480
+49     490
+50     500
+51     510
+52     520
+53     530
+54     540
+55     550
+56     560
+57     570
+58     580
+59     590
+60     600
+61     610
+62     620
+63     630
+64     640
+65     650
+66     660
+67     670
+68     680
+69     690
+70     700
+71     710
+72     720
+73     730
+74     740
+75     750
+76     760
+77     770
+78     780
+79     790
+80     800
+81     810
+82     820
+83     830
+84     840
+85     850
+86     860
+87     870
+88     880
+89     890
+90     900
+91     910
+92     920
+93     930
+94     940
+95     950
+96     960
+97     970
+98     980
+99     990
+100    1000
+101    1010
+102    1020
+103    1030
+104    1040
+105    1050
+106    1060
+107    1070
+108    1080
+109    1090
+110    1100
+111    1110
+112    1120
+113    1130
+114    1140
+115    1150
+116    1160
+117    1170
+118    1180
+119    1190
+120    1200
+121    1210
+
+-- !select_7 --
+0      0
+1      10
+2      20
+3      30
+4      40
+5      50
+6      60
+7      70
+8      80
+9      90
+10     100
+11     110
+12     120
+13     130
+14     140
+15     150
+16     160
+17     170
+18     180
+19     190
+20     200
+21     210
+22     220
+23     230
+24     240
+25     250
+26     260
+27     270
+28     280
+29     290
+30     300
+31     310
+32     320
+33     330
+34     340
+35     350
+36     360
+37     370
+38     380
+39     390
+40     400
+41     410
+42     420
+43     430
+44     440
+45     450
+46     460
+47     470
+48     480
+49     490
+50     500
+51     510
+52     520
+53     530
+54     540
+55     550
+56     560
+57     570
+58     580
+59     590
+60     600
+61     610
+62     620
+63     630
+64     640
+65     650
+66     660
+67     670
+68     680
+69     690
+70     700
+71     710
+72     720
+73     730
+74     740
+75     750
+76     760
+77     770
+78     780
+79     790
+80     800
+81     810
+82     820
+83     830
+84     840
+85     850
+86     860
+87     870
+88     880
+89     890
+90     900
+91     910
+92     920
+93     930
+94     940
+95     950
+96     960
+97     970
+98     980
+99     990
+100    1000
+101    1010
+102    1020
+103    1030
+104    1040
+105    1050
+106    1060
+107    1070
+108    1080
+109    1090
+110    1100
+111    1110
+112    1120
+113    1130
+114    1140
+115    1150
+116    1160
+117    1170
+118    1180
+119    1190
+120    1200
+121    1210
+122    1220
+123    1230
+124    1240
+125    1250
+126    1260
+127    1270
+128    1280
+129    1290
+130    1300
+131    1310
+132    1320
+133    1330
+134    1340
+135    1350
+136    1360
+137    1370
+138    1380
+139    1390
+140    1400
+141    1410
+142    1420
+
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index a915851b938..49bfbc18792 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -34,13 +34,16 @@ class ClusterOptions {
 
     int feNum = 1
     int beNum = 3
+
     List<String> feConfigs = [
         'heartbeat_interval_second=5',
     ]
 
     List<String> beConfigs = [
+        'report_disk_state_interval_seconds=2',
         'report_random_wait=false',
     ]
+
     boolean connectToFollower = false
 
     // 1. cloudMode = true, only create cloud cluster.
diff --git 
a/regression-test/suites/compaction/test_compaction_with_visible_version.groovy 
b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy
new file mode 100644
index 00000000000..194a1b67566
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.NodeType
+
+suite('test_compaction_with_visible_version') {
+    def options = new ClusterOptions()
+    def compaction_keep_invisible_version_min_count = 50L
+    options.feConfigs += [
+        'partition_info_update_interval_secs=5',
+    ]
+    options.beConfigs += [
+        'disable_auto_compaction=true',
+        'report_tablet_interval_seconds=1',
+        'tablet_rowset_stale_sweep_by_size=true',
+        'tablet_rowset_stale_sweep_threshold_size=0',
+        'compaction_keep_invisible_version_timeout_sec=6000',
+        
"compaction_keep_invisible_version_min_count=${compaction_keep_invisible_version_min_count}".toString(),
+        'compaction_keep_invisible_version_max_count=500',
+    ]
+    options.enableDebugPoints()
+
+    docker(options) {
+        def E_CUMULATIVE_NO_SUITABLE_VERSION = 'E-2000'
+        def E_FULL_MISS_VERSION = 'E-2009'
+
+        sql 'SET GLOBAL insert_visible_timeout_ms = 3000'
+
+        def tableName = 'test_compaction_with_visible_version'
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort)
+
+        def triggerCompaction = { tablet, isCompactSucc, compaction_type ->
+            def tabletId = tablet.TabletId
+            def backendId = tablet.BackendId
+            def backendIp = backendId_to_backendIP.get(backendId)
+            def backendHttpPort = backendId_to_backendHttpPort.get(backendId)
+            def code
+            def out
+            def err
+            if (compaction_type == 'base') {
+                (code, out, err) = be_run_base_compaction(backendIp, 
backendHttpPort, tabletId)
+            } else {
+                (code, out, err) = be_run_cumulative_compaction(backendIp, 
backendHttpPort, tabletId)
+            }
+            logger.info("Run compaction: code=${code}, out=${out}, err=${err}")
+            assertEquals(0, code)
+            def compactJson = parseJson(out.trim())
+            if (isCompactSucc) {
+                assertEquals('success', compactJson.status.toLowerCase())
+            } else {
+                if (compaction_type == 'base') {
+                    assertEquals(E_FULL_MISS_VERSION, compactJson.status)
+                } else {
+                    assertEquals(E_CUMULATIVE_NO_SUITABLE_VERSION, 
compactJson.status)
+                }
+            }
+        }
+
+        def waitCompaction = { tablet, startTs ->
+            def tabletId = tablet.TabletId
+            def backendId = tablet.BackendId
+            def backendIp = backendId_to_backendIP.get(backendId)
+            def backendHttpPort = backendId_to_backendHttpPort.get(backendId)
+            def running = true
+            while (running) {
+                assertTrue(System.currentTimeMillis() - startTs < 60 * 1000)
+                Thread.sleep(1000)
+                def (code, out, err) = be_get_compaction_status(backendIp, 
backendHttpPort, tabletId)
+                logger.info("Get compaction: code=${code}, out=${out}, 
err=${err}")
+                assertEquals(0, code)
+
+                def compactionStatus = parseJson(out.trim())
+                assertEquals('success', compactionStatus.status.toLowerCase())
+                running = compactionStatus.run_status
+            }
+        }
+
+        def checkCompact = { isCumuCompactSucc, runBaseCompact, 
isInvisibleTimeout, version, visibleVersion ->
+            def partition = sql_return_maparray("SHOW PARTITIONS FROM 
${tableName}")[0]
+            assertEquals(visibleVersion, partition.VisibleVersion as long)
+
+            // wait be report version count
+            Thread.sleep(3 * 1000)
+            def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}"
+            def lastInvisibleVersionCountMap = [:]
+            def lastVisibleVersionCountMap = [:]
+            tablets.each {
+                lastVisibleVersionCountMap[it.BackendId] = 
it.VisibleVersionCount as long
+                lastInvisibleVersionCountMap[it.BackendId] =
+                        (it.VersionCount as long) - (it.VisibleVersionCount as 
long)
+                triggerCompaction it, isCumuCompactSucc, 'cumulative'
+            }
+
+            if (isCumuCompactSucc) {
+                // wait compaction done
+                def startTs = System.currentTimeMillis()
+                tablets.each {
+                    waitCompaction it, startTs
+                }
+            }
+
+            if (runBaseCompact) {
+                tablets.each {
+                    triggerCompaction it, true, 'base'
+                }
+
+                def startTs = System.currentTimeMillis()
+                tablets.each {
+                    waitCompaction it, startTs
+                }
+            }
+
+            // wait report
+            Thread.sleep(3 * 1000)
+
+            tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}"
+            tablets.each {
+                def backendId = it.BackendId
+                def visibleVersionCount = it.VisibleVersionCount as long
+                def totalVersionCount = it.VersionCount as long
+                def invisibleVersionCount = totalVersionCount - 
visibleVersionCount
+                assertEquals(version, it.Version as long)
+
+                if (isInvisibleTimeout) {
+                    def values = [Math.min(version - visibleVersion, 
compaction_keep_invisible_version_min_count),
+                    Math.min(version - visibleVersion, 
compaction_keep_invisible_version_min_count + 1)]
+
+                    // part of invisible version was compact
+                    assertTrue(invisibleVersionCount in values,
+                            "not match, invisibleVersionCount: 
${invisibleVersionCount}, candidate values: ${values}")
+                } else {
+                    // invisible version couldn't compact
+                    assertEquals(version - visibleVersion, 
invisibleVersionCount)
+                }
+
+                def lastVisibleVersionCount = 
lastVisibleVersionCountMap.get(backendId)
+                def lastInvisibleVersionCount = 
lastInvisibleVersionCountMap.get(backendId)
+                if (isCumuCompactSucc) {
+                    assertTrue(lastInvisibleVersionCount > 
invisibleVersionCount || lastInvisibleVersionCount <= 1,
+                            "not met with: lastInvisibleVersionCount 
${lastInvisibleVersionCount} > "
+                            + "invisibleVersionCount ${invisibleVersionCount}")
+                    if (runBaseCompact) {
+                        assertEquals(1L, visibleVersionCount)
+                    }
+                } else {
+                    assertEquals(lastVisibleVersionCount, visibleVersionCount)
+                }
+            }
+        }
+
+        sql " CREATE TABLE ${tableName} (k1 int, k2 int) DISTRIBUTED BY 
HASH(k1) BUCKETS 1 "
+
+        // normal
+        def rowNum = 0L
+        def insertNRecords = { num ->
+            // if enable debug point 
DatabaseTransactionMgr.stop_finish_transaction,
+            // insert will need to wait insert_visible_timeout_ms.
+            // so use multiple threads to reduce the wait time.
+            def futures = []
+            for (def i = 0; i < num; i++, rowNum++) {
+                def index = rowNum
+                futures.add(thread {
+                    sql " INSERT INTO ${tableName} VALUES (${index}, ${index * 
10}) "
+                })
+            }
+            futures.each { it.get() }
+        }
+        insertNRecords(21)
+        // after insert 21 rows, be can run compact ok.
+        checkCompact(true, false, false, rowNum + 1, rowNum + 1)
+        qt_select_1 "SELECT * FROM ${tableName} ORDER BY k1"
+
+        // publish but not visible
+        def lastRowNum = rowNum
+        cluster.injectDebugPoints(NodeType.FE, 
['DatabaseTransactionMgr.stop_finish_transaction':null])
+        insertNRecords(21)
+        // after enable debugpoint, be will add rowsets, but visible version 
will not increase.
+        // then no rowsets can pick to compact.
+        // so expect compact failed.
+        checkCompact(false, false, false, rowNum + 1, lastRowNum + 1)
+        qt_select_2 "SELECT * FROM ${tableName} ORDER BY k1"
+
+        cluster.clearFrontendDebugPoints()
+        Thread.sleep(5000)
+        // after clear debug point, visible version will increase.
+        // then some rowsets can pick to compact.
+        // so expect compact succ.
+        checkCompact(true, true, false, rowNum + 1, rowNum + 1)
+        qt_select_3 "SELECT * FROM ${tableName} ORDER BY k1"
+
+        lastRowNum = rowNum
+        cluster.injectDebugPoints(NodeType.FE, 
['DatabaseTransactionMgr.stop_finish_transaction':null])
+        insertNRecords(80)
+        // 80 versions are not invisible yet,  BE will not compact them.
+        // if we send http to compact them,  BE will reply no rowsets can 
compact now
+        checkCompact(false, false, false, rowNum + 1, lastRowNum + 1)
+        // Because BE not compact, so query should be ok.
+        qt_select_4 "SELECT * FROM ${tableName} ORDER BY k1"
+
+        update_all_be_config('compaction_keep_invisible_version_timeout_sec', 
1)
+        checkCompact(true, false, true, rowNum + 1, lastRowNum + 1)
+        qt_select_5 "SELECT * FROM ${tableName} ORDER BY k1"
+
+        def getVersionCountMap = { ->
+            def versionCountMap = [:]
+            def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}"
+            tablets.each {
+                versionCountMap.put(it.BackendId as long, 
[it.VisibleVersionCount as long, it.VersionCount as long])
+            }
+            return versionCountMap
+        }
+
+        // after backend restart, it should update its visible version from FE
+        // and then it report its visible version count and total version count
+        def oldVersionCountMap = getVersionCountMap()
+        cluster.restartBackends()
+        Thread.sleep(20000)
+        def newVersionCountMap = getVersionCountMap()
+        assertEquals(oldVersionCountMap, newVersionCountMap)
+
+        cluster.clearFrontendDebugPoints()
+        Thread.sleep(5000)
+        // after clear fe's debug point, the 80 version are visible now.
+        // so compact is ok
+        checkCompact(true, false, false, rowNum + 1, rowNum + 1)
+        qt_select_6 "SELECT * FROM ${tableName} ORDER BY k1"
+
+        cluster.injectDebugPoints(NodeType.FE, 
['DatabaseTransactionMgr.stop_finish_transaction':null])
+        def compaction_keep_invisible_version_timeout_sec = 1
+        compaction_keep_invisible_version_min_count = 0L
+        update_all_be_config('compaction_keep_invisible_version_timeout_sec', 
compaction_keep_invisible_version_timeout_sec)
+        update_all_be_config('compaction_keep_invisible_version_min_count', 
compaction_keep_invisible_version_min_count)
+
+        lastRowNum = rowNum
+        insertNRecords(21)
+
+        Thread.sleep((compaction_keep_invisible_version_timeout_sec + 1) * 
1000)
+
+        // after compaction_keep_invisible_version_timeout_sec,
+        // all version had been compact 
(compaction_keep_invisible_version_min_count=0),
+        checkCompact(true, false, true, rowNum + 1, lastRowNum + 1)
+
+        // visible version had been compact
+        test {
+            sql "SELECT * FROM ${tableName} ORDER BY k1"
+
+            // E-230:
+            //(1105, 'errCode = 2, detailMessage = 
(128.2.51.2)[CANCELLED]missed_versions is empty, spec_version 43,
+            // max_version 123, tablet_id 10062')
+            exception 'missed_versions is empty'
+        }
+
+        cluster.clearFrontendDebugPoints()
+        Thread.sleep(5000)
+        qt_select_7 "SELECT * FROM ${tableName} ORDER BY k1"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to