This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1bc7cdbff6bfe468254ba0c507e0a287ae46e6c8
Author: Yida Wu <[email protected]>
AuthorDate: Mon Oct 13 16:36:28 2025 -0700

    IMPALA-14493: Cap memory usage of global admission service
    
    The global admission service can experience OOM errors under
    high concurrency because its process memory tracker is inaccurate
    and doesn't account for all memory allocations.
    
    Ensuring memory tracker accurately accounts for every allocation
    could be difficult, this patch uses a simpler solution to
    introduce a hard memory cap using tcmalloc statistics, which
    accurately reflect the true process memory usage. If a new query
    is submitted while tcmalloc memory usage is over the process
    limit, the query will be rejected immediately to protect from OOM.
    
    Adds a new flag enable_admission_service_mem_safeguard allowing
    this feature to be enabled or disabled. By default, this feature is
    turned on
    
    Tests:
    Added test test_admission_service_low_mem_limit.
    Passed exhaustive tests.
    
    Change-Id: I2ee2c942a73fcd69358851fc2fdc0fc4fe531c73
    Reviewed-on: http://gerrit.cloudera.org:8080/23542
    Reviewed-by: Abhishek Rawat <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/admission-controller.cc         | 20 ++++++++++++++++++++
 be/src/scheduling/admissiond-env.cc               | 10 ++++++++++
 be/src/scheduling/admissiond-env.h                |  6 ++++++
 tests/custom_cluster/test_admission_controller.py | 23 +++++++++++++++++++++--
 4 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index b31d83a5c..db6720de4 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -26,6 +26,7 @@
 #include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "scheduling/admissiond-env.h"
 #include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/executor-group.h"
 #include "scheduling/schedule-state.h"
@@ -36,6 +37,7 @@
 #include "util/bit-util.h"
 #include "util/collection-metrics.h"
 #include "util/debug-util.h"
+#include "util/memory-metrics.h"
 #include "util/metrics.h"
 #include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
@@ -266,6 +268,9 @@ const string REASON_NO_EXECUTOR_GROUPS =
     "Waiting for executors to start. Only DDL queries and queries scheduled 
only on the "
     "coordinator (either NUM_NODES set to 1 or when small query optimization 
is "
     "triggered) can currently run.";
+const string REASON_EXCEED_MEMORY_LIMIT =
+    "Admission rejected due to memory pressure in admissiond. Current usage: 
$0 bytes, "
+    "limit: $1 bytes";
 
 // The name of the root pool.
 const string ROOT_POOL = "root";
@@ -1646,6 +1651,21 @@ Status AdmissionController::SubmitForAdmission(const 
AdmissionRequest& request,
       return Status::Expected(rejected_msg);
     }
 
+    int64_t bytes_inuse = TcmallocMetric::BYTES_IN_USE->GetValue();
+    if (!is_trivial && AdmissiondEnv::GetInstance() != nullptr
+        && AdmissiondEnv::GetInstance()->admission_service_mem_limit() > 0
+        && bytes_inuse > 
AdmissiondEnv::GetInstance()->admission_service_mem_limit()) {
+      queue_node->not_admitted_reason = Substitute(REASON_EXCEED_MEMORY_LIMIT,
+          bytes_inuse, 
AdmissiondEnv::GetInstance()->admission_service_mem_limit());
+      request.summary_profile->AddInfoString(
+          PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED);
+      stats->metrics()->total_rejected->Increment(1);
+      const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_REJECTED,
+          queue_node->pool_name, queue_node->not_admitted_reason);
+      VLOG_QUERY << "query_id=" << PrintId(request.query_id) << " " << 
rejected_msg.msg();
+      return Status::Expected(rejected_msg);
+    }
+
     string user;
     RETURN_IF_ERROR(GetEffectiveShortUser(
         queue_node->admission_request.request.query_ctx.session, &user));
diff --git a/be/src/scheduling/admissiond-env.cc 
b/be/src/scheduling/admissiond-env.cc
index 8d188ea36..bb7ff8669 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -30,6 +30,7 @@
 #include "util/mem-info.h"
 #include "util/memory-metrics.h"
 #include "util/metrics.h"
+#include "util/pretty-printer.h"
 #include "util/uid-util.h"
 
 #include "common/names.h"
@@ -45,6 +46,10 @@ DEFINE_validator(
                  << "' must be greater than 0 and less than or equal to 1000.";
       return false;
     });
+DEFINE_bool(enable_admission_service_mem_safeguard, true,
+    "When true, enables a hard memory limit safeguard for the admission 
service. "
+    "This rejects new queries if the in-use process memory from tcmalloc 
exceeds "
+    "admission_service_mem_limit to prevent OOM.");
 
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_port);
@@ -107,6 +112,11 @@ Status AdmissiondEnv::Init() {
       new MemTracker(AggregateMemoryMetrics::TOTAL_USED, bytes_limit, 
"Process"));
   mem_tracker_->RegisterMetrics(
       DaemonEnv::GetInstance()->metrics(), "mem-tracker.process");
+  if (FLAGS_enable_admission_service_mem_safeguard) {
+    admission_mem_limit_ = bytes_limit;
+    LOG(INFO) << "Set admission service memory limit to "
+              << PrettyPrinter::Print(admission_mem_limit_, TUnit::BYTES);
+  }
 
   http_handler_->RegisterHandlers(DaemonEnv::GetInstance()->webserver());
   if (DaemonEnv::GetInstance()->metrics_webserver() != nullptr) {
diff --git a/be/src/scheduling/admissiond-env.h 
b/be/src/scheduling/admissiond-env.h
index 6f2cd6dc2..fd6e48c77 100644
--- a/be/src/scheduling/admissiond-env.h
+++ b/be/src/scheduling/admissiond-env.h
@@ -61,6 +61,7 @@ class AdmissiondEnv {
   RpcMgr* rpc_mgr() { return rpc_mgr_.get(); }
   Scheduler* scheduler() { return scheduler_.get(); }
   StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
+  int64_t admission_service_mem_limit() { return admission_mem_limit_; }
 
  private:
   static AdmissiondEnv* admissiond_env_;
@@ -80,6 +81,11 @@ class AdmissiondEnv {
   std::unique_ptr<StatestoreSubscriber> statestore_subscriber_;
 
   MetricGroup* rpc_metrics_ = nullptr;
+
+  /// Memory limit for the admission service. If admission_mem_limit_ is set 
to a value
+  /// over 0, new admission requests are rejected when the tcmalloc in-use 
bytes are over
+  /// this limit.
+  int64_t admission_mem_limit_ = 0;
 };
 
 } // namespace impala
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index bd7c1c44b..f4254a95b 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -1100,7 +1100,8 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=10, max_queued=10,
         pool_max_mem=10 * 1024 * 1024, proc_mem_limit=2 * 1024 * 1024,
-        queue_wait_timeout_ms=1000),
+        queue_wait_timeout_ms=1000)
+      + " --enable_admission_service_mem_safeguard=false",
       statestored_args=_STATESTORED_ARGS)
   def test_timeout_reason_host_memory(self):
     self.client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
@@ -1134,7 +1135,8 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=10, max_queued=10,
         pool_max_mem=2 * 1024 * 1024, proc_mem_limit=20 * 1024 * 1024,
-        queue_wait_timeout_ms=1000),
+        queue_wait_timeout_ms=1000)
+      + " --enable_admission_service_mem_safeguard=false",
       statestored_args=_STATESTORED_ARGS)
   def test_timeout_reason_pool_memory(self):
     self.client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
@@ -2297,6 +2299,23 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     client1.close()
     client2.close()
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--vmodule admission-controller=3 --mem_limit=10MB ")
+  def test_admission_service_low_mem_limit(self):
+    EXPECTED_REASON = "Admission rejected due to memory pressure"
+    # Test whether it will fail for a normal query.
+    failed_query_handle = self.client.execute_async(
+            "select * from functional_parquet.alltypes limit 100")
+    self.client.wait_for_impala_state(failed_query_handle, ERROR, 20)
+    profile = self.client.get_runtime_profile(failed_query_handle)
+    assert EXPECTED_REASON in profile, \
+      "Expected reason '{0}' not found in profile: 
{1}".format(EXPECTED_REASON, profile)
+    self.client.close_query(failed_query_handle)
+    # Test it should pass all the trivial queries.
+    self._test_trivial_queries_suc()
+
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
   def test_retained_removed_coords_size(self):

Reply via email to