This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 653e5388d IMPALA-13408: use a specific flag for the topic prefix 
cluster identifier.
653e5388d is described below

commit 653e5388dd34252c3c6357172a4b81f030b4651f
Author: Andrew Sherman <[email protected]>
AuthorDate: Mon Sep 30 13:53:12 2024 -0700

    IMPALA-13408: use a specific flag for the topic prefix cluster identifier.
    
    The cluster_id flag was introduced in IMPALA-12426 to identify Impala
    clusters in systems where a single query_log table could be shared. In
    IMPALA-13208 the cluster_id flag was reused as a prefix to topic names
    for backend membership, to allow sub-clusters of backends within a
    Statestore service.
    
    There have been some problems with the interaction of these two usages.
    An important difference is that the query_log cluster_id must be set
    only on coordinators, whereas the topic prefix cluster_id must be set
    simultaneously on coordinators, executors, and admission daemons
    (if present). If a system is started with cluster_id set only on
    coordinators then there are split-brain problems where coordinators and
    executors are tracked in different topics. In addition, the query_log
    cluster_id is more likely to be user-settable as it is used for data in
    query_log which will be read by humans, who may want to write queries
    selecting data from their ‘production’ or ‘dev’ clusters.
    
    Avoid these problems by using a separate flag for the topic prefix
    cluster_id ‘cluster_membership_topic_id’.
    
    Change-Id: Icd3f7e1c73c00a7aaeee79ecb461209e3939c422
    Reviewed-on: http://gerrit.cloudera.org:8080/21867
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/exec-env.cc                   |  6 +++---
 be/src/scheduling/admission-controller.cc    | 12 +++++++++---
 be/src/scheduling/admissiond-env.cc          |  6 +++---
 be/src/scheduling/cluster-membership-mgr.cc  |  7 ++++---
 tests/custom_cluster/test_shared_catalogd.py |  4 ++--
 5 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 50abe359e..a5316acec 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -149,7 +149,7 @@ DECLARE_string(state_store_host);
 DECLARE_int32(state_store_port);
 DECLARE_string(state_store_2_host);
 DECLARE_int32(state_store_2_port);
-DECLARE_string(cluster_id);
+DECLARE_string(cluster_membership_topic_id);
 
 DECLARE_string(debug_actions);
 DECLARE_string(ssl_client_ca_certificate);
@@ -286,8 +286,8 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int 
webserver_port,
   }
   // Set StatestoreSubscriber::subscriber_id as cluster_id + hostname + 
krpc_port.
   string subscriber_id = Substitute("impalad@$0:$1", FLAGS_hostname, 
FLAGS_krpc_port);
-  if (!FLAGS_cluster_id.empty()) {
-    subscriber_id = FLAGS_cluster_id + '-' + subscriber_id;
+  if (!FLAGS_cluster_membership_topic_id.empty()) {
+    subscriber_id = FLAGS_cluster_membership_topic_id + '-' + subscriber_id;
   }
   statestore_subscriber_.reset(new StatestoreSubscriber(subscriber_id, 
subscriber_address,
       statestore_address, statestore2_address, metrics_.get(), 
subscriber_type));
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 55c02749b..bb603383c 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -78,7 +78,13 @@ DEFINE_bool(clamp_query_mem_limit_backend_mem_limit, true, 
"Caps query memory li
 
 DECLARE_bool(is_coordinator);
 DECLARE_bool(is_executor);
-DECLARE_string(cluster_id);
+
+// Note that cluster_membership_topic_id is different from cluster_id in that 
it must be
+// set on both Coordinators and Executors (as well as AdmissionD if present).
+DEFINE_string(cluster_membership_topic_id, "",
+    "Specifies an identifier string that will be used as a prefix to 
Statestore topic "
+    "ids used for grouping Impala backends and Admission Daemons into 
sub-groups within "
+    "an Impala cluster defined by the Statestore service.");
 
 namespace impala {
 
@@ -664,11 +670,11 @@ 
AdmissionController::AdmissionController(ClusterMembershipMgr* cluster_membershi
       });
   total_dequeue_failed_coordinator_limited_ =
       metrics_group_->AddCounter(TOTAL_DEQUEUE_FAILED_COORDINATOR_LIMITED, 0);
-  if (FLAGS_cluster_id.empty()) {
+  if (FLAGS_cluster_membership_topic_id.empty()) {
     request_queue_topic_name_ = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
   } else {
     request_queue_topic_name_ =
-        FLAGS_cluster_id + '-' + Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
+        FLAGS_cluster_membership_topic_id + '-' + 
Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
   }
 }
 
diff --git a/be/src/scheduling/admissiond-env.cc 
b/be/src/scheduling/admissiond-env.cc
index 7260c60a8..12f840721 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -42,7 +42,7 @@ DECLARE_string(state_store_2_host);
 DECLARE_int32(state_store_2_port);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_string(hostname);
-DECLARE_string(cluster_id);
+DECLARE_string(cluster_membership_topic_id);
 
 namespace impala {
 
@@ -65,8 +65,8 @@ AdmissiondEnv::AdmissiondEnv()
       MakeNetworkAddress(FLAGS_state_store_2_host, FLAGS_state_store_2_port);
   string subscriber_id = Substitute("admissiond@$0",
       TNetworkAddressToString(admission_service_addr));
-  if (!FLAGS_cluster_id.empty()) {
-    subscriber_id = FLAGS_cluster_id + '-' + subscriber_id;
+  if (!FLAGS_cluster_membership_topic_id.empty()) {
+    subscriber_id = FLAGS_cluster_membership_topic_id + '-' + subscriber_id;
   }
   statestore_subscriber_.reset(new StatestoreSubscriber(subscriber_id, 
subscriber_address,
       statestore_address, statestore2_address, metrics,
diff --git a/be/src/scheduling/cluster-membership-mgr.cc 
b/be/src/scheduling/cluster-membership-mgr.cc
index fd724fbc1..e08204986 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -31,7 +31,7 @@
 
 DECLARE_int32(num_expected_executors);
 DECLARE_string(expected_executor_group_sets);
-DECLARE_string(cluster_id);
+DECLARE_string(cluster_membership_topic_id);
 
 namespace {
 using namespace impala;
@@ -91,10 +91,11 @@ ClusterMembershipMgr::ClusterMembershipMgr(
     current_membership_(std::make_shared<const Snapshot>()),
     statestore_subscriber_(subscriber),
     local_backend_id_(move(local_backend_id)) {
-  if (FLAGS_cluster_id.empty()) {
+  if (FLAGS_cluster_membership_topic_id.empty()) {
     membership_topic_name_ = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   } else {
-    membership_topic_name_ = FLAGS_cluster_id + '-' + 
Statestore::IMPALA_MEMBERSHIP_TOPIC;
+    membership_topic_name_ =
+        FLAGS_cluster_membership_topic_id + '-' + 
Statestore::IMPALA_MEMBERSHIP_TOPIC;
   }
   Status status = PopulateExpectedExecGroupSets(expected_exec_group_sets_);
   if(!status.ok()) {
diff --git a/tests/custom_cluster/test_shared_catalogd.py 
b/tests/custom_cluster/test_shared_catalogd.py
index 9ac629b55..cdbb14719 100644
--- a/tests/custom_cluster/test_shared_catalogd.py
+++ b/tests/custom_cluster/test_shared_catalogd.py
@@ -30,11 +30,11 @@ class TestSharedCatalogd(CustomClusterTestSuite):
     super(TestSharedCatalogd, self).setup_method(method)
     self.coordinator = self.cluster.impalads[0]
 
-  @CustomClusterTestSuite.with_args(impalad_args="-cluster_id=cluster1")
+  
@CustomClusterTestSuite.with_args(impalad_args="-cluster_membership_topic_id=cluster1")
   def test_disjiont_clusters(self, unique_database):
     """Tests that two Impala clusters can share catalogd and statestore."""
     # Start a new cluster of 3 impalads using a new cluster id.
-    self._start_impala_cluster(["--impalad_args=-cluster_id=cluster2"],
+    
self._start_impala_cluster(["--impalad_args=-cluster_membership_topic_id=cluster2"],
                                cluster_size=3,
                                num_coordinators=3,
                                add_impalads=True,

Reply via email to