This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git

The following commit(s) were added to refs/heads/master by this push:
     new 1aa9ac4fe44 Prevent making snapshot on remote rowset in single replica 
compaction (#28716)
1aa9ac4fe44 is described below

commit 1aa9ac4fe44ee23db95a757d2d2ecf368d5774b9
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Wed Dec 27 23:43:43 2023 +0800

    Prevent making snapshot on remote rowset in single replica compaction 
(#28716)
---
 be/src/common/status.h                             |   2 -
 be/src/olap/base_compaction.cpp                    |   8 -
 be/src/olap/base_compaction.h                      |   2 -
 be/src/olap/compaction.h                           |   7 +-
 be/src/olap/cumulative_compaction.cpp              |   8 -
 be/src/olap/cumulative_compaction.h                |   2 -
 be/src/olap/full_compaction.cpp                    |  10 -
 be/src/olap/full_compaction.h                      |   2 -
 be/src/olap/olap_server.cpp                        |  61 +++--
 be/src/olap/single_replica_compaction.cpp          |   8 -
 be/src/olap/snapshot_manager.cpp                   |   4 +-
 be/src/olap/storage_engine.cpp                     |  22 --
 be/src/olap/storage_engine.h                       |  12 -
 be/src/olap/tablet.cpp                             | 289 ++++++++-------------
 be/src/olap/tablet.h                               |  37 +--
 be/src/olap/task/engine_clone_task.cpp             |   1 -
 be/src/olap/task/engine_storage_migration_task.cpp |   2 -
 17 files changed, 162 insertions(+), 315 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index 14aec46cc22..c0d92152877 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -160,7 +160,6 @@ namespace ErrorCode {
     E(BE_INVALID_NEED_MERGED_VERSIONS, -810, true);          \
     E(BE_ERROR_DELETE_ACTION, -811, true);                   \
     E(BE_SEGMENTS_OVERLAPPING, -812, true);                  \
-    E(BE_CLONE_OCCURRED, -813, true);                        \
     E(PUSH_INIT_ERROR, -900, true);                          \
     E(PUSH_VERSION_INCORRECT, -902, true);                   \
     E(PUSH_SCHEMA_MISMATCH, -903, true);                     \
@@ -228,7 +227,6 @@ namespace ErrorCode {
     E(CUMULATIVE_INVALID_NEED_MERGED_VERSIONS, -2004, true); \
     E(CUMULATIVE_ERROR_DELETE_ACTION, -2005, true);          \
     E(CUMULATIVE_MISS_VERSION, -2006, true);                 \
-    E(CUMULATIVE_CLONE_OCCURRED, -2007, true);               \
     E(FULL_NO_SUITABLE_VERSION, -2008, false);               \
     E(FULL_MISS_VERSION, -2009, true);                       \
     E(META_INVALID_ARGUMENT, -3000, true);                   \
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 6ae006709a7..474909cbf45 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -56,7 +56,6 @@ Status BaseCompaction::prepare_compact() {
     // 1. pick rowsets to compact
     RETURN_IF_ERROR(pick_rowsets_to_compact());
     COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size());
-    _tablet->set_clone_occurred(false);
 
     return Status::OK();
 }
@@ -73,13 +72,6 @@ Status BaseCompaction::execute_compact_impl() {
                 "another base compaction is running. tablet={}", 
_tablet->tablet_id());
     }
 
-    // Clone task may happen after compaction task is submitted to thread 
pool, and rowsets picked
-    // for compaction may change. In this case, current compaction task should 
not be executed.
-    if (_tablet->get_clone_occurred()) {
-        _tablet->set_clone_occurred(false);
-        return Status::Error<BE_CLONE_OCCURRED, false>("get_clone_occurred 
failed");
-    }
-
     SCOPED_ATTACH_TASK(_mem_tracker);
 
     // 2. do base compaction, merge rowsets
diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h
index 73aca0d5e1f..e86cb3330a5 100644
--- a/be/src/olap/base_compaction.h
+++ b/be/src/olap/base_compaction.h
@@ -43,8 +43,6 @@ public:
     Status prepare_compact() override;
     Status execute_compact_impl() override;
 
-    std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
-
 protected:
     Status pick_rowsets_to_compact() override;
     std::string compaction_name() const override { return "base compaction"; }
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 2aff05ced83..a8e3b1a5c28 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -58,6 +58,8 @@ public:
     virtual Status prepare_compact() = 0;
     Status execute_compact();
     virtual Status execute_compact_impl() = 0;
+
+    const std::vector<RowsetSharedPtr>& input_rowsets() { return 
_input_rowsets; }
 #ifdef BE_TEST
     void set_input_rowset(const std::vector<RowsetSharedPtr>& rowsets);
     RowsetSharedPtr output_rowset();
@@ -65,10 +67,11 @@ public:
 
     RuntimeProfile* runtime_profile() const { return _profile.get(); }
 
+    virtual ReaderType compaction_type() const = 0;
+    virtual std::string compaction_name() const = 0;
+
 protected:
     virtual Status pick_rowsets_to_compact() = 0;
-    virtual std::string compaction_name() const = 0;
-    virtual ReaderType compaction_type() const = 0;
 
     Status do_compaction(int64_t permits);
     Status do_compaction_impl(int64_t permits);
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index 1c65df768d0..1f54c1f3285 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -61,7 +61,6 @@ Status CumulativeCompaction::prepare_compact() {
     // 2. pick rowsets to compact
     RETURN_IF_ERROR(pick_rowsets_to_compact());
     COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size());
-    _tablet->set_clone_occurred(false);
 
     return Status::OK();
 }
@@ -73,13 +72,6 @@ Status CumulativeCompaction::execute_compact_impl() {
                 "The tablet is under cumulative compaction. tablet={}", 
_tablet->tablet_id());
     }
 
-    // Clone task may happen after compaction task is submitted to thread 
pool, and rowsets picked
-    // for compaction may change. In this case, current compaction task should 
not be executed.
-    if (_tablet->get_clone_occurred()) {
-        _tablet->set_clone_occurred(false);
-        return Status::Error<CUMULATIVE_CLONE_OCCURRED, 
false>("get_clone_occurred failed");
-    }
-
     SCOPED_ATTACH_TASK(_mem_tracker);
 
     // 3. do cumulative compaction, merge rowsets
diff --git a/be/src/olap/cumulative_compaction.h 
b/be/src/olap/cumulative_compaction.h
index d74542a2ffe..7ea7fb383f1 100644
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cumulative_compaction.h
@@ -39,8 +39,6 @@ public:
     Status prepare_compact() override;
     Status execute_compact_impl() override;
 
-    std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
-
 protected:
     Status pick_rowsets_to_compact() override;
 
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 01bd5e3dc61..927b4a33198 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -51,29 +51,19 @@ Status FullCompaction::prepare_compact() {
         return Status::Error<INVALID_ARGUMENT, false>("Full compaction init 
failed");
     }
 
-    std::unique_lock full_lock(_tablet->get_full_compaction_lock());
     std::unique_lock base_lock(_tablet->get_base_compaction_lock());
     std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());
 
     // 1. pick rowsets to compact
     RETURN_IF_ERROR(pick_rowsets_to_compact());
-    _tablet->set_clone_occurred(false);
 
     return Status::OK();
 }
 
 Status FullCompaction::execute_compact_impl() {
-    std::unique_lock full_lock(_tablet->get_full_compaction_lock());
     std::unique_lock base_lock(_tablet->get_base_compaction_lock());
     std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());
 
-    // Clone task may happen after compaction task is submitted to thread 
pool, and rowsets picked
-    // for compaction may change. In this case, current compaction task should 
not be executed.
-    if (_tablet->get_clone_occurred()) {
-        _tablet->set_clone_occurred(false);
-        return Status::Error<BE_CLONE_OCCURRED, false>("get_clone_occurred 
failed");
-    }
-
     SCOPED_ATTACH_TASK(_mem_tracker);
 
     // 2. do full compaction, merge rowsets
diff --git a/be/src/olap/full_compaction.h b/be/src/olap/full_compaction.h
index bce9ac745b6..631d901e846 100644
--- a/be/src/olap/full_compaction.h
+++ b/be/src/olap/full_compaction.h
@@ -39,8 +39,6 @@ public:
     Status execute_compact_impl() override;
     Status modify_rowsets(const Merger::Statistics* stats = nullptr) override;
 
-    std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
-
 protected:
     Status pick_rowsets_to_compact() override;
     std::string compaction_name() const override { return "full compaction"; }
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index f7caa7d8d02..7268d7959f9 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -57,6 +57,7 @@
 #include "olap/olap_common.h"
 #include "olap/rowset/segcompaction.h"
 #include "olap/schema_change.h"
+#include "olap/single_replica_compaction.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "olap/tablet_manager.h"
@@ -593,11 +594,6 @@ void StorageEngine::_compaction_tasks_producer_callback() {
                 continue;
             }
 
-            /// Regardless of whether the tablet is submitted for compaction 
or not,
-            /// we need to call 'reset_compaction' to clean up the 
base_compaction or cumulative_compaction objects
-            /// in the tablet, because these two objects store the tablet's 
own shared_ptr.
-            /// If it is not cleaned up, the reference count of the tablet 
will always be greater than 1,
-            /// thus cannot be collected by the garbage collector. 
(TabletManager::start_trash_sweep)
             for (const auto& tablet : tablets_compaction) {
                 if (compaction_type == CompactionType::BASE_COMPACTION) {
                     
tablet->set_last_base_compaction_schedule_time(UnixMillis());
@@ -717,33 +713,39 @@ Status 
StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tab
         return Status::AlreadyExist<false>(
                 "compaction task has already been submitted, tablet_id={}", 
tablet->tablet_id());
     }
-    Status st = tablet->prepare_single_replica_compaction(tablet, 
compaction_type);
+
+    auto compaction = std::make_shared<SingleReplicaCompaction>(tablet, 
compaction_type);
+    auto st = compaction->prepare_compact();
+
     auto clean_single_replica_compaction = [tablet, this]() {
-        tablet->reset_single_replica_compaction();
         _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::CUMULATIVE_COMPACTION);
         _pop_tablet_from_submitted_compaction(tablet, 
CompactionType::BASE_COMPACTION);
     };
 
-    if (st.ok()) {
-        auto submit_st = _single_replica_compaction_thread_pool->submit_func(
-                [tablet, compaction_type, clean_single_replica_compaction]() {
-                    tablet->execute_single_replica_compaction(compaction_type);
-                    clean_single_replica_compaction();
-                });
-        if (!submit_st.ok()) {
-            clean_single_replica_compaction();
-            return Status::InternalError(
-                    "failed to submit single replica compaction task to thread 
pool, "
-                    "tablet_id={} ",
-                    tablet->tablet_id());
+    if (!st.ok()) {
+        clean_single_replica_compaction();
+        if (!st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
+            LOG(WARNING) << "failed to prepare single replica compaction, 
tablet_id="
+                         << tablet->tablet_id() << " : " << st;
+            return st;
         }
-        return Status::OK();
-    } else {
+        return Status::OK(); // No suitable version, regard as OK
+    }
+
+    auto submit_st = _single_replica_compaction_thread_pool->submit_func(
+            [tablet, compaction = std::move(compaction),
+             clean_single_replica_compaction]() mutable {
+                tablet->execute_single_replica_compaction(*compaction);
+                clean_single_replica_compaction();
+            });
+    if (!submit_st.ok()) {
         clean_single_replica_compaction();
         return Status::InternalError(
-                "failed to prepare single replica compaction task tablet_id={} 
",
+                "failed to submit single replica compaction task to thread 
pool, "
+                "tablet_id={}",
                 tablet->tablet_id());
     }
+    return Status::OK();
 }
 
 void StorageEngine::get_tablet_rowset_versions(const 
PGetTabletVersionsRequest* request,
@@ -917,8 +919,10 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                 "compaction task has already been submitted, tablet_id={}, 
compaction_type={}.",
                 tablet->tablet_id(), compaction_type);
     }
+    std::shared_ptr<Compaction> compaction;
     int64_t permits = 0;
-    Status st = 
tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, 
&permits);
+    Status st = 
Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet,
+                                                                 compaction, 
permits);
     bool is_low_priority_task = [&]() {
         // Can add more strategies to determine whether a task is a low 
priority task in the future
         if (!config::enable_compaction_priority_scheduling) {
@@ -938,27 +942,24 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                 (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
                         ? _cumu_compaction_thread_pool
                         : _base_compaction_thread_pool;
-        auto st = thread_pool->submit_func([tablet, compaction_type, permits, 
is_low_priority_task,
+        auto st = thread_pool->submit_func([tablet, compaction = 
std::move(compaction),
+                                            compaction_type, permits, 
is_low_priority_task,
                                             this]() {
             if (is_low_priority_task && 
!_increase_low_priority_task_nums(tablet->data_dir())) {
                 VLOG_DEBUG << "skip low priority compaction task for tablet: "
                            << tablet->tablet_id();
                 // Todo: push task back
             } else {
-                tablet->execute_compaction(compaction_type);
+                tablet->execute_compaction(*compaction);
                 if (is_low_priority_task) {
                     _decrease_low_priority_task_nums(tablet->data_dir());
                 }
             }
             _permit_limiter.release(permits);
-            // reset compaction
-            tablet->reset_compaction(compaction_type);
             _pop_tablet_from_submitted_compaction(tablet, compaction_type);
         });
         if (!st.ok()) {
             _permit_limiter.release(permits);
-            // reset compaction
-            tablet->reset_compaction(compaction_type);
             _pop_tablet_from_submitted_compaction(tablet, compaction_type);
             return Status::InternalError(
                     "failed to submit compaction task to thread pool, "
@@ -967,8 +968,6 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
         }
         return Status::OK();
     } else {
-        // reset compaction
-        tablet->reset_compaction(compaction_type);
         _pop_tablet_from_submitted_compaction(tablet, compaction_type);
         if (!st.ok()) {
             return Status::InternalError(
diff --git a/be/src/olap/single_replica_compaction.cpp 
b/be/src/olap/single_replica_compaction.cpp
index a5e060147d6..2a8edb60471 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -70,7 +70,6 @@ Status SingleReplicaCompaction::prepare_compact() {
 
     // 1. pick rowsets to compact
     RETURN_IF_ERROR(pick_rowsets_to_compact());
-    _tablet->set_clone_occurred(false);
     if (_input_rowsets.size() == 1) {
         return 
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets.size() is 1");
     }
@@ -105,13 +104,6 @@ Status SingleReplicaCompaction::execute_compact_impl() {
                 "another base compaction is running. tablet={}", 
_tablet->tablet_id());
     }
 
-    // Clone task may happen after compaction task is submitted to thread 
pool, and rowsets picked
-    // for compaction may change. In this case, current compaction task should 
not be executed.
-    if (_tablet->get_clone_occurred()) {
-        _tablet->set_clone_occurred(false);
-        return Status::Error<BE_CLONE_OCCURRED, false>("get_clone_occurred 
failed");
-    }
-
     SCOPED_ATTACH_TASK(_mem_tracker);
 
     // 2. do single replica compaction
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index fd865712c5f..b5090f0e280 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -426,10 +426,10 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
                           << ref_tablet->tablet_id();
                 Version version(request.start_version, request.end_version);
                 const RowsetSharedPtr rowset = 
ref_tablet->get_rowset_by_version(version, false);
-                if (rowset != nullptr) {
+                if (rowset && rowset->is_local()) {
                     consistent_rowsets.push_back(rowset);
                 } else {
-                    LOG(WARNING) << "failed to find version when do compaction 
snapshot. "
+                    LOG(WARNING) << "failed to find local version when do 
compaction snapshot. "
                                  << " tablet=" << request.tablet_id
                                  << " schema_hash=" << request.schema_hash
                                  << " version=" << version;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 4e0fb2eddf1..cf4687caf3e 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1256,28 +1256,6 @@ PendingRowsetGuard 
StorageEngine::add_pending_rowset(const RowsetWriterContext&
     return _pending_remote_rowsets.add(ctx.rowset_id);
 }
 
-void StorageEngine::create_cumulative_compaction(
-        TabletSharedPtr best_tablet, std::shared_ptr<CumulativeCompaction>& 
cumulative_compaction) {
-    cumulative_compaction.reset(new CumulativeCompaction(best_tablet));
-}
-
-void StorageEngine::create_base_compaction(TabletSharedPtr best_tablet,
-                                           std::shared_ptr<BaseCompaction>& 
base_compaction) {
-    base_compaction.reset(new BaseCompaction(best_tablet));
-}
-
-void StorageEngine::create_full_compaction(TabletSharedPtr best_tablet,
-                                           std::shared_ptr<FullCompaction>& 
full_compaction) {
-    full_compaction.reset(new FullCompaction(best_tablet));
-}
-
-void StorageEngine::create_single_replica_compaction(
-        TabletSharedPtr best_tablet,
-        std::shared_ptr<SingleReplicaCompaction>& single_replica_compaction,
-        CompactionType compaction_type) {
-    single_replica_compaction.reset(new SingleReplicaCompaction(best_tablet, 
compaction_type));
-}
-
 bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo* 
replica,
                                           std::string* token) {
     TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6b028b01734..77d3efeaaf8 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -180,18 +180,6 @@ public:
     void get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
                                     PGetTabletVersionsResponse* response);
 
-    void create_cumulative_compaction(TabletSharedPtr best_tablet,
-                                      std::shared_ptr<CumulativeCompaction>& 
cumulative_compaction);
-    void create_base_compaction(TabletSharedPtr best_tablet,
-                                std::shared_ptr<BaseCompaction>& 
base_compaction);
-
-    void create_full_compaction(TabletSharedPtr best_tablet,
-                                std::shared_ptr<FullCompaction>& 
full_compaction);
-
-    void create_single_replica_compaction(
-            TabletSharedPtr best_tablet,
-            std::shared_ptr<SingleReplicaCompaction>& 
single_replica_compaction,
-            CompactionType compaction_type);
     bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, 
std::string* token);
 
     bool should_fetch_from_peer(int64_t tablet_id);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5f915ad1de1..8cfce2d35ec 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -69,6 +69,7 @@
 #include "io/fs/file_writer.h"
 #include "io/fs/path.h"
 #include "io/fs/remote_file_system.h"
+#include "io/io_common.h"
 #include "olap/base_compaction.h"
 #include "olap/base_tablet.h"
 #include "olap/binlog.h"
@@ -143,21 +144,39 @@ using std::string;
 using std::vector;
 using io::FileSystemSPtr;
 
-static bvar::LatencyRecorder g_tablet_lookup_rowkey_latency("doris_pk", 
"tablet_lookup_rowkey");
-static bvar::LatencyRecorder 
g_tablet_commit_phase_update_delete_bitmap_latency(
+namespace {
+
+bvar::LatencyRecorder g_tablet_lookup_rowkey_latency("doris_pk", 
"tablet_lookup_rowkey");
+bvar::LatencyRecorder g_tablet_commit_phase_update_delete_bitmap_latency(
         "doris_pk", "commit_phase_update_delete_bitmap");
-static bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk",
-                                                                   
"update_delete_bitmap");
-static bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", 
"lookup_not_found");
-static bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second(
+bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", 
"update_delete_bitmap");
+bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", "lookup_not_found");
+bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second(
         "doris_pk", "lookup_not_found_per_second", &g_tablet_pk_not_found, 60);
 
-const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD = 1s;
-
 bvar::Adder<uint64_t> exceed_version_limit_counter;
 bvar::Window<bvar::Adder<uint64_t>> exceed_version_limit_counter_minute(
         &exceed_version_limit_counter, 60);
 
+void set_last_failure_time(Tablet* tablet, const Compaction& compaction, 
int64_t ms) {
+    switch (compaction.compaction_type()) {
+    case ReaderType::READER_CUMULATIVE_COMPACTION:
+        tablet->set_last_cumu_compaction_failure_time(ms);
+        return;
+    case ReaderType::READER_BASE_COMPACTION:
+        tablet->set_last_base_compaction_failure_time(ms);
+        return;
+    case ReaderType::READER_FULL_COMPACTION:
+        tablet->set_last_full_compaction_failure_time(ms);
+        return;
+    default:
+        LOG(FATAL) << "invalid compaction type " << 
compaction.compaction_name()
+                   << " tablet_id: " << tablet->tablet_id();
+    }
+};
+
+} // namespace
+
 struct WriteCooldownMetaExecutors {
     WriteCooldownMetaExecutors(size_t executor_nums = 5);
 
@@ -251,7 +270,6 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* 
data_dir,
           _newly_created_rowset_num(0),
           _last_checkpoint_time(0),
           _cumulative_compaction_type(cumulative_compaction_type),
-          _is_clone_occurred(false),
           _is_tablet_path_exists(true),
           _last_missed_version(-1),
           _last_missed_time_s(0) {
@@ -475,6 +493,22 @@ Status 
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
         return Status::OK();
     }
 
+    if (check_delete) {
+        for (auto&& rs : to_delete) {
+            if (auto it = _rs_version_map.find(rs->version()); it == 
_rs_version_map.end()) {
+                return Status::Error<DELETE_VERSION_ERROR>(
+                        "try to delete not exist version {} from {}", 
rs->version().to_string(),
+                        tablet_id());
+            } else if (rs->rowset_id() != it->second->rowset_id()) {
+                return Status::Error<DELETE_VERSION_ERROR>(
+                        "try to delete version {} from {}, but rowset id 
changed, delete rowset id "
+                        "is {}, exists rowsetid is {}",
+                        rs->version().to_string(), tablet_id(), 
rs->rowset_id().to_string(),
+                        it->second->rowset_id().to_string());
+            }
+        }
+    }
+
     bool same_version = true;
     std::sort(to_add.begin(), to_add.end(), Rowset::comparator);
     std::sort(to_delete.begin(), to_delete.end(), Rowset::comparator);
@@ -489,23 +523,6 @@ Status 
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
         same_version = false;
     }
 
-    if (check_delete) {
-        for (auto& rs : to_delete) {
-            auto find_rs = _rs_version_map.find(rs->version());
-            if (find_rs == _rs_version_map.end()) {
-                return Status::Error<DELETE_VERSION_ERROR>(
-                        "try to delete not exist version {} from {}", 
rs->version().to_string(),
-                        tablet_id());
-            } else if (find_rs->second->rowset_id() != rs->rowset_id()) {
-                return Status::Error<DELETE_VERSION_ERROR>(
-                        "try to delete version {} from {}, but rowset id 
changed, delete rowset id "
-                        "is {}, exists rowsetid is {}",
-                        rs->version().to_string(), tablet_id(), 
rs->rowset_id().to_string(),
-                        find_rs->second->rowset_id().to_string());
-            }
-        }
-    }
-
     std::vector<RowsetMetaSharedPtr> rs_metas_to_delete;
     for (auto& rs : to_delete) {
         rs_metas_to_delete.push_back(rs->rowset_meta());
@@ -1755,138 +1772,100 @@ void 
Tablet::generate_tablet_meta_copy_unlocked(TabletMetaSharedPtr new_tablet_m
 }
 
 Status Tablet::prepare_compaction_and_calculate_permits(CompactionType 
compaction_type,
-                                                        TabletSharedPtr 
tablet, int64_t* permits) {
-    std::vector<RowsetSharedPtr> compaction_rowsets;
+                                                        const TabletSharedPtr& 
tablet,
+                                                        
std::shared_ptr<Compaction>& compaction,
+                                                        int64_t& permits) {
     if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
         MonotonicStopWatch watch;
         watch.start();
-        SCOPED_CLEANUP({
-            if (!config::disable_compaction_trace_log &&
-                watch.elapsed_time() / 1e9 > 
config::cumulative_compaction_trace_threshold) {
-                std::stringstream ss;
-                _cumulative_compaction->runtime_profile()->pretty_print(&ss);
-                LOG(WARNING) << "prepare cumulative compaction cost " << 
watch.elapsed_time() / 1e9
-                             << std::endl
-                             << ss.str();
-            }
-        });
 
-        StorageEngine::instance()->create_cumulative_compaction(tablet, 
_cumulative_compaction);
+        compaction = std::make_shared<CumulativeCompaction>(tablet);
         
DorisMetrics::instance()->cumulative_compaction_request_total->increment(1);
-        Status res = _cumulative_compaction->prepare_compact();
+        Status res = compaction->prepare_compact();
+        if (!config::disable_compaction_trace_log &&
+            watch.elapsed_time() / 1e9 > 
config::cumulative_compaction_trace_threshold) {
+            std::stringstream ss;
+            compaction->runtime_profile()->pretty_print(&ss);
+            LOG(WARNING) << "prepare cumulative compaction cost " << 
watch.elapsed_time() / 1e9
+                         << std::endl
+                         << ss.str();
+        }
+
         if (!res.ok()) {
-            set_last_cumu_compaction_failure_time(UnixMillis());
-            *permits = 0;
+            tablet->set_last_cumu_compaction_failure_time(UnixMillis());
+            permits = 0;
             if (!res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
                 
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
-                return Status::InternalError("prepare cumulative compaction 
with err: {}",
-                                             res.to_string());
+                return Status::InternalError("prepare cumulative compaction 
with err: {}", res);
             }
             // return OK if OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION, so that 
we don't need to
             // print too much useless logs.
             // And because we set permits to 0, so even if we return OK here, 
nothing will be done.
             return Status::OK();
         }
-        compaction_rowsets = _cumulative_compaction->get_input_rowsets();
     } else if (compaction_type == CompactionType::BASE_COMPACTION) {
-        DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION);
         MonotonicStopWatch watch;
         watch.start();
-        SCOPED_CLEANUP({
-            if (!config::disable_compaction_trace_log &&
-                watch.elapsed_time() / 1e9 > 
config::base_compaction_trace_threshold) {
-                std::stringstream ss;
-                _base_compaction->runtime_profile()->pretty_print(&ss);
-                LOG(WARNING) << "prepare base compaction cost " << 
watch.elapsed_time() / 1e9
-                             << std::endl
-                             << ss.str();
-            }
-        });
 
-        StorageEngine::instance()->create_base_compaction(tablet, 
_base_compaction);
+        compaction = std::make_shared<BaseCompaction>(tablet);
         DorisMetrics::instance()->base_compaction_request_total->increment(1);
-        Status res = _base_compaction->prepare_compact();
-        set_last_base_compaction_status(res.to_string());
+        Status res = compaction->prepare_compact();
+        if (!config::disable_compaction_trace_log &&
+            watch.elapsed_time() / 1e9 > 
config::base_compaction_trace_threshold) {
+            std::stringstream ss;
+            compaction->runtime_profile()->pretty_print(&ss);
+            LOG(WARNING) << "prepare base compaction cost " << 
watch.elapsed_time() / 1e9
+                         << std::endl
+                         << ss.str();
+        }
+
+        tablet->set_last_base_compaction_status(res.to_string());
         if (!res.ok()) {
-            set_last_base_compaction_failure_time(UnixMillis());
-            *permits = 0;
+            tablet->set_last_base_compaction_failure_time(UnixMillis());
+            permits = 0;
             if (!res.is<BE_NO_SUITABLE_VERSION>()) {
                 
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
-                return Status::InternalError("prepare base compaction with 
err: {}",
-                                             res.to_string());
+                return Status::InternalError("prepare base compaction with 
err: {}", res);
             }
             // return OK if OLAP_ERR_BE_NO_SUITABLE_VERSION, so that we don't 
need to
             // print too much useless logs.
             // And because we set permits to 0, so even if we return OK here, 
nothing will be done.
             return Status::OK();
         }
-        compaction_rowsets = _base_compaction->get_input_rowsets();
     } else {
         DCHECK_EQ(compaction_type, CompactionType::FULL_COMPACTION);
-        MonotonicStopWatch watch;
-        watch.start();
-        StorageEngine::instance()->create_full_compaction(tablet, 
_full_compaction);
-        Status res = _full_compaction->prepare_compact();
+
+        compaction = std::make_shared<FullCompaction>(tablet);
+        Status res = compaction->prepare_compact();
         if (!res.ok()) {
-            set_last_full_compaction_failure_time(UnixMillis());
-            *permits = 0;
+            tablet->set_last_full_compaction_failure_time(UnixMillis());
+            permits = 0;
             if (!res.is<FULL_NO_SUITABLE_VERSION>()) {
-                return Status::InternalError("prepare full compaction with 
err: {}",
-                                             res.to_string());
+                return Status::InternalError("prepare full compaction with 
err: {}", res);
             }
             // return OK if OLAP_ERR_BE_NO_SUITABLE_VERSION, so that we don't 
need to
             // print too much useless logs.
             // And because we set permits to 0, so even if we return OK here, 
nothing will be done.
             return Status::OK();
         }
-        compaction_rowsets = _full_compaction->get_input_rowsets();
-    }
-    *permits = 0;
-    for (auto rowset : compaction_rowsets) {
-        *permits += rowset->rowset_meta()->get_compaction_score();
     }
-    return Status::OK();
-}
 
-Status Tablet::prepare_single_replica_compaction(TabletSharedPtr tablet,
-                                                 CompactionType 
compaction_type) {
-    StorageEngine::instance()->create_single_replica_compaction(tablet, 
_single_replica_compaction,
-                                                                
compaction_type);
-    Status res = _single_replica_compaction->prepare_compact();
-    if (!res.ok()) {
-        if (!res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
-            return Status::InternalError("prepare single replica compaction 
with err: {}",
-                                         res.to_string());
-        }
+    permits = 0;
+    for (auto&& rowset : compaction->input_rowsets()) {
+        permits += rowset->rowset_meta()->get_compaction_score();
     }
     return Status::OK();
 }
 
-void Tablet::execute_single_replica_compaction(CompactionType compaction_type) 
{
-    Status res = _single_replica_compaction->execute_compact();
+void Tablet::execute_single_replica_compaction(SingleReplicaCompaction& 
compaction) {
+    Status res = compaction.execute_compact();
     if (!res.ok()) {
-        if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
-            set_last_cumu_compaction_failure_time(UnixMillis());
-        } else if (compaction_type == CompactionType::BASE_COMPACTION) {
-            set_last_base_compaction_failure_time(UnixMillis());
-        } else if (compaction_type == CompactionType::FULL_COMPACTION) {
-            set_last_full_compaction_failure_time(UnixMillis());
-        }
+        set_last_failure_time(this, compaction, UnixMillis());
         LOG(WARNING) << "failed to do single replica compaction. res=" << res
                      << ", tablet=" << tablet_id();
         return;
     }
-    if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
-        set_last_cumu_compaction_failure_time(0);
-    } else if (compaction_type == CompactionType::BASE_COMPACTION) {
-        set_last_base_compaction_failure_time(0);
-    } else if (compaction_type == CompactionType::FULL_COMPACTION) {
-        set_last_full_compaction_failure_time(0);
-    }
-}
-
-void Tablet::reset_single_replica_compaction() {
-    _single_replica_compaction.reset();
+    set_last_failure_time(this, compaction, 0);
 }
 
 std::vector<Version> Tablet::get_all_versions() {
@@ -1903,78 +1882,38 @@ std::vector<Version> Tablet::get_all_versions() {
     return local_versions;
 }
 
-void Tablet::execute_compaction(CompactionType compaction_type) {
+void Tablet::execute_compaction(Compaction& compaction) {
     signal::tablet_id = tablet_id();
-    if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
-        MonotonicStopWatch watch;
-        watch.start();
-        SCOPED_CLEANUP({
-            if (!config::disable_compaction_trace_log &&
-                watch.elapsed_time() / 1e9 > 
config::cumulative_compaction_trace_threshold) {
-                std::stringstream ss;
-                _cumulative_compaction->runtime_profile()->pretty_print(&ss);
-                LOG(WARNING) << "execute cumulative compaction cost " << 
watch.elapsed_time() / 1e9
-                             << std::endl
-                             << ss.str();
-            }
-        });
 
-        Status res = _cumulative_compaction->execute_compact();
-        if (!res.ok()) {
-            set_last_cumu_compaction_failure_time(UnixMillis());
-            
DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
-            LOG(WARNING) << "failed to do cumulative compaction. res=" << res
-                         << ", tablet=" << tablet_id();
-            return;
-        }
-        set_last_cumu_compaction_failure_time(0);
-    } else if (compaction_type == CompactionType::BASE_COMPACTION) {
-        DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION);
-        MonotonicStopWatch watch;
-        watch.start();
-        SCOPED_CLEANUP({
-            if (!config::disable_compaction_trace_log &&
-                watch.elapsed_time() / 1e9 > 
config::base_compaction_trace_threshold) {
-                std::stringstream ss;
-                _base_compaction->runtime_profile()->pretty_print(&ss);
-                LOG(WARNING) << "execute base compaction cost " << 
watch.elapsed_time() / 1e9
-                             << std::endl
-                             << ss.str();
-            }
-        });
+    MonotonicStopWatch watch;
+    watch.start();
 
-        Status res = _base_compaction->execute_compact();
-        set_last_base_compaction_status(res.to_string());
-        if (!res.ok()) {
-            set_last_base_compaction_failure_time(UnixMillis());
-            
DorisMetrics::instance()->base_compaction_request_failed->increment(1);
-            LOG(WARNING) << "failed to do base compaction. res=" << res
-                         << ", tablet=" << tablet_id();
-            return;
-        }
-        set_last_base_compaction_failure_time(0);
+    Status res = compaction.execute_compact();
+
+    if (!res.ok()) [[unlikely]] {
+        set_last_failure_time(this, compaction, UnixMillis());
+        LOG(WARNING) << "failed to do " << compaction.compaction_name()
+                     << ", tablet=" << tablet_id() << " : " << res;
     } else {
-        DCHECK_EQ(compaction_type, CompactionType::FULL_COMPACTION);
-        MonotonicStopWatch watch;
-        watch.start();
-        Status res = _full_compaction->execute_compact();
-        if (!res.ok()) {
-            set_last_full_compaction_failure_time(UnixMillis());
-            LOG(WARNING) << "failed to do full compaction. res=" << res
-                         << ", tablet=" << tablet_id();
-            return;
-        }
-        set_last_full_compaction_failure_time(0);
+        set_last_failure_time(this, compaction, 0);
     }
-}
 
-void Tablet::reset_compaction(CompactionType compaction_type) {
-    if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
-        _cumulative_compaction.reset();
-    } else if (compaction_type == CompactionType::BASE_COMPACTION) {
-        _base_compaction.reset();
-    } else {
-        _full_compaction.reset();
+    if (!config::disable_compaction_trace_log) {
+        auto need_trace = [&compaction, &watch] {
+            return compaction.compaction_type() == 
ReaderType::READER_CUMULATIVE_COMPACTION
+                           ? watch.elapsed_time() / 1e9 >
+                                     
config::cumulative_compaction_trace_threshold
+                   : compaction.compaction_type() == 
ReaderType::READER_BASE_COMPACTION
+                           ? watch.elapsed_time() / 1e9 > 
config::base_compaction_trace_threshold
+                           : false;
+        };
+        if (need_trace()) {
+            std::stringstream ss;
+            compaction.runtime_profile()->pretty_print(&ss);
+            LOG(WARNING) << "execute " << compaction.compaction_name() << " 
cost "
+                         << watch.elapsed_time() / 1e9 << std::endl
+                         << ss.str();
+        }
     }
 }
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 4174a0ec26d..614288921ed 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -59,9 +59,7 @@ namespace doris {
 
 class Tablet;
 class CumulativeCompactionPolicy;
-class CumulativeCompaction;
-class BaseCompaction;
-class FullCompaction;
+class Compaction;
 class SingleReplicaCompaction;
 class RowsetWriter;
 struct RowsetWriterContext;
@@ -85,7 +83,7 @@ enum SortType : int;
 
 enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, 
STORAGE_TYPE_REMOTE_AND_LOCAL };
 
-extern const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD;
+static inline constexpr auto TRACE_TABLET_LOCK_THRESHOLD = 
std::chrono::seconds(1);
 
 class Tablet final : public BaseTablet {
 public:
@@ -146,6 +144,8 @@ public:
     // operation in rowsets
     Status add_rowset(RowsetSharedPtr rowset);
     Status create_initial_rowset(const int64_t version);
+
+    // MUST hold EXCLUSIVE `_meta_lock`.
     Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
                           std::vector<RowsetSharedPtr>& to_delete, bool 
check_delete = false);
 
@@ -194,7 +194,6 @@ public:
     std::mutex& get_push_lock() { return _ingest_lock; }
     std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
     std::mutex& get_cumulative_compaction_lock() { return 
_cumulative_compaction_lock; }
-    std::mutex& get_full_compaction_lock() { return _full_compaction_lock; }
 
     std::shared_mutex& get_migration_lock() { return _migration_lock; }
 
@@ -296,18 +295,13 @@ public:
     // return a json string to show the compaction status of this tablet
     void get_compaction_status(std::string* json_result);
 
-    Status prepare_compaction_and_calculate_permits(CompactionType 
compaction_type,
-                                                    TabletSharedPtr tablet, 
int64_t* permits);
-
-    Status prepare_single_replica_compaction(TabletSharedPtr tablet,
-                                             CompactionType compaction_type);
-    void execute_compaction(CompactionType compaction_type);
-    void reset_compaction(CompactionType compaction_type);
-    void execute_single_replica_compaction(CompactionType compaction_type);
-    void reset_single_replica_compaction();
+    static Status prepare_compaction_and_calculate_permits(CompactionType 
compaction_type,
+                                                           const 
TabletSharedPtr& tablet,
+                                                           
std::shared_ptr<Compaction>& compaction,
+                                                           int64_t& permits);
 
-    void set_clone_occurred(bool clone_occurred) { _is_clone_occurred = 
clone_occurred; }
-    bool get_clone_occurred() { return _is_clone_occurred; }
+    void execute_compaction(Compaction& compaction);
+    void execute_single_replica_compaction(SingleReplicaCompaction& 
compaction);
 
     void set_cumulative_compaction_policy(
             std::shared_ptr<CumulativeCompactionPolicy> 
cumulative_compaction_policy) {
@@ -319,7 +313,7 @@ public:
     }
 
     void set_last_base_compaction_status(std::string status) {
-        _last_base_compaction_status = status;
+        _last_base_compaction_status = std::move(status);
     }
 
     std::string get_last_base_compaction_status() { return 
_last_base_compaction_status; }
@@ -622,7 +616,6 @@ private:
     std::mutex _ingest_lock;
     std::mutex _base_compaction_lock;
     std::mutex _cumulative_compaction_lock;
-    std::mutex _full_compaction_lock;
     std::mutex _schema_change_lock;
     std::shared_mutex _migration_lock;
     std::mutex _build_inverted_index_lock;
@@ -666,14 +659,6 @@ private:
     std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
     std::string_view _cumulative_compaction_type;
 
-    std::shared_ptr<CumulativeCompaction> _cumulative_compaction;
-    std::shared_ptr<BaseCompaction> _base_compaction;
-    std::shared_ptr<FullCompaction> _full_compaction;
-    std::shared_ptr<SingleReplicaCompaction> _single_replica_compaction;
-
-    // whether clone task occurred during the tablet is in thread pool queue 
to wait for compaction
-    std::atomic<bool> _is_clone_occurred;
-
     // use a seperate thread to check all tablets paths existance
     std::atomic<bool> _is_tablet_path_exists;
 
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 068e8723354..e27db6188bd 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -702,7 +702,6 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const 
std::string& clone_d
     std::lock_guard 
cumulative_compaction_lock(tablet->get_cumulative_compaction_lock());
     std::lock_guard cold_compaction_lock(tablet->get_cold_compaction_lock());
     std::lock_guard 
build_inverted_index_lock(tablet->get_build_inverted_index_lock());
-    tablet->set_clone_occurred(true);
     std::lock_guard<std::mutex> push_lock(tablet->get_push_lock());
     std::lock_guard<std::mutex> rwlock(tablet->get_rowset_update_lock());
     std::lock_guard<std::shared_mutex> wrlock(tablet->get_header_lock());
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index 8fba16c67df..9e8fc39abde 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -201,12 +201,10 @@ Status EngineStorageMigrationTask::_migrate() {
     // compaction will be prohibited for the mow table when migration. 
Moreover, it is useless
     // to perform a compaction operation on the migration data, as the 
migration still migrates
     // the data of rowsets before the compaction operation.
-    std::unique_lock full_compaction_lock(_tablet->get_full_compaction_lock(), 
std::defer_lock);
     std::unique_lock base_compaction_lock(_tablet->get_base_compaction_lock(), 
std::defer_lock);
     std::unique_lock 
cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(),
                                           std::defer_lock);
     if (_tablet->enable_unique_key_merge_on_write()) {
-        full_compaction_lock.lock();
         base_compaction_lock.lock();
         cumu_compaction_lock.lock();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to