This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new fcee022e6 IMPALA-13208: Add cluster id to the membership and 
request-queue topic names
fcee022e6 is described below

commit fcee022e6033afe8c8c072fef1274640336b8770
Author: stiga-huang <[email protected]>
AuthorDate: Sun Jul 7 16:06:25 2024 +0800

    IMPALA-13208: Add cluster id to the membership and request-queue topic names
    
    To share catalogd and statestore across Impala clusters, this adds the
    cluster id to the membership and request-queue topic names. So impalads
    are only visible to each other inside the same cluster, i.e. using the
    same cluster id. Note that impalads are still subscribe to the same
    catalog-update topic so they can share the same catalog service.
    If cluster id is empty, use the original topic names.
    
    This also adds the non-empty cluster id as the prefix of the statestore
    subscriber id for impalad and admissiond.
    
    Tests:
     - Add custom cluster test
     - Ran exhaustive tests
    
    Change-Id: I2ff41539f568ef03c0ee2284762b4116b313d90f
    Reviewed-on: http://gerrit.cloudera.org:8080/21573
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/exec-env.cc                   | 10 ++--
 be/src/scheduling/admission-controller.cc    | 13 ++++--
 be/src/scheduling/admission-controller.h     |  2 +
 be/src/scheduling/admissiond-env.cc          | 11 +++--
 be/src/scheduling/cluster-membership-mgr.cc  | 12 +++--
 be/src/scheduling/cluster-membership-mgr.h   |  2 +
 be/src/statestore/statestore.cc              |  5 +-
 bin/start-impala-cluster.py                  | 24 +++++++---
 tests/common/custom_cluster_test_suite.py    |  4 ++
 tests/common/impala_test_suite.py            | 11 ++++-
 tests/custom_cluster/test_query_live.py      |  3 +-
 tests/custom_cluster/test_shared_catalogd.py | 68 ++++++++++++++++++++++++++++
 12 files changed, 144 insertions(+), 21 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 61dff495c..50abe359e 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -149,6 +149,7 @@ DECLARE_string(state_store_host);
 DECLARE_int32(state_store_port);
 DECLARE_string(state_store_2_host);
 DECLARE_int32(state_store_2_port);
+DECLARE_string(cluster_id);
 
 DECLARE_string(debug_actions);
 DECLARE_string(ssl_client_ca_certificate);
@@ -283,9 +284,12 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int 
webserver_port,
   } else if (FLAGS_is_executor) {
     subscriber_type = TStatestoreSubscriberType::EXECUTOR;
   }
-  // Set StatestoreSubscriber::subscriber_id as hostname + krpc_port.
-  statestore_subscriber_.reset(new StatestoreSubscriber(
-      Substitute("impalad@$0:$1", FLAGS_hostname, FLAGS_krpc_port), 
subscriber_address,
+  // Set StatestoreSubscriber::subscriber_id as cluster_id + hostname + 
krpc_port.
+  string subscriber_id = Substitute("impalad@$0:$1", FLAGS_hostname, 
FLAGS_krpc_port);
+  if (!FLAGS_cluster_id.empty()) {
+    subscriber_id = FLAGS_cluster_id + '-' + subscriber_id;
+  }
+  statestore_subscriber_.reset(new StatestoreSubscriber(subscriber_id, 
subscriber_address,
       statestore_address, statestore2_address, metrics_.get(), 
subscriber_type));
 
   if (FLAGS_is_coordinator) {
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index ef6449a70..55c02749b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -78,6 +78,7 @@ DEFINE_bool(clamp_query_mem_limit_backend_mem_limit, true, 
"Caps query memory li
 
 DECLARE_bool(is_coordinator);
 DECLARE_bool(is_executor);
+DECLARE_string(cluster_id);
 
 namespace impala {
 
@@ -663,6 +664,12 @@ 
AdmissionController::AdmissionController(ClusterMembershipMgr* cluster_membershi
       });
   total_dequeue_failed_coordinator_limited_ =
       metrics_group_->AddCounter(TOTAL_DEQUEUE_FAILED_COORDINATOR_LIMITED, 0);
+  if (FLAGS_cluster_id.empty()) {
+    request_queue_topic_name_ = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
+  } else {
+    request_queue_topic_name_ =
+        FLAGS_cluster_id + '-' + Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
+  }
 }
 
 AdmissionController::~AdmissionController() {
@@ -694,7 +701,7 @@ Status AdmissionController::Init() {
   // This can effectively reduce the network load of the statestore.
   string filter_prefix =
       FLAGS_is_executor && !FLAGS_is_coordinator ? TOPIC_KEY_POOL_PREFIX : "";
-  Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC,
+  Status status = subscriber_->AddTopic(request_queue_topic_name_,
       /* is_transient=*/true, /* populate_min_subscriber_topic_version=*/false,
       filter_prefix, cb);
   if (!status.ok()) {
@@ -1700,7 +1707,7 @@ void AdmissionController::UpdatePoolStats(
     AddPoolAndPerHostStatsUpdates(subscriber_topic_updates);
 
     StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-        incoming_topic_deltas.find(Statestore::IMPALA_REQUEST_QUEUE_TOPIC);
+        incoming_topic_deltas.find(request_queue_topic_name_);
     if (topic != incoming_topic_deltas.end()) {
       const TTopicDelta& delta = topic->second;
       // Delta and non-delta updates are handled the same way, except for a 
full update
@@ -2095,7 +2102,7 @@ void AdmissionController::AddPoolAndPerHostStatsUpdates(
   }
   topic_updates->push_back(TTopicDelta());
   TTopicDelta& topic_delta = topic_updates->back();
-  topic_delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
+  topic_delta.topic_name = request_queue_topic_name_;
   for (const string& pool_name: pools_for_updates_) {
     DCHECK(pool_stats_.find(pool_name) != pool_stats_.end());
     PoolStats* stats = GetPoolStats(pool_name);
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index 9c4db809d..296559c9c 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -912,6 +912,8 @@ class AdmissionController {
   typedef boost::unordered_map<UniqueIdPB, int> NumReleasedBackends;
   NumReleasedBackends num_released_backends_;
 
+  std::string request_queue_topic_name_;
+
   /// Resolves the resource pool name in 'query_ctx.request_pool' and stores 
the resulting
   /// name in 'pool_name' and the resulting config in 'pool_config'.
   Status ResolvePoolAndGetConfig(const TQueryCtx& query_ctx, std::string* 
pool_name,
diff --git a/be/src/scheduling/admissiond-env.cc 
b/be/src/scheduling/admissiond-env.cc
index 8efa5c9d6..7260c60a8 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -42,6 +42,7 @@ DECLARE_string(state_store_2_host);
 DECLARE_int32(state_store_2_port);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_string(hostname);
+DECLARE_string(cluster_id);
 
 namespace impala {
 
@@ -62,9 +63,13 @@ AdmissiondEnv::AdmissiondEnv()
       MakeNetworkAddress(FLAGS_state_store_host, FLAGS_state_store_port);
   TNetworkAddress statestore2_address =
       MakeNetworkAddress(FLAGS_state_store_2_host, FLAGS_state_store_2_port);
-  statestore_subscriber_.reset(new StatestoreSubscriber(
-      Substitute("admissiond@$0", 
TNetworkAddressToString(admission_service_addr)),
-      subscriber_address, statestore_address, statestore2_address, metrics,
+  string subscriber_id = Substitute("admissiond@$0",
+      TNetworkAddressToString(admission_service_addr));
+  if (!FLAGS_cluster_id.empty()) {
+    subscriber_id = FLAGS_cluster_id + '-' + subscriber_id;
+  }
+  statestore_subscriber_.reset(new StatestoreSubscriber(subscriber_id, 
subscriber_address,
+      statestore_address, statestore2_address, metrics,
       TStatestoreSubscriberType::ADMISSIOND));
 
   scheduler_.reset(new Scheduler(metrics, request_pool_service()));
diff --git a/be/src/scheduling/cluster-membership-mgr.cc 
b/be/src/scheduling/cluster-membership-mgr.cc
index 89024ef97..fd724fbc1 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -31,6 +31,7 @@
 
 DECLARE_int32(num_expected_executors);
 DECLARE_string(expected_executor_group_sets);
+DECLARE_string(cluster_id);
 
 namespace {
 using namespace impala;
@@ -90,6 +91,11 @@ ClusterMembershipMgr::ClusterMembershipMgr(
     current_membership_(std::make_shared<const Snapshot>()),
     statestore_subscriber_(subscriber),
     local_backend_id_(move(local_backend_id)) {
+  if (FLAGS_cluster_id.empty()) {
+    membership_topic_name_ = Statestore::IMPALA_MEMBERSHIP_TOPIC;
+  } else {
+    membership_topic_name_ = FLAGS_cluster_id + '-' + 
Statestore::IMPALA_MEMBERSHIP_TOPIC;
+  }
   Status status = PopulateExpectedExecGroupSets(expected_exec_group_sets_);
   if(!status.ok()) {
     LOG(FATAL) << "Error populating expected executor group sets: " << status;
@@ -132,7 +138,7 @@ Status ClusterMembershipMgr::Init() {
   StatestoreSubscriber::UpdateCallback cb =
       bind<void>(mem_fn(&ClusterMembershipMgr::UpdateMembership), this, _1, 
_2);
   Status status = statestore_subscriber_->AddTopic(
-      Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
+      membership_topic_name_, /* is_transient=*/ true,
       /* populate_min_subscriber_topic_version=*/ false,
       /* filter_prefix= */"", cb);
   if (!status.ok()) {
@@ -196,7 +202,7 @@ void ClusterMembershipMgr::UpdateMembership(
 
   // First look to see if the topic we're interested in has an update.
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
+      incoming_topic_deltas.find(membership_topic_name_);
 
   // Ignore spurious messages.
   if (topic == incoming_topic_deltas.end()) return;
@@ -526,7 +532,7 @@ void ClusterMembershipMgr::AddLocalBackendToStatestore(
 
   subscriber_topic_updates->emplace_back(TTopicDelta());
   TTopicDelta& update = subscriber_topic_updates->back();
-  update.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
+  update.topic_name = membership_topic_name_;
   update.topic_entries.emplace_back(TTopicItem());
   // Setting this flag allows us to pass the resulting topic update to other
   // ClusterMembershipMgr instances in tests unmodified.
diff --git a/be/src/scheduling/cluster-membership-mgr.h 
b/be/src/scheduling/cluster-membership-mgr.h
index bfbb7e453..8963933b3 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -286,6 +286,8 @@ class ClusterMembershipMgr {
   /// 'current_membership_lock_'.
   mutable std::mutex callback_fn_lock_;
 
+  std::string membership_topic_name_;
+
   friend class impala::test::SchedulerWrapper;
   friend class 
ClusterMembershipMgrUnitTest_TestPopulateExpectedExecGroupSets_Test;
 };
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index c150bb1aa..ce78f9119 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -21,6 +21,7 @@
 #include <tuple>
 #include <utility>
 
+#include <boost/algorithm/string/predicate.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/lexical_cast.hpp>
 #include <thrift/Thrift.h>
@@ -48,6 +49,7 @@
 
 #include "common/names.h"
 
+using boost::algorithm::ends_with;
 using boost::posix_time::seconds;
 using boost::shared_lock;
 using boost::shared_mutex;
@@ -1230,7 +1232,8 @@ Statestore::TopicEntry::Version 
Statestore::GetMinSubscriberTopicVersion(
 }
 
 bool Statestore::IsPrioritizedTopic(const string& topic) {
-  return topic == IMPALA_MEMBERSHIP_TOPIC || topic == 
IMPALA_REQUEST_QUEUE_TOPIC;
+  return ends_with(topic, IMPALA_MEMBERSHIP_TOPIC)
+      || ends_with(topic, IMPALA_REQUEST_QUEUE_TOPIC);
 }
 
 const char* Statestore::GetUpdateKindName(UpdateKind kind) {
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 9d2ad99af..5c23a7252 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -86,8 +86,11 @@ parser.add_option("--force_kill", dest="force_kill", 
action="store_true", defaul
                   help="Force kill impalad and statestore processes.")
 parser.add_option("-a", "--add_executors", dest="add_executors",
                   action="store_true", default=False,
-                  help="Start additional impalad processes. The executor group 
name must "
-                  "be specified using --impalad_args")
+                  help="Start additional executors. The executor group name 
must be"
+                  "specified using --impalad_args")
+parser.add_option("--add_impalads", dest="add_impalads",
+                  action="store_true", default=False,
+                  help="Start additional impalad processes.")
 parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only",
                   action="store_true", default=False,
                   help="Restarts only the impalad processes")
@@ -534,7 +537,7 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, 
use_exclusive_coordi
           timeout=DISCONNECTED_SESSION_TIMEOUT,
           args=args)
 
-    if i >= num_coordinators:
+    if i - start_idx >= num_coordinators:
       args = "-is_coordinator=false {args}".format(args=args)
     elif use_exclusive_coordinators:
       # Coordinator instance that doesn't execute non-coordinator fragments
@@ -1090,7 +1093,7 @@ if __name__ == "__main__":
     cluster_ops.kill_all_catalogds(force=options.force_kill)
   elif options.restart_statestored_only:
     cluster_ops.kill_all_statestoreds(force=options.force_kill)
-  elif options.add_executors:
+  elif options.add_executors or options.add_impalads:
     pass
   else:
     cluster_ops.kill_all_daemons(force=options.force_kill)
@@ -1122,6 +1125,11 @@ if __name__ == "__main__":
       cluster_ops.start_impalads(options.cluster_size, num_coordinators,
                                  use_exclusive_coordinators, 
existing_cluster_size)
       expected_cluster_size += existing_cluster_size
+    elif options.add_impalads:
+      cluster_ops.start_impalads(options.cluster_size, 
options.num_coordinators,
+                                 options.use_exclusive_coordinators,
+                                 existing_cluster_size)
+      expected_cluster_size += existing_cluster_size
     else:
       cluster_ops.start_statestore()
       cluster_ops.start_catalogd()
@@ -1138,8 +1146,12 @@ if __name__ == "__main__":
       for delay in options.catalog_init_delays.split(","):
         if int(delay.strip()) != 0: expected_catalog_delays += 1
     # Check for the cluster to be ready.
-    impala_cluster.wait_until_ready(expected_cluster_size,
-        expected_cluster_size - expected_catalog_delays)
+    expected_num_ready_impalads = expected_cluster_size - 
expected_catalog_delays
+    if options.add_impalads:
+      # TODO: This is a hack to make the waiting logic work. We'd better add a 
dedicated
+      # option for adding a new cluster using the existing catalogd and 
statestore.
+      expected_num_ready_impalads = options.cluster_size
+    impala_cluster.wait_until_ready(expected_cluster_size, 
expected_num_ready_impalads)
   except Exception as e:
     LOG.exception("Error starting cluster")
     sys.exit(1)
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index 914d56603..50005a417 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -309,6 +309,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
                             num_coordinators=NUM_COORDINATORS,
                             use_exclusive_coordinators=False,
                             add_executors=False,
+                            add_impalads=False,
                             log_level=1,
                             expected_num_impalads=DEFAULT_CLUSTER_SIZE,
                             expected_subscribers=0,
@@ -338,6 +339,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if add_executors:
       cmd.append("--add_executors")
 
+    if add_impalads:
+      cmd.append("--add_impalads")
+
     if pytest.config.option.use_local_catalog:
       cmd.append("--impalad_args=--use_local_catalog=1")
       cmd.append("--catalogd_args=--catalog_topic_mode=minimal")
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index e5122676a..901a7ed03 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -316,7 +316,16 @@ class ImpalaTestSuite(BaseTestSuite):
 
   @classmethod
   def create_client_for_nth_impalad(cls, nth=0, protocol='beeswax'):
-    host_port = cls.__get_cluster_host_ports(protocol)[nth]
+    host_ports = cls.__get_cluster_host_ports(protocol)
+    if nth < len(IMPALAD_HOST_PORT_LIST):
+      host_port = host_ports[nth]
+    else:
+      # IMPALAD_HOST_PORT_LIST just has 3 items. When we start more impalads, 
calculate
+      # the ports based on the first item.
+      host_port = host_ports[0]
+      host, port = host_port.split(':')
+      port = str(int(port) + nth)
+      host_port = host + ':' + port
     return ImpalaTestSuite.create_impala_client(host_port, protocol=protocol)
 
   @classmethod
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index 69f0f514f..8820f4270 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -252,7 +252,8 @@ class TestQueryLive(CustomClusterTestSuite):
   def test_executor_groups(self):
     """Asserts scans are performed only on coordinators with multiple executor 
groups."""
     # Add a (non-dedicated) coordinator and executor in a different executor 
group.
-    
self._start_impala_cluster(options=['--impalad_args=--executor_groups=extra'],
+    
self._start_impala_cluster(options=['--impalad_args=--executor_groups=extra',
+                                        
'--impalad_args=--cluster_id=test_query_live'],
                                cluster_size=1,
                                add_executors=True,
                                expected_num_impalads=4)
diff --git a/tests/custom_cluster/test_shared_catalogd.py 
b/tests/custom_cluster/test_shared_catalogd.py
new file mode 100644
index 000000000..9ac629b55
--- /dev/null
+++ b/tests/custom_cluster/test_shared_catalogd.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env impala-python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+import re
+
+
+class TestSharedCatalogd(CustomClusterTestSuite):
+  """Test sharing catalogd across Impala clusters"""
+
+  def setup_method(self, method):
+    super(TestSharedCatalogd, self).setup_method(method)
+    self.coordinator = self.cluster.impalads[0]
+
+  @CustomClusterTestSuite.with_args(impalad_args="-cluster_id=cluster1")
+  def test_disjiont_clusters(self, unique_database):
+    """Tests that two Impala clusters can share catalogd and statestore."""
+    # Start a new cluster of 3 impalads using a new cluster id.
+    self._start_impala_cluster(["--impalad_args=-cluster_id=cluster2"],
+                               cluster_size=3,
+                               num_coordinators=3,
+                               add_impalads=True,
+                               expected_num_impalads=3,
+                               expected_subscribers=7)
+    cluster2 = self.cluster.impalads[3:]
+    assert len(cluster2) == 3
+    cluster2_client = self.create_client_for_nth_impalad(3)
+    result = cluster2_client.execute("select count(*) from 
functional.alltypes",
+                                     fetch_profile_after_close=True)
+    # Verify the query runs in the new cluster
+    match = re.search(r'Per Host Number of Fragment Instances: (.*)\n',
+                      result.runtime_profile)
+    assert match is not None
+    # Examples:
+    # nodes = ['quanlong-OptiPlex-BJ:27005(1)', 
'quanlong-OptiPlex-BJ:27004(1)',
+    #          'quanlong-OptiPlex-BJ:27003(2)']
+    # port_fis = ['27005(1)', '27004(1)', '27003(2)']
+    nodes = match.group(1).split()
+    assert len(nodes) == 3
+    port_fis = [n.split(':')[1] for n in nodes]
+    assert "27003(2)" in port_fis
+    assert "27004(1)" in port_fis
+    assert "27005(1)" in port_fis
+
+    # Create a table in cluster1 with sync_ddl=true. The table should be 
visible
+    # in cluster2 immediately.
+    self.execute_query("create table {}.tbl(i int)".format(unique_database),
+                       query_options={"sync_ddl": True})
+    result = cluster2_client.execute("describe {}.tbl".format(unique_database))
+    assert result.data == ['i\tint\t']

Reply via email to