This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 859c9c1f6666e3a62d827661a03d65700d11fc48
Author: Yida Wu <[email protected]>
AuthorDate: Sat Aug 2 05:28:00 2025 -0700

    IMPALA-14276: Fix memory leak by removing AdmissionState on rejection
    
    Normally, AdmissionState entries in admissiond are cleaned up when
    a query is released. However, for requests that are rejected,
    releasing query is not called, and their AdmissionState was not
    removed from admission_state_map_ resulting in a memory leak over
    time.
    
    This leak was less noticeable because AdmissionState entries were
    relatively small. However, when admissiond is run as a standalone
    process, each AdmissionState includes a profile sidecar, which
    can be large, making the leak much more.
    
    This change adds logic to remove AdmissionState entries when the
    admission request is rejected.
    
    Testing:
    Add test_admission_state_map_mem_leak for regression test.
    
    Change-Id: I9fba4f176c648ed7811225f7f94c91342a724d10
    Reviewed-on: http://gerrit.cloudera.org:8080/23257
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Abhishek Rawat <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/admission-control-service.cc    | 10 ++++
 tests/custom_cluster/test_admission_controller.py | 70 +++++++++++++++++++++++
 2 files changed, 80 insertions(+)

diff --git a/be/src/scheduling/admission-control-service.cc 
b/be/src/scheduling/admission-control-service.cc
index f94e1e039..be6c33bd0 100644
--- a/be/src/scheduling/admission-control-service.cc
+++ b/be/src/scheduling/admission-control-service.cc
@@ -212,6 +212,16 @@ void AdmissionControlService::GetQueryStatus(const 
GetQueryStatusRequestPB* req,
   }
 
   RespondAndReleaseRpc(status, resp, rpc_context);
+  if (admission_state->admission_done && !admission_state->admit_status.ok()) {
+    LOG(INFO) << "Query " << req->query_id()
+              << " was rejected. Removing admission state to free resources.";
+    // If this RPC fails and the admission state is already removed,
+    // a retry may fail with an "Invalid handle" error because the entry is 
gone.
+    // This is okay and doesn't cause any real problem.
+    // To make it more robust, we may delay the removal using a time-based 
approach.
+    discard_result(admission_state_map_.Delete(req->query_id()));
+    VLOG(3) << "Current admission state map size: " << 
admission_state_map_.Count();
+  }
 }
 
 void AdmissionControlService::ReleaseQuery(const ReleaseQueryRequestPB* req,
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 77ea46415..bd7c1c44b 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -2327,6 +2327,76 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     except subprocess.CalledProcessError as e:
       assert "cluster_membership_retained_removed_coords" in str(e)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=("--vmodule=admission-control-service=3 
--default_pool_max_requests=1 "
+      "--queue_wait_timeout_ms=1"),
+      disable_log_buffering=True)
+  def test_admission_state_map_mem_leak(self):
+    """
+    Regression test to reproduce IMPALA-14276.
+    Steps:
+    1. Submit a long-running query to coord1 and let it run.
+    2. Repeatedly submit short queries to coord2 that get queued and time out 
due to
+       admission limits.
+    3. Get memory usage before and after to check for possible memory leak in
+       admissiond.
+    """
+
+    # Long-running query that blocks a request slot.
+    long_query = "select count(*) from functional.alltypes where int_col = 
sleep(10000)"
+    # Simple short query used to trigger queuing and timeout.
+    short_query = "select count(*) from functional.alltypes limit 1"
+
+    # Max timeout for waiting on query state transitions.
+    timeout_s = 10
+
+    ac = self.cluster.admissiond
+    all_coords = self.cluster.get_all_coordinators()
+    assert len(all_coords) >= 2, "Test requires at least two coordinators"
+
+    coord1, coord2 = all_coords[0], all_coords[1]
+
+    # Submit long query to coord1 to occupy the admission slot.
+    client1 = coord1.service.create_hs2_client()
+    handle1 = client1.execute_async(long_query)
+    client1.wait_for_impala_state(handle1, RUNNING, timeout_s)
+
+    # Allow some time for the system to stabilize.
+    sleep(5)
+    # Capture memory usage before stressing the system.
+    old_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use")
+    assert old_total_bytes != 0
+
+    # Submit short queries to coord2 which will be queued and time out.
+    client2 = coord2.service.create_hs2_client()
+    number_of_iterations = 500
+    for i in range(number_of_iterations):
+        handle2 = client2.execute_async(short_query)
+        self._wait_for_change_to_profile(
+            handle2,
+            "Query Status: Admission for query exceeded timeout",
+            client=client2,
+            timeout=timeout_s)
+        client2.close_query(handle2)
+
+    # Capture memory usage after the test.
+    new_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use")
+
+    # Ensure memory usage has not grown more than 10%, indicating no leak.
+    assert new_total_bytes < old_total_bytes * 1.1
+    # Check if the admission state map size stays 1 all the time, which is
+    # the long running query.
+    admissiond_log = self.get_ac_log_name()
+    self.assert_log_contains(admissiond_log, 'INFO',
+      "Current admission state map size: {}".format(1),
+      expected_count=number_of_iterations)
+
+    # Cleanup clients.
+    client1.close()
+    client2.close()
+
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(

Reply via email to