This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 819db8fa4 IMPALA-12155: Support High Availability for CatalogD
819db8fa4 is described below

commit 819db8fa4667e06d1a56fe08baddfbc26983d389
Author: wzhou-code <[email protected]>
AuthorDate: Wed Jun 7 23:13:24 2023 -0700

    IMPALA-12155: Support High Availability for CatalogD
    
    To support catalog HA, we allow two catalogd instances in an Active-
    Passive HA pair to be added to an Impala cluster.
    We add the preemptive behavior for catalogd. When enabled, the
    preemptive behavior allows the catalogd with the higher priority to
    become active and the paired catalogd becomes standby. The active
    catalogd acts as the source of metadata and provides catalog service
    for the Impala cluster.
    
    To enable catalog HA for a cluster, two catalogds in the HA pair and
    statestore must be started with starting flag "enable_catalogd_ha".
    
    The catalogd in an Active-Passive HA pair can be assigned an instance
    priority value to indicate a preference for which catalogd should assume
    the active role. The registration ID which is assigned by statestore can
    be used as instance priority value. The lower numerical value in
    registration ID corresponds to a higher priority. The catalogd with the
    higher priority is designated as active, the other catalogd is
    designated as standby. Only the active catalogd propagates the
    IMPALA_CATALOG_TOPIC to the cluster. This guarantees only one writer for
    the IMPALA_CATALOG_TOPIC in a Impala cluster.
    
    The statestore which is the registration center of an Impala cluster
    assigns the roles for the catalogd in the HA pair after both catalogds
    register to statestore. When statestore detects the active catalogd is
    not healthy, it fails over catalog service to standby catalogd. When
    failover occurs, statestore sends notifications with the address of
    active catalogd to all coordinators and catalogd in the cluster. The
    events are logged in the statestore and catalogd logs. When the catalogd
    with the higher priority recovers from a failure, statestore does not
    resume it as active to avoid flip-flop between the two catalogd.
    
    To make a specific catalogd in the HA pair as active instance, the
    catalogd must be started with starting flag "force_catalogd_active" so
    that the catalogd will be assigned with active role when it registers
    to statestore. This allows administrator to manually perform catalog
    service failover.
    
    Added option "--enable_catalogd_ha" in bin/start-impala-cluster.py.
    If the option is specified when running the script, the script will
    create an Impala cluster with two catalogd instances in HA pair.
    
    Testing:
     - Passed the core tests.
     - Added unit-test for auto failover and manual failover.
    
    Change-Id: I68ce7e57014e2a01133aede7853a212d90688ddd
    Reviewed-on: http://gerrit.cloudera.org:8080/19914
    Reviewed-by: Xiang Yang <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Tamas Mate <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |  69 ++++-
 be/src/catalog/catalog-server.h                    |  57 ++++-
 be/src/catalog/catalogd-main.cc                    |  15 +-
 be/src/common/global-flags.cc                      |  21 ++
 be/src/runtime/exec-env.cc                         |  20 +-
 be/src/runtime/exec-env.h                          |   3 +
 be/src/statestore/CMakeLists.txt                   |   2 +
 be/src/statestore/statestore-catalogd-mgr.cc       | 228 +++++++++++++++++
 be/src/statestore/statestore-catalogd-mgr.h        | 125 +++++++++
 be/src/statestore/statestore-subscriber-catalog.cc |  38 +++
 be/src/statestore/statestore-subscriber-catalog.h  |   8 +-
 be/src/statestore/statestore-subscriber.cc         |  34 +--
 be/src/statestore/statestore-subscriber.h          |  20 +-
 be/src/statestore/statestore.cc                    | 101 ++++----
 be/src/statestore/statestore.h                     |  62 +----
 be/src/util/impalad-metrics.cc                     |   5 +
 be/src/util/impalad-metrics.h                      |   4 +
 bin/start-impala-cluster.py                        | 125 +++++++--
 common/thrift/StatestoreService.thrift             |   6 +
 common/thrift/metrics.json                         |  50 ++++
 tests/common/custom_cluster_test_suite.py          |   6 +-
 tests/common/impala_cluster.py                     |  46 ++--
 tests/common/impala_service.py                     |   3 +
 tests/custom_cluster/test_catalogd_ha.py           | 280 +++++++++++++++++++++
 24 files changed, 1145 insertions(+), 183 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 2da354573..3e639737e 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -137,6 +137,8 @@ DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
 DECLARE_string(hostname);
 DECLARE_bool(compact_catalog_topic);
+DECLARE_bool(enable_catalogd_ha);
+DECLARE_bool(force_catalogd_active);
 
 #ifndef NDEBUG
 DECLARE_int32(stress_catalog_startup_delay_ms);
@@ -148,9 +150,11 @@ string CatalogServer::IMPALA_CATALOG_TOPIC = 
"catalog-update";
 
 const string CATALOG_SERVER_TOPIC_PROCESSING_TIMES =
     "catalog-server.topic-processing-time-s";
-
 const string CATALOG_SERVER_PARTIAL_FETCH_RPC_QUEUE_LEN =
     "catalog.partial-fetch-rpc.queue-len";
+const string CATALOG_HA_ACTIVE_STATUS = "catalog-server.ha-active-status";
+const string CATALOG_HA_NUM_ACTIVE_STATUS_CHANGE =
+    "catalog-server.ha-number-active-status-change";
 
 const string CATALOG_WEB_PAGE = "/catalog";
 const string CATALOG_TEMPLATE = "catalog.tmpl";
@@ -340,6 +344,11 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
       CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
   partial_fetch_rpc_queue_len_metric_ =
       metrics->AddGauge(CATALOG_SERVER_PARTIAL_FETCH_RPC_QUEUE_LEN, 0);
+  ha_active_status_metric_ =
+      metrics->AddProperty(CATALOG_HA_ACTIVE_STATUS, 
!FLAGS_enable_catalogd_ha);
+  num_ha_active_status_change_metric_ =
+      metrics->AddCounter(CATALOG_HA_NUM_ACTIVE_STATUS_CHANGE, 0);
+  is_active_.Store(FLAGS_enable_catalogd_ha ? 0 : 1);
 }
 
 Status CatalogServer::Start() {
@@ -382,8 +391,22 @@ Status CatalogServer::Start() {
     status.AddDetail("CatalogService failed to start");
     return status;
   }
-
-  RETURN_IF_ERROR(statestore_subscriber_->Start());
+  // Add callback to handle notification of updating catalogd from Statestore.
+  StatestoreSubscriber::UpdateCatalogdCallback update_catalogd_cb =
+      bind<void>(mem_fn(&CatalogServer::UpdateRegisteredCatalogd), this, _1);
+  statestore_subscriber_->AddUpdateCatalogdTopic(update_catalogd_cb);
+
+  bool has_active_catalogd = false;
+  TCatalogRegistration active_catalogd_registration;
+  RETURN_IF_ERROR(
+      statestore_subscriber_->Start(&has_active_catalogd, 
&active_catalogd_registration));
+  if (FLAGS_enable_catalogd_ha && has_active_catalogd) {
+    UpdateRegisteredCatalogd(active_catalogd_registration);
+    if (FLAGS_force_catalogd_active && is_active_.Load() != 1) {
+      LOG(ERROR) << "Could not start CatalogD as active instance";
+      return Status("Could not start CatalogD as active instance");
+    }
+  }
 
   // Notify the thread to start for the first time.
   {
@@ -422,6 +445,9 @@ void CatalogServer::RegisterWebpages(Webserver* webserver, 
bool metrics_only) {
 void CatalogServer::UpdateCatalogTopicCallback(
     const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
     vector<TTopicDelta>* subscriber_topic_updates) {
+  // Don't update catalog if this instance is not active.
+  if (is_active_.Load() == 0) return;
+
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
       incoming_topic_deltas.find(CatalogServer::IMPALA_CATALOG_TOPIC);
   if (topic == incoming_topic_deltas.end()) return;
@@ -468,6 +494,43 @@ void CatalogServer::UpdateCatalogTopicCallback(
   catalog_update_cv_.NotifyOne();
 }
 
+void CatalogServer::UpdateRegisteredCatalogd(
+    const TCatalogRegistration& catalogd_registration) {
+  if (catalogd_registration.address.hostname.empty()
+      || catalogd_registration.address.port == 0) {
+    return;
+  }
+  LOG(INFO) << "Get notification of active catalogd: "
+            << TNetworkAddressToString(catalogd_registration.address);
+  bool is_matching = (catalogd_registration.address.hostname == FLAGS_hostname
+      && catalogd_registration.address.port == FLAGS_catalog_service_port);
+  bool is_changed;
+  if (is_matching) {
+    is_changed = is_active_.CompareAndSwap(0, 1);
+  } else {
+    is_changed = is_active_.CompareAndSwap(1, 0);
+  }
+  if (is_changed) {
+    {
+      unique_lock<mutex> unique_lock(catalog_lock_);
+      bool is_active = (is_active_.Load() != 0);
+      if (is_active) {
+        // Reset last_sent_catalog_version_ when the catalogd become active. 
This will
+        // lead to non-delta catalog update for next IMPALA_CATALOG_TOPIC 
which also
+        // instruct the statestore to clear all entries for the catalog update 
topic.
+        last_sent_catalog_version_ = 0;
+        // Signal the catalog update gathering thread to start.
+        topic_updates_ready_ = false;
+        catalog_update_cv_.NotifyOne();
+      }
+      ha_active_status_metric_->SetValue(is_active);
+    }
+    num_ha_active_status_change_metric_->Increment(1);
+    LOG(INFO) << "The role of catalogd instance is changed to "
+              << (is_matching ? "active" : "standby");
+  }
+}
+
 [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
   while (true) {
     unique_lock<mutex> unique_lock(catalog_lock_);
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 9abe9f8d6..ce1e7378f 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -24,6 +24,7 @@
 #include <boost/unordered_set.hpp>
 
 #include "catalog/catalog.h"
+#include "common/atomic.h"
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
@@ -54,11 +55,45 @@ class Catalog;
 /// cache over JNI to get the current state of the catalog. Any updates are 
broadcast to
 /// the rest of the cluster using the Statestore over the IMPALA_CATALOG_TOPIC.
 /// The CatalogServer must be the only writer to the IMPALA_CATALOG_TOPIC, 
meaning there
-/// cannot be multiple CatalogServers running at the same time, as the 
correctness of delta
-/// updates relies upon this assumption.
-/// TODO: In the future the CatalogServer could go into a "standby" mode if it 
detects
-/// updates from another writer on the topic. This is a bit tricky because it 
requires
-/// some basic form of leader election.
+/// cannot be multiple CatalogServers running at the same time, as the 
correctness of
+/// delta updates relies upon this assumption.
+///
+/// Catalog HA:
+/// To support catalog HA, we add the preemptive behavior for catalogd. When 
enabled,
+/// the preemptive behavior allows the catalogd with the higher priority to 
become active
+/// and the paired catalogd becomes standby. The active catalogd acts as the 
source of
+/// metadata and provides catalog service for the Impala cluster.
+/// By default, preemption is disabled on the catalogd which is running as 
single catalog
+/// instance in an Impala cluster. For deployment with catalog HA, the 
preemption must
+/// be enabled with starting flag "enable_catalogd_ha" on both the catalogd in 
the HA pair
+/// and statestore.
+/// The catalogd in an Active-Passive HA pair can be assigned an instance 
priority value
+/// to indicate a preference for which catalogd should assume the active role. 
The
+/// registration ID which is assigned by statestore can be used as instance 
priority
+/// value. The lower numerical value in registration ID corresponds to a 
higher priority.
+/// The catalogd with the higher priority is designated as active, the other 
catalogd is
+/// designated as standby. Only the active catalogd propagates the 
IMPALA_CATALOG_TOPIC
+/// to the cluster. This guarantees only one writer for the 
IMPALA_CATALOG_TOPIC in a
+/// Impala cluster.
+/// Statestore only send the IMPALA_CATALOG_TOPIC messages to active catalogd. 
Also
+/// catalogds are registered as writer for IMPALA_CATALOG_TOPIC so the standby 
catalogd
+/// does not receive catalog updates from statestore. When standby catalogd 
becomes
+/// active, its "last_sent_catalog_version_" member variable is reset. This 
will lead to
+/// non-delta catalog update for next IMPALA_CATALOG_TOPIC which also 
instructs the
+/// statestore to clear all entries for the catalog update topic, so that 
statestore keeps
+/// in-sync with new active catalogd for the catalog update topic.
+/// statestore which is the registration center of an Impala cluster assigns 
the roles
+/// for the catalogd in the HA pair after both catalogd register to 
statestore. When
+/// statestore detects the active catalogd is not healthy, it fails over 
catalog service
+/// to standby catalogd. When failover occurs, statestore sends notifications 
with the
+/// address of active catalogd to all coordinators and catalogd in the 
cluster. The event
+/// is logged in the statestore and catalogd logs. When the catalogd with the 
higher
+/// priority recovers from a failure, statestore does not resume it as active 
to avoid
+/// flip-flop between the two catalogd.
+/// To make a specific catalogd in the HA pair as active instance, the 
catalogd must be
+/// started with starting flag "force_catalogd_active" so that the catalogd 
will be
+/// assigned with active role when it registers to statestore. This allows 
administrator
+/// to manually perform catalog service failover.
 class CatalogServer {
  public:
   static std::string IMPALA_CATALOG_TOPIC;
@@ -99,6 +134,9 @@ class CatalogServer {
   /// Indicates whether the catalog service is ready.
   std::atomic_bool service_started_{false};
 
+  /// Set to 1 if this catalog instance is active.
+  AtomicInt32 is_active_{0};
+
   /// Thrift API implementation which proxies requests onto this 
CatalogService.
   std::shared_ptr<CatalogServiceIf> thrift_iface_;
   ThriftSerializer thrift_serializer_;
@@ -112,6 +150,12 @@ class CatalogServer {
   /// Tracks the partial fetch RPC call queue length on the Catalog server.
   IntGauge* partial_fetch_rpc_queue_len_metric_;
 
+  /// Metric that tracks if this catalogd is active when catalogd HA is 
enabled.
+  BooleanProperty* ha_active_status_metric_;
+
+  /// Metric to count the number of active status changes.
+  IntCounter* num_ha_active_status_change_metric_;
+
   /// Thread that polls the catalog for any updates.
   std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
@@ -162,6 +206,9 @@ class CatalogServer {
       const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
 
+  /// Callback function for receiving notification of updating catalogd.
+  void UpdateRegisteredCatalogd(const TCatalogRegistration& 
catalogd_registration);
+
   /// Executed by the catalog_update_gathering_thread_. Calls into JniCatalog
   /// to get the latest set of catalog objects that exist, along with some 
metadata on
   /// each object. The results are stored in the shared catalog_objects_ data 
structure.
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index 026eb29f4..a668480ee 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -17,6 +17,7 @@
 //
 // This file contains the main() function for the catalog daemon process,
 
+#include <gflags/gflags.h>
 #include <jni.h>
 
 #include "catalog/catalog-server.h"
@@ -50,8 +51,18 @@ using namespace impala;
 using namespace apache::thrift;
 
 int CatalogdMain(int argc, char** argv) {
-  FLAGS_webserver_port = 25020;
-  FLAGS_state_store_subscriber_port = 23020;
+  // Set webserver_port as 25020 and state_store_subscriber_port as 23020 for 
catalogd
+  // if and only if these two ports had not been set explicitly in command 
line.
+  // An Impala cluster could be launched with more than one catalogd instances 
when
+  // CatalogD HA is enabled. These two ports should be set explicitly in 
command line
+  // with different values for each catalogd instance when launching a 
mini-cluster.
+  if (google::GetCommandLineFlagInfoOrDie("webserver_port").is_default) {
+    FLAGS_webserver_port = 25020;
+  }
+  if 
(google::GetCommandLineFlagInfoOrDie("state_store_subscriber_port").is_default) 
{
+    FLAGS_state_store_subscriber_port = 23020;
+  }
+
   InitCommonRuntime(argc, argv, true);
   InitFeSupport();
 
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 822617e4c..0b1515f4f 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -384,6 +384,27 @@ DEFINE_bool(tolerate_statestore_startup_delay, false, "If 
set to true, the subsc
     "startup. But instead it enters into Recovery mode, it will loop, sleep 
and retry "
     "till it successfully registers with the statestore.");
 
+// Starting flags for CatalogD High Availability
+DEFINE_bool(enable_catalogd_ha, false, "Set to true to enable CatalogD HA");
+DEFINE_bool(force_catalogd_active, false, "Set to true to force this catalogd 
instance "
+    "to take active role. It's used to perform manual fail over for catalog 
service.");
+// Use subscriber-id which is built with network address as priority value of 
catalogd
+// instance when designating active catalogd. The lower subscriber_id (i.e. 
lower network
+// address) corresponds to a higher priority.
+// This is mainly used in unit-test for predictable results.
+DEFINE_bool(use_subscriber_id_as_catalogd_priority, false, "Subscriber-id is 
used as "
+    "priority value of catalogd instance if this is set as true. Otherwise, "
+    "registration_id which is generated as random number will be used as 
priority value "
+    "of catalogd instance.");
+// Waiting period in ms for HA preemption. It should be set with proper value 
based on the
+// time to take for bringing a catalogd instance in-line in the deployment 
environment.
+DEFINE_int64(catalogd_ha_preemption_wait_period_ms, 10000, "(Advanced) The 
time after "
+    "which statestore designates the first registered catalogd as active if 
statestore "
+    "does not receive registration request from the second catalogd.");
+DEFINE_int64(active_catalogd_designation_monitoring_frequency_ms, 100, 
"(Advanced) "
+    "Frequency (in ms) with which the statestore monitors if active catalogd 
is "
+    "designated.");
+
 // TGeospatialLibrary's values are mapped here as constants
 static const string geo_lib_none = "NONE";
 static const string geo_lib_hive_esri = "HIVE_ESRI";
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 79be4b1f2..28597f1d5 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -514,17 +514,17 @@ Status ExecEnv::StartStatestoreSubscriberService() {
 
   // Must happen after all topic registrations / callbacks are done
   if (statestore_subscriber_.get() != nullptr) {
-    bool has_registered_catalogd = false;
-    TCatalogRegistration catalogd_registration;
-    Status status =
-        statestore_subscriber_->Start(&has_registered_catalogd, 
&catalogd_registration);
+    bool has_active_catalogd = false;
+    TCatalogRegistration active_catalogd_registration;
+    Status status = statestore_subscriber_->Start(
+        &has_active_catalogd, &active_catalogd_registration);
     if (!status.ok()) {
       status.AddDetail("Statestore subscriber did not start up.");
       return status;
     }
     if (statestore_subscriber_->IsRegistered()) 
SetStatestoreRegistrationCompleted();
-    if (has_registered_catalogd && FLAGS_is_coordinator) {
-      UpdateCatalogd(catalogd_registration);
+    if (has_active_catalogd && FLAGS_is_coordinator) {
+      UpdateCatalogd(active_catalogd_registration);
     }
   }
 
@@ -693,6 +693,12 @@ void ExecEnv::UpdateCatalogd(const TCatalogRegistration& 
catalogd_registration)
   }
   std::lock_guard<std::mutex> l(catalogd_address_lock_);
   DCHECK(catalogd_address_.get() != nullptr);
+  if (!is_catalogd_address_metric_set_) {
+    // At least set the metric once.
+    is_catalogd_address_metric_set_ = true;
+    ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS->SetValue(
+        TNetworkAddressToString(catalogd_registration.address));
+  }
   bool is_matching = (catalogd_registration.address.port == 
catalogd_address_->port
       && catalogd_registration.address.hostname == 
catalogd_address_->hostname);
   if (!is_matching) {
@@ -701,6 +707,8 @@ void ExecEnv::UpdateCatalogd(const TCatalogRegistration& 
catalogd_registration)
               << " to " << 
TNetworkAddressToString(catalogd_registration.address);
     catalogd_address_ =
         std::make_shared<const TNetworkAddress>(catalogd_registration.address);
+    ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS->SetValue(
+        TNetworkAddressToString(catalogd_registration.address));
   }
 }
 
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 521b81615..26579f72b 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -321,6 +321,9 @@ class ExecEnv {
   /// Current address of Catalog service
   std::shared_ptr<const TNetworkAddress> catalogd_address_;
 
+  /// Flag that indicate if the metric for catalogd address has been set.
+  bool is_catalogd_address_metric_set_ = false;
+
   /// Protects catalogd_address_.
   mutable std::mutex catalogd_address_lock_;
 
diff --git a/be/src/statestore/CMakeLists.txt b/be/src/statestore/CMakeLists.txt
index 65c26dc5c..028b81d07 100644
--- a/be/src/statestore/CMakeLists.txt
+++ b/be/src/statestore/CMakeLists.txt
@@ -29,6 +29,8 @@ add_library(Statestore
   failure-detector.cc
   ${STATESTORE_SERVICE_PROTO_SRCS}
   statestore.cc
+  statestore-catalogd-mgr.cc
+  statestore-subscriber-catalog.cc
   statestore-subscriber.cc
   statestored-main.cc
 )
diff --git a/be/src/statestore/statestore-catalogd-mgr.cc 
b/be/src/statestore/statestore-catalogd-mgr.cc
new file mode 100644
index 000000000..8af3b5244
--- /dev/null
+++ b/be/src/statestore/statestore-catalogd-mgr.cc
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "statestore/statestore-catalogd-mgr.h"
+
+#include "gen-cpp/Types_types.h"
+#include "util/container-util.h"
+#include "util/time.h"
+
+using namespace impala;
+
+DECLARE_bool(use_subscriber_id_as_catalogd_priority);
+DECLARE_int64(catalogd_ha_preemption_wait_period_ms);
+
+#define COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(NAME1, NAME2)     \
+  do {                                                                     \
+    NAME1##_catalogd_subscriber_id_ = NAME2##_catalogd_subscriber_id_;     \
+    NAME1##_catalogd_registration_id_ = NAME2##_catalogd_registration_id_; \
+    NAME1##_catalogd_registration_ = NAME2##_catalogd_registration_;       \
+  } while (false)
+
+#define COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(NAME) \
+  do {                                                        \
+    NAME##_catalogd_subscriber_id_ = subscriber_id;           \
+    NAME##_catalogd_registration_id_ = registration_id;       \
+    NAME##_catalogd_registration_ = catalogd_registration;    \
+  } while (false)
+
+#define RESET_CATALOGD_REGISTRATION_MEMBER_VARIABLES(NAME)    \
+  do {                                                        \
+    NAME##_catalogd_subscriber_id_ = "";                      \
+    NAME##_catalogd_registration_id_ = TUniqueId();           \
+    NAME##_catalogd_registration_ = TCatalogRegistration();   \
+  } while (false)
+
+bool StatestoreCatalogdMgr::RegisterCatalogd(bool is_reregistering,
+    const SubscriberId& subscriber_id,
+    const RegistrationId& registration_id,
+    const TCatalogRegistration& catalogd_registration) {
+  std::lock_guard<std::mutex> l(catalog_mgr_lock_);
+  if (!enable_catalogd_ha_) {
+    // CatalogD HA is not enabled.
+    num_registered_catalogd_++;
+    DCHECK(num_registered_catalogd_ < 2);
+    is_active_catalogd_assigned_ = true;
+    COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
+    return true;
+  }
+
+  if (is_reregistering) {
+    if (num_registered_catalogd_ == 2) {
+      DCHECK(is_active_catalogd_assigned_);
+      if (subscriber_id == standby_catalogd_subscriber_id_
+          && catalogd_registration.force_catalogd_active) {
+        // Re-register standby catalogd as active one.
+        COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(standby, active);
+        COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
+        LOG(INFO) << active_catalogd_subscriber_id_
+                  << " is re-registered with FLAGS_force_catalogd_active.";
+        return true;
+      }
+    } else {
+      DCHECK(num_registered_catalogd_ == 1 && first_catalogd_register_time_ != 
0);
+      if (!is_active_catalogd_assigned_
+          && (MonotonicMillis() - first_catalogd_register_time_
+              >= FLAGS_catalogd_ha_preemption_wait_period_ms)) {
+        is_active_catalogd_assigned_ = true;
+        COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
+        LOG(INFO) << active_catalogd_subscriber_id_
+                  << " is re-registered after HA preemption waiting period and 
"
+                  << "is assigned as active catalogd.";
+        return true;
+      }
+    }
+    // There is no role change during re-registration.
+    VLOG(3) << subscriber_id << " is re-registered, but there is no role 
change.";
+    return false;
+  }
+
+  if (num_registered_catalogd_ == 0) {
+    DCHECK(!is_active_catalogd_assigned_);
+    // First catalogd is registered.
+    num_registered_catalogd_++;
+    COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(first);
+    bool is_waiting_period_expired = false;
+    if (first_catalogd_register_time_ == 0) {
+      first_catalogd_register_time_ = MonotonicMillis();
+    } else if (MonotonicMillis() - first_catalogd_register_time_ >=
+        FLAGS_catalogd_ha_preemption_wait_period_ms) {
+      is_waiting_period_expired = true;
+    }
+    if (catalogd_registration.force_catalogd_active || 
is_waiting_period_expired) {
+      // Don't need to wait second catalogd if force_catalogd_active is true 
or the
+      // waiting period is expired.
+      is_active_catalogd_assigned_ = true;
+      COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
+      LOG(INFO) << active_catalogd_subscriber_id_ << " is assigned as active 
catalogd.";
+      return true;
+    }
+    // Wait second catalogd to be registered.
+    VLOG(3) << "Wait second catalogd to be registered during HA preemption 
waiting "
+            << "period.";
+  } else {
+    num_registered_catalogd_++;
+    DCHECK(num_registered_catalogd_ == 2);
+    if (catalogd_registration.force_catalogd_active) {
+      // Force to set the current one as active catalogd
+      if (is_active_catalogd_assigned_) {
+        COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(standby, active);
+      } else {
+        COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(standby, first);
+      }
+      is_active_catalogd_assigned_ = true;
+      COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
+      LOG(INFO) << active_catalogd_subscriber_id_
+                << " is registered with FLAGS_force_catalogd_active and is 
assigned as "
+                << "active catalogd.";
+      return true;
+    } else if (is_active_catalogd_assigned_) {
+      // Existing one is already assigned as active catalogd.
+      COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(standby);
+      VLOG(3) << "There is another catalogd already assigned as active 
catalogd.";
+    } else {
+      // Compare priority and assign the catalogd with high priority as active 
catalogd.
+      is_active_catalogd_assigned_ = true;
+      bool first_has_high_priority = 
FLAGS_use_subscriber_id_as_catalogd_priority
+          ? first_catalogd_subscriber_id_ < subscriber_id
+          : first_catalogd_registration_id_ < registration_id;
+      if (first_has_high_priority) {
+        COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(active, first);
+        COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(standby);
+      } else {
+        COPY_CATALOGD_REGISTRATION_FROM_LOCAL_VARIABLES(active);
+        COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(standby, first);
+      }
+      LOG(INFO) << active_catalogd_subscriber_id_
+                << " has higher priority and is assigned as active catalogd.";
+      return true;
+    }
+  }
+  return false;
+}
+
+bool StatestoreCatalogdMgr::CheckActiveCatalog() {
+  std::lock_guard<std::mutex> l(catalog_mgr_lock_);
+  if (is_active_catalogd_assigned_) {
+    return true;
+  } else if (num_registered_catalogd_ == 0
+      || first_catalogd_register_time_ == 0
+      || (MonotonicMillis() - first_catalogd_register_time_ <
+          FLAGS_catalogd_ha_preemption_wait_period_ms)) {
+    return false;
+  }
+  // Assign the first registered catalogd as active one.
+  DCHECK(num_registered_catalogd_ == 1);
+  is_active_catalogd_assigned_ = true;
+  COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(active, first);
+  LOG(INFO) << active_catalogd_subscriber_id_
+            << " is assigned as active catalogd after preemption waiting 
period.";
+  return true;
+}
+
+bool StatestoreCatalogdMgr::UnregisterCatalogd(
+    const SubscriberId& unregistered_subscriber_id) {
+  std::lock_guard<std::mutex> l(catalog_mgr_lock_);
+  num_registered_catalogd_--;
+  if (unregistered_subscriber_id == active_catalogd_subscriber_id_) {
+    // Unregister active catalogd.
+    DCHECK(is_active_catalogd_assigned_);
+    if (num_registered_catalogd_ > 0) {
+      // Fail over to standby catalogd
+      COPY_CATALOGD_REGISTRATION_FROM_MEMBER_VARIABLES(active, standby);
+      RESET_CATALOGD_REGISTRATION_MEMBER_VARIABLES(standby);
+      LOG(INFO) << "Fail over active catalogd to " << 
active_catalogd_subscriber_id_;
+      return true;
+    } else {
+      is_active_catalogd_assigned_ = false;
+      // Don't need to wait second one to be registered.
+      first_catalogd_register_time_ = MonotonicMillis() -
+          FLAGS_catalogd_ha_preemption_wait_period_ms -1;
+    }
+  } else if (num_registered_catalogd_ > 0) {
+    // Unregister standby catalogd.
+    DCHECK(unregistered_subscriber_id == standby_catalogd_subscriber_id_);
+    RESET_CATALOGD_REGISTRATION_MEMBER_VARIABLES(standby);
+    VLOG(3) << "Unregister standby catalogd " << unregistered_subscriber_id;
+  } else {
+    // Active catalogd has not been designated.
+    DCHECK(!is_active_catalogd_assigned_);
+  }
+  return false;
+}
+
+const TCatalogRegistration& 
StatestoreCatalogdMgr::GetActiveCatalogRegistration(
+    bool* has_active_catalogd) {
+  std::lock_guard<std::mutex> l(catalog_mgr_lock_);
+  *has_active_catalogd = is_active_catalogd_assigned_;
+  return active_catalogd_registration_;
+}
+
+const SubscriberId& StatestoreCatalogdMgr::GetActiveCatalogdSubscriberId() {
+  std::lock_guard<std::mutex> l(catalog_mgr_lock_);
+  return active_catalogd_subscriber_id_;
+}
+
+bool StatestoreCatalogdMgr::IsActiveCatalogd(const SubscriberId& 
subscriber_id) {
+  std::lock_guard<std::mutex> l(catalog_mgr_lock_);
+  return active_catalogd_subscriber_id_ == subscriber_id;
+}
+
+int64 StatestoreCatalogdMgr::GetSendingSequence() {
+  std::lock_guard<std::mutex> l(catalog_mgr_lock_);
+  return ++sending_sequence_;
+}
diff --git a/be/src/statestore/statestore-catalogd-mgr.h 
b/be/src/statestore/statestore-catalogd-mgr.h
new file mode 100644
index 000000000..00c4ee5d6
--- /dev/null
+++ b/be/src/statestore/statestore-catalogd-mgr.h
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <mutex>
+#include <string>
+
+#include "common/status.h"
+#include "gen-cpp/StatestoreService_types.h"
+
+namespace impala {
+
+class Status;
+
+/// A randomly generated number which is assigned to subscribers for each 
registration.
+typedef TUniqueId RegistrationId;
+
+/// A SubscriberId uniquely identifies a single subscriber, and is provided by 
the
+/// subscriber at registration time.
+typedef std::string SubscriberId;
+
+/// StatestoreCatalogdMgr:
+/// Tracks variety of bookkeeping information for Catalog daemon on statestore.
+/// It manages the designation of the active catalogd when CatalogD High 
Availability is
+/// enabled.
+class StatestoreCatalogdMgr {
+ public:
+  StatestoreCatalogdMgr(bool enable_catalogd_ha)
+    : enable_catalogd_ha_(enable_catalogd_ha),
+      is_active_catalogd_assigned_(false),
+      num_registered_catalogd_(0),
+      first_catalogd_register_time_(0),
+      sending_sequence_(0) {}
+
+  /// Register one catalogd.
+  /// Return true if new active catalogd is designated during this 
registration.
+  /// Note that the active role could be designated to the peer catalogd.
+  bool RegisterCatalogd(bool is_reregistering, const SubscriberId& 
subscriber_id,
+      const RegistrationId& registration_id,
+      const TCatalogRegistration& catalogd_registration);
+
+  /// Return true if active catalogd is already designated. Otherwise check if 
preemption
+  /// waiting period is expired. If the period is expired, assign the first 
registered
+  /// catalogd in active role and return true.
+  bool CheckActiveCatalog();
+
+  /// Unregister one catalogd. If it's active and there is a standby catalogd, 
fail over
+  /// the catalog service to standby catalogd.
+  /// Return true if failover of catalog service has happened in this call.
+  bool UnregisterCatalogd(const SubscriberId& unregistered_subscriber_id);
+
+  /// Return the protocol version of catalog service and address of active 
catalogd.
+  /// Set *has_active_catalogd as false if the active one is not designated 
yet.
+  const TCatalogRegistration& GetActiveCatalogRegistration(bool* 
has_active_catalogd);
+
+  /// Return the subscriber-id of active catalogd.
+  /// This function should be called after the active catalogd is designated.
+  const SubscriberId& GetActiveCatalogdSubscriberId();
+
+  /// Check if the subscriber with given subscriber_id is active catalogd.
+  bool IsActiveCatalogd(const SubscriberId&subscriber_id);
+
+  /// Return the mutex lock.
+  std::mutex* GetLock() { return &catalog_mgr_lock_; }
+
+  /// Get sending sequence number.
+  int64 GetSendingSequence();
+
+ private:
+  /// Protect all member variables.
+  std::mutex catalog_mgr_lock_;
+
+  /// Set to true if CatalogD HA is enabled.
+  bool enable_catalogd_ha_;
+
+  /// Indicate if the active catalogd has been assigned.
+  bool is_active_catalogd_assigned_;
+
+  /// Number of registered catalogd.
+  int num_registered_catalogd_;
+
+  /// The registering time of first catalogd.
+  int64_t first_catalogd_register_time_;
+  /// subscriber_id of the first registered catalogd
+  SubscriberId first_catalogd_subscriber_id_;
+  /// RegistrationId of the first registered catalogd
+  RegistrationId first_catalogd_registration_id_;
+  /// Additional registration info of first registered catalogd.
+  TCatalogRegistration first_catalogd_registration_;
+
+  /// subscriber_id of the active catalogd
+  SubscriberId active_catalogd_subscriber_id_;
+  /// RegistrationId of the active catalogd
+  RegistrationId active_catalogd_registration_id_;
+  /// Additional registration info of activ catalogd
+  TCatalogRegistration active_catalogd_registration_;
+
+  /// Following three variables are only valid if num_registered_catalogd_ == 
2.
+  /// subscriber_id of the standby catalogd
+  SubscriberId standby_catalogd_subscriber_id_;
+  /// RegistrationId of the standby catalogd
+  RegistrationId standby_catalogd_registration_id_;
+  /// Additional registration info of standby catalogd
+  TCatalogRegistration standby_catalogd_registration_;
+
+  /// Monotonically increasing sending sequence number.
+  int64 sending_sequence_;
+};
+
+} // namespace impala
diff --git a/be/src/statestore/statestore-subscriber-catalog.cc 
b/be/src/statestore/statestore-subscriber-catalog.cc
new file mode 100644
index 000000000..3f90bce20
--- /dev/null
+++ b/be/src/statestore/statestore-subscriber-catalog.cc
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "statestore/statestore-subscriber-catalog.h"
+
+using namespace impala;
+
+DECLARE_bool(enable_catalogd_ha);
+DECLARE_bool(force_catalogd_active);
+
+StatestoreSubscriberCatalog::StatestoreSubscriberCatalog(
+    const std::string& subscriber_id,
+    const TNetworkAddress& heartbeat_address,
+    const TNetworkAddress& statestore_address,
+    MetricGroup* metrics,
+    CatalogServiceVersion::type catalog_protocol_version,
+    const TNetworkAddress& catalogd_address)
+  : StatestoreSubscriber(subscriber_id, heartbeat_address, statestore_address, 
metrics,
+    TStatestoreSubscriberType::CATALOGD) {
+  catalogd_registration_.__set_protocol(catalog_protocol_version);
+  catalogd_registration_.__set_address(catalogd_address);
+  catalogd_registration_.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha);
+  
catalogd_registration_.__set_force_catalogd_active(FLAGS_force_catalogd_active);
+}
diff --git a/be/src/statestore/statestore-subscriber-catalog.h 
b/be/src/statestore/statestore-subscriber-catalog.h
index 759db297d..f6657f7f7 100644
--- a/be/src/statestore/statestore-subscriber-catalog.h
+++ b/be/src/statestore/statestore-subscriber-catalog.h
@@ -23,7 +23,6 @@ namespace impala {
 
 /// Statestore subscriber for Catalog service.
 /// Catalog-specific parameters for statestore registration.
-/// We will add more parameters when supporting CatalogD HA.
 class StatestoreSubscriberCatalog : public StatestoreSubscriber {
  public:
   /// Only constructor.
@@ -38,12 +37,7 @@ class StatestoreSubscriberCatalog : public 
StatestoreSubscriber {
       const TNetworkAddress& statestore_address,
       MetricGroup* metrics,
       CatalogServiceVersion::type catalog_protocol_version,
-      const TNetworkAddress& catalogd_address)
-    : StatestoreSubscriber(subscriber_id, heartbeat_address, 
statestore_address, metrics,
-        TStatestoreSubscriberType::CATALOGD) {
-    catalogd_registration_.__set_protocol(catalog_protocol_version);
-    catalogd_registration_.__set_address(catalogd_address);
-  }
+      const TNetworkAddress& catalogd_address);
 
   virtual ~StatestoreSubscriberCatalog() {}
 
diff --git a/be/src/statestore/statestore-subscriber.cc 
b/be/src/statestore/statestore-subscriber.cc
index 048a62a47..909914e6a 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -210,8 +210,8 @@ bool StatestoreSubscriber::IsRegistered() const {
   return statestore_->IsRegistered();
 }
 
-Status StatestoreSubscriber::Start(bool* has_registered_catalogd,
-    TCatalogRegistration* registered_catalogd_registration) {
+Status StatestoreSubscriber::Start(bool* has_active_catalogd,
+    TCatalogRegistration* active_catalogd_registration) {
   // Backend must be started before registration
   std::shared_ptr<TProcessor> processor(
       new StatestoreSubscriberProcessor(thrift_iface_));
@@ -240,7 +240,7 @@ Status StatestoreSubscriber::Start(bool* 
has_registered_catalogd,
   // Specify the port which the heartbeat server is listening on.
   heartbeat_address_.port = heartbeat_server_->port();
 
-  return statestore_->Start(has_registered_catalogd, 
registered_catalogd_registration);
+  return statestore_->Start(has_active_catalogd, active_catalogd_registration);
 }
 
 /// Set Register Request
@@ -357,8 +357,8 @@ void 
StatestoreSubscriber::StatestoreStub::AddCompleteRegistrationTopic(
   complete_registration_callbacks_.push_back(callback);
 }
 
-Status StatestoreSubscriber::StatestoreStub::Register(bool* 
has_registered_catalogd,
-    TCatalogRegistration* registered_catalogd_registration) {
+Status StatestoreSubscriber::StatestoreStub::Register(bool* 
has_active_catalogd,
+    TCatalogRegistration* active_catalogd_registration) {
   // Check protocol version of the statestore first.
   TGetProtocolVersionRequest get_protocol_request;
   TGetProtocolVersionResponse get_protocol_response;
@@ -451,11 +451,11 @@ Status 
StatestoreSubscriber::StatestoreStub::Register(bool* has_registered_catal
       VLOG(1) << "No statestore ID received from statestore";
     }
     if (status.ok() && response.__isset.catalogd_registration) {
-      VLOG(1) << "Registered catalogd address: "
+      VLOG(1) << "Active catalogd address: "
               << 
TNetworkAddressToString(response.catalogd_registration.address);
-      if (has_registered_catalogd != nullptr) *has_registered_catalogd = true;
-      if (registered_catalogd_registration != nullptr) {
-        *registered_catalogd_registration = response.catalogd_registration;
+      if (has_active_catalogd != nullptr) *has_active_catalogd = true;
+      if (active_catalogd_registration != nullptr) {
+        *active_catalogd_registration = response.catalogd_registration;
       }
     }
   }
@@ -463,8 +463,8 @@ Status StatestoreSubscriber::StatestoreStub::Register(bool* 
has_registered_catal
   return status;
 }
 
-Status StatestoreSubscriber::StatestoreStub::Start(bool* 
has_registered_catalogd,
-    TCatalogRegistration* registered_catalogd_registration) {
+Status StatestoreSubscriber::StatestoreStub::Start(bool* has_active_catalogd,
+    TCatalogRegistration* active_catalogd_registration) {
   Status status;
   {
     // Take the lock to ensure that, if a topic-update is received during 
registration
@@ -475,7 +475,7 @@ Status StatestoreSubscriber::StatestoreStub::Start(bool* 
has_registered_catalogd
     // Inject failure before registering to statestore.
     status = DebugAction(FLAGS_debug_actions, 
"REGISTER_STATESTORE_ON_STARTUP");
     if (status.ok()) {
-      status = Register(has_registered_catalogd, 
registered_catalogd_registration);
+      status = Register(has_active_catalogd, active_catalogd_registration);
     }
     if (status.ok()) {
       is_registered_ = true;
@@ -528,14 +528,14 @@ void 
StatestoreSubscriber::StatestoreStub::RecoveryModeChecker() {
       LOG(INFO) << subscriber_->subscriber_id_
                 << ": Connection with statestore lost, entering recovery mode";
       uint32_t attempt_count = 1;
-      bool has_registered_catalogd = false;
-      TCatalogRegistration registered_catalogd_registration;
+      bool has_active_catalogd = false;
+      TCatalogRegistration active_catalogd_registration;
       while (true) {
         LOG(INFO) << "Trying to re-register with statestore, attempt: "
                   << attempt_count++;
         re_registr_attempt_metric_->Increment(1);
         Status status =
-            Register(&has_registered_catalogd, 
&registered_catalogd_registration);
+            Register(&has_active_catalogd, &active_catalogd_registration);
         if (status.ok()) {
           if (!is_registered_) {
             is_registered_ = true;
@@ -550,9 +550,9 @@ void 
StatestoreSubscriber::StatestoreStub::RecoveryModeChecker() {
           failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
           LOG(INFO) << "Reconnected to statestore. Exiting recovery mode";
 
-          if (has_registered_catalogd) {
+          if (has_active_catalogd) {
             for (const UpdateCatalogdCallback& callback : 
update_catalogd_callbacks_) {
-              callback(registered_catalogd_registration);
+              callback(active_catalogd_registration);
             }
           }
           // Break out of enclosing while (true) to top of outer-scope loop.
diff --git a/be/src/statestore/statestore-subscriber.h 
b/be/src/statestore/statestore-subscriber.h
index 6e65359ea..45dd9c31b 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -129,14 +129,14 @@ class StatestoreSubscriber {
   /// Registers this subscriber with the statestore, and starts the
   /// heartbeat service, as well as a thread to check for failure and
   /// initiate recovery mode.
-  /// has_registered_catalogd - return true if the catalogd is successfully 
registered
-  ///                           to statestore.
-  /// registered_catalogd_registration - return the address and protocol of the
-  ///                                    registered catalogd.
+  /// has_active_catalogd - return true if the active catalogd is designated by
+  ///                       statestore.
+  /// active_catalogd_registration - return the address and protocol of the
+  ///                                active catalogd.
   ///
   /// Returns OK unless some error occurred, like a failure to connect.
-  virtual Status Start(bool* has_registered_catalogd = nullptr,
-      TCatalogRegistration* registered_catalogd_registration = nullptr);
+  virtual Status Start(bool* has_active_catalogd = nullptr,
+      TCatalogRegistration* active_catalogd_registration = nullptr);
 
   /// Set Register Request
   virtual Status SetRegisterRequest(TRegisterSubscriberRequest* request);
@@ -241,8 +241,8 @@ class StatestoreSubscriber {
         const TNetworkAddress& statestore_address, MetricGroup* metrics);
 
     /// Returns OK unless some error occurred, like a failure to connect.
-    Status Start(bool* has_registered_catalogd,
-        TCatalogRegistration* registered_catalogd_registration);
+    Status Start(bool* has_active_catalogd,
+        TCatalogRegistration* active_catalogd_registration);
 
     /// Adds a topic to the set of topics that updates will be received
     /// for. When a topic update is received, the supplied UpdateCallback
@@ -308,8 +308,8 @@ class StatestoreSubscriber {
     /// Creates a client of the remote statestore and sends a list of
     /// topics to register for. Returns OK unless there is some problem
     /// connecting, or the statestore reports an error.
-    Status Register(bool* has_registered_catalogd,
-        TCatalogRegistration* registered_catalogd_registration);
+    Status Register(bool* has_active_catalogd,
+        TCatalogRegistration* active_catalogd_registration);
 
     /// Returns OK if registration_id == registration_id_, or if 
registration_id_ is not
     /// yet set, an error otherwise. Used to confirm that RPCs from the 
statestore are
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 840c2d630..d47b33694 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -112,6 +112,8 @@ 
DEFINE_int32(statestore_update_catalogd_tcp_timeout_seconds, 3, "(Advanced) The
     "protects against badly hung machines that are not able to respond to the "
     "UpdateCatalogd RPC in short order");
 
+DECLARE_bool(enable_catalogd_ha);
+DECLARE_int64(active_catalogd_designation_monitoring_frequency_ms);
 DECLARE_string(debug_actions);
 DECLARE_string(ssl_server_certificate);
 DECLARE_string(ssl_private_key);
@@ -134,6 +136,9 @@ const string STATESTORE_PRIORITY_UPDATE_DURATION =
     "statestore.priority-topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
 const string STATESTORE_UPDATE_CATALOGD_NUM = "statestore.num-update-catalogd";
+const string STATESTORE_CLEAR_TOPIC_ENTRIES_NUM =
+    "statestore.num-clear-topic-entries-requests";
+const string STATESTORE_ACTIVE_CATALOGD_ADDRESS = 
"statestore.active-catalogd-address";
 
 // Initial version for each Topic registered by a Subscriber. Generally, the 
Topic will
 // have a Version that is the MAX() of all entries in the Topic, but this 
initial
@@ -213,18 +218,18 @@ class StatestoreThriftIf : public StatestoreServiceIf {
     }
 
     RegistrationId registration_id;
-    bool has_registered_catalogd;
-    TCatalogRegistration registered_catalogd_registration;
+    bool has_active_catalogd;
+    TCatalogRegistration active_catalogd_registration;
     Status status = statestore_->RegisterSubscriber(params.subscriber_id,
         params.subscriber_location, params.topic_registrations, 
subscriber_type,
         subscribe_catalogd_change, catalogd_registration, &registration_id,
-        &has_registered_catalogd, &registered_catalogd_registration);
+        &has_active_catalogd, &active_catalogd_registration);
     status.ToThrift(&response.status);
     response.__set_registration_id(registration_id);
     response.__set_statestore_id(statestore_->GetStateStoreId());
     response.__set_protocol_version(statestore_->GetProtocolVersion());
-    if (has_registered_catalogd) {
-      response.__set_catalogd_registration(registered_catalogd_registration);
+    if (has_active_catalogd) {
+      response.__set_catalogd_registration(active_catalogd_registration);
     }
   }
 
@@ -496,6 +501,7 @@ void 
Statestore::Subscriber::RefreshLastHeartbeatTimestamp() {
 
 Statestore::Statestore(MetricGroup* metrics)
   : protocol_version_(StatestoreServiceVersion::V2),
+    catalog_manager_(FLAGS_enable_catalogd_ha),
     subscriber_topic_update_threadpool_("statestore-update",
         "subscriber-update-worker",
         FLAGS_statestore_num_update_threads,
@@ -547,6 +553,10 @@ Statestore::Statestore(MetricGroup* metrics)
   heartbeat_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, 
STATESTORE_HEARTBEAT_DURATION);
   update_catalogd_metric_ = 
metrics->AddCounter(STATESTORE_UPDATE_CATALOGD_NUM, 0);
+  clear_topic_entries_metric_ =
+      metrics->AddCounter(STATESTORE_CLEAR_TOPIC_ENTRIES_NUM, 0);
+  active_catalogd_address_metric_ = metrics->AddProperty<string>(
+      STATESTORE_ACTIVE_CATALOGD_ADDRESS, "");
 
   update_state_client_cache_->InitMetrics(metrics, "subscriber-update-state");
   heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
@@ -715,11 +725,14 @@ Status Statestore::RegisterSubscriber(const SubscriberId& 
subscriber_id,
     bool subscribe_catalogd_change,
     const TCatalogRegistration& catalogd_registration,
     RegistrationId* registration_id,
-    bool* has_registered_catalogd,
-    TCatalogRegistration* registered_catalogd_registration) {
+    bool* has_active_catalogd,
+    TCatalogRegistration* active_catalogd_registration) {
   bool is_catalogd = subscriber_type == TStatestoreSubscriberType::CATALOGD;
   if (subscriber_id.empty()) {
     return Status("Subscriber ID cannot be empty string");
+  } else if (is_catalogd
+      && FLAGS_enable_catalogd_ha != catalogd_registration.enable_catalogd_ha) 
{
+    return Status("CalaogD HA enabling flag from catalogd does not match.");
   }
 
   // Create any new topics first, so that when the subscriber is first sent a 
topic update
@@ -743,6 +756,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& 
subscriber_id,
     }
   }
   LOG(INFO) << "Registering: " << subscriber_id;
+  bool is_reregistering = false;
   {
     lock_guard<mutex> l(subscribers_lock_);
     UUIDToTUniqueId(subscriber_uuid_generator_(), registration_id);
@@ -758,14 +772,15 @@ Status Statestore::RegisterSubscriber(const SubscriberId& 
subscriber_id,
                   << TNetworkAddressToString(subscriber->network_address())
                   << " , new address: " << TNetworkAddressToString(location);
       }
+      is_reregistering = true;
     }
 
-    if (is_catalogd) {
-      if (catalog_manager_.RegisterCatalogd(subscriber_id, *registration_id,
-              catalogd_registration)) {
-        LOG(INFO) << "CatalogD: " << subscriber_id << " is registered.";
-        update_catalod_cv_.NotifyAll();
-      }
+    if (is_catalogd
+        && catalog_manager_.RegisterCatalogd(is_reregistering, subscriber_id,
+            *registration_id, catalogd_registration)) {
+      LOG(INFO) << "Active catalogd role is designated to "
+                << catalog_manager_.GetActiveCatalogdSubscriberId();
+      update_catalod_cv_.NotifyAll();
     }
 
     shared_ptr<Subscriber> current_registration(new Subscriber(
@@ -781,36 +796,13 @@ Status Statestore::RegisterSubscriber(const SubscriberId& 
subscriber_id,
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
     RETURN_IF_ERROR(OfferUpdate(update, 
&subscriber_priority_topic_update_threadpool_));
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
-    *registered_catalogd_registration =
-        catalog_manager_.GetCatalogRegistration(has_registered_catalogd);
+    *active_catalogd_registration =
+        catalog_manager_.GetActiveCatalogRegistration(has_active_catalogd);
   }
 
   return Status::OK();
 }
 
-bool Statestore::CatalogManager::RegisterCatalogd(const SubscriberId& 
subscriber_id,
-    const RegistrationId& registration_id,
-    const TCatalogRegistration& catalogd_registration) {
-  lock_guard<mutex> l(catalog_mgr_lock_);
-  is_registered_ = true;
-  catalogd_subscriber_id_ = subscriber_id;
-  catalogd_registration_id_ = registration_id;
-  catalogd_registration_ = catalogd_registration;
-  return true;
-}
-
-const TCatalogRegistration& Statestore::CatalogManager::GetCatalogRegistration(
-    bool* is_registered) {
-  lock_guard<mutex> l(catalog_mgr_lock_);
-  *is_registered = is_registered_;
-  return catalogd_registration_;
-}
-
-int64 Statestore::CatalogManager::GetSendingSequence() {
-  lock_guard<mutex> l(catalog_mgr_lock_);
-  return ++sending_sequence_;
-}
-
 bool Statestore::FindSubscriber(const SubscriberId& subscriber_id,
     const RegistrationId& registration_id, shared_ptr<Subscriber>* subscriber) 
{
   DCHECK(subscriber != nullptr);
@@ -824,6 +816,13 @@ bool Statestore::FindSubscriber(const SubscriberId& 
subscriber_id,
 
 Status Statestore::SendTopicUpdate(Subscriber* subscriber, UpdateKind 
update_kind,
     bool* update_skipped) {
+  // Don't send topic update to inactive catalogd.
+  if (FLAGS_enable_catalogd_ha && subscriber->IsCatalogd()
+      && !catalog_manager_.IsActiveCatalogd(subscriber->id())) {
+    VLOG(3) << "Skip sending topic update to inactive catalogd";
+    return Status::OK();
+  }
+
   // Time any successful RPCs (i.e. those for which UpdateState() completed, 
even though
   // it may have returned an error.)
   MonotonicStopWatch sw;
@@ -909,6 +908,7 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, 
UpdateKind update_kin
         DCHECK(!update.__isset.from_version);
         LOG(INFO) << "Received request for clearing the entries of topic: "
                   << update.topic_name << " from: " << subscriber->id();
+        clear_topic_entries_metric_->Increment(1);
         topic.ClearAllEntries();
       }
 
@@ -1152,6 +1152,11 @@ void Statestore::DoSubscriberUpdate(UpdateKind 
update_kind, int thread_id,
                   << "or re-registered (last known registration ID: "
                   << PrintId(update.registration_id) << ")";
         UnregisterSubscriber(subscriber.get());
+        if (subscriber->IsCatalogd()) {
+          if (catalog_manager_.UnregisterCatalogd(subscriber->id())) {
+            update_catalod_cv_.NotifyAll();
+          }
+        }
       } else {
         LOG(INFO) << "Failure was already detected for subscriber '" << 
subscriber->id()
                   << "'. Won't send another " << update_kind_str;
@@ -1201,6 +1206,15 @@ void Statestore::DoSubscriberUpdate(UpdateKind 
update_kind, int thread_id,
 }
 
 [[noreturn]] void Statestore::MonitorUpdateCatalogd() {
+  // Check if the first registered one should be designated with active role.
+  int64_t timeout_us =
+      FLAGS_active_catalogd_designation_monitoring_frequency_ms * 
MICROS_PER_MILLI;
+  while (!catalog_manager_.CheckActiveCatalog()) {
+    unique_lock<mutex> l(*catalog_manager_.GetLock());
+    update_catalod_cv_.WaitFor(l, timeout_us);
+  }
+  SendUpdateCatalogdNotification();
+
   // Wait for notification. If catalogd is registered, send notification to all
   // coordinators
   while (1) {
@@ -1213,11 +1227,14 @@ void Statestore::DoSubscriberUpdate(UpdateKind 
update_kind, int thread_id,
 }
 
 void Statestore::SendUpdateCatalogdNotification() {
-  bool has_registered_catalogd;
+  bool has_active_catalogd;
   TCatalogRegistration catalogd_registration =
-      catalog_manager_.GetCatalogRegistration(&has_registered_catalogd);
-  DCHECK(has_registered_catalogd);
-  if (!has_registered_catalogd) return;
+      catalog_manager_.GetActiveCatalogRegistration(&has_active_catalogd);
+  DCHECK(has_active_catalogd);
+  if (!has_active_catalogd) return;
+
+  active_catalogd_address_metric_->SetValue(
+      TNetworkAddressToString(catalogd_registration.address));
 
   vector<std::shared_ptr<Subscriber>> receivers;
   {
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 2a21fa6a6..5578817e8 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -38,6 +38,7 @@
 #include "rpc/thrift-client.h"
 #include "runtime/client-cache.h"
 #include "statestore/failure-detector.h"
+#include "statestore/statestore-catalogd-mgr.h"
 #include "statestore/statestore-subscriber-client-wrapper.h"
 #include "util/aligned-new.h"
 #include "util/condition-variable.h"
@@ -52,7 +53,6 @@ namespace impala {
 class Status;
 
 typedef ClientCache<StatestoreSubscriberClientWrapper> 
StatestoreSubscriberClientCache;
-typedef TUniqueId RegistrationId;
 
 std::string SubscriberTypeToString(TStatestoreSubscriberType::type t);
 
@@ -128,10 +128,6 @@ std::string 
SubscriberTypeToString(TStatestoreSubscriberType::type t);
 /// 4. Topic::lock_ (terminal)
 class Statestore : public CacheLineAligned {
  public:
-  /// A SubscriberId uniquely identifies a single subscriber, and is
-  /// provided by the subscriber at registration time.
-  typedef std::string SubscriberId;
-
   /// A TopicId uniquely identifies a single topic
   typedef std::string TopicId;
 
@@ -168,8 +164,8 @@ class Statestore : public CacheLineAligned {
       bool subscribe_catalogd_change,
       const TCatalogRegistration& catalogd_registration,
       RegistrationId* registration_id,
-      bool* has_registered_catalogd,
-      TCatalogRegistration* registered_catalogd_registration);
+      bool* has_active_catalogd,
+      TCatalogRegistration* active_catalogd_registration);
 
   /// Registers webpages for the input webserver. If metrics_only is set then 
only
   /// '/healthz' page is registered.
@@ -529,48 +525,6 @@ class Statestore : public CacheLineAligned {
     bool unregistered_ = false;
   };
 
-  /// CatalogManager:
-  ///   Tracks variety of bookkeeping information for Catalog daemon.
-  class CatalogManager {
-   public:
-    CatalogManager()
-      : is_registered_(false),
-        sending_sequence_(0) {}
-
-    /// Register one catalogd.
-    bool RegisterCatalogd(const SubscriberId& subscriber_id,
-        const RegistrationId& registration_id,
-        const TCatalogRegistration& catalogd_registration);
-
-    /// Return the protocol version of catalog service and address of catalogd.
-    const TCatalogRegistration& GetCatalogRegistration(bool* is_registered);
-
-    /// Return the mutex lock.
-    std::mutex* GetLock() { return &catalog_mgr_lock_; }
-
-    /// Get sending sequence number.
-    int64 GetSendingSequence();
-
-   private:
-    /// Protect all member variable.
-    std::mutex catalog_mgr_lock_;
-
-    /// Indicate if the catalogd has been registered.
-    bool is_registered_;
-
-    /// subscriber_id of the registered catalogd.
-    SubscriberId catalogd_subscriber_id_;
-
-    /// RegistrationId of the registered catalogd.
-    RegistrationId catalogd_registration_id_;
-
-    /// Additional registration info for catalog daemon.
-    TCatalogRegistration catalogd_registration_;
-
-    /// Sending sequence number.
-    int64 sending_sequence_;
-  };
-
   /// Unique identifier for this statestore instance.
   TUniqueId statestore_id_;
 
@@ -593,7 +547,7 @@ class Statestore : public CacheLineAligned {
   SubscriberMap subscribers_;
 
   /// CatalogD Manager
-  CatalogManager catalog_manager_;
+  StatestoreCatalogdMgr catalog_manager_;
 
   /// Condition variable for sending the notifications of updating catalogd.
   ConditionVariable update_catalod_cv_;
@@ -721,6 +675,14 @@ class Statestore : public CacheLineAligned {
   /// Metric to count the total number of UpdateCatalogd RPCs sent by 
statestore.
   IntCounter* update_catalogd_metric_;
 
+  /// Metric to count the total number of requests for clearing topic entries 
from
+  /// catalogd. Catalogd indicates to clear topic entries when it is restarted 
or its
+  /// role has been changed from standby to active.
+  IntCounter* clear_topic_entries_metric_;
+
+  /// Metric that tracks the address of active catalogd for catalogd HA
+  StringProperty* active_catalogd_address_metric_;
+
   /// Utility method to add an update to the given thread pool, and to fail if 
the thread
   /// pool is already at capacity. Assumes that subscribers_lock_ is held by 
the caller.
   Status OfferUpdate(const ScheduledSubscriberUpdate& update,
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index b5be78330..7f735b33d 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -114,6 +114,8 @@ const char* 
ImpaladMetricKeys::CATALOG_OBJECT_VERSION_LOWER_BOUND =
     "catalog.catalog-object-version-lower-bound";
 const char* ImpaladMetricKeys::CATALOG_TOPIC_VERSION = "catalog.curr-topic";
 const char* ImpaladMetricKeys::CATALOG_SERVICE_ID = "catalog.curr-serviceid";
+const char* ImpaladMetricKeys::ACTIVE_CATALOGD_ADDRESS =
+    "catalog.active-catalogd-address";
 const char* ImpaladMetricKeys::CATALOG_READY = "catalog.ready";
 const char* ImpaladMetricKeys::CATALOG_CACHE_AVG_LOAD_TIME =
     "catalog.cache.average-load-time";
@@ -241,6 +243,7 @@ BooleanProperty* ImpaladMetrics::CATALOG_READY = nullptr;
 BooleanProperty* ImpaladMetrics::IMPALA_SERVER_READY = nullptr;
 StringProperty* ImpaladMetrics::IMPALA_SERVER_VERSION = nullptr;
 StringProperty* ImpaladMetrics::CATALOG_SERVICE_ID = nullptr;
+StringProperty* ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS = nullptr;
 
 // Histograms
 HistogramMetric* ImpaladMetrics::QUERY_DURATIONS = nullptr;
@@ -263,6 +266,8 @@ void ImpaladMetrics::InitCatalogMetrics(MetricGroup* m) {
       catalog_metrics->AddGauge(ImpaladMetricKeys::CATALOG_TOPIC_VERSION, 0);
   CATALOG_SERVICE_ID =
       
catalog_metrics->AddProperty<string>(ImpaladMetricKeys::CATALOG_SERVICE_ID, "");
+  ACTIVE_CATALOGD_ADDRESS = catalog_metrics->AddProperty<string>(
+      ImpaladMetricKeys::ACTIVE_CATALOGD_ADDRESS, "");
   CATALOG_READY =
       catalog_metrics->AddProperty<bool>(ImpaladMetricKeys::CATALOG_READY, 
false);
   // CatalogdMetaProvider cache metrics. Valid only when --use_local_catalog 
is set.
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index 3c1f0e17f..35e53c82a 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -165,6 +165,9 @@ class ImpaladMetricKeys {
   /// ServiceID of Catalog with impalad.
   static const char* CATALOG_SERVICE_ID;
 
+  // Address of active catalogd.
+  static const char* ACTIVE_CATALOGD_ADDRESS;
+
   /// Number of tables in the catalog
   static const char* CATALOG_NUM_TABLES;
 
@@ -339,6 +342,7 @@ class ImpaladMetrics {
   static BooleanProperty* IMPALA_SERVER_READY;
   static StringProperty* IMPALA_SERVER_VERSION;
   static StringProperty* CATALOG_SERVICE_ID;
+  static StringProperty* ACTIVE_CATALOGD_ADDRESS;
   // Histograms
   static HistogramMetric* QUERY_DURATIONS;
   static HistogramMetric* DDL_DURATIONS;
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 88b886ab7..932310cbe 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -42,6 +42,7 @@ from tests.common.impala_cluster import (ImpalaCluster, 
DEFAULT_BEESWAX_PORT,
     DEFAULT_STATE_STORE_SUBSCRIBER_PORT, DEFAULT_IMPALAD_WEBSERVER_PORT,
     DEFAULT_STATESTORED_WEBSERVER_PORT, DEFAULT_CATALOGD_WEBSERVER_PORT,
     DEFAULT_ADMISSIOND_WEBSERVER_PORT, DEFAULT_CATALOGD_JVM_DEBUG_PORT,
+    DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_CATALOGD_STATE_STORE_SUBSCRIBER_PORT,
     DEFAULT_EXTERNAL_FE_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT,
     find_user_processes, run_daemon)
 
@@ -151,6 +152,10 @@ parser.add_option("--geospatial_library", 
dest="geospatial_library",
                   action="store", default="HIVE_ESRI",
                   help="Sets which implementation of geospatial libraries 
should be "
                   "initialized")
+parser.add_option("--enable_catalogd_ha", dest="enable_catalogd_ha",
+                  action="store_true", default=False,
+                  help="If true, enables CatalogD HA - the cluster will be 
launched "
+                  "with two catalogd instances as Active-Passive HA pair.")
 
 # For testing: list of comma-separated delays, in milliseconds, that delay 
impalad catalog
 # replica initialization. The ith delay is applied to the ith impalad.
@@ -285,6 +290,32 @@ def impalad_service_name(i):
     return "impalad_node{node_num}".format(node_num=i)
 
 
+def choose_catalogd_ports(instance_num):
+  """Compute the ports for catalogd instance num 'instance_num', returning as 
a map
+  from the argument name to the port number."""
+  return {'catalog_service_port': DEFAULT_CATALOG_SERVICE_PORT + instance_num,
+          'state_store_subscriber_port':
+              DEFAULT_CATALOGD_STATE_STORE_SUBSCRIBER_PORT + instance_num,
+          'webserver_port': DEFAULT_CATALOGD_WEBSERVER_PORT + instance_num}
+
+
+def build_catalogd_port_args(instance_num):
+  CATALOGD_PORTS = (
+      "-catalog_service_port={catalog_service_port} "
+      "-state_store_subscriber_port={state_store_subscriber_port} "
+      "-webserver_port={webserver_port}")
+  return CATALOGD_PORTS.format(**choose_catalogd_ports(instance_num))
+
+
+def catalogd_service_name(i):
+  """Return the name to use for the ith catalog daemon in the cluster."""
+  if i == 0:
+    # The first catalogd always logs to catalogd.INFO
+    return "catalogd"
+  else:
+    return "catalogd_node{node_num}".format(node_num=i)
+
+
 def combine_arg_list_opts(opt_args):
   """Helper for processing arguments like impalad_args. The input is a list of 
strings,
   each of which is the string passed into one instance of the argument, e.g. 
for
@@ -295,18 +326,32 @@ def combine_arg_list_opts(opt_args):
   return list(itertools.chain(*[shlex.split(arg) for arg in opt_args]))
 
 
-def build_statestored_arg_list():
+def build_statestored_arg_list(enable_catalogd_ha):
   """Build a list of command line arguments to pass to the statestored."""
-  return (build_logging_args("statestored") + 
build_kerberos_args("statestored") +
-      combine_arg_list_opts(options.state_store_args))
+  args = (build_logging_args("statestored") + 
build_kerberos_args("statestored")
+      + combine_arg_list_opts(options.state_store_args))
+  if (enable_catalogd_ha):
+    args.extend(["-enable_catalogd_ha=true"])
+  return args
 
 
-def build_catalogd_arg_list():
-  """Build a list of command line arguments to pass to the catalogd."""
-  return (build_logging_args("catalogd") +
-      ["-kudu_master_hosts", options.kudu_master_hosts] +
-      build_kerberos_args("catalogd") +
-      combine_arg_list_opts(options.catalogd_args))
+def build_catalogd_arg_list(num_catalogd, enable_catalogd_ha, remap_ports):
+  """Build a list of command line arguments to pass to the catalogd.
+  Build args for two catalogd instances if catalogd HA is enabled."""
+  catalogd_arg_list = []
+  for i in range(num_catalogd):
+    service_name = catalogd_service_name(i)
+    args = (build_logging_args(service_name)
+        + ["-kudu_master_hosts", options.kudu_master_hosts]
+        + build_kerberos_args("catalogd")
+        + combine_arg_list_opts(options.catalogd_args))
+    if remap_ports:
+      catalogd_port_args = build_catalogd_port_args(i)
+      args.extend(shlex.split(catalogd_port_args))
+    if enable_catalogd_ha:
+      args.extend(["-enable_catalogd_ha=true"])
+    catalogd_arg_list.append(args)
+  return catalogd_arg_list
 
 
 def build_admissiond_arg_list():
@@ -503,7 +548,7 @@ class MiniClusterOperations(object):
   def kill_all_impalads(self, force=False):
     kill_matching_processes(["impalad"], force=force)
 
-  def kill_catalogd(self, force=False):
+  def kill_all_catalogds(self, force=False):
     kill_matching_processes(["catalogd"], force=force)
 
   def kill_statestored(self, force=False):
@@ -516,20 +561,31 @@ class MiniClusterOperations(object):
     LOG.info("Starting State Store logging to 
{log_dir}/statestored.INFO".format(
         log_dir=options.log_dir))
     output_file = os.path.join(options.log_dir, "statestore-out.log")
-    run_daemon_with_options("statestored", build_statestored_arg_list(), 
output_file)
+    run_daemon_with_options("statestored",
+        build_statestored_arg_list(options.enable_catalogd_ha), output_file)
     if not check_process_exists("statestored", 10):
       raise RuntimeError("Unable to start statestored. Check log or file 
permissions"
                          " for more details.")
 
   def start_catalogd(self):
-    LOG.info("Starting Catalog Service logging to 
{log_dir}/catalogd.INFO".format(
-        log_dir=options.log_dir))
-    output_file = os.path.join(options.log_dir, "catalogd-out.log")
-    run_daemon_with_options("catalogd", build_catalogd_arg_list(), output_file,
-        jvm_debug_port=DEFAULT_CATALOGD_JVM_DEBUG_PORT)
-    if not check_process_exists("catalogd", 10):
-      raise RuntimeError("Unable to start catalogd. Check log or file 
permissions"
-                         " for more details.")
+    if options.enable_catalogd_ha:
+      num_catalogd = 2
+    else:
+      num_catalogd = 1
+    catalogd_arg_lists = build_catalogd_arg_list(
+        num_catalogd, options.enable_catalogd_ha, remap_ports=True)
+    for i in range(num_catalogd):
+      service_name = catalogd_service_name(i)
+      LOG.info(
+          "Starting Catalog Service logging to 
{log_dir}/{service_name}.INFO".format(
+              log_dir=options.log_dir, service_name=service_name))
+      output_file = os.path.join(
+          options.log_dir, 
"{service_name}-out.log".format(service_name=service_name))
+      run_daemon_with_options("catalogd", catalogd_arg_lists[i], output_file,
+          jvm_debug_port=DEFAULT_CATALOGD_JVM_DEBUG_PORT + i)
+      if not check_process_exists("catalogd", 10):
+        raise RuntimeError("Unable to start catalogd. Check log or file 
permissions"
+                           " for more details.")
 
   def start_admissiond(self):
     LOG.info("Starting Admission Control Service logging to 
{log_dir}/admissiond.INFO"
@@ -593,7 +649,7 @@ class DockerMiniClusterOperations(object):
 
   def kill_all_daemons(self, force=False):
     self.kill_statestored(force=force)
-    self.kill_catalogd(force=force)
+    self.kill_all_catalogds(force=force)
     self.kill_admissiond(force=force)
     self.kill_all_impalads(force=force)
 
@@ -607,8 +663,15 @@ class DockerMiniClusterOperations(object):
         LOG.info("Stopping container {0}".format(container_name))
         check_call(["docker", "stop", container_name])
 
-  def kill_catalogd(self, force=False):
-    self.__stop_container__("catalogd")
+  def kill_all_catalogds(self, force=False):
+    # List all running containers on the network and kill those with the 
catalogd name
+    # prefix to make sure that no running container are left over from 
previous clusters.
+    container_name_prefix = self.__gen_container_name__("catalogd")
+    for container_id, info in 
self.__get_network_info__()["Containers"].items():
+      container_name = info["Name"]
+      if container_name.startswith(container_name_prefix):
+        LOG.info("Stopping container {0}".format(container_name))
+        check_call(["docker", "stop", container_name])
 
   def kill_statestored(self, force=False):
     self.__stop_container__("statestored")
@@ -617,12 +680,22 @@ class DockerMiniClusterOperations(object):
     self.__stop_container__("admissiond")
 
   def start_statestore(self):
-    self.__run_container__("statestored", build_statestored_arg_list(),
+    self.__run_container__("statestored",
+        build_statestored_arg_list(options.enable_catalogd_ha),
         {DEFAULT_STATESTORED_WEBSERVER_PORT: 
DEFAULT_STATESTORED_WEBSERVER_PORT})
 
   def start_catalogd(self):
-    self.__run_container__("catalogd", build_catalogd_arg_list(),
-          {DEFAULT_CATALOGD_WEBSERVER_PORT: DEFAULT_CATALOGD_WEBSERVER_PORT})
+    if options.enable_catalogd_ha:
+      num_catalogd = 2
+    else:
+      num_catalogd = 1
+    catalogd_arg_lists = build_catalogd_arg_list(
+        num_catalogd, options.enable_catalogd_ha, remap_ports=False)
+    for i in range(num_catalogd):
+      chosen_ports = choose_catalogd_ports(i)
+      port_map = {DEFAULT_CATALOG_SERVICE_PORT: 
chosen_ports['catalog_service_port'],
+                  DEFAULT_CATALOGD_WEBSERVER_PORT: 
chosen_ports['webserver_port']}
+      self.__run_container__("catalogd", catalogd_arg_lists[i], port_map, i)
 
   def start_admissiond(self):
     self.__run_container__("admissiond", build_admissiond_arg_list(),
@@ -803,7 +876,7 @@ if __name__ == "__main__":
   if options.restart_impalad_only:
     cluster_ops.kill_all_impalads(force=options.force_kill)
   elif options.restart_catalogd_only:
-    cluster_ops.kill_catalogd(force=options.force_kill)
+    cluster_ops.kill_all_catalogds(force=options.force_kill)
   elif options.restart_statestored_only:
     cluster_ops.kill_statestored(force=options.force_kill)
   elif options.add_executors:
diff --git a/common/thrift/StatestoreService.thrift 
b/common/thrift/StatestoreService.thrift
index 62e9f5b04..8c2294082 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -201,6 +201,12 @@ struct TCatalogRegistration {
 
   // Address of catalogd.
   2: required Types.TNetworkAddress address;
+
+  // True if CatalogD HA is enabled.
+  3: optional bool enable_catalogd_ha;
+
+  // True if the catalogd instance is started as active instance.
+  4: optional bool force_catalogd_active;
 }
 
 struct TRegisterSubscriberRequest {
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 7e650343c..7c144b9a6 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -399,6 +399,16 @@
     "kind": "PROPERTY",
     "key": "catalog.ready"
   },
+  {
+    "description": "The address of the Active Catalog Server daemon.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Address of Active Catalog daemon",
+    "units": "NONE",
+    "kind": "PROPERTY",
+    "key": "catalog.active-catalogd-address"
+  },
   {
     "description": "The number of clients currently in use by the Catalog 
Server client cache.",
     "contexts": [
@@ -1081,6 +1091,26 @@
     "kind": "GAUGE",
     "key": "impala.thrift-server.CatalogService.timedout-cnxn-requests"
   },
+  {
+    "description": "Current active status of this Catalog Server daemon.",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "The active status of Catalog Server daemon",
+    "units": "NONE",
+    "kind": "PROPERTY",
+    "key": "catalog-server.ha-active-status"
+  },
+  {
+    "description": "The number of active status changes made to this Catalog 
Server Daemon.",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Number of active status changes",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "catalog-server.ha-number-active-status-change"
+  },
   {
     "description": "The number of active connections to this StateStore's 
service.",
     "contexts": [
@@ -1850,6 +1880,26 @@
     "kind": "COUNTER",
     "key": "statestore.num-update-catalogd"
   },
+  {
+    "description": "The number of requests for clearing topic entries from 
catalogd.",
+    "contexts": [
+      "STATESTORE"
+    ],
+    "label": "Number of request for clearing topic entries",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "statestore.num-clear-topic-entries-requests"
+  },
+  {
+    "description": "The address of the Active Catalog Server daemon.",
+    "contexts": [
+      "STATESTORE"
+    ],
+    "label": "Address of Active Catalog daemon",
+    "units": "NONE",
+    "kind": "PROPERTY",
+    "key": "statestore.active-catalogd-address"
+  },
   {
     "description": "The number of registered Statestore subscribers.",
     "contexts": [
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index e56f59124..17be12ad3 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -336,11 +336,15 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       raise Exception("statestored was not found")
 
     # The number of statestore subscribers is
-    # cluster_size (# of impalad) + 1 (for catalogd).
+    #     cluster_size (# of impalad) + 1 (for catalogd)
+    #     + 1 (for admissiond if enable_admission_service is set in the 
options)
+    #     + 1 (for catalogd if enable_catalogd_ha is set in the options).
     if expected_subscribers == 0:
       expected_subscribers = expected_num_impalads + 1
       if "--enable_admission_service" in options:
         expected_subscribers += 1
+      if "--enable_catalogd_ha" in options:
+        expected_subscribers += 1
 
     statestored.service.wait_for_live_subscribers(expected_subscribers,
                                                   
timeout=statestored_timeout_s)
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 38849f8bc..6989123f6 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -54,6 +54,7 @@ DEFAULT_HS2_HTTP_PORT = 28000
 DEFAULT_KRPC_PORT = 27000
 DEFAULT_CATALOG_SERVICE_PORT = 26000
 DEFAULT_STATE_STORE_SUBSCRIBER_PORT = 23000
+DEFAULT_CATALOGD_STATE_STORE_SUBSCRIBER_PORT = 23020
 DEFAULT_IMPALAD_WEBSERVER_PORT = 25000
 DEFAULT_STATESTORED_WEBSERVER_PORT = 25010
 DEFAULT_CATALOGD_WEBSERVER_PORT = 25020
@@ -92,17 +93,17 @@ class ImpalaCluster(object):
     Helpful to confirm that processes have been killed.
     """
     if self.docker_network is None:
-      self.__impalads, self.__statestoreds, self.__catalogd, self.__admissiond 
=\
+      self.__impalads, self.__statestoreds, self.__catalogds, 
self.__admissiond =\
           self.__build_impala_process_lists()
     else:
-      self.__impalads, self.__statestoreds, self.__catalogd, self.__admissiond 
=\
+      self.__impalads, self.__statestoreds, self.__catalogds, 
self.__admissiond =\
           self.__find_docker_containers()
     admissiond_str = ""
     if self.use_admission_service:
       admissiond_str = "/%d admissiond" % (1 if self.__admissiond else 0)
 
     LOG.debug("Found %d impalad/%d statestored/%d catalogd%s process(es)" %
-        (len(self.__impalads), len(self.__statestoreds), 1 if self.__catalogd 
else 0,
+        (len(self.__impalads), len(self.__statestoreds), len(self.__catalogds),
          admissiond_str))
 
   @property
@@ -123,8 +124,15 @@ class ImpalaCluster(object):
 
   @property
   def catalogd(self):
-    """Returns the catalogd process, or None if no catalogd process was 
found"""
-    return self.__catalogd
+    # If no catalogd process exists, return None. Otherwise, return first 
catalogd
+    return self.__catalogds[0] if len(self.__catalogds) > 0 else None
+
+  def catalogds(self):
+    """Returns a list of the known catalogd processes"""
+    return self.__catalogds
+
+  def get_first_catalogd(self):
+    return self.catalogds[0]
 
   @property
   def admissiond(self):
@@ -231,7 +239,7 @@ class ImpalaCluster(object):
     """
     impalads = list()
     statestored = list()
-    catalogd = None
+    catalogds = list()
     admissiond = None
     daemons = ['impalad', 'catalogd', 'statestored', 'admissiond']
     for binary, process in find_user_processes(daemons):
@@ -253,12 +261,13 @@ class ImpalaCluster(object):
       elif binary == 'statestored':
         statestored.append(StateStoreProcess(cmdline))
       elif binary == 'catalogd':
-        catalogd = CatalogdProcess(cmdline)
+        catalogds.append(CatalogdProcess(cmdline))
       elif binary == 'admissiond':
         admissiond = AdmissiondProcess(cmdline)
 
     self.__sort_impalads(impalads)
-    return impalads, statestored, catalogd, admissiond
+    self.__sort_catalogds(catalogds)
+    return impalads, statestored, catalogds, admissiond
 
   def __find_docker_containers(self):
     """
@@ -266,7 +275,7 @@ class ImpalaCluster(object):
     """
     impalads = []
     statestoreds = []
-    catalogd = None
+    catalogds = []
     admissiond = None
     output = check_output(["docker", "network", "inspect", 
self.docker_network],
                           universal_newlines=True)
@@ -292,15 +301,15 @@ class ImpalaCluster(object):
         statestoreds.append(StateStoreProcess(args, container_id=container_id,
                                               port_map=port_map))
       elif executable == 'catalogd':
-        assert catalogd is None
-        catalogd = CatalogdProcess(args, container_id=container_id,
-                                   port_map=port_map)
+        catalogds.append(CatalogdProcess(args, container_id=container_id,
+                                         port_map=port_map))
       elif executable == 'admissiond':
         assert admissiond is None
         admissiond = AdmissiondProcess(args, container_id=container_id,
                                        port_map=port_map)
     self.__sort_impalads(impalads)
-    return impalads, statestoreds, catalogd, admissiond
+    self.__sort_catalogds(catalogds)
+    return impalads, statestoreds, catalogds, admissiond
 
   def __sort_impalads(self, impalads):
     """Does an in-place sort of a list of ImpaladProcess objects into a 
canonical order.
@@ -309,6 +318,13 @@ class ImpalaCluster(object):
     the containerised cluster."""
     impalads.sort(key=lambda i: i.service.hs2_port)
 
+  def __sort_catalogds(self, catalogds):
+    """Does an in-place sort of a list of CatalogdProcess objects into a 
canonical order.
+    We order them by their service port, so that get_first_catalogd() always 
returns the
+    first one. We need to use a port that is exposed and mapped to a host port 
for
+    the containerised cluster."""
+    catalogds.sort(key=lambda i: i.service.service_port)
+
 
 # Represents a process running on a machine and common actions that can be 
performed
 # on a process such as restarting or killing. The process may be the main 
process in
@@ -573,9 +589,11 @@ class CatalogdProcess(BaseImpalaProcess):
   def __get_port(self):
     return int(self._get_port('catalog_service_port', 
DEFAULT_CATALOG_SERVICE_PORT))
 
-  def start(self, wait_until_ready=True):
+  def start(self, wait_until_ready=True, additional_args=None):
     """Starts catalogd and waits until the service is ready to accept 
connections."""
     restart_args = self.cmd[1:]
+    if additional_args:
+      restart_args = restart_args + [additional_args]
     LOG.info("Starting Catalogd process: {0}".format(restart_args))
     run_daemon("catalogd", restart_args)
     if wait_until_ready:
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 0dee27429..a412133c7 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -505,6 +505,9 @@ class CatalogdService(BaseImpalaService):
       sleep(interval)
     assert False, 'Catalog version not ready in expected time.'
 
+  def get_catalog_service_port(self):
+    return self.service_port
+
 
 class AdmissiondService(BaseImpalaService):
   def __init__(self, hostname, webserver_interface, webserver_port,
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
new file mode 100644
index 000000000..2669c1133
--- /dev/null
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+import logging
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import build_flavor_timeout
+from time import sleep
+
+LOG = logging.getLogger('catalogd_ha_test')
+DEFAULT_STATESTORE_SERVICE_PORT = 24000
+DEFAULT_CATALOG_SERVICE_PORT = 26000
+
+
+class TestCatalogdHA(CustomClusterTestSuite):
+  """A simple wrapper class to launch a cluster with catalogd HA enabled.
+  The cluster will be launched with two catalogd instances as Active-Passive 
HA pair.
+  statestored and catalogds are started with starting flag 
FLAGS_enable_catalogd_ha
+  as true. """
+
+  def get_workload(self):
+    return 'functional-query'
+
+  # Verify port of the active catalogd of statestore is matching with the 
catalog
+  # service port of the given catalogd service.
+  def __verify_statestore_active_catalogd_port(self, catalogd_service):
+    statestore_service = self.cluster.statestored.service
+    active_catalogd_address = \
+        
statestore_service.get_metric_value("statestore.active-catalogd-address")
+    _, catalog_service_port = active_catalogd_address.split(":")
+    assert(int(catalog_service_port) == 
catalogd_service.get_catalog_service_port())
+
+  # Verify port of the active catalogd of impalad is matching with the catalog
+  # service port of the given catalogd service.
+  def __verify_impalad_active_catalogd_port(self, impalad_index, 
catalogd_service):
+    impalad_service = self.cluster.impalads[impalad_index].service
+    active_catalogd_address = \
+        impalad_service.get_metric_value("catalog.active-catalogd-address")
+    _, catalog_service_port = active_catalogd_address.split(":")
+    assert(int(catalog_service_port) == 
catalogd_service.get_catalog_service_port())
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+    start_args="--enable_catalogd_ha")
+  def test_catalogd_ha_with_two_catalogd(self):
+    """The test case for cluster started with catalogd HA enabled."""
+    # Verify two catalogd instances are created with one as active.
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    
assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+    assert(not 
catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+
+    # Verify ports of the active catalogd of statestore and impalad are 
matching with
+    # the catalog service port of the current active catalogd.
+    self.__verify_statestore_active_catalogd_port(catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
+    # Verify simple query is ran successfully.
+    self.execute_query_expect_success(
+        self.client, "select count(*) from functional.alltypes")
+
+    # Restart one coordinator. Verify it get active catalogd address from 
statestore.
+    self.cluster.impalads[0].restart()
+    
self.cluster.impalads[0].service.wait_for_metric_value('impala-server.ready',
+        expected_value=1, timeout=30)
+    self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--enable_catalogd_ha=true "
+                     "--use_subscriber_id_as_catalogd_priority=true "
+                     "--catalogd_ha_preemption_wait_period_ms=200",
+    catalogd_args="--enable_catalogd_ha=true")
+  def test_catalogd_ha_with_one_catalogd(self):
+    """The test case for cluster with only one catalogd when catalogd HA is 
enabled."""
+    # Verify the catalogd instances is created as active.
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 1)
+    catalogd_service_1 = catalogds[0].service
+    
assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+
+    # Verify ports of the active catalogd of statestore and impalad are 
matching with
+    # the catalog service port of the current active catalogd.
+    self.__verify_statestore_active_catalogd_port(catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
+    # Verify simple query is ran successfully.
+    self.execute_query_expect_success(
+        self.client, "select count(*) from functional.alltypes")
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
+                     "--statestore_heartbeat_frequency_ms=1000",
+    start_args="--enable_catalogd_ha")
+  def test_catalogd_auto_failover(self):
+    """Stop active catalogd and verify standby catalogd becomes active.
+    Restart original active catalogd. Verify that statestore does not resume 
its
+    active role."""
+    # Verify two catalogd instances are created with one as active.
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    
assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+    assert(not 
catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+
+    statestore_service = self.cluster.statestored.service
+    start_count_clear_topic_entries = statestore_service.get_metric_value(
+        "statestore.num-clear-topic-entries-requests")
+
+    # Kill active catalogd
+    catalogds[0].kill()
+
+    # Wait for long enough for the statestore to detect the failure of active 
catalogd
+    # and assign active role to standby catalogd.
+    catalogd_service_2.wait_for_metric_value(
+        "catalog-server.ha-active-status", expected_value=True, timeout=30)
+    assert(catalogd_service_2.get_metric_value(
+        "catalog-server.ha-number-active-status-change") > 0)
+    
assert(catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+
+    # Verify ports of the active catalogd of statestore and impalad are 
matching with
+    # the catalog service port of the current active catalogd.
+    self.__verify_statestore_active_catalogd_port(catalogd_service_2)
+    self.__verify_impalad_active_catalogd_port(0, catalogd_service_2)
+    self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
+    self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
+    # Verify simple query is ran successfully.
+    self.execute_query_expect_success(
+        self.client, "select count(*) from functional.alltypes")
+
+    end_count_clear_topic_entries = statestore_service.get_metric_value(
+        "statestore.num-clear-topic-entries-requests")
+    assert end_count_clear_topic_entries > start_count_clear_topic_entries
+
+    # Restart original active catalogd. Verify that statestore does not resume 
it as
+    # active to avoid flip-flop.
+    catalogds[0].start(wait_until_ready=True)
+    sleep(1)
+    catalogd_service_1 = catalogds[0].service
+    assert(not 
catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+    
assert(catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+
+    # Verify ports of the active catalogd of statestore and impalad are 
matching with
+    # the catalog service port of the current active catalogd.
+    self.__verify_statestore_active_catalogd_port(catalogd_service_2)
+    self.__verify_impalad_active_catalogd_port(0, catalogd_service_2)
+    self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
+    self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true "
+                     "--statestore_heartbeat_frequency_ms=1000",
+    start_args="--enable_catalogd_ha")
+  def test_catalogd_manual_failover(self):
+    """Stop active catalogd and verify standby catalogd becomes active.
+    Restart original active catalogd with force_catalogd_active as true. 
Verify that
+    statestore resume it as active.
+    """
+    # Verify two catalogd instances are created with one as active.
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    
assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+    assert(not 
catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+
+    statestore_service = self.cluster.statestored.service
+    start_count_clear_topic_entries = statestore_service.get_metric_value(
+        "statestore.num-clear-topic-entries-requests")
+
+    # Kill active catalogd
+    catalogds[0].kill()
+
+    # Wait for long enough for the statestore to detect the failure of active 
catalogd
+    # and assign active role to standby catalogd.
+    catalogd_service_2.wait_for_metric_value(
+        "catalog-server.ha-active-status", expected_value=True, timeout=30)
+    assert(catalogd_service_2.get_metric_value(
+        "catalog-server.ha-number-active-status-change") > 0)
+    
assert(catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+
+    # Verify ports of the active catalogd of statestore and impalad are 
matching with
+    # the catalog service port of the current active catalogd.
+    self.__verify_statestore_active_catalogd_port(catalogd_service_2)
+    self.__verify_impalad_active_catalogd_port(0, catalogd_service_2)
+    self.__verify_impalad_active_catalogd_port(1, catalogd_service_2)
+    self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
+
+    # Verify simple query is ran successfully.
+    self.execute_query_expect_success(
+        self.client, "select count(*) from functional.alltypes")
+
+    end_count_clear_topic_entries = statestore_service.get_metric_value(
+        "statestore.num-clear-topic-entries-requests")
+    assert end_count_clear_topic_entries > start_count_clear_topic_entries
+    start_count_clear_topic_entries = end_count_clear_topic_entries
+
+    # Restart original active catalogd with force_catalogd_active as true.
+    # Verify that statestore resume it as active.
+    catalogds[0].start(wait_until_ready=True,
+                       additional_args="--force_catalogd_active=true")
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_1.wait_for_metric_value(
+        "catalog-server.ha-active-status", expected_value=True, timeout=15)
+    
assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+    sleep_time_s = build_flavor_timeout(2, slow_build_timeout=5)
+    sleep(sleep_time_s)
+    assert(not 
catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+
+    # Verify ports of the active catalogd of statestore and impalad are 
matching with
+    # the catalog service port of the current active catalogd.
+    self.__verify_statestore_active_catalogd_port(catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
+
+    end_count_clear_topic_entries = statestore_service.get_metric_value(
+        "statestore.num-clear-topic-entries-requests")
+    assert end_count_clear_topic_entries > start_count_clear_topic_entries
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+    start_args="--enable_catalogd_ha")
+  def test_restart_statestore(self):
+    """The test case for restarting statestore after the cluster is created 
with
+    catalogd HA enabled."""
+    # Verify two catalogd instances are created with one as active.
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    
assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+    assert(not 
catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+
+    # Verify ports of the active catalogd of statestore and impalad are 
matching with
+    # the catalog service port of the current active catalogd.
+    self.__verify_statestore_active_catalogd_port(catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
+
+    # Restart statestore. Verify one catalogd is assigned as active, the other 
is
+    # assigned as standby.
+    self.cluster.statestored.restart()
+    wait_time_s = build_flavor_timeout(90, slow_build_timeout=180)
+    
self.cluster.statestored.service.wait_for_metric_value('statestore.live-backends',
+        expected_value=5, timeout=wait_time_s)
+    sleep_time_s = build_flavor_timeout(2, slow_build_timeout=5)
+    sleep(sleep_time_s)
+    
assert(catalogd_service_1.get_metric_value("catalog-server.ha-active-status"))
+    assert(not 
catalogd_service_2.get_metric_value("catalog-server.ha-active-status"))
+
+    # Verify ports of the active catalogd of statestore and impalad are 
matching with
+    # the catalog service port of the current active catalogd.
+    self.__verify_statestore_active_catalogd_port(catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(0, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(1, catalogd_service_1)
+    self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
+    # Verify simple query is ran successfully.
+    self.execute_query_expect_success(
+        self.client, "select count(*) from functional.alltypes")

Reply via email to