This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 41c145f5a IMPALA-13536: Fix Workload Management Init with Catalog HA
41c145f5a is described below
commit 41c145f5add442dbd089bc77641c1e117486bc08
Author: jasonmfehr <[email protected]>
AuthorDate: Mon Nov 25 12:55:51 2024 -0800
IMPALA-13536: Fix Workload Management Init with Catalog HA
When running an Impala cluster with catalogd HA enabled, the standby
catalogd would go into a loop waiting for the first catalog update to
arrive repeatedly logging the same error and never joining the server
thread defined in catalogd-main.cc.
Before this patch, when the standby daemon became active, the first
catalogd update was finally received, and the workload management
initialization process ran a second time in the newly active daemon
because this daemon saw that it was active.
This patch modifies the catalogd workload management initialization
code so it waits until the active catalogd has been determined. At
that point, the standby daemon skips workload management
initialization while the active daemon runs it after it receives the
first catalog update.
Testing was accomplished by modifying the workload management
initialization custom cluster tests to assert that the init process
is not re-run when a catalogd switches from standby to active and
also to remove the assumption that the first catalogd was active. The
test_catalog_ha test was deleted since all its assertions are handled
by the setup_method of the new TestWorkloadManagementCatalogHA class.
Ozone tests with and without erasure coding were also ran and passed.
Change-Id: Id3797a0a9cf0b8ae844d9b7d46b607d93824f69a
Reviewed-on: http://gerrit.cloudera.org:8080/22118
Reviewed-by: Riza Suminto <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/catalog/catalog-server.cc | 4 +-
be/src/catalog/catalog-server.h | 24 +++--
be/src/catalog/catalogd-main.cc | 9 +-
be/src/catalog/workload-management-init.cc | 14 ++-
tests/common/impala_test_suite.py | 21 ++++
tests/custom_cluster/test_workload_mgmt_init.py | 133 +++++++++++++-----------
6 files changed, 132 insertions(+), 73 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index c06575894..4f80dee31 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -477,7 +477,7 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
: protocol_version_(CatalogServiceVersion::V2),
thrift_iface_(new CatalogServiceThriftIf(this)),
thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
- is_active_(!FLAGS_enable_catalogd_ha),
+ is_active_(!FLAGS_enable_catalogd_ha),
is_ha_determined_(!FLAGS_enable_catalogd_ha),
topic_updates_ready_(false), last_sent_catalog_version_(0L),
catalog_objects_max_version_(0L) {
topic_processing_time_metric_ =
StatsMetric<double>::CreateAndRegister(metrics,
@@ -717,6 +717,8 @@ void CatalogServer::UpdateActiveCatalogd(bool
is_registration_reply,
catalog_->RegenerateServiceId();
}
}
+
+ is_ha_determined_ = true;
}
bool CatalogServer::IsActive() {
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index ed6ec17ce..2323f4907 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -129,11 +129,19 @@ class CatalogServer {
return protocol_version_;
}
- /// Blocks until the first catalog update happens (indicated by the variable
- /// last_sent_catalog_version_ having a value greater than 0). Does not time
out.
+ /// Blocks until this catalog is ready to process requests and does not time
out.
+ ///
+ /// If catalog HA is not enabled, the catalog being ready is indicated by
the variable
+ /// last_sent_catalog_version_ having a value greater than 0.
+ ///
+ /// If catalog HA is enabled, waits until the active and standby catalogd
daemons have
+ /// been determined. In the active daemon, this function will not return
until the
+ /// last_sent_catalog_version_ variable is greater than 0. In the standby
daemon, this
+ /// function returns as soon as the daemon has been determined to be the
standby.
+ ///
/// Returns `true` or `false` indicating if this catalogd is the active
catalogd.
- /// If catalog ha is not enabled, returns `true`.
- bool WaitForFirstCatalogUpdate();
+ /// If catalog HA is not enabled, returns `true`.
+ bool WaitForCatalogReady();
// Initializes workload management by creating or upgrading the necessary
database and
// tables. Does not check if the current catalogd is active or if workload
management is
@@ -209,13 +217,17 @@ class CatalogServer {
std::unique_ptr<Thread> catalog_metrics_refresh_thread_;
/// Protects is_active_, active_catalogd_version_checker_,
- /// catalog_update_cv_, pending_topic_updates_,
catalog_objects_to/from_version_, and
- /// last_sent_catalog_version.
+ /// catalog_update_cv_, pending_topic_updates_,
catalog_objects_to/from_version_,
+ /// last_sent_catalog_version, and is_ha_determined_.
std::mutex catalog_lock_;
/// Set to true if this catalog instance is active.
bool is_active_;
+ /// Set to true after active catalog has been determined. Will be true if
catalog ha
+ /// is not enabled.
+ bool is_ha_determined_;
+
/// Object to track the version of received active catalogd.
boost::scoped_ptr<ActiveCatalogdVersionChecker>
active_catalogd_version_checker_;
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index a989bd485..260fddcbd 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -103,8 +103,13 @@ int CatalogdMain(int argc, char** argv) {
catalog_server.MarkServiceAsStarted();
LOG(INFO) << "CatalogService started on port: " <<
FLAGS_catalog_service_port;
- if (FLAGS_enable_workload_mgmt &&
catalog_server.WaitForFirstCatalogUpdate()) {
- ABORT_IF_ERROR(catalog_server.InitWorkloadManagement());
+ if (FLAGS_enable_workload_mgmt) {
+ if (catalog_server.WaitForCatalogReady()) {
+ ABORT_IF_ERROR(catalog_server.InitWorkloadManagement());
+ } else {
+ LOG(INFO) << "Skipping workload management initialization since catalogd
HA is "
+ << "enabled and this catalogd is not active";
+ }
}
server->Join();
diff --git a/be/src/catalog/workload-management-init.cc
b/be/src/catalog/workload-management-init.cc
index bd1dd36cb..a7eae391b 100644
--- a/be/src/catalog/workload-management-init.cc
+++ b/be/src/catalog/workload-management-init.cc
@@ -497,18 +497,24 @@ static Status _tableSchemaManagement(CatalogServiceIf*
svc, const string& ip_add
} // function _logTableSchemaManagement
inline bool CatalogServer::IsCatalogInitialized() {
- unique_lock<mutex> l(catalog_lock_);
- return last_sent_catalog_version_ > 0;
+ lock_guard<mutex> l(catalog_lock_);
+
+ // The first expression evaluates to true when the first catalog update is
sent. If
+ // catalog HA is enabled, the last_sent_catalog_version_ variable will only
be
+ // incremented on the active catalogd.
+ // The second expression evaluates to true when the the standby catalogd
determines that
+ // it is the standby.
+ return last_sent_catalog_version_ > 0 || (is_ha_determined_ && !is_active_);
} // CatalogServer::IsCatalogInitialized
-bool CatalogServer::WaitForFirstCatalogUpdate() {
+bool CatalogServer::WaitForCatalogReady() {
while (!IsCatalogInitialized()) {
LOG(INFO) << "Waiting for first catalog update";
SleepForMs(WM_INIT_CHECK_SLEEP_MS);
}
return IsActive();
-} // function CatalogServer::WaitForFirstCatalogUpdate
+} // function CatalogServer::WaitForCatalogReady
Status CatalogServer::InitWorkloadManagement() {
DCHECK_NE(nullptr, thrift_iface_.get());
diff --git a/tests/common/impala_test_suite.py
b/tests/common/impala_test_suite.py
index bab057481..d6ce6e09c 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1393,6 +1393,27 @@ class ImpalaTestSuite(BaseTestSuite):
return self.assert_log_contains(
"impalad", level, line_regex, expected_count, timeout_s, dry_run)
+ def assert_catalogd_ha_contains(self, level, line_regex, timeout_s=6):
+ """
+ When running catalogd in ha mode, asserts that the specified line_regex is
found at
+ least once across all instances of catalogd.
+ Returns a list of the results of calling assert_catalogd_log_contains for
each
+ catalogd daemon. If no matches were found for a daemon, the list index for
that daemon
+ will be None.
+ """
+
+ matches = []
+ for node_idx in range(len(self.cluster.catalogds())):
+ try:
+ matches.append(self.assert_catalogd_log_contains(level, line_regex,
-1, timeout_s,
+ node_index=node_idx))
+ except AssertionError:
+ matches.append(None)
+
+ assert not all(elem is None for elem in matches), "No log lines found in
any " \
+ "catalogd instances for regex: {}".format(line_regex)
+ return matches
+
def assert_catalogd_log_contains(self, level, line_regex, expected_count=1,
timeout_s=6, dry_run=False, node_index=0):
"""
diff --git a/tests/custom_cluster/test_workload_mgmt_init.py
b/tests/custom_cluster/test_workload_mgmt_init.py
index 6640d1dd0..7963a0661 100644
--- a/tests/custom_cluster/test_workload_mgmt_init.py
+++ b/tests/custom_cluster/test_workload_mgmt_init.py
@@ -21,11 +21,14 @@ import os
import re
from subprocess import CalledProcessError
+from logging import getLogger
from SystemTables.ttypes import TQueryTableColumn
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.util.workload_management import assert_query
+LOG = getLogger(__name__)
+
class TestWorkloadManagementInitBase(CustomClusterTestSuite):
@@ -463,49 +466,6 @@ class
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
data = log_results.data[0].split("\t")
assert len(data) == len(log_results.column_labels)
- @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
- catalogd_args="--enable_workload_mgmt",
start_args="--enable_catalogd_ha",
- statestored_args="--use_subscriber_id_as_catalogd_priority=true",
- disable_log_buffering=True)
- def test_catalog_ha(self):
- """Asserts workload management initialization is only done on the active
catalogd."""
-
- # Assert the active catalog ran workload management initialization.
- self.assert_catalogd_log_contains("INFO",
- r"Completed workload management initialization")
-
- # Assert the standby catalog skipped workload management initialization.
- self.assert_catalogd_log_contains("INFO", r"workload management",
expected_count=0,
- node_index=1)
-
- @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
- catalogd_args="--enable_workload_mgmt",
start_args="--enable_catalogd_ha",
- statestored_args="--use_subscriber_id_as_catalogd_priority=true",
- disable_log_buffering=True)
- def test_catalog_ha_failover(self):
- """Asserts workload management initialization is not run a second time
when catalogd
- failover happens."""
-
- # Assert the active catalog ran workload management initialization.
- self.assert_catalogd_log_contains("INFO",
- r"Completed workload management initialization")
-
- # Assert the standby catalog skipped workload management initialization.
- self.assert_catalogd_log_contains("INFO", r"workload management
initialization",
- expected_count=0, node_index=1)
-
- # Kill active catalogd
- catalogds = self.cluster.catalogds()
- catalogds[0].kill()
-
- # Wait for failover.
- catalogds[1].service.wait_for_metric_value("catalog-server.active-status",
- expected_value=True, timeout=30)
-
- # Assert workload management initialization did not run a second time.
- self.assert_catalogd_log_contains("INFO", r"workload management
initialization",
- expected_count=0, node_index=1)
-
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
catalogd_args="--enable_workload_mgmt",
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
@@ -519,23 +479,6 @@ class
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
self.assert_catalogd_log_contains("INFO",
r"Completed workload management initialization")
- @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
- catalogd_args="--enable_workload_mgmt",
- statestored_args="--use_subscriber_id_as_catalogd_priority=true",
- start_args="--enable_catalogd_ha --enable_statestored_ha",
- disable_log_buffering=True)
- def test_catalog_statestore_ha(self):
- """Asserts workload management initialization is only done on the active
catalogd
- when both catalog and statestore ha is enabled."""
-
- # Assert the active catalog ran workload management initialization.
- self.assert_catalogd_log_contains("INFO",
- r"Completed workload management initialization")
-
- # Assert the standby catalog skipped workload management initialization.
- self.assert_catalogd_log_contains("INFO", r"workload management",
expected_count=0,
- node_index=1)
-
class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
@@ -624,3 +567,73 @@ class
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
# Assert the standby catalog skipped workload management initialization.
self.assert_catalogd_log_contains("INFO", r"workload management
initialization",
expected_count=0, node_index=1)
+
+
+class TestWorkloadManagementCatalogHA(TestWorkloadManagementInitBase):
+
+ """Tests for the workload management initialization process. The setup
method of this
+ class ensures only 1 catalogd ran the workload management initialization
process."""
+
+ def setup_method(self, method):
+ super(TestWorkloadManagementCatalogHA, self).setup_method(method)
+
+ # Find all catalog instances that have initialized workload management.
+ init_logs = self.assert_catalogd_ha_contains("INFO",
+ r"Completed workload management initialization", timeout_s=30)
+ assert len(init_logs) == 2, "Expected length of catalogd matches to be '2'
but " \
+ "was '{}'".format(len(init_logs))
+
+ # Assert only 1 catalog ran workload management initialization.
+ assert init_logs[0] is None or init_logs[1] is None, "Both catalogds ran
workload " \
+ "management initialization"
+
+ # Assert the standby catalog skipped workload management initialization.
+ self.standby_catalog = 1
+ self.active_catalog = 0
+ if init_logs[0] is None:
+ # Catalogd 1 is the active catalog
+ self.standby_catalog = 0
+ self.active_catalog = 1
+
+ LOG.info("Found active catalogd is daemon '{}' and standby catalogd is
daemon '{}'"
+ .format(self.active_catalog, self.standby_catalog))
+
+ self.assert_catalogd_log_contains("INFO",
+ r"Skipping workload management initialization since catalogd HA is
enabled and "
+ r"this catalogd is not active", node_index=self.standby_catalog)
+
+ @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
+ catalogd_args="--enable_workload_mgmt",
start_args="--enable_catalogd_ha",
+ statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+ disable_log_buffering=True)
+ def test_catalog_ha_failover(self):
+ """Asserts workload management initialization is not run a second time
when catalogd
+ failover happens."""
+
+ # Kill active catalogd
+ catalogds = self.cluster.catalogds()
+ catalogds[0].kill()
+
+ # Wait for failover.
+ catalogds[1].service.wait_for_metric_value("catalog-server.active-status",
+ expected_value=True, timeout=30)
+
+ # Wait for standby catalog to complete its initialization as the active
catalogd.
+ self.assert_catalogd_log_contains("INFO", r'catalog update with \d+
entries is '
+ r'assembled', expected_count=-1, node_index=self.standby_catalog)
+
+ # Assert workload management initialization did not run a second time.
+ self.assert_catalogd_log_contains("INFO", r"Starting workload management "
+ r"initialization", expected_count=0, node_index=self.standby_catalog)
+
+ @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
+ catalogd_args="--enable_workload_mgmt",
+ statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+ start_args="--enable_catalogd_ha --enable_statestored_ha",
+ disable_log_buffering=True)
+ def test_catalog_statestore_ha(self):
+ """Asserts workload management initialization is only done on the active
catalogd
+ when both catalog and statestore ha is enabled."""
+
+ self.assert_log_contains("statestored", "INFO", r"Registering: catalog",
2, 30)
+ self.assert_log_contains("statestored_node1", "INFO", r"Registering:
catalog", 2, 30)