This is an automated email from the ASF dual-hosted git repository.
asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 653e5388d IMPALA-13408: use a specific flag for the topic prefix
cluster identifier.
653e5388d is described below
commit 653e5388dd34252c3c6357172a4b81f030b4651f
Author: Andrew Sherman <[email protected]>
AuthorDate: Mon Sep 30 13:53:12 2024 -0700
IMPALA-13408: use a specific flag for the topic prefix cluster identifier.
The cluster_id flag was introduced in IMPALA-12426 to identify Impala
clusters in systems where a single query_log table could be shared. In
IMPALA-13208 the cluster_id flag was reused as a prefix to topic names
for backend membership, to allow sub-clusters of backends within a
Statestore service.
There have been some problems with the interaction of these two usages.
An important difference is that the query_log cluster_id must be set
only on coordinators, whereas the topic prefix cluster_id must be set
simultaneously on coordinators, executors, and admission daemons
(if present). If a system is started with cluster_id set only on
coordinators then there are split-brain problems where coordinators and
executors are tracked in different topics. In addition, the query_log
cluster_id is more likely to be user-settable as it is used for data in
query_log which will be read by humans, who may want to write queries
selecting data from their ‘production’ or ‘dev’ clusters.
Avoid these problems by using a separate flag for the topic prefix
cluster_id ‘cluster_membership_topic_id’.
Change-Id: Icd3f7e1c73c00a7aaeee79ecb461209e3939c422
Reviewed-on: http://gerrit.cloudera.org:8080/21867
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/exec-env.cc | 6 +++---
be/src/scheduling/admission-controller.cc | 12 +++++++++---
be/src/scheduling/admissiond-env.cc | 6 +++---
be/src/scheduling/cluster-membership-mgr.cc | 7 ++++---
tests/custom_cluster/test_shared_catalogd.py | 4 ++--
5 files changed, 21 insertions(+), 14 deletions(-)
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 50abe359e..a5316acec 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -149,7 +149,7 @@ DECLARE_string(state_store_host);
DECLARE_int32(state_store_port);
DECLARE_string(state_store_2_host);
DECLARE_int32(state_store_2_port);
-DECLARE_string(cluster_id);
+DECLARE_string(cluster_membership_topic_id);
DECLARE_string(debug_actions);
DECLARE_string(ssl_client_ca_certificate);
@@ -286,8 +286,8 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int
webserver_port,
}
// Set StatestoreSubscriber::subscriber_id as cluster_id + hostname +
krpc_port.
string subscriber_id = Substitute("impalad@$0:$1", FLAGS_hostname,
FLAGS_krpc_port);
- if (!FLAGS_cluster_id.empty()) {
- subscriber_id = FLAGS_cluster_id + '-' + subscriber_id;
+ if (!FLAGS_cluster_membership_topic_id.empty()) {
+ subscriber_id = FLAGS_cluster_membership_topic_id + '-' + subscriber_id;
}
statestore_subscriber_.reset(new StatestoreSubscriber(subscriber_id,
subscriber_address,
statestore_address, statestore2_address, metrics_.get(),
subscriber_type));
diff --git a/be/src/scheduling/admission-controller.cc
b/be/src/scheduling/admission-controller.cc
index 55c02749b..bb603383c 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -78,7 +78,13 @@ DEFINE_bool(clamp_query_mem_limit_backend_mem_limit, true,
"Caps query memory li
DECLARE_bool(is_coordinator);
DECLARE_bool(is_executor);
-DECLARE_string(cluster_id);
+
+// Note that cluster_membership_topic_id is different from cluster_id in that
it must be
+// set on both Coordinators and Executors (as well as AdmissionD if present).
+DEFINE_string(cluster_membership_topic_id, "",
+ "Specifies an identifier string that will be used as a prefix to
Statestore topic "
+ "ids used for grouping Impala backends and Admission Daemons into
sub-groups within "
+ "an Impala cluster defined by the Statestore service.");
namespace impala {
@@ -664,11 +670,11 @@
AdmissionController::AdmissionController(ClusterMembershipMgr* cluster_membershi
});
total_dequeue_failed_coordinator_limited_ =
metrics_group_->AddCounter(TOTAL_DEQUEUE_FAILED_COORDINATOR_LIMITED, 0);
- if (FLAGS_cluster_id.empty()) {
+ if (FLAGS_cluster_membership_topic_id.empty()) {
request_queue_topic_name_ = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
} else {
request_queue_topic_name_ =
- FLAGS_cluster_id + '-' + Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
+ FLAGS_cluster_membership_topic_id + '-' +
Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
}
}
diff --git a/be/src/scheduling/admissiond-env.cc
b/be/src/scheduling/admissiond-env.cc
index 7260c60a8..12f840721 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -42,7 +42,7 @@ DECLARE_string(state_store_2_host);
DECLARE_int32(state_store_2_port);
DECLARE_int32(state_store_subscriber_port);
DECLARE_string(hostname);
-DECLARE_string(cluster_id);
+DECLARE_string(cluster_membership_topic_id);
namespace impala {
@@ -65,8 +65,8 @@ AdmissiondEnv::AdmissiondEnv()
MakeNetworkAddress(FLAGS_state_store_2_host, FLAGS_state_store_2_port);
string subscriber_id = Substitute("admissiond@$0",
TNetworkAddressToString(admission_service_addr));
- if (!FLAGS_cluster_id.empty()) {
- subscriber_id = FLAGS_cluster_id + '-' + subscriber_id;
+ if (!FLAGS_cluster_membership_topic_id.empty()) {
+ subscriber_id = FLAGS_cluster_membership_topic_id + '-' + subscriber_id;
}
statestore_subscriber_.reset(new StatestoreSubscriber(subscriber_id,
subscriber_address,
statestore_address, statestore2_address, metrics,
diff --git a/be/src/scheduling/cluster-membership-mgr.cc
b/be/src/scheduling/cluster-membership-mgr.cc
index fd724fbc1..e08204986 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -31,7 +31,7 @@
DECLARE_int32(num_expected_executors);
DECLARE_string(expected_executor_group_sets);
-DECLARE_string(cluster_id);
+DECLARE_string(cluster_membership_topic_id);
namespace {
using namespace impala;
@@ -91,10 +91,11 @@ ClusterMembershipMgr::ClusterMembershipMgr(
current_membership_(std::make_shared<const Snapshot>()),
statestore_subscriber_(subscriber),
local_backend_id_(move(local_backend_id)) {
- if (FLAGS_cluster_id.empty()) {
+ if (FLAGS_cluster_membership_topic_id.empty()) {
membership_topic_name_ = Statestore::IMPALA_MEMBERSHIP_TOPIC;
} else {
- membership_topic_name_ = FLAGS_cluster_id + '-' +
Statestore::IMPALA_MEMBERSHIP_TOPIC;
+ membership_topic_name_ =
+ FLAGS_cluster_membership_topic_id + '-' +
Statestore::IMPALA_MEMBERSHIP_TOPIC;
}
Status status = PopulateExpectedExecGroupSets(expected_exec_group_sets_);
if(!status.ok()) {
diff --git a/tests/custom_cluster/test_shared_catalogd.py
b/tests/custom_cluster/test_shared_catalogd.py
index 9ac629b55..cdbb14719 100644
--- a/tests/custom_cluster/test_shared_catalogd.py
+++ b/tests/custom_cluster/test_shared_catalogd.py
@@ -30,11 +30,11 @@ class TestSharedCatalogd(CustomClusterTestSuite):
super(TestSharedCatalogd, self).setup_method(method)
self.coordinator = self.cluster.impalads[0]
- @CustomClusterTestSuite.with_args(impalad_args="-cluster_id=cluster1")
+
@CustomClusterTestSuite.with_args(impalad_args="-cluster_membership_topic_id=cluster1")
def test_disjiont_clusters(self, unique_database):
"""Tests that two Impala clusters can share catalogd and statestore."""
# Start a new cluster of 3 impalads using a new cluster id.
- self._start_impala_cluster(["--impalad_args=-cluster_id=cluster2"],
+
self._start_impala_cluster(["--impalad_args=-cluster_membership_topic_id=cluster2"],
cluster_size=3,
num_coordinators=3,
add_impalads=True,