This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5641c778fb0 [fix](compaction) fixing the inaccurate statistics of 
concurrent compaction tasks (#37318)
5641c778fb0 is described below

commit 5641c778fb0c7d9f4145e7a764eca5988efca18d
Author: Luwei <814383...@qq.com>
AuthorDate: Sat Jul 6 00:03:15 2024 +0800

    [fix](compaction) fixing the inaccurate statistics of concurrent compaction 
tasks (#37318)
    
    Specify the specific stages of compaction tasks and accurately count
    concurrent tasks only while they are actively running
---
 be/src/olap/base_tablet.h             |   4 ++
 be/src/olap/olap_server.cpp           |  51 ++++++++-----
 be/src/olap/storage_engine.cpp        |  12 ++--
 be/src/olap/storage_engine.h          |   8 ++-
 be/src/olap/tablet_manager.cpp        |   4 +-
 be/src/olap/tablet_manager.h          |   2 +-
 be/test/olap/compaction_task_test.cpp | 132 ++++++++++++++++++++++++++++++++++
 be/test/olap/tablet_mgr_test.cpp      |   2 +-
 8 files changed, 185 insertions(+), 30 deletions(-)

diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 4852a6cba9b..f625ecf4a0a 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -23,6 +23,7 @@
 
 #include "common/status.h"
 #include "olap/iterators.h"
+#include "olap/olap_common.h"
 #include "olap/partial_update_info.h"
 #include "olap/rowset/segment_v2/segment.h"
 #include "olap/tablet_fwd.h"
@@ -43,6 +44,8 @@ struct TabletWithVersion {
     int64_t version;
 };
 
+enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING };
+
 // Base class for all tablet classes
 class BaseTablet {
 public:
@@ -301,6 +304,7 @@ public:
     std::atomic<int64_t> write_count = 0;
     std::atomic<int64_t> compaction_count = 0;
 
+    CompactionStage compaction_stage = CompactionStage::NOT_SCHEDULED;
     std::mutex sample_info_lock;
     std::vector<CompactionSampleInfo> sample_infos;
     Status last_compaction_status = Status::OK();
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 91c13918818..07acf77eed9 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -44,6 +44,7 @@
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
+#include "common/sync_point.h"
 #include "gen_cpp/BackendService.h"
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/Types_constants.h"
@@ -51,6 +52,7 @@
 #include "gutil/ref_counted.h"
 #include "io/fs/file_writer.h" // IWYU pragma: keep
 #include "io/fs/path.h"
+#include "olap/base_tablet.h"
 #include "olap/cold_data_compaction.h"
 #include "olap/compaction_permit_limiter.h"
 #include "olap/cumulative_compaction_policy.h"
@@ -603,8 +605,8 @@ void StorageEngine::_adjust_compaction_thread_num() {
 void StorageEngine::_compaction_tasks_producer_callback() {
     LOG(INFO) << "try to start compaction producer process!";
 
-    std::unordered_set<TTabletId> tablet_submitted_cumu;
-    std::unordered_set<TTabletId> tablet_submitted_base;
+    std::unordered_set<TabletSharedPtr> tablet_submitted_cumu;
+    std::unordered_set<TabletSharedPtr> tablet_submitted_base;
     std::vector<DataDir*> data_dirs = get_stores();
     for (auto& data_dir : data_dirs) {
         _tablet_submitted_cumu_compaction[data_dir] = tablet_submitted_cumu;
@@ -884,6 +886,17 @@ int get_concurrent_per_disk(int max_score, int 
thread_per_disk) {
     return thread_per_disk;
 }
 
+int StorageEngine::_get_executing_compaction_num(
+        std::unordered_set<TabletSharedPtr>& compaction_tasks) {
+    int num = 0;
+    for (const auto& task : compaction_tasks) {
+        if (task->compaction_stage == CompactionStage::EXECUTING) {
+            num++;
+        }
+    }
+    return num;
+}
+
 std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
         CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool 
check_score) {
     _update_cumulative_compaction_policy();
@@ -896,8 +909,8 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
 
     // Copy _tablet_submitted_xxx_compaction map so that we don't need to hold 
_tablet_submitted_compaction_mutex
     // when traversing the data dir
-    std::map<DataDir*, std::unordered_set<TTabletId>> copied_cumu_map;
-    std::map<DataDir*, std::unordered_set<TTabletId>> copied_base_map;
+    std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_cumu_map;
+    std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_base_map;
     {
         std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
         copied_cumu_map = _tablet_submitted_cumu_compaction;
@@ -910,7 +923,8 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
         // in the current submitted tasks.
         // If so, the last Slot can be assigned to Base compaction,
         // otherwise, this Slot needs to be reserved for cumulative compaction.
-        int count = copied_cumu_map[data_dir].size() + 
copied_base_map[data_dir].size();
+        int count = _get_executing_compaction_num(copied_cumu_map[data_dir]) +
+                    _get_executing_compaction_num(copied_base_map[data_dir]);
         int thread_per_disk = data_dir->is_ssd_disk() ? 
config::compaction_task_num_per_fast_disk
                                                       : 
config::compaction_task_num_per_disk;
 
@@ -973,19 +987,16 @@ bool 
StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table
     bool already_existed = false;
     switch (compaction_type) {
     case CompactionType::CUMULATIVE_COMPACTION:
-        already_existed = 
!(_tablet_submitted_cumu_compaction[tablet->data_dir()]
-                                    .insert(tablet->tablet_id())
-                                    .second);
+        already_existed =
+                
!(_tablet_submitted_cumu_compaction[tablet->data_dir()].insert(tablet).second);
         break;
     case CompactionType::BASE_COMPACTION:
-        already_existed = 
!(_tablet_submitted_base_compaction[tablet->data_dir()]
-                                    .insert(tablet->tablet_id())
-                                    .second);
+        already_existed =
+                
!(_tablet_submitted_base_compaction[tablet->data_dir()].insert(tablet).second);
         break;
     case CompactionType::FULL_COMPACTION:
-        already_existed = 
!(_tablet_submitted_full_compaction[tablet->data_dir()]
-                                    .insert(tablet->tablet_id())
-                                    .second);
+        already_existed =
+                
!(_tablet_submitted_full_compaction[tablet->data_dir()].insert(tablet).second);
         break;
     }
     return already_existed;
@@ -997,13 +1008,13 @@ void 
StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet
     int removed = 0;
     switch (compaction_type) {
     case CompactionType::CUMULATIVE_COMPACTION:
-        removed = 
_tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+        removed = 
_tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet);
         break;
     case CompactionType::BASE_COMPACTION:
-        removed = 
_tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+        removed = 
_tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet);
         break;
     case CompactionType::FULL_COMPACTION:
-        removed = 
_tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+        removed = 
_tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet);
         break;
     }
 
@@ -1034,6 +1045,7 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                 "compaction task has already been submitted, tablet_id={}, 
compaction_type={}.",
                 tablet->tablet_id(), compaction_type);
     }
+    tablet->compaction_stage = CompactionStage::PENDING;
     std::shared_ptr<CompactionMixin> compaction;
     int64_t permits = 0;
     Status st = 
Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet,
@@ -1048,17 +1060,21 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                         : _base_compaction_thread_pool;
         auto st = thread_pool->submit_func([tablet, compaction = 
std::move(compaction),
                                             compaction_type, permits, force, 
this]() {
+            tablet->compaction_stage = CompactionStage::EXECUTING;
+            
TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
             tablet->execute_compaction(*compaction);
             if (!force) {
                 _permit_limiter.release(permits);
             }
             _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+            tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
         });
         if (!st.ok()) {
             if (!force) {
                 _permit_limiter.release(permits);
             }
             _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+            tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
             return Status::InternalError(
                     "failed to submit compaction task to thread pool, "
                     "tablet_id={}, compaction_type={}.",
@@ -1067,6 +1083,7 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
         return Status::OK();
     } else {
         _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+        tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
         if (!st.ok()) {
             return Status::InternalError(
                     "failed to prepare compaction task and calculate permits, "
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 90093241ad2..43093d3183e 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1404,9 +1404,9 @@ Status 
StorageEngine::get_compaction_status_json(std::string* result) {
         rapidjson::Document arr;
         arr.SetArray();
 
-        for (auto& tablet_id : it.second) {
+        for (auto& tablet : it.second) {
             rapidjson::Value key;
-            const std::string& key_str = std::to_string(tablet_id);
+            const std::string& key_str = std::to_string(tablet->tablet_id());
             key.SetString(key_str.c_str(), key_str.length(), 
path_obj.GetAllocator());
             arr.PushBack(key, root.GetAllocator());
         }
@@ -1428,9 +1428,9 @@ Status 
StorageEngine::get_compaction_status_json(std::string* result) {
         rapidjson::Document arr;
         arr.SetArray();
 
-        for (auto& tablet_id : it.second) {
+        for (auto& tablet : it.second) {
             rapidjson::Value key;
-            const std::string& key_str = std::to_string(tablet_id);
+            const std::string& key_str = std::to_string(tablet->tablet_id());
             key.SetString(key_str.c_str(), key_str.length(), 
path_obj2.GetAllocator());
             arr.PushBack(key, root.GetAllocator());
         }
@@ -1452,9 +1452,9 @@ Status 
StorageEngine::get_compaction_status_json(std::string* result) {
         rapidjson::Document arr;
         arr.SetArray();
 
-        for (auto& tablet_id : it.second) {
+        for (auto& tablet : it.second) {
             rapidjson::Value key;
-            const std::string& key_str = std::to_string(tablet_id);
+            const std::string& key_str = std::to_string(tablet->tablet_id());
             key.SetString(key_str.c_str(), key_str.length(), 
path_obj3.GetAllocator());
             arr.PushBack(key, root.GetAllocator());
         }
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 5ddd888db6d..64312a2b2b7 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -389,6 +389,8 @@ private:
 
     int32_t _auto_get_interval_by_disk_capacity(DataDir* data_dir);
 
+    int _get_executing_compaction_num(std::unordered_set<TabletSharedPtr>& 
compaction_tasks);
+
 private:
     EngineOptions _options;
     std::mutex _store_lock;
@@ -451,9 +453,9 @@ private:
 
     std::mutex _tablet_submitted_compaction_mutex;
     // a tablet can do base and cumulative compaction at same time
-    std::map<DataDir*, std::unordered_set<TTabletId>> 
_tablet_submitted_cumu_compaction;
-    std::map<DataDir*, std::unordered_set<TTabletId>> 
_tablet_submitted_base_compaction;
-    std::map<DataDir*, std::unordered_set<TTabletId>> 
_tablet_submitted_full_compaction;
+    std::map<DataDir*, std::unordered_set<TabletSharedPtr>> 
_tablet_submitted_cumu_compaction;
+    std::map<DataDir*, std::unordered_set<TabletSharedPtr>> 
_tablet_submitted_base_compaction;
+    std::map<DataDir*, std::unordered_set<TabletSharedPtr>> 
_tablet_submitted_full_compaction;
 
     std::mutex _low_priority_task_nums_mutex;
     std::unordered_map<DataDir*, int32_t> _low_priority_task_nums;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 1f8017de44e..a234ab93a47 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -722,7 +722,7 @@ void TabletManager::get_tablet_stat(TTabletStatResult* 
result) {
 
 std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction(
         CompactionType compaction_type, DataDir* data_dir,
-        const std::unordered_set<TTabletId>& tablet_submitted_compaction, 
uint32_t* score,
+        const std::unordered_set<TabletSharedPtr>& 
tablet_submitted_compaction, uint32_t* score,
         const std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>&
                 all_cumulative_compaction_policies) {
     int64_t now_ms = UnixMillis();
@@ -749,7 +749,7 @@ std::vector<TabletSharedPtr> 
TabletManager::find_best_tablets_to_compaction(
             return;
         }
 
-        auto search = 
tablet_submitted_compaction.find(tablet_ptr->tablet_id());
+        auto search = tablet_submitted_compaction.find(tablet_ptr);
         if (search != tablet_submitted_compaction.end()) {
             return;
         }
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 51cc2af404a..809a2237356 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -82,7 +82,7 @@ public:
     // single compaction tasks for the tablet.
     std::vector<TabletSharedPtr> find_best_tablets_to_compaction(
             CompactionType compaction_type, DataDir* data_dir,
-            const std::unordered_set<TTabletId>& tablet_submitted_compaction, 
uint32_t* score,
+            const std::unordered_set<TabletSharedPtr>& 
tablet_submitted_compaction, uint32_t* score,
             const std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>&
                     all_cumulative_compaction_policies);
 
diff --git a/be/test/olap/compaction_task_test.cpp 
b/be/test/olap/compaction_task_test.cpp
new file mode 100644
index 00000000000..1bb92af982b
--- /dev/null
+++ b/be/test/olap/compaction_task_test.cpp
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <filesystem>
+#include <memory>
+
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "gtest/gtest_pred_impl.h"
+#include "io/fs/local_file_system.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/data_dir.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "util/threadpool.h"
+
+namespace doris {
+using namespace config;
+
+class CompactionTaskTest : public testing::Test {
+public:
+    virtual void SetUp() {
+        _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp";
+        auto st = 
io::global_local_filesystem()->delete_directory(_engine_data_path);
+        ASSERT_TRUE(st.ok()) << st;
+        st = 
io::global_local_filesystem()->create_directory(_engine_data_path);
+        ASSERT_TRUE(st.ok()) << st;
+        EXPECT_TRUE(
+                
io::global_local_filesystem()->create_directory(_engine_data_path + 
"/meta").ok());
+
+        EngineOptions options;
+        options.backend_uid = UniqueId::gen_uid();
+        _storage_engine = std::make_unique<StorageEngine>(options);
+        _data_dir = std::make_unique<DataDir>(*_storage_engine, 
_engine_data_path, 100000000);
+        static_cast<void>(_data_dir->init());
+    }
+
+    virtual void TearDown() {
+        
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
+        ExecEnv::GetInstance()->set_storage_engine(nullptr);
+    }
+
+    std::unique_ptr<StorageEngine> _storage_engine;
+    std::string _engine_data_path;
+    std::unique_ptr<DataDir> _data_dir;
+};
+
+static RowsetSharedPtr create_rowset(Version version, int num_segments, bool 
overlapping,
+                                     int data_size) {
+    auto rs_meta = std::make_shared<RowsetMeta>();
+    rs_meta->set_rowset_type(BETA_ROWSET); // important
+    rs_meta->_rowset_meta_pb.set_start_version(version.first);
+    rs_meta->_rowset_meta_pb.set_end_version(version.second);
+    rs_meta->set_num_segments(num_segments);
+    rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
+    rs_meta->set_total_disk_size(data_size);
+    RowsetSharedPtr rowset;
+    Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), 
&rowset);
+    if (!st.ok()) {
+        return nullptr;
+    }
+    return rowset;
+}
+
+TEST_F(CompactionTaskTest, TestSubmitCompactionTask) {
+    auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                      .set_min_threads(2)
+                      .set_max_threads(2)
+                      .build(&_storage_engine->_base_compaction_thread_pool);
+    EXPECT_TRUE(st.OK());
+    st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                 .set_min_threads(2)
+                 .set_max_threads(2)
+                 .build(&_storage_engine->_cumu_compaction_thread_pool);
+    EXPECT_TRUE(st.OK());
+
+    auto* sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    sp->set_call_back("olap_server::execute_compaction", [](auto&& values) {
+        std::this_thread::sleep_for(std::chrono::seconds(10));
+        bool* pred = try_any_cast<bool*>(values.back());
+        *pred = true;
+    });
+
+    for (int tablet_cnt = 0; tablet_cnt < 10; ++tablet_cnt) {
+        TabletMetaSharedPtr tablet_meta;
+        tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, 
TTabletSchema(), 6, {{7, 8}},
+                                         UniqueId(9, 10), 
TTabletType::TABLET_TYPE_DISK,
+                                         TCompressionType::LZ4F));
+        TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()), 
tablet_meta, _data_dir.get(),
+                                          CUMULATIVE_SIZE_BASED_POLICY));
+        st = tablet->init();
+        EXPECT_TRUE(st.OK());
+
+        for (int i = 2; i < 30; ++i) {
+            RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
+            tablet->_rs_version_map.emplace(rs->version(), rs);
+        }
+        tablet->_cumulative_point = 2;
+
+        st = _storage_engine->_submit_compaction_task(tablet, 
CompactionType::CUMULATIVE_COMPACTION,
+                                                      false);
+        EXPECT_TRUE(st.OK());
+    }
+
+    int executing_task_num = _storage_engine->_get_executing_compaction_num(
+            
_storage_engine->_tablet_submitted_cumu_compaction[_data_dir.get()]);
+    EXPECT_EQ(executing_task_num, 2);
+}
+
+} // namespace doris
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 69f17ecccba..c3cd74b55ed 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -421,7 +421,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
         create_tablet(id, false, rowset_size++);
     }
 
-    std::unordered_set<TTabletId> cumu_set;
+    std::unordered_set<TabletSharedPtr> cumu_set;
     std::unordered_map<std::string_view, 
std::shared_ptr<CumulativeCompactionPolicy>>
             cumulative_compaction_policies;
     cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to