This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 859c9c1f6666e3a62d827661a03d65700d11fc48 Author: Yida Wu <[email protected]> AuthorDate: Sat Aug 2 05:28:00 2025 -0700 IMPALA-14276: Fix memory leak by removing AdmissionState on rejection Normally, AdmissionState entries in admissiond are cleaned up when a query is released. However, for requests that are rejected, releasing query is not called, and their AdmissionState was not removed from admission_state_map_ resulting in a memory leak over time. This leak was less noticeable because AdmissionState entries were relatively small. However, when admissiond is run as a standalone process, each AdmissionState includes a profile sidecar, which can be large, making the leak much more. This change adds logic to remove AdmissionState entries when the admission request is rejected. Testing: Add test_admission_state_map_mem_leak for regression test. Change-Id: I9fba4f176c648ed7811225f7f94c91342a724d10 Reviewed-on: http://gerrit.cloudera.org:8080/23257 Reviewed-by: Riza Suminto <[email protected]> Reviewed-by: Abhishek Rawat <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/scheduling/admission-control-service.cc | 10 ++++ tests/custom_cluster/test_admission_controller.py | 70 +++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc index f94e1e039..be6c33bd0 100644 --- a/be/src/scheduling/admission-control-service.cc +++ b/be/src/scheduling/admission-control-service.cc @@ -212,6 +212,16 @@ void AdmissionControlService::GetQueryStatus(const GetQueryStatusRequestPB* req, } RespondAndReleaseRpc(status, resp, rpc_context); + if (admission_state->admission_done && !admission_state->admit_status.ok()) { + LOG(INFO) << "Query " << req->query_id() + << " was rejected. Removing admission state to free resources."; + // If this RPC fails and the admission state is already removed, + // a retry may fail with an "Invalid handle" error because the entry is gone. + // This is okay and doesn't cause any real problem. + // To make it more robust, we may delay the removal using a time-based approach. + discard_result(admission_state_map_.Delete(req->query_id())); + VLOG(3) << "Current admission state map size: " << admission_state_map_.Count(); + } } void AdmissionControlService::ReleaseQuery(const ReleaseQueryRequestPB* req, diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 77ea46415..bd7c1c44b 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -2327,6 +2327,76 @@ class TestAdmissionControllerWithACService(TestAdmissionController): except subprocess.CalledProcessError as e: assert "cluster_membership_retained_removed_coords" in str(e) + @SkipIfNotHdfsMinicluster.tuned_for_minicluster + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=("--vmodule=admission-control-service=3 --default_pool_max_requests=1 " + "--queue_wait_timeout_ms=1"), + disable_log_buffering=True) + def test_admission_state_map_mem_leak(self): + """ + Regression test to reproduce IMPALA-14276. + Steps: + 1. Submit a long-running query to coord1 and let it run. + 2. Repeatedly submit short queries to coord2 that get queued and time out due to + admission limits. + 3. Get memory usage before and after to check for possible memory leak in + admissiond. + """ + + # Long-running query that blocks a request slot. + long_query = "select count(*) from functional.alltypes where int_col = sleep(10000)" + # Simple short query used to trigger queuing and timeout. + short_query = "select count(*) from functional.alltypes limit 1" + + # Max timeout for waiting on query state transitions. + timeout_s = 10 + + ac = self.cluster.admissiond + all_coords = self.cluster.get_all_coordinators() + assert len(all_coords) >= 2, "Test requires at least two coordinators" + + coord1, coord2 = all_coords[0], all_coords[1] + + # Submit long query to coord1 to occupy the admission slot. + client1 = coord1.service.create_hs2_client() + handle1 = client1.execute_async(long_query) + client1.wait_for_impala_state(handle1, RUNNING, timeout_s) + + # Allow some time for the system to stabilize. + sleep(5) + # Capture memory usage before stressing the system. + old_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use") + assert old_total_bytes != 0 + + # Submit short queries to coord2 which will be queued and time out. + client2 = coord2.service.create_hs2_client() + number_of_iterations = 500 + for i in range(number_of_iterations): + handle2 = client2.execute_async(short_query) + self._wait_for_change_to_profile( + handle2, + "Query Status: Admission for query exceeded timeout", + client=client2, + timeout=timeout_s) + client2.close_query(handle2) + + # Capture memory usage after the test. + new_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use") + + # Ensure memory usage has not grown more than 10%, indicating no leak. + assert new_total_bytes < old_total_bytes * 1.1 + # Check if the admission state map size stays 1 all the time, which is + # the long running query. + admissiond_log = self.get_ac_log_name() + self.assert_log_contains(admissiond_log, 'INFO', + "Current admission state map size: {}".format(1), + expected_count=number_of_iterations) + + # Cleanup clients. + client1.close() + client2.close() + @SkipIfNotHdfsMinicluster.tuned_for_minicluster @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(
