This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d56eea72 IMPALA-12057: Track removed coordinators to reject queued 
queries early
8d56eea72 is described below

commit 8d56eea72518aa11a36aa086dc8961bc8cdbd1fd
Author: Yida Wu <[email protected]>
AuthorDate: Mon Jun 9 09:35:22 2025 -0700

    IMPALA-12057: Track removed coordinators to reject queued queries early
    
    Queries in global admission control can remain queued for a long time
    if they are assigned to a coordinator that has already left the
    cluster. Admissiond can't distinguish between a coordinator that
    hasn’t yet been propagated via the statestore and one that has
    already been removed, resulting in unnecessary waiting until timeout.
    This timeout is determined by either FLAGS_queue_wait_timeout_ms or
    the queue_timeout_ms in the pool config. By default,
    FLAGS_queue_wait_timeout_ms is 1 minute, but in production it's
    normally configured to 10 to 15 minutes.
    
    This change tracks recently removed coordinators and rejects such
    queued queries immediately using REASON_COORDINATOR_REMOVED.
    To ensure the removed coordinator list remains simple and bounded,
    it avoids duplicate entries and enforces FIFO eviction at
    the minimum of MAX_REMOVED_COORD_SIZE (1000) and
    FLAGS_cluster_membership_retained_removed_coords.
    
    It's possible that a coordinator marked as removed comes back
    with the same backend id. In that case, admissiond will see it in
    current_backends and won't need to check the removed list. Even
    if a coordinator briefly flaps and a request is rejected, it's not
    critical, the coordinator can retry. So to keep the design simple
    and safe, we keep the removed coord entry as-is.
    
    Added a parameter is_admissiond to the ClusterMembershipMgr
    constructor to indicate whether it is running within the admissiond.
    
    Tests:
    Passed exhaustive tests.
    Added unit tests to verify the eviction logic and the duplicate
    case.
    Added regression test test_coord_not_registered_in_ac.
    
    Change-Id: I1e0f270299f8c20975d7895c17f4e2791c3360e0
    Reviewed-on: http://gerrit.cloudera.org:8080/23094
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/admission-controller.cc         |  33 +++--
 be/src/scheduling/admissiond-env.cc               |  15 ++-
 be/src/scheduling/cluster-membership-mgr-test.cc  | 145 +++++++++++++++++++++-
 be/src/scheduling/cluster-membership-mgr.cc       |  44 +++++--
 be/src/scheduling/cluster-membership-mgr.h        |  17 ++-
 tests/custom_cluster/test_admission_controller.py |  77 ++++++++++++
 6 files changed, 307 insertions(+), 24 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 8c8f90bf4..38623df60 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -260,6 +260,8 @@ const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED =
 const string REASON_SCHEDULER_ERROR = "Error during scheduling: $0";
 const string REASON_COORDINATOR_NOT_FOUND =
     "Coordinator not registered with the statestore.";
+const string REASON_COORDINATOR_REMOVED_PREFIX = "The coordinator no longer 
exists: ";
+const string REASON_COORDINATOR_REMOVED = REASON_COORDINATOR_REMOVED_PREFIX + 
"$0";
 const string REASON_NO_EXECUTOR_GROUPS =
     "Waiting for executors to start. Only DDL queries and queries scheduled 
only on the "
     "coordinator (either NUM_NODES set to 1 or when small query optimization 
is "
@@ -1836,7 +1838,8 @@ void AdmissionController::ReleaseQuery(const UniqueIdPB& 
query_id,
       }
       if (to_release.size() > 0) {
         LOG(INFO) << "ReleaseQuery for " << query_id << " called with "
-                  << to_release.size() << "unreleased backends. Releasing 
automatically.";
+                  << to_release.size()
+                  << " unreleased backends. Releasing automatically.";
         ReleaseQueryBackendsLocked(query_id, coord_id, to_release);
       }
     }
@@ -2238,14 +2241,21 @@ Status AdmissionController::ComputeGroupScheduleStates(
   output_schedules->clear();
 
   // Queries may arrive before we've gotten a statestore update containing the 
descriptor
-  // for their coordinator, in which case we queue the query until it arrives. 
It's also
-  // possible (though very unlikely) that the coordinator was removed from the 
cluster
-  // membership after submitting this query for admission. Currently, in this 
case the
-  // query will remain queued until it times out, but we can consider 
detecting failed
-  // coordinators and cleaning up their queued queries.
+  // for their coordinator, in which case we queue the query until it arrives. 
Currently,
+  // in this case the query will remain queued until it times out.
+  // There's also a case where the coordinator was removed from the cluster 
after the
+  // query was submitted, in this case, we mark it as COORDINATOR_REMOVED to 
avoid
+  // indefinite queuing and improve cleanup.
   auto it = 
membership_snapshot->current_backends.find(PrintId(request.coord_id));
   if (it == membership_snapshot->current_backends.end()) {
-    queue_node->not_admitted_reason = REASON_COORDINATOR_NOT_FOUND;
+    auto rm_it =
+        
membership_snapshot->removed_coordinators_map.find(PrintId(request.coord_id));
+    if (rm_it == membership_snapshot->removed_coordinators_map.end()) {
+      queue_node->not_admitted_reason = REASON_COORDINATOR_NOT_FOUND;
+    } else {
+      queue_node->not_admitted_reason =
+          Substitute(REASON_COORDINATOR_REMOVED, rm_it->second);
+    }
     LOG(WARNING) << queue_node->not_admitted_reason;
     return Status::OK();
   }
@@ -2326,6 +2336,10 @@ static inline string 
PrintScheduleStateMemInfo(ScheduleState* state) {
       MemLimitSourcePB_Name(state->coord_backend_mem_to_admit_source()));
 }
 
+static inline bool IsReasonCoordinatorRemoved(const string& reason) {
+  return reason.rfind(REASON_COORDINATOR_REMOVED_PREFIX, 0) == 0;
+}
+
 bool AdmissionController::FindGroupToAdmitOrReject(
     ClusterMembershipMgr::SnapshotPtr& membership_snapshot,
     const TPoolConfig& pool_config, const TPoolConfig& root_cfg, bool 
admit_from_queue,
@@ -2349,6 +2363,7 @@ bool AdmissionController::FindGroupToAdmitOrReject(
   }
   if (queue_node->group_states.empty()) {
     DCHECK(!queue_node->not_admitted_reason.empty());
+    if (IsReasonCoordinatorRemoved(queue_node->not_admitted_reason)) return 
false;
     return true;
   }
 
@@ -2575,11 +2590,14 @@ void AdmissionController::TryDequeue() {
       --max_to_dequeue;
       VLOG(3) << "Dequeueing from stats for pool " << pool_name;
       stats->Dequeue(false);
+      const UniqueIdPB& query_id = queue_node->admission_request.query_id;
       if (is_rejected) {
         AdmissionOutcome outcome =
             queue_node->admit_outcome->Set(AdmissionOutcome::REJECTED);
         if (outcome == AdmissionOutcome::REJECTED) {
           stats->metrics()->total_rejected->Increment(1);
+          VLOG_QUERY << "Rejected, query id=" << PrintId(query_id)
+                     << " reason: " << queue_node->not_admitted_reason;
           return; // next query
         } else {
           DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
@@ -2588,7 +2606,6 @@ void AdmissionController::TryDequeue() {
       }
       DCHECK(is_cancelled || queue_node->admitted_schedule != nullptr);
 
-      const UniqueIdPB& query_id = queue_node->admission_request.query_id;
       if (!is_cancelled) {
         VLOG_QUERY << "Admitting from queue: query=" << PrintId(query_id);
         AdmissionOutcome outcome =
diff --git a/be/src/scheduling/admissiond-env.cc 
b/be/src/scheduling/admissiond-env.cc
index 1f6d13b86..8d188ea36 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -26,6 +26,7 @@
 #include "scheduling/scheduler.h"
 #include "service/impala-http-handler.h"
 #include "util/default-path-handlers.h"
+#include "util/gflag-validator-util.h"
 #include "util/mem-info.h"
 #include "util/memory-metrics.h"
 #include "util/metrics.h"
@@ -35,6 +36,15 @@
 
 DEFINE_int32(
     admission_service_port, 29500, "The port where the admission control 
service runs");
+DEFINE_int32(cluster_membership_retained_removed_coords, 1000,
+    "Max number of removed coordinators to track. Oldest entry is evicted when 
full.");
+DEFINE_validator(
+    cluster_membership_retained_removed_coords, [](const char* name, const int 
val) {
+      if (val > 0 && val <= 1000) return true;
+      LOG(ERROR) << "Flag '" << name
+                 << "' must be greater than 0 and less than or equal to 1000.";
+      return false;
+    });
 
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_port);
@@ -74,8 +84,9 @@ AdmissiondEnv::AdmissiondEnv()
       TStatestoreSubscriberType::ADMISSIOND));
 
   scheduler_.reset(new Scheduler(metrics, request_pool_service()));
-  cluster_membership_mgr_.reset(new ClusterMembershipMgr(
-      PrintId(DaemonEnv::GetInstance()->backend_id()), subscriber(), metrics));
+  cluster_membership_mgr_.reset(
+      new ClusterMembershipMgr(PrintId(DaemonEnv::GetInstance()->backend_id()),
+          subscriber(), metrics, true /* is_admissiond */));
   admission_controller_.reset(new AdmissionController(cluster_membership_mgr(),
       subscriber(), request_pool_service(), metrics, scheduler(), 
pool_mem_trackers(),
       admission_service_addr));
diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc 
b/be/src/scheduling/cluster-membership-mgr-test.cc
index f67d508d3..5ba4d5f92 100644
--- a/be/src/scheduling/cluster-membership-mgr-test.cc
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -40,6 +40,7 @@ DECLARE_int32(statestore_max_missed_heartbeats);
 DECLARE_int32(statestore_heartbeat_frequency_ms);
 DECLARE_int32(num_expected_executors);
 DECLARE_string(expected_executor_group_sets);
+DECLARE_int32(cluster_membership_retained_removed_coords);
 
 namespace impala {
 
@@ -169,6 +170,16 @@ class ClusterMembershipMgrTest : public testing::Test {
     }
   }
 
+  /// Sends a single topic item in a delta to the specific backend.
+  void SendDeltaTo(Backend* be, const TTopicItem& item) {
+    StatestoreSubscriber::TopicDeltaMap topic_delta_map;
+    TTopicDelta& delta = topic_delta_map[Statestore::IMPALA_MEMBERSHIP_TOPIC];
+    delta.is_delta = true;
+    delta.topic_entries.push_back(item);
+    vector<TTopicDelta> unused;
+    be->cmm->UpdateMembership(topic_delta_map, &unused);
+  }
+
   /// Creates a new backend and adds it to the list of offline backends. If 
idx is
   /// omitted, the current number of backends will be used as the new index.
   Backend* CreateBackend(int idx = -1) {
@@ -184,11 +195,11 @@ class ClusterMembershipMgrTest : public testing::Test {
   /// Creates a new ClusterMembershipMgr for a backend and moves the backend 
from
   /// 'offline_' to 'starting_'. Callers must handle invalidated iterators 
after calling
   /// this method.
-  void CreateCMM(Backend* be) {
+  void CreateCMM(Backend* be, bool is_admissiond = false) {
     ASSERT_TRUE(IsInVector(be, offline_));
     be->metric_group = make_unique<MetricGroup>("test");
     be->cmm = make_unique<ClusterMembershipMgr>(
-        PrintId(be->backend_id), nullptr, be->metric_group.get());
+        PrintId(be->backend_id), nullptr, be->metric_group.get(), 
is_admissiond);
     RemoveFromVector(be, &offline_);
     starting_.push_back(be);
   }
@@ -261,6 +272,102 @@ class ClusterMembershipMgrTest : public testing::Test {
     if (is_quiescing) RemoveFromVector(be, &quiescing_);
   }
 
+  /// Helper function for removed coordinator tests.
+  /// If same_backend_id is true, each loop uses the same backend_id.
+  void RemovedCoordinatorTestHelper(bool same_backend_id) {
+    Backend* base = CreateBackend(9999);
+    CreateCMM(base, true /* is_admissiond */);
+    Poll(base);
+
+    int be_port = 10000;
+    Backend* b = CreateBackend(be_port);
+    BackendDescriptorPB be = *b->desc;
+    be.set_is_coordinator(true);
+    string static_be_id;
+    be.SerializeToString(&static_be_id);
+    string be_str = static_be_id;
+
+    for (int i = 0; i < 10; ++i) {
+      if (!same_backend_id && i > 0) {
+        b = CreateBackend(be_port + i);
+        be = *b->desc;
+        be.set_is_coordinator(true);
+        be.SerializeToString(&be_str);
+      }
+
+      // Add the coordinator backend
+      TTopicItem add_item;
+      add_item.key = PrintId(be.backend_id());
+      add_item.value = be_str;
+      add_item.deleted = false;
+      SendDeltaTo(base, add_item);
+
+      // Now delete it
+      TTopicItem delete_item;
+      delete_item.key = PrintId(be.backend_id());
+      delete_item.deleted = true;
+      SendDeltaTo(base, delete_item);
+    }
+
+    auto snap = base->cmm->GetSnapshot();
+    EXPECT_EQ(0, snap->current_backends.size());
+
+    if (same_backend_id) {
+      EXPECT_EQ(1, snap->removed_coordinators_map.size());
+      EXPECT_EQ(1, snap->removed_coordinators_order.size());
+    } else {
+      EXPECT_EQ(10, snap->removed_coordinators_map.size());
+      EXPECT_EQ(10, snap->removed_coordinators_order.size());
+    }
+  }
+
+  /// Helper function for removed coordinator eviction tests.
+  void TestRemovedCoordinatorListEviction(int expected_max_size) {
+    Backend* base = CreateBackend(9999);
+    CreateCMM(base, true /* is_admissiond */);
+    Poll(base);
+
+    const int num_backends = expected_max_size + 1;
+    string first_id, middle_id, last_id;
+
+    for (int i = 0; i < num_backends; ++i) {
+      Backend* b = CreateBackend(10000 + i);
+      BackendDescriptorPB be = *b->desc;
+      be.set_is_coordinator(true);
+      std::string be_str;
+      be.SerializeToString(&be_str);
+
+      // Add backend
+      TTopicItem add_item;
+      add_item.key = PrintId(be.backend_id());
+      add_item.value = be_str;
+      add_item.deleted = false;
+      SendDeltaTo(base, add_item);
+
+      if (i == 0)
+        first_id = add_item.key;
+      else if (i == expected_max_size / 2)
+        middle_id = add_item.key;
+      else if (i == num_backends - 1)
+        last_id = add_item.key;
+
+      // Then delete it
+      TTopicItem delete_item;
+      delete_item.key = add_item.key;
+      delete_item.deleted = true;
+      SendDeltaTo(base, delete_item);
+    }
+
+    auto snap = base->cmm->GetSnapshot();
+    EXPECT_EQ(expected_max_size, snap->removed_coordinators_map.size());
+    EXPECT_EQ(expected_max_size, snap->removed_coordinators_order.size());
+
+    EXPECT_EQ(0, snap->removed_coordinators_map.count(first_id))
+        << "Oldest coordinator should be evicted when list exceeds " << 
expected_max_size;
+    EXPECT_EQ(1, snap->removed_coordinators_map.count(middle_id));
+    EXPECT_EQ(1, snap->removed_coordinators_map.count(last_id));
+  }
+
   mt19937 rng_;
 
   int RandomInt(int max) {
@@ -303,6 +410,29 @@ void _assertCoords(
   }
 }
 
+/// This test verifies that repeatedly adding and removing the same coordinator
+/// backend does not result in duplicate entries in the removed coordinators 
map.
+TEST_F(ClusterMembershipMgrTest, RemovedCoordinatorListRepeat) {
+  RemovedCoordinatorTestHelper(/*same_backend_id=*/true);
+}
+
+/// This test verifies that adding and removing different coordinator backends
+/// results in multiple unique entries in the removed coordinators map.
+TEST_F(ClusterMembershipMgrTest, RemovedCoordinatorListDifferentIds) {
+  RemovedCoordinatorTestHelper(/*same_backend_id=*/false);
+}
+
+/// These tests verify that the removed coordinator map, when configured with 
varying
+/// maximum sizes, correctly evicts the oldest entry following a FIFO policy.
+TEST_F(ClusterMembershipMgrTest, RemovedCoordEvictionWithDefault) {
+  TestRemovedCoordinatorListEviction(/*expected_max_size=*/1000);
+}
+
+TEST_F(ClusterMembershipMgrTest, RemovedCoordEvictionWithLimit100) {
+  FLAGS_cluster_membership_retained_removed_coords = 100;
+  TestRemovedCoordinatorListEviction(/*expected_max_size=*/100);
+}
+
 /// This test takes two instances of the ClusterMembershipMgr through a common 
lifecycle.
 /// It also serves as an example for how to craft statestore messages and pass 
them to
 /// UpdateMembership().
@@ -312,8 +442,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
 
   MetricGroup tmp_metrics1("test-metrics1");
   MetricGroup tmp_metrics2("test-metrics2");
-  ClusterMembershipMgr cmm1(b1->address().hostname(), nullptr, &tmp_metrics1);
-  ClusterMembershipMgr cmm2(b2->address().hostname(), nullptr, &tmp_metrics2);
+  ClusterMembershipMgr cmm1(b1->address().hostname(), nullptr, &tmp_metrics1, 
true);
+  ClusterMembershipMgr cmm2(b2->address().hostname(), nullptr, &tmp_metrics2, 
true);
 
   const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, 
TTopicDelta()}};
@@ -390,10 +520,15 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   ss_topic_delta->topic_entries[0].deleted = true;
   cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
   ASSERT_EQ(0, returned_topic_deltas.size());
-  ASSERT_EQ(1, cmm2.GetSnapshot()->current_backends.size());
+  auto snap = cmm2.GetSnapshot();
+  string b1_id = PrintId(b1->backend_id());
+  ASSERT_EQ(1, snap->current_backends.size());
   ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
   _assertCoords(cmm1, {"host_1", "host_2"});
   _assertCoords(cmm2, {"host_2"});
+  ASSERT_EQ(1, snap->removed_coordinators_map.size());
+  ASSERT_TRUE(snap->removed_coordinators_map.count(b1_id));
+  ASSERT_EQ(b1_id, snap->removed_coordinators_order.front());
 }
 
 TEST_F(ClusterMembershipMgrTest, IsBlacklisted) {
diff --git a/be/src/scheduling/cluster-membership-mgr.cc 
b/be/src/scheduling/cluster-membership-mgr.cc
index 4815ec08e..85e2a5a34 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -32,6 +32,7 @@
 DECLARE_int32(num_expected_executors);
 DECLARE_string(expected_executor_group_sets);
 DECLARE_string(cluster_membership_topic_id);
+DECLARE_int32(cluster_membership_retained_removed_coords);
 
 namespace {
 using namespace impala;
@@ -88,12 +89,13 @@ static const string TOTAL_BACKENDS_KEY_FORMAT(
 const string ClusterMembershipMgr::EMPTY_GROUP_NAME(
     "empty group (using coordinator only)");
 
-ClusterMembershipMgr::ClusterMembershipMgr(
-    string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* 
metrics)
+ClusterMembershipMgr::ClusterMembershipMgr(string local_backend_id,
+    StatestoreSubscriber* subscriber, MetricGroup* metrics, bool is_admissiond)
   : empty_exec_group_(EMPTY_GROUP_NAME),
     current_membership_(std::make_shared<const Snapshot>()),
     statestore_subscriber_(subscriber),
-    local_backend_id_(move(local_backend_id)) {
+    local_backend_id_(move(local_backend_id)),
+    is_admissiond_(is_admissiond) {
   if (FLAGS_cluster_membership_topic_id.empty()) {
     membership_topic_name_ = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   } else {
@@ -109,6 +111,8 @@ ClusterMembershipMgr::ClusterMembershipMgr(
   RegisterUpdateCallbackFn([this](const ClusterMembershipMgr::SnapshotPtr& 
snapshot) {
     this->UpdateMetrics(snapshot);
   });
+  LOG(INFO) << "Using cluster membership removed coords size "
+            << FLAGS_cluster_membership_retained_removed_coords;
 }
 
 void ClusterMembershipMgr::InitMetrics(MetricGroup* metrics) {
@@ -190,15 +194,39 @@ vector<TNetworkAddress> 
ClusterMembershipMgr::Snapshot::GetCoordinatorAddresses(
   return coordinators;
 }
 
-static inline void _removeCoordIfExists(
+static inline void _markCoordinatorAsRemoved(
     const std::shared_ptr<ClusterMembershipMgr::Snapshot>& state,
-    const BackendDescriptorPB& be) {
+    const BackendDescriptorPB& be, const BackendDescriptorPB& desc) {
+  string backend_id_str = PrintId(be.backend_id());
+  if (state->removed_coordinators_map.find(backend_id_str)
+      != state->removed_coordinators_map.end()) {
+    return;
+  }
+  state->removed_coordinators_map[backend_id_str] =
+      NetworkAddressPBToString(desc.address());
+  state->removed_coordinators_order.push_back(backend_id_str);
+  DCHECK(FLAGS_cluster_membership_retained_removed_coords > 0);
+  if (state->removed_coordinators_map.size()
+      > FLAGS_cluster_membership_retained_removed_coords) {
+    const string& oldest = state->removed_coordinators_order.front();
+    state->removed_coordinators_map.erase(oldest);
+    state->removed_coordinators_order.pop_front();
+  }
+}
 
+static inline void _removeCoordIfExists(
+    const std::shared_ptr<ClusterMembershipMgr::Snapshot>& state,
+    const BackendDescriptorPB& be, bool is_admissiond) {
   // The BackendDescriptorPB may be incomplete. Use the backend id to retrieve 
the actual
   // backend descriptor so the backend can be removed.
   const BackendDescriptorPB* actual_be =
       state->all_coordinators.LookUpBackendDesc(be.backend_id());
   if (actual_be != nullptr) {
+    if (is_admissiond) {
+      // If global admission control is enabled, we also need to track 
information
+      // about the removed coordinator to prevent potential issues.
+      _markCoordinatorAsRemoved(state, be, *actual_be);
+    }
     state->all_coordinators.RemoveExecutor(*actual_be);
   }
 }
@@ -306,7 +334,7 @@ void ClusterMembershipMgr::UpdateMembership(
 
         // If a coordinator is not shutdown gracefully, then it will be 
deleted here.
         if (be_desc.is_coordinator()) {
-          _removeCoordIfExists(new_state, be_desc);
+          _removeCoordIfExists(new_state, be_desc, is_admissiond_);
         }
 
         // Note: be_desc is a reference to item.key, thus this erase must come 
at the end
@@ -381,7 +409,7 @@ void ClusterMembershipMgr::UpdateMembership(
         }
 
         if (existing.is_coordinator()) {
-          _removeCoordIfExists(new_state, be_desc);
+          _removeCoordIfExists(new_state, be_desc, is_admissiond_);
         }
       }
       existing = be_desc;
@@ -445,7 +473,7 @@ void ClusterMembershipMgr::UpdateMembership(
 
     // Add ourself to the list of all coordinators.
     if (is_active_coordinator(*local_be_desc.get())) {
-      _removeCoordIfExists(new_state, *local_be_desc);
+      _removeCoordIfExists(new_state, *local_be_desc, is_admissiond_);
       new_state->all_coordinators.AddExecutor(*local_be_desc);
     }
 
diff --git a/be/src/scheduling/cluster-membership-mgr.h 
b/be/src/scheduling/cluster-membership-mgr.h
index 848067b47..595379efc 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -75,6 +75,9 @@ class ClusterMembershipMgr {
   /// Maps backend IDs to backend descriptors.
   typedef std::unordered_map<std::string, BackendDescriptorPB> BackendIdMap;
 
+  /// Maps backend IDs to backend addresses.
+  typedef std::unordered_map<std::string, std::string> BackendIdAddrMap;
+
   /// Maps executor group names to executor groups.
   typedef std::unordered_map<std::string, ExecutorGroup> ExecutorGroups;
 
@@ -113,6 +116,15 @@ class ClusterMembershipMgr {
     // Executor group of all non-quiescing coordinators in the cluster. Set 
during the
     // SetState() function.
     ExecutorGroup all_coordinators;
+    // Map from unique backend ID to backend address for coordinators that were
+    // removed from the cluster membership. This helps identify recently 
removed
+    // coordinators and is used to reject queries destined for those 
coordinators to
+    // prevent hanging or routing issues.
+    BackendIdAddrMap removed_coordinators_map;
+    // Tracks the insertion order of entries in 'removed_coordinators_map' to 
implement
+    // FIFO eviction. When the number of removed coordinators exceeds the 
maximum allowed
+    // size (MAX_REMOVED_COORD_SIZE), the oldest entry (front of the list) is 
evicted.
+    std::list<std::string> removed_coordinators_order;
   };
 
   /// An immutable shared membership snapshot.
@@ -130,7 +142,7 @@ class ClusterMembershipMgr {
   typedef std::function<void(const SnapshotPtr&)> UpdateCallbackFn;
 
   ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* 
subscriber,
-      MetricGroup* metrics);
+      MetricGroup* metrics, bool is_admissiond = false);
 
   /// Initializes instances of this class. This only sets up the statestore 
subscription.
   /// Callbacks to the local ImpalaServer and Frontend must be registered in 
separate
@@ -280,6 +292,9 @@ class ClusterMembershipMgr {
   /// protected by a lock - only used in the statestore thread.
   std::string local_backend_id_;
 
+  /// If true, the cluster membership manager is in an admissiond process.
+  bool is_admissiond_;
+
   /// Callbacks that provide external dependencies.
   BackendDescriptorPtrFn local_be_desc_fn_;
   std::vector<UpdateCallbackFn> update_callback_fns_;
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 5b9850732..50fa518ce 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -24,6 +24,7 @@ import logging
 import os
 import re
 import signal
+import subprocess
 import sys
 import threading
 from time import sleep, time
@@ -2174,6 +2175,82 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     self.client.close_query(handle1)
     self.client.wait_for_impala_state(handle2, RUNNING, timeout_s)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--vmodule admission-controller=3 
--default_pool_max_requests=1 ",
+      disable_log_buffering=True)
+  def test_coord_not_registered_in_ac(self):
+    """Regression test for IMPALA-12057. Verifies that no excessive logs are
+    generated when a query is queued in the  admission controller and the 
coordinator
+    hosting the admitted query goes down. Prior to IMPALA-12057, such a 
scenario could
+    cause excessive logging during dequeue attempts. After the fix, such 
logging should
+    no longer occur and the queued query should be rejected."""
+    # Query designed to run for a few minutes.
+    query = "select count(*) from functional.alltypes where int_col = 
sleep(10000)"
+    timeout_s = 10
+    keys = [
+      "admission-controller.total-admitted.default-pool",
+      "admission-controller.total-queued.default-pool",
+      "admission-controller.total-dequeued.default-pool",
+      "admission-controller.total-rejected.default-pool",
+    ]
+
+    def get_ac_metrics(service, keys, default=0):
+      return service.get_metric_values(keys, [default] * len(keys))
+    for i in range(1, 4):
+      handle1 = self.client.execute_async(query)
+      # Make sure the first query has been admitted.
+      self.client.wait_for_impala_state(handle1, RUNNING, timeout_s)
+
+      # Run another query. This query should be queued because only 1 query is 
allowed in
+      # the default pool.
+      handle2 = self.client.execute_async(query)
+      self._wait_for_change_to_profile(handle2, "Admission result: Queued")
+      # Kill the first coordinator.
+      all_coords = self.cluster.get_all_coordinators()
+      all_coords[0].kill()
+      # Wait briefly to allow the potential excessive logging to occur.
+      sleep(3)
+      self.assert_log_contains(self.get_ac_log_name(), 'INFO',
+          "Coordinator not registered with the statestore", expected_count=0)
+      # Verify the metrics.
+      cur_admission_metrics = get_ac_metrics(self.cluster.admissiond.service, 
keys)
+      assert cur_admission_metrics == [i, i, i, i]
+      all_coords[0].start()
+    self.assert_log_contains_multiline(self.get_ac_log_name(), 'INFO',
+        "The coordinator no longer exists")
+
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  def test_retained_removed_coords_size(self):
+    # Use a flag value below the hard cap (1000). Expect the value to be 
accepted.
+    self._start_impala_cluster([
+      '--impalad_args=--vmodule admission-controller=3',
+      '--impalad_args=--cluster_membership_retained_removed_coords=10',
+      'disable_log_buffering=True'])
+    self.assert_log_contains(self.get_ac_log_name(), 'INFO',
+      "Using cluster membership removed coords size 10", expected_count=1)
+
+    # Use invalid values. Expect the cluster to fail to start.
+    try:
+      self._start_impala_cluster([
+        '--impalad_args=--vmodule admission-controller=3',
+        '--impalad_args=--cluster_membership_retained_removed_coords=10000',
+        'disable_log_buffering=True'])
+      self.fail("Expected CalledProcessError was not raised.")
+    except subprocess.CalledProcessError as e:
+      assert "cluster_membership_retained_removed_coords" in str(e)
+
+    try:
+      self._start_impala_cluster([
+        '--impalad_args=--vmodule admission-controller=3',
+        '--impalad_args=--cluster_membership_retained_removed_coords=0',
+        'disable_log_buffering=True'])
+      self.fail("Expected CalledProcessError was not raised.")
+    except subprocess.CalledProcessError as e:
+      assert "cluster_membership_retained_removed_coords" in str(e)
+
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(

Reply via email to