This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 46109a77001 [fix](compaction) fix compaction producer hold for permits 
leak (#45664)
46109a77001 is described below

commit 46109a77001ab0472a33910f826b72a3334c2f37
Author: shee <[email protected]>
AuthorDate: Fri Jan 10 12:49:21 2025 +0800

    [fix](compaction) fix compaction producer hold for permits leak (#45664)
    
    compaction producer hold
    <img width="1083" alt="Clipboard_Screenshot_1734609076"
    
src="https://github.com/user-attachments/assets/0e675cf3-3235-47bc-ba07-97b1a90fa2c6";
    />
    
    compaction threads is free
    <img width="1044" alt="Clipboard_Screenshot_1734609095"
    
src="https://github.com/user-attachments/assets/147fccc1-b563-4160-9062-456bdca980c2";
    />
    so judge permits has alread leak
    
    Co-authored-by: garenshi <[email protected]>
    Co-authored-by: camby <[email protected]>
---
 be/src/olap/olap_server.cpp | 36 ++++++++++++++++--------------------
 1 file changed, 16 insertions(+), 20 deletions(-)

diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 90d0883984e..2b77dc7e7a6 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -698,20 +698,6 @@ void StorageEngine::_compaction_tasks_producer_callback() {
                     last_base_score_update_time = cur_time;
                 }
             }
-            std::unique_ptr<ThreadPool>& thread_pool =
-                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
-                            ? _cumu_compaction_thread_pool
-                            : _base_compaction_thread_pool;
-            VLOG_CRITICAL << "compaction thread pool. type: "
-                          << (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
-                                                                               
        : "BASE")
-                          << ", num_threads: " << thread_pool->num_threads()
-                          << ", num_threads_pending_start: "
-                          << thread_pool->num_threads_pending_start()
-                          << ", num_active_threads: " << 
thread_pool->num_active_threads()
-                          << ", max_threads: " << thread_pool->max_threads()
-                          << ", min_threads: " << thread_pool->min_threads()
-                          << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
             std::vector<TabletSharedPtr> tablets_compaction =
                     _generate_compaction_tasks(compaction_type, data_dirs, 
check_score);
             if (tablets_compaction.size() == 0) {
@@ -1066,23 +1052,33 @@ Status 
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
                 (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
                         ? _cumu_compaction_thread_pool
                         : _base_compaction_thread_pool;
+        VLOG_CRITICAL << "compaction thread pool. type: "
+                      << (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+                                                                               
    : "BASE")
+                      << ", num_threads: " << thread_pool->num_threads()
+                      << ", num_threads_pending_start: " << 
thread_pool->num_threads_pending_start()
+                      << ", num_active_threads: " << 
thread_pool->num_active_threads()
+                      << ", max_threads: " << thread_pool->max_threads()
+                      << ", min_threads: " << thread_pool->min_threads()
+                      << ", num_total_queued_tasks: " << 
thread_pool->get_queue_size();
         auto st = thread_pool->submit_func([tablet, compaction = 
std::move(compaction),
                                             compaction_type, permits, force, 
this]() {
+            Defer defer {[&]() {
+                if (!force) {
+                    _permit_limiter.release(permits);
+                }
+                _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+                tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
+            }};
             if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(), 
compaction_type)) {
                 LOG(INFO) << "Tablet state has been changed, no need to begin 
this compaction "
                              "task, tablet_id="
                           << tablet->tablet_id() << ", tablet_state=" << 
tablet->tablet_state();
-                _pop_tablet_from_submitted_compaction(tablet, compaction_type);
                 return;
             }
             tablet->compaction_stage = CompactionStage::EXECUTING;
             
TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
             tablet->execute_compaction(*compaction);
-            if (!force) {
-                _permit_limiter.release(permits);
-            }
-            _pop_tablet_from_submitted_compaction(tablet, compaction_type);
-            tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
         });
         if (!st.ok()) {
             if (!force) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to