This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 8d56eea72 IMPALA-12057: Track removed coordinators to reject queued
queries early
8d56eea72 is described below
commit 8d56eea72518aa11a36aa086dc8961bc8cdbd1fd
Author: Yida Wu <[email protected]>
AuthorDate: Mon Jun 9 09:35:22 2025 -0700
IMPALA-12057: Track removed coordinators to reject queued queries early
Queries in global admission control can remain queued for a long time
if they are assigned to a coordinator that has already left the
cluster. Admissiond can't distinguish between a coordinator that
hasn’t yet been propagated via the statestore and one that has
already been removed, resulting in unnecessary waiting until timeout.
This timeout is determined by either FLAGS_queue_wait_timeout_ms or
the queue_timeout_ms in the pool config. By default,
FLAGS_queue_wait_timeout_ms is 1 minute, but in production it's
normally configured to 10 to 15 minutes.
This change tracks recently removed coordinators and rejects such
queued queries immediately using REASON_COORDINATOR_REMOVED.
To ensure the removed coordinator list remains simple and bounded,
it avoids duplicate entries and enforces FIFO eviction at
the minimum of MAX_REMOVED_COORD_SIZE (1000) and
FLAGS_cluster_membership_retained_removed_coords.
It's possible that a coordinator marked as removed comes back
with the same backend id. In that case, admissiond will see it in
current_backends and won't need to check the removed list. Even
if a coordinator briefly flaps and a request is rejected, it's not
critical, the coordinator can retry. So to keep the design simple
and safe, we keep the removed coord entry as-is.
Added a parameter is_admissiond to the ClusterMembershipMgr
constructor to indicate whether it is running within the admissiond.
Tests:
Passed exhaustive tests.
Added unit tests to verify the eviction logic and the duplicate
case.
Added regression test test_coord_not_registered_in_ac.
Change-Id: I1e0f270299f8c20975d7895c17f4e2791c3360e0
Reviewed-on: http://gerrit.cloudera.org:8080/23094
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/scheduling/admission-controller.cc | 33 +++--
be/src/scheduling/admissiond-env.cc | 15 ++-
be/src/scheduling/cluster-membership-mgr-test.cc | 145 +++++++++++++++++++++-
be/src/scheduling/cluster-membership-mgr.cc | 44 +++++--
be/src/scheduling/cluster-membership-mgr.h | 17 ++-
tests/custom_cluster/test_admission_controller.py | 77 ++++++++++++
6 files changed, 307 insertions(+), 24 deletions(-)
diff --git a/be/src/scheduling/admission-controller.cc
b/be/src/scheduling/admission-controller.cc
index 8c8f90bf4..38623df60 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -260,6 +260,8 @@ const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED =
const string REASON_SCHEDULER_ERROR = "Error during scheduling: $0";
const string REASON_COORDINATOR_NOT_FOUND =
"Coordinator not registered with the statestore.";
+const string REASON_COORDINATOR_REMOVED_PREFIX = "The coordinator no longer
exists: ";
+const string REASON_COORDINATOR_REMOVED = REASON_COORDINATOR_REMOVED_PREFIX +
"$0";
const string REASON_NO_EXECUTOR_GROUPS =
"Waiting for executors to start. Only DDL queries and queries scheduled
only on the "
"coordinator (either NUM_NODES set to 1 or when small query optimization
is "
@@ -1836,7 +1838,8 @@ void AdmissionController::ReleaseQuery(const UniqueIdPB&
query_id,
}
if (to_release.size() > 0) {
LOG(INFO) << "ReleaseQuery for " << query_id << " called with "
- << to_release.size() << "unreleased backends. Releasing
automatically.";
+ << to_release.size()
+ << " unreleased backends. Releasing automatically.";
ReleaseQueryBackendsLocked(query_id, coord_id, to_release);
}
}
@@ -2238,14 +2241,21 @@ Status AdmissionController::ComputeGroupScheduleStates(
output_schedules->clear();
// Queries may arrive before we've gotten a statestore update containing the
descriptor
- // for their coordinator, in which case we queue the query until it arrives.
It's also
- // possible (though very unlikely) that the coordinator was removed from the
cluster
- // membership after submitting this query for admission. Currently, in this
case the
- // query will remain queued until it times out, but we can consider
detecting failed
- // coordinators and cleaning up their queued queries.
+ // for their coordinator, in which case we queue the query until it arrives.
Currently,
+ // in this case the query will remain queued until it times out.
+ // There's also a case where the coordinator was removed from the cluster
after the
+ // query was submitted, in this case, we mark it as COORDINATOR_REMOVED to
avoid
+ // indefinite queuing and improve cleanup.
auto it =
membership_snapshot->current_backends.find(PrintId(request.coord_id));
if (it == membership_snapshot->current_backends.end()) {
- queue_node->not_admitted_reason = REASON_COORDINATOR_NOT_FOUND;
+ auto rm_it =
+
membership_snapshot->removed_coordinators_map.find(PrintId(request.coord_id));
+ if (rm_it == membership_snapshot->removed_coordinators_map.end()) {
+ queue_node->not_admitted_reason = REASON_COORDINATOR_NOT_FOUND;
+ } else {
+ queue_node->not_admitted_reason =
+ Substitute(REASON_COORDINATOR_REMOVED, rm_it->second);
+ }
LOG(WARNING) << queue_node->not_admitted_reason;
return Status::OK();
}
@@ -2326,6 +2336,10 @@ static inline string
PrintScheduleStateMemInfo(ScheduleState* state) {
MemLimitSourcePB_Name(state->coord_backend_mem_to_admit_source()));
}
+static inline bool IsReasonCoordinatorRemoved(const string& reason) {
+ return reason.rfind(REASON_COORDINATOR_REMOVED_PREFIX, 0) == 0;
+}
+
bool AdmissionController::FindGroupToAdmitOrReject(
ClusterMembershipMgr::SnapshotPtr& membership_snapshot,
const TPoolConfig& pool_config, const TPoolConfig& root_cfg, bool
admit_from_queue,
@@ -2349,6 +2363,7 @@ bool AdmissionController::FindGroupToAdmitOrReject(
}
if (queue_node->group_states.empty()) {
DCHECK(!queue_node->not_admitted_reason.empty());
+ if (IsReasonCoordinatorRemoved(queue_node->not_admitted_reason)) return
false;
return true;
}
@@ -2575,11 +2590,14 @@ void AdmissionController::TryDequeue() {
--max_to_dequeue;
VLOG(3) << "Dequeueing from stats for pool " << pool_name;
stats->Dequeue(false);
+ const UniqueIdPB& query_id = queue_node->admission_request.query_id;
if (is_rejected) {
AdmissionOutcome outcome =
queue_node->admit_outcome->Set(AdmissionOutcome::REJECTED);
if (outcome == AdmissionOutcome::REJECTED) {
stats->metrics()->total_rejected->Increment(1);
+ VLOG_QUERY << "Rejected, query id=" << PrintId(query_id)
+ << " reason: " << queue_node->not_admitted_reason;
return; // next query
} else {
DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
@@ -2588,7 +2606,6 @@ void AdmissionController::TryDequeue() {
}
DCHECK(is_cancelled || queue_node->admitted_schedule != nullptr);
- const UniqueIdPB& query_id = queue_node->admission_request.query_id;
if (!is_cancelled) {
VLOG_QUERY << "Admitting from queue: query=" << PrintId(query_id);
AdmissionOutcome outcome =
diff --git a/be/src/scheduling/admissiond-env.cc
b/be/src/scheduling/admissiond-env.cc
index 1f6d13b86..8d188ea36 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -26,6 +26,7 @@
#include "scheduling/scheduler.h"
#include "service/impala-http-handler.h"
#include "util/default-path-handlers.h"
+#include "util/gflag-validator-util.h"
#include "util/mem-info.h"
#include "util/memory-metrics.h"
#include "util/metrics.h"
@@ -35,6 +36,15 @@
DEFINE_int32(
admission_service_port, 29500, "The port where the admission control
service runs");
+DEFINE_int32(cluster_membership_retained_removed_coords, 1000,
+ "Max number of removed coordinators to track. Oldest entry is evicted when
full.");
+DEFINE_validator(
+ cluster_membership_retained_removed_coords, [](const char* name, const int
val) {
+ if (val > 0 && val <= 1000) return true;
+ LOG(ERROR) << "Flag '" << name
+ << "' must be greater than 0 and less than or equal to 1000.";
+ return false;
+ });
DECLARE_string(state_store_host);
DECLARE_int32(state_store_port);
@@ -74,8 +84,9 @@ AdmissiondEnv::AdmissiondEnv()
TStatestoreSubscriberType::ADMISSIOND));
scheduler_.reset(new Scheduler(metrics, request_pool_service()));
- cluster_membership_mgr_.reset(new ClusterMembershipMgr(
- PrintId(DaemonEnv::GetInstance()->backend_id()), subscriber(), metrics));
+ cluster_membership_mgr_.reset(
+ new ClusterMembershipMgr(PrintId(DaemonEnv::GetInstance()->backend_id()),
+ subscriber(), metrics, true /* is_admissiond */));
admission_controller_.reset(new AdmissionController(cluster_membership_mgr(),
subscriber(), request_pool_service(), metrics, scheduler(),
pool_mem_trackers(),
admission_service_addr));
diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc
b/be/src/scheduling/cluster-membership-mgr-test.cc
index f67d508d3..5ba4d5f92 100644
--- a/be/src/scheduling/cluster-membership-mgr-test.cc
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -40,6 +40,7 @@ DECLARE_int32(statestore_max_missed_heartbeats);
DECLARE_int32(statestore_heartbeat_frequency_ms);
DECLARE_int32(num_expected_executors);
DECLARE_string(expected_executor_group_sets);
+DECLARE_int32(cluster_membership_retained_removed_coords);
namespace impala {
@@ -169,6 +170,16 @@ class ClusterMembershipMgrTest : public testing::Test {
}
}
+ /// Sends a single topic item in a delta to the specific backend.
+ void SendDeltaTo(Backend* be, const TTopicItem& item) {
+ StatestoreSubscriber::TopicDeltaMap topic_delta_map;
+ TTopicDelta& delta = topic_delta_map[Statestore::IMPALA_MEMBERSHIP_TOPIC];
+ delta.is_delta = true;
+ delta.topic_entries.push_back(item);
+ vector<TTopicDelta> unused;
+ be->cmm->UpdateMembership(topic_delta_map, &unused);
+ }
+
/// Creates a new backend and adds it to the list of offline backends. If
idx is
/// omitted, the current number of backends will be used as the new index.
Backend* CreateBackend(int idx = -1) {
@@ -184,11 +195,11 @@ class ClusterMembershipMgrTest : public testing::Test {
/// Creates a new ClusterMembershipMgr for a backend and moves the backend
from
/// 'offline_' to 'starting_'. Callers must handle invalidated iterators
after calling
/// this method.
- void CreateCMM(Backend* be) {
+ void CreateCMM(Backend* be, bool is_admissiond = false) {
ASSERT_TRUE(IsInVector(be, offline_));
be->metric_group = make_unique<MetricGroup>("test");
be->cmm = make_unique<ClusterMembershipMgr>(
- PrintId(be->backend_id), nullptr, be->metric_group.get());
+ PrintId(be->backend_id), nullptr, be->metric_group.get(),
is_admissiond);
RemoveFromVector(be, &offline_);
starting_.push_back(be);
}
@@ -261,6 +272,102 @@ class ClusterMembershipMgrTest : public testing::Test {
if (is_quiescing) RemoveFromVector(be, &quiescing_);
}
+ /// Helper function for removed coordinator tests.
+ /// If same_backend_id is true, each loop uses the same backend_id.
+ void RemovedCoordinatorTestHelper(bool same_backend_id) {
+ Backend* base = CreateBackend(9999);
+ CreateCMM(base, true /* is_admissiond */);
+ Poll(base);
+
+ int be_port = 10000;
+ Backend* b = CreateBackend(be_port);
+ BackendDescriptorPB be = *b->desc;
+ be.set_is_coordinator(true);
+ string static_be_id;
+ be.SerializeToString(&static_be_id);
+ string be_str = static_be_id;
+
+ for (int i = 0; i < 10; ++i) {
+ if (!same_backend_id && i > 0) {
+ b = CreateBackend(be_port + i);
+ be = *b->desc;
+ be.set_is_coordinator(true);
+ be.SerializeToString(&be_str);
+ }
+
+ // Add the coordinator backend
+ TTopicItem add_item;
+ add_item.key = PrintId(be.backend_id());
+ add_item.value = be_str;
+ add_item.deleted = false;
+ SendDeltaTo(base, add_item);
+
+ // Now delete it
+ TTopicItem delete_item;
+ delete_item.key = PrintId(be.backend_id());
+ delete_item.deleted = true;
+ SendDeltaTo(base, delete_item);
+ }
+
+ auto snap = base->cmm->GetSnapshot();
+ EXPECT_EQ(0, snap->current_backends.size());
+
+ if (same_backend_id) {
+ EXPECT_EQ(1, snap->removed_coordinators_map.size());
+ EXPECT_EQ(1, snap->removed_coordinators_order.size());
+ } else {
+ EXPECT_EQ(10, snap->removed_coordinators_map.size());
+ EXPECT_EQ(10, snap->removed_coordinators_order.size());
+ }
+ }
+
+ /// Helper function for removed coordinator eviction tests.
+ void TestRemovedCoordinatorListEviction(int expected_max_size) {
+ Backend* base = CreateBackend(9999);
+ CreateCMM(base, true /* is_admissiond */);
+ Poll(base);
+
+ const int num_backends = expected_max_size + 1;
+ string first_id, middle_id, last_id;
+
+ for (int i = 0; i < num_backends; ++i) {
+ Backend* b = CreateBackend(10000 + i);
+ BackendDescriptorPB be = *b->desc;
+ be.set_is_coordinator(true);
+ std::string be_str;
+ be.SerializeToString(&be_str);
+
+ // Add backend
+ TTopicItem add_item;
+ add_item.key = PrintId(be.backend_id());
+ add_item.value = be_str;
+ add_item.deleted = false;
+ SendDeltaTo(base, add_item);
+
+ if (i == 0)
+ first_id = add_item.key;
+ else if (i == expected_max_size / 2)
+ middle_id = add_item.key;
+ else if (i == num_backends - 1)
+ last_id = add_item.key;
+
+ // Then delete it
+ TTopicItem delete_item;
+ delete_item.key = add_item.key;
+ delete_item.deleted = true;
+ SendDeltaTo(base, delete_item);
+ }
+
+ auto snap = base->cmm->GetSnapshot();
+ EXPECT_EQ(expected_max_size, snap->removed_coordinators_map.size());
+ EXPECT_EQ(expected_max_size, snap->removed_coordinators_order.size());
+
+ EXPECT_EQ(0, snap->removed_coordinators_map.count(first_id))
+ << "Oldest coordinator should be evicted when list exceeds " <<
expected_max_size;
+ EXPECT_EQ(1, snap->removed_coordinators_map.count(middle_id));
+ EXPECT_EQ(1, snap->removed_coordinators_map.count(last_id));
+ }
+
mt19937 rng_;
int RandomInt(int max) {
@@ -303,6 +410,29 @@ void _assertCoords(
}
}
+/// This test verifies that repeatedly adding and removing the same coordinator
+/// backend does not result in duplicate entries in the removed coordinators
map.
+TEST_F(ClusterMembershipMgrTest, RemovedCoordinatorListRepeat) {
+ RemovedCoordinatorTestHelper(/*same_backend_id=*/true);
+}
+
+/// This test verifies that adding and removing different coordinator backends
+/// results in multiple unique entries in the removed coordinators map.
+TEST_F(ClusterMembershipMgrTest, RemovedCoordinatorListDifferentIds) {
+ RemovedCoordinatorTestHelper(/*same_backend_id=*/false);
+}
+
+/// These tests verify that the removed coordinator map, when configured with
varying
+/// maximum sizes, correctly evicts the oldest entry following a FIFO policy.
+TEST_F(ClusterMembershipMgrTest, RemovedCoordEvictionWithDefault) {
+ TestRemovedCoordinatorListEviction(/*expected_max_size=*/1000);
+}
+
+TEST_F(ClusterMembershipMgrTest, RemovedCoordEvictionWithLimit100) {
+ FLAGS_cluster_membership_retained_removed_coords = 100;
+ TestRemovedCoordinatorListEviction(/*expected_max_size=*/100);
+}
+
/// This test takes two instances of the ClusterMembershipMgr through a common
lifecycle.
/// It also serves as an example for how to craft statestore messages and pass
them to
/// UpdateMembership().
@@ -312,8 +442,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
MetricGroup tmp_metrics1("test-metrics1");
MetricGroup tmp_metrics2("test-metrics2");
- ClusterMembershipMgr cmm1(b1->address().hostname(), nullptr, &tmp_metrics1);
- ClusterMembershipMgr cmm2(b2->address().hostname(), nullptr, &tmp_metrics2);
+ ClusterMembershipMgr cmm1(b1->address().hostname(), nullptr, &tmp_metrics1,
true);
+ ClusterMembershipMgr cmm2(b2->address().hostname(), nullptr, &tmp_metrics2,
true);
const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id,
TTopicDelta()}};
@@ -390,10 +520,15 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
ss_topic_delta->topic_entries[0].deleted = true;
cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
ASSERT_EQ(0, returned_topic_deltas.size());
- ASSERT_EQ(1, cmm2.GetSnapshot()->current_backends.size());
+ auto snap = cmm2.GetSnapshot();
+ string b1_id = PrintId(b1->backend_id());
+ ASSERT_EQ(1, snap->current_backends.size());
ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
_assertCoords(cmm1, {"host_1", "host_2"});
_assertCoords(cmm2, {"host_2"});
+ ASSERT_EQ(1, snap->removed_coordinators_map.size());
+ ASSERT_TRUE(snap->removed_coordinators_map.count(b1_id));
+ ASSERT_EQ(b1_id, snap->removed_coordinators_order.front());
}
TEST_F(ClusterMembershipMgrTest, IsBlacklisted) {
diff --git a/be/src/scheduling/cluster-membership-mgr.cc
b/be/src/scheduling/cluster-membership-mgr.cc
index 4815ec08e..85e2a5a34 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -32,6 +32,7 @@
DECLARE_int32(num_expected_executors);
DECLARE_string(expected_executor_group_sets);
DECLARE_string(cluster_membership_topic_id);
+DECLARE_int32(cluster_membership_retained_removed_coords);
namespace {
using namespace impala;
@@ -88,12 +89,13 @@ static const string TOTAL_BACKENDS_KEY_FORMAT(
const string ClusterMembershipMgr::EMPTY_GROUP_NAME(
"empty group (using coordinator only)");
-ClusterMembershipMgr::ClusterMembershipMgr(
- string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup*
metrics)
+ClusterMembershipMgr::ClusterMembershipMgr(string local_backend_id,
+ StatestoreSubscriber* subscriber, MetricGroup* metrics, bool is_admissiond)
: empty_exec_group_(EMPTY_GROUP_NAME),
current_membership_(std::make_shared<const Snapshot>()),
statestore_subscriber_(subscriber),
- local_backend_id_(move(local_backend_id)) {
+ local_backend_id_(move(local_backend_id)),
+ is_admissiond_(is_admissiond) {
if (FLAGS_cluster_membership_topic_id.empty()) {
membership_topic_name_ = Statestore::IMPALA_MEMBERSHIP_TOPIC;
} else {
@@ -109,6 +111,8 @@ ClusterMembershipMgr::ClusterMembershipMgr(
RegisterUpdateCallbackFn([this](const ClusterMembershipMgr::SnapshotPtr&
snapshot) {
this->UpdateMetrics(snapshot);
});
+ LOG(INFO) << "Using cluster membership removed coords size "
+ << FLAGS_cluster_membership_retained_removed_coords;
}
void ClusterMembershipMgr::InitMetrics(MetricGroup* metrics) {
@@ -190,15 +194,39 @@ vector<TNetworkAddress>
ClusterMembershipMgr::Snapshot::GetCoordinatorAddresses(
return coordinators;
}
-static inline void _removeCoordIfExists(
+static inline void _markCoordinatorAsRemoved(
const std::shared_ptr<ClusterMembershipMgr::Snapshot>& state,
- const BackendDescriptorPB& be) {
+ const BackendDescriptorPB& be, const BackendDescriptorPB& desc) {
+ string backend_id_str = PrintId(be.backend_id());
+ if (state->removed_coordinators_map.find(backend_id_str)
+ != state->removed_coordinators_map.end()) {
+ return;
+ }
+ state->removed_coordinators_map[backend_id_str] =
+ NetworkAddressPBToString(desc.address());
+ state->removed_coordinators_order.push_back(backend_id_str);
+ DCHECK(FLAGS_cluster_membership_retained_removed_coords > 0);
+ if (state->removed_coordinators_map.size()
+ > FLAGS_cluster_membership_retained_removed_coords) {
+ const string& oldest = state->removed_coordinators_order.front();
+ state->removed_coordinators_map.erase(oldest);
+ state->removed_coordinators_order.pop_front();
+ }
+}
+static inline void _removeCoordIfExists(
+ const std::shared_ptr<ClusterMembershipMgr::Snapshot>& state,
+ const BackendDescriptorPB& be, bool is_admissiond) {
// The BackendDescriptorPB may be incomplete. Use the backend id to retrieve
the actual
// backend descriptor so the backend can be removed.
const BackendDescriptorPB* actual_be =
state->all_coordinators.LookUpBackendDesc(be.backend_id());
if (actual_be != nullptr) {
+ if (is_admissiond) {
+ // If global admission control is enabled, we also need to track
information
+ // about the removed coordinator to prevent potential issues.
+ _markCoordinatorAsRemoved(state, be, *actual_be);
+ }
state->all_coordinators.RemoveExecutor(*actual_be);
}
}
@@ -306,7 +334,7 @@ void ClusterMembershipMgr::UpdateMembership(
// If a coordinator is not shutdown gracefully, then it will be
deleted here.
if (be_desc.is_coordinator()) {
- _removeCoordIfExists(new_state, be_desc);
+ _removeCoordIfExists(new_state, be_desc, is_admissiond_);
}
// Note: be_desc is a reference to item.key, thus this erase must come
at the end
@@ -381,7 +409,7 @@ void ClusterMembershipMgr::UpdateMembership(
}
if (existing.is_coordinator()) {
- _removeCoordIfExists(new_state, be_desc);
+ _removeCoordIfExists(new_state, be_desc, is_admissiond_);
}
}
existing = be_desc;
@@ -445,7 +473,7 @@ void ClusterMembershipMgr::UpdateMembership(
// Add ourself to the list of all coordinators.
if (is_active_coordinator(*local_be_desc.get())) {
- _removeCoordIfExists(new_state, *local_be_desc);
+ _removeCoordIfExists(new_state, *local_be_desc, is_admissiond_);
new_state->all_coordinators.AddExecutor(*local_be_desc);
}
diff --git a/be/src/scheduling/cluster-membership-mgr.h
b/be/src/scheduling/cluster-membership-mgr.h
index 848067b47..595379efc 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -75,6 +75,9 @@ class ClusterMembershipMgr {
/// Maps backend IDs to backend descriptors.
typedef std::unordered_map<std::string, BackendDescriptorPB> BackendIdMap;
+ /// Maps backend IDs to backend addresses.
+ typedef std::unordered_map<std::string, std::string> BackendIdAddrMap;
+
/// Maps executor group names to executor groups.
typedef std::unordered_map<std::string, ExecutorGroup> ExecutorGroups;
@@ -113,6 +116,15 @@ class ClusterMembershipMgr {
// Executor group of all non-quiescing coordinators in the cluster. Set
during the
// SetState() function.
ExecutorGroup all_coordinators;
+ // Map from unique backend ID to backend address for coordinators that were
+ // removed from the cluster membership. This helps identify recently
removed
+ // coordinators and is used to reject queries destined for those
coordinators to
+ // prevent hanging or routing issues.
+ BackendIdAddrMap removed_coordinators_map;
+ // Tracks the insertion order of entries in 'removed_coordinators_map' to
implement
+ // FIFO eviction. When the number of removed coordinators exceeds the
maximum allowed
+ // size (MAX_REMOVED_COORD_SIZE), the oldest entry (front of the list) is
evicted.
+ std::list<std::string> removed_coordinators_order;
};
/// An immutable shared membership snapshot.
@@ -130,7 +142,7 @@ class ClusterMembershipMgr {
typedef std::function<void(const SnapshotPtr&)> UpdateCallbackFn;
ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber*
subscriber,
- MetricGroup* metrics);
+ MetricGroup* metrics, bool is_admissiond = false);
/// Initializes instances of this class. This only sets up the statestore
subscription.
/// Callbacks to the local ImpalaServer and Frontend must be registered in
separate
@@ -280,6 +292,9 @@ class ClusterMembershipMgr {
/// protected by a lock - only used in the statestore thread.
std::string local_backend_id_;
+ /// If true, the cluster membership manager is in an admissiond process.
+ bool is_admissiond_;
+
/// Callbacks that provide external dependencies.
BackendDescriptorPtrFn local_be_desc_fn_;
std::vector<UpdateCallbackFn> update_callback_fns_;
diff --git a/tests/custom_cluster/test_admission_controller.py
b/tests/custom_cluster/test_admission_controller.py
index 5b9850732..50fa518ce 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -24,6 +24,7 @@ import logging
import os
import re
import signal
+import subprocess
import sys
import threading
from time import sleep, time
@@ -2174,6 +2175,82 @@ class
TestAdmissionControllerWithACService(TestAdmissionController):
self.client.close_query(handle1)
self.client.wait_for_impala_state(handle2, RUNNING, timeout_s)
+ @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--vmodule admission-controller=3
--default_pool_max_requests=1 ",
+ disable_log_buffering=True)
+ def test_coord_not_registered_in_ac(self):
+ """Regression test for IMPALA-12057. Verifies that no excessive logs are
+ generated when a query is queued in the admission controller and the
coordinator
+ hosting the admitted query goes down. Prior to IMPALA-12057, such a
scenario could
+ cause excessive logging during dequeue attempts. After the fix, such
logging should
+ no longer occur and the queued query should be rejected."""
+ # Query designed to run for a few minutes.
+ query = "select count(*) from functional.alltypes where int_col =
sleep(10000)"
+ timeout_s = 10
+ keys = [
+ "admission-controller.total-admitted.default-pool",
+ "admission-controller.total-queued.default-pool",
+ "admission-controller.total-dequeued.default-pool",
+ "admission-controller.total-rejected.default-pool",
+ ]
+
+ def get_ac_metrics(service, keys, default=0):
+ return service.get_metric_values(keys, [default] * len(keys))
+ for i in range(1, 4):
+ handle1 = self.client.execute_async(query)
+ # Make sure the first query has been admitted.
+ self.client.wait_for_impala_state(handle1, RUNNING, timeout_s)
+
+ # Run another query. This query should be queued because only 1 query is
allowed in
+ # the default pool.
+ handle2 = self.client.execute_async(query)
+ self._wait_for_change_to_profile(handle2, "Admission result: Queued")
+ # Kill the first coordinator.
+ all_coords = self.cluster.get_all_coordinators()
+ all_coords[0].kill()
+ # Wait briefly to allow the potential excessive logging to occur.
+ sleep(3)
+ self.assert_log_contains(self.get_ac_log_name(), 'INFO',
+ "Coordinator not registered with the statestore", expected_count=0)
+ # Verify the metrics.
+ cur_admission_metrics = get_ac_metrics(self.cluster.admissiond.service,
keys)
+ assert cur_admission_metrics == [i, i, i, i]
+ all_coords[0].start()
+ self.assert_log_contains_multiline(self.get_ac_log_name(), 'INFO',
+ "The coordinator no longer exists")
+
+ @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+ @pytest.mark.execute_serially
+ def test_retained_removed_coords_size(self):
+ # Use a flag value below the hard cap (1000). Expect the value to be
accepted.
+ self._start_impala_cluster([
+ '--impalad_args=--vmodule admission-controller=3',
+ '--impalad_args=--cluster_membership_retained_removed_coords=10',
+ 'disable_log_buffering=True'])
+ self.assert_log_contains(self.get_ac_log_name(), 'INFO',
+ "Using cluster membership removed coords size 10", expected_count=1)
+
+ # Use invalid values. Expect the cluster to fail to start.
+ try:
+ self._start_impala_cluster([
+ '--impalad_args=--vmodule admission-controller=3',
+ '--impalad_args=--cluster_membership_retained_removed_coords=10000',
+ 'disable_log_buffering=True'])
+ self.fail("Expected CalledProcessError was not raised.")
+ except subprocess.CalledProcessError as e:
+ assert "cluster_membership_retained_removed_coords" in str(e)
+
+ try:
+ self._start_impala_cluster([
+ '--impalad_args=--vmodule admission-controller=3',
+ '--impalad_args=--cluster_membership_retained_removed_coords=0',
+ 'disable_log_buffering=True'])
+ self.fail("Expected CalledProcessError was not raised.")
+ except subprocess.CalledProcessError as e:
+ assert "cluster_membership_retained_removed_coords" in str(e)
+
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(