This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c9bfdbb272238a73e95e483c823f6b54f022de0d
Author: Yida Wu <[email protected]>
AuthorDate: Wed Jan 14 03:07:27 2026 -0800

    IMPALA-14682: Use centralized async cleanup for admission state cleanup
    
    In IMPALA-14605, we added a mechanism to clean up the admission state
    asynchronously. This patch refactors all admission state deletions
    to use this centralized async method, making it easier to reason
    about when admission state is removed and to detect cases where a
    query’s admission state is not properly cleared.
    
    Additionally, this refactoring is a necessary step for future
    improvements, such as implementing time-based deletion.
    
    Also updated test_admission_state_map_mem_leak to verify the
    admission state number using the new global metric
    admission-control-service.num-queries as it is more stable than
    checking the log.
    
    Tests:
    Passed core tests.
    Passed exhaustive custom_cluster/test_admission_controller.py test.
    
    Change-Id: I04f46f2e42ec5e50f4dcccb6b73a34a376615ab0
    Reviewed-on: http://gerrit.cloudera.org:8080/23873
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/admission-control-service.cc    | 28 +++++++++++------------
 be/src/scheduling/admission-control-service.h     |  6 ++---
 be/src/scheduling/admission-controller.cc         |  2 +-
 be/src/scheduling/admission-controller.h          |  3 ++-
 be/src/scheduling/admissiond-env.cc               |  4 ++--
 tests/custom_cluster/test_admission_controller.py |  6 ++---
 6 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/be/src/scheduling/admission-control-service.cc 
b/be/src/scheduling/admission-control-service.cc
index a94eb56de..c5b7aaa9b 100644
--- a/be/src/scheduling/admission-control-service.cc
+++ b/be/src/scheduling/admission-control-service.cc
@@ -241,7 +241,7 @@ void AdmissionControlService::GetQueryStatus(const 
GetQueryStatusRequestPB* req,
     // a retry may fail with an "Invalid handle" error because the entry is 
gone.
     // This is okay and doesn't cause any real problem.
     // To make it more robust, we may delay the removal using a time-based 
approach.
-    discard_result(admission_state_map_.Delete(req->query_id()));
+    CleanupAdmissionStateMapAsync(req->query_id(), __func__);
     VLOG(3) << "Current admission state map size: " << 
admission_state_map_.Count();
   }
   RespondAndReleaseRpc(status, resp, rpc_context);
@@ -265,7 +265,9 @@ void AdmissionControlService::ReleaseQuery(const 
ReleaseQueryRequestPB* req,
     }
   }
 
-  RESPOND_IF_ERROR(admission_state_map_.Delete(req->query_id()));
+  // Use async cleanup as the centralized way for admission state deletion, the
+  // client should not need to handle deletion errors of this internal map.
+  CleanupAdmissionStateMapAsync(req->query_id(), __func__);
   RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
 }
 
@@ -326,9 +328,7 @@ void AdmissionControlService::AdmissionHeartbeat(const 
AdmissionHeartbeatRequest
           req->host_id(), query_ids);
 
   for (const UniqueIdPB& query_id : cleaned_up) {
-    // ShardedQueryMap::Delete will log an error already if anything goes 
wrong, so just
-    // ignore the return value.
-    discard_result(admission_state_map_.Delete(query_id));
+    CleanupAdmissionStateMapAsync(query_id, __func__);
   }
 
   RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
@@ -343,9 +343,7 @@ void 
AdmissionControlService::CancelQueriesOnFailedCoordinators(
 
   for (const auto& entry : cleaned_up) {
     for (const UniqueIdPB& query_id : entry.second) {
-      // ShardedQueryMap::Delete will log an error already if anything goes 
wrong, so just
-      // ignore the return value.
-      discard_result(admission_state_map_.Delete(query_id));
+      CleanupAdmissionStateMapAsync(query_id, __func__);
     }
   }
 }
@@ -404,10 +402,11 @@ bool AdmissionControlService::CheckAndUpdateHeartbeat(
   return false;
 }
 
-void AdmissionControlService::CleanupAdmissionStateMapAsync(const UniqueIdPB& 
query_id) {
+void AdmissionControlService::CleanupAdmissionStateMapAsync(
+    const UniqueIdPB& query_id, const char* caller_func) {
   {
     std::lock_guard<std::mutex> lock(cleanup_queue_lock_);
-    admission_state_cleanup_queue_.push_back(query_id);
+    admission_state_cleanup_queue_.emplace_back(query_id, caller_func);
   }
   cleanup_queue_cv_.notify_all();
 }
@@ -418,12 +417,13 @@ void 
AdmissionControlService::AdmissionStateMapCleanupLoop() {
     cleanup_queue_cv_.wait(lock,
         [&] { return !admission_state_cleanup_queue_.empty() || 
shutdown_.load(); });
     if (admission_state_cleanup_queue_.empty() && shutdown_.load()) return;
-    std::deque<UniqueIdPB> local_queue;
+    std::deque<std::pair<UniqueIdPB, const char*>> local_queue;
     std::swap(local_queue, admission_state_cleanup_queue_);
     lock.unlock();
-    for (const UniqueIdPB& query_id : local_queue) {
-      discard_result(admission_state_map_.Delete(query_id));
-      VLOG_QUERY << "Cleaned up admission state map for query=" << 
PrintId(query_id);
+    for (const auto& entry : local_queue) {
+      discard_result(admission_state_map_.Delete(entry.first));
+      VLOG_QUERY << "Cleaned up admission state map for query=" << 
PrintId(entry.first)
+                 << ", triggered by function: " << entry.second;
     }
     lock.lock();
   }
diff --git a/be/src/scheduling/admission-control-service.h 
b/be/src/scheduling/admission-control-service.h
index 57afc90d4..762bc49a2 100644
--- a/be/src/scheduling/admission-control-service.h
+++ b/be/src/scheduling/admission-control-service.h
@@ -81,7 +81,7 @@ class AdmissionControlService : public 
AdmissionControlServiceIf,
 
   /// Asyncly queues a request to remove the query from admission_state_map_.
   /// This is non-blocking, thread-safe, and avoids deadlocks with the caller.
-  void CleanupAdmissionStateMapAsync(const UniqueIdPB& query_id);
+  void CleanupAdmissionStateMapAsync(const UniqueIdPB& query_id, const char* 
caller_func);
 
  private:
   friend class ImpalaHttpHandler;
@@ -183,8 +183,8 @@ class AdmissionControlService : public 
AdmissionControlServiceIf,
   /// Condition variable to wake up the cleanup thread.
   std::condition_variable cleanup_queue_cv_;
 
-  /// Queue of query ids waiting to be removed from the map.
-  std::deque<UniqueIdPB> admission_state_cleanup_queue_;
+  /// Queue of query ids and the caller function name waiting to be removed 
from the map.
+  std::deque<std::pair<UniqueIdPB, const char*>> 
admission_state_cleanup_queue_;
 };
 
 } // namespace impala
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 4fa8f4f37..75b9ea3ec 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -2656,7 +2656,7 @@ void AdmissionController::TryDequeue() {
       if (is_cancelled) {
         VLOG_QUERY << "Dequeued cancelled query=" << PrintId(query_id);
         if (admission_map_cleanup_cb_) {
-          admission_map_cleanup_cb_(query_id);
+          admission_map_cleanup_cb_(query_id, __func__);
         }
         return; // next query
       }
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index 1efd3750e..6cf60c411 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -485,7 +485,8 @@ class AdmissionController {
   };
 
   // Callback type for cleaning up the admission map.
-  using AdmissionMapCleanupCb = std::function<void(const UniqueIdPB&)>;
+  using AdmissionMapCleanupCb =
+      std::function<void(const UniqueIdPB&, const char* caller_func)>;
 
   // Register the callback function for admission map cleanup.
   void RegisterAdmissionMapCleanupCallback(AdmissionMapCleanupCb cb) {
diff --git a/be/src/scheduling/admissiond-env.cc 
b/be/src/scheduling/admissiond-env.cc
index 895f052f5..c8398bc60 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -137,8 +137,8 @@ Status AdmissiondEnv::Init() {
   DCHECK(admission_control_svc_);
   DCHECK(admission_controller_);
   admission_controller_->RegisterAdmissionMapCleanupCallback(
-      [&](const UniqueIdPB& query_id) {
-        admission_control_svc_->CleanupAdmissionStateMapAsync(query_id);
+      [&](const UniqueIdPB& query_id, const char* caller_func) {
+        admission_control_svc_->CleanupAdmissionStateMapAsync(query_id, 
caller_func);
       });
 
   RETURN_IF_ERROR(cluster_membership_mgr_->Init());
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 8b033085d..24c063a4a 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -2406,10 +2406,8 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     assert new_total_bytes < old_total_bytes * 1.1
     # Check if the admission state map size stays 1 all the time, which is
     # the long running query.
-    admissiond_log = self.get_ac_log_name()
-    self.assert_log_contains(admissiond_log, 'INFO',
-      "Current admission state map size: {}".format(1),
-      expected_count=number_of_iterations)
+    admission_state_size = 
ac.get_metric_value("admission-control-service.num-queries")
+    assert admission_state_size == 1
 
     # Cleanup clients.
     client1.close()

Reply via email to