This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0c9fe293c317f9135427f6a68f64c92abdfe0231 Author: Yida Wu <[email protected]> AuthorDate: Tue Dec 9 00:51:28 2025 -0800 IMPALA-14612: Add global metrics for admission state map size We need better observability for the admission state map to warn about potential memory leaks. The admission state map tracks queries currently being processed or queued. An entry is added when a query is submitted for admission. The entry is removed when the query finishes execution, is rejected by admission control, times out while queuing, or is cancelled. If the removal logic is missed due to bugs, the map size grows indefinitely, causing a memory leak. We have observed cases where admission state entries were not released, causing memory leaks in admissiond. Adds the metric admission-control-service.num-queries and its high water mark to track the number of active entries. This patch updates GenericShardedQueryMap to support an optional AtomicHighWaterMarkGauge. When set, the map automatically increments or decrements the gauge during Add and Delete operations. This ensures the metric accurately reflects the map size without requiring manual updates at every call site. Tests: Updated and passed test_admission_state_map_mem_leak to verify the metrics. Change-Id: Ie803aabf8d91b6381c5d0d7534cd9c9fc2166a73 Reviewed-on: http://gerrit.cloudera.org:8080/23760 Reviewed-by: Riza Suminto <[email protected]> Reviewed-by: Jason Fehr <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/scheduling/admission-control-service.cc | 13 ++++++++++--- be/src/util/sharded-query-map-util.cc | 3 +++ be/src/util/sharded-query-map-util.h | 10 ++++++++++ common/thrift/metrics.json | 20 ++++++++++++++++++++ tests/custom_cluster/test_admission_controller.py | 16 +++++++++++----- 5 files changed, 54 insertions(+), 8 deletions(-) diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc index 4489f7f36..127231507 100644 --- a/be/src/scheduling/admission-control-service.cc +++ b/be/src/scheduling/admission-control-service.cc @@ -18,6 +18,7 @@ #include "scheduling/admission-control-service.h" #include "common/constant-strings.h" +#include "common/names.h" #include "gen-cpp/admission_control_service.pb.h" #include "gutil/strings/substitute.h" #include "kudu/rpc/rpc_context.h" @@ -35,13 +36,16 @@ #include "util/parse-util.h" #include "util/promise.h" -#include "common/names.h" - using kudu::rpc::RpcContext; static const string QUEUE_LIMIT_MSG = "(Advanced) Limit on RPC payloads consumption for " "AdmissionControlService. " + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit"); +static const string ADMISSION_MAP_SIZE_METRIC_KEY = + "admission-control-service.num-queries"; +static const string ADMISSION_MAP_SIZE_HWM_METRIC_KEY = + "admission-control-service.num-queries-high-water-mark"; + DEFINE_string(admission_control_service_queue_mem_limit, "50MB", QUEUE_LIMIT_MSG.c_str()); DEFINE_int32(admission_control_service_num_svc_threads, 0, "Number of threads for processing admission control service's RPCs. if left at " @@ -73,7 +77,7 @@ namespace impala { AdmissionControlService::AdmissionControlService(MetricGroup* metric_group) : AdmissionControlServiceIf(AdmissiondEnv::GetInstance()->rpc_mgr()->metric_entity(), - AdmissiondEnv::GetInstance()->rpc_mgr()->result_tracker()) { + AdmissiondEnv::GetInstance()->rpc_mgr()->result_tracker()) { MemTracker* process_mem_tracker = AdmissiondEnv::GetInstance()->process_mem_tracker(); bool is_percent; // not used int64_t bytes_limit = @@ -89,6 +93,9 @@ AdmissionControlService::AdmissionControlService(MetricGroup* metric_group) bytes_limit, "Admission Control Service Queue", process_mem_tracker)); MemTrackerMetric::CreateMetrics( metric_group, mem_tracker_.get(), "AdmissionControlService"); + AtomicHighWaterMarkGauge* map_size_metric = metric_group->AddHWMGauge( + ADMISSION_MAP_SIZE_HWM_METRIC_KEY, ADMISSION_MAP_SIZE_METRIC_KEY, 0); + admission_state_map_.SetSizeMetric(map_size_metric); } Status AdmissionControlService::Init() { diff --git a/be/src/util/sharded-query-map-util.cc b/be/src/util/sharded-query-map-util.cc index a723b27e5..ded7e9d1d 100644 --- a/be/src/util/sharded-query-map-util.cc +++ b/be/src/util/sharded-query-map-util.cc @@ -20,6 +20,7 @@ #include "runtime/query-driver.h" #include "scheduling/admission-control-service.h" #include "util/debug-util.h" +#include "util/metrics.h" namespace impala { @@ -36,6 +37,7 @@ Status GenericShardedQueryMap<K, V>::Add(const K& query_id, const V& obj) { strings::Substitute("query id $0 already exists", PrintId(query_id)))); } map_ref->insert(make_pair(query_id, obj)); + if (size_metric_ != nullptr) size_metric_->Increment(1); return Status::OK(); } @@ -65,6 +67,7 @@ Status GenericShardedQueryMap<K, V>::Delete(const K& query_id) { return err; } map_ref->erase(entry); + if (size_metric_ != nullptr) size_metric_->Increment(-1); return Status::OK(); } diff --git a/be/src/util/sharded-query-map-util.h b/be/src/util/sharded-query-map-util.h index 217090dc4..1f7d63f9d 100644 --- a/be/src/util/sharded-query-map-util.h +++ b/be/src/util/sharded-query-map-util.h @@ -28,6 +28,8 @@ namespace impala { +class AtomicHighWaterMarkGauge; + /// This is a template that can be used for any map that maps from a query ID (TUniqueId /// or UniqueIdPB) to some object, and that needs to be sharded. It provides a SpinLock /// per shard to synchronize access to each shard of the map. The underlying shard is @@ -65,6 +67,11 @@ class GenericShardedQueryMap { return count; } + void SetSizeMetric(AtomicHighWaterMarkGauge* metric) { + DCHECK(size_metric_ == nullptr); + size_metric_ = metric; + } + // Adds ('key', 'value') to the map, returning an error if 'key' already exists. Status Add(const K& key, const V& value); @@ -90,6 +97,9 @@ class GenericShardedQueryMap { SpinLock map_lock_; }; struct MapShard shards_[NUM_QUERY_BUCKETS]; + + // Metric for tracking the map size. + AtomicHighWaterMarkGauge* size_metric_ = nullptr; }; template <typename T> diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index 310735693..182919de8 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -3948,6 +3948,26 @@ "kind": "COUNTER", "key": "admission-controller.total-dequeue-failed-coordinator-limited" }, + { + "key": "admission-control-service.num-queries-high-water-mark", + "label": "HWM Num Queries in Admission Control Service", + "units": "NONE", + "kind": "GAUGE", + "description": "The high water mark of queries registered in Admission Control Service (queuing or running).", + "contexts": [ + "ADMISSIOND" + ] + }, + { + "key": "admission-control-service.num-queries", + "label": "Num Queries in Admission Control Service", + "units": "NONE", + "kind": "GAUGE", + "description": "The number of queries currently registered in Admission Control Service (queuing or running) and not yet fully released.", + "contexts": [ + "ADMISSIOND" + ] + }, { "description": "The full version string of the Admission Control Server.", "contexts": [ diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index f4254a95b..e085df6ed 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -2371,7 +2371,7 @@ class TestAdmissionControllerWithACService(TestAdmissionController): # Max timeout for waiting on query state transitions. timeout_s = 10 - ac = self.cluster.admissiond + ac = self.cluster.admissiond.service all_coords = self.cluster.get_all_coordinators() assert len(all_coords) >= 2, "Test requires at least two coordinators" @@ -2382,10 +2382,9 @@ class TestAdmissionControllerWithACService(TestAdmissionController): handle1 = client1.execute_async(long_query) client1.wait_for_impala_state(handle1, RUNNING, timeout_s) - # Allow some time for the system to stabilize. - sleep(5) + ac.wait_for_metric_value("admission-control-service.num-queries", 1) # Capture memory usage before stressing the system. - old_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use") + old_total_bytes = ac.get_metric_value("tcmalloc.bytes-in-use") assert old_total_bytes != 0 # Submit short queries to coord2 which will be queued and time out. @@ -2401,7 +2400,7 @@ class TestAdmissionControllerWithACService(TestAdmissionController): client2.close_query(handle2) # Capture memory usage after the test. - new_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use") + new_total_bytes = ac.get_metric_value("tcmalloc.bytes-in-use") # Ensure memory usage has not grown more than 10%, indicating no leak. assert new_total_bytes < old_total_bytes * 1.1 @@ -2416,6 +2415,13 @@ class TestAdmissionControllerWithACService(TestAdmissionController): client1.close() client2.close() + # Verify num queries return to 0. + ac.wait_for_metric_value( + "admission-control-service.num-queries", 0) + num_queries_hwm = \ + ac.get_metric_value("admission-control-service.num-queries-high-water-mark") + assert num_queries_hwm > 1 + @SkipIfNotHdfsMinicluster.tuned_for_minicluster @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(
