This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-3.4.2 in repository https://gitbox.apache.org/repos/asf/impala.git
commit ee9c20f936761af1319acfd46c55ed2019383358 Author: liuyao <[email protected]> AuthorDate: Sat Jul 3 19:04:58 2021 +0800 IMPALA-5476: Fix catalogd restart brings stale metadata ImpaladCatalog#updateCatalog() doesn't trigger a full topic update request when detecting catalogServiceId changes. It just updates the local catalogServiceId and throws an exception to abort applying the DDL/DML results. This causes a problem when catalogd is restarted and the DDL/DML is executed on the restarted instance. In this case, only the local catalogServiceId is updated to the latest. The local catalog remains stale. Then when dealing with the following updates from statestore, the catalogServiceId always matches, so updates will be applied without exceptions. However, the catalog objects usually won't be updated since they have higher versions (from the old catalogd instance) than those in the update. This brings the local catalog out of sync until the catalog version of the new catalogd grows larger enough. Note that in dealing with the catalog updates from statestore, if the catalogServiceId unmatches, impalad will request a full topic update. See more in ImpalaServer::CatalogUpdateCallback(). This patch fixes this issue by checking the catalogServiceId before invoking UpdateCatalogCache() of FE. If catalogServiceId doesn't match the one in the DDL/DML result, wait until it changes. The following update from statestore will change it and unblocks the DDL/DML thread. Testing add several tests in tests/custom_cluster/test_restart_services.py Change-Id: I9fe25f5a2a42fb432e306ef08ae35750c8f3c50c Reviewed-on: http://gerrit.cloudera.org:8080/17645 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/impala-server.cc | 59 +++++++++++---- tests/custom_cluster/test_restart_services.py | 104 ++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 13 deletions(-) diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 8bf57d23d..771808a89 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1687,19 +1687,52 @@ Status ImpalaServer::ProcessCatalogUpdateResult( WaitForCatalogUpdateTopicPropagation(catalog_service_id); } } else { - CatalogUpdateResultIterator callback_ctx(catalog_update_result); - TUpdateCatalogCacheRequest update_req; - update_req.__set_is_delta(true); - update_req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx)); - // The catalog version is updated in WaitForCatalogUpdate below. So we need a - // standalone field in the request to update the service ID without touching the - // catalog version. - update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id); - // Apply the changes to the local catalog cache. - TUpdateCatalogCacheResponse resp; - Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp); - if (!status.ok()) LOG(ERROR) << status.GetDetail(); - RETURN_IF_ERROR(status); + TUniqueId cur_service_id; + { + unique_lock<mutex> ver_lock(catalog_version_lock_); + cur_service_id = catalog_update_info_.catalog_service_id; + if (catalog_update_info_.catalog_service_id != catalog_service_id) { + LOG(INFO) << "Catalog service ID mismatch. Current ID: " + << PrintId(cur_service_id) << ". ID in response: " + << PrintId(catalog_service_id) << ". Catalogd may be restarted. Waiting for" + " new catalog update from statestore."; + // Catalog service ID has been changed, and impalad request a full topic update. + // When impalad completes the full topic update, it will exit this loop. + while (cur_service_id == catalog_update_info_.catalog_service_id) { + catalog_version_update_cv_.Wait(ver_lock); + } + cur_service_id = catalog_update_info_.catalog_service_id; + } + } + + if (cur_service_id == catalog_service_id) { + CatalogUpdateResultIterator callback_ctx(catalog_update_result); + TUpdateCatalogCacheRequest update_req; + update_req.__set_is_delta(true); + update_req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx)); + // The catalog version is updated in WaitForCatalogUpdate below. So we need a + // standalone field in the request to update the service ID without touching the + // catalog version. + update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id); + // Apply the changes to the local catalog cache. + TUpdateCatalogCacheResponse resp; + Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp); + if (!status.ok()) LOG(ERROR) << status.GetDetail(); + RETURN_IF_ERROR(status); + } else { + // We can't apply updates on another service id, because the local catalog is still + // inconsistent with the catalogd that executes the DDL. Catalogd may be restarted + // more than once inside a statestore update cycle. 'cur_service_id' could belong + // to 1) a stale update from the previous restarted catalogd, or 2) a newer update + // from next restarted catalogd. We are good to ignore the DDL result at the second + // case. However, in the first case clients may see stale catalog until the + // expected catalog topic update comes. + // TODO: handle the first case in IMPALA-10875. + LOG(WARNING) << "Ignoring catalog update result of catalog service ID: " + << PrintId(catalog_service_id) << ". The expected catalog service ID: " + << PrintId(catalog_service_id) << ". Current catalog service ID: " + << PrintId(cur_service_id) <<". Catalogd may be restarted more than once."; + } if (!wait_for_all_subscribers) return Status::OK(); // Wait until we receive and process the catalog update that covers the effects // (catalog objects) of this operation. diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py index 90191eda8..d436d6579 100644 --- a/tests/custom_cluster/test_restart_services.py +++ b/tests/custom_cluster/test_restart_services.py @@ -163,6 +163,110 @@ class TestRestart(CustomClusterTestSuite): thread.join() self.wait_for_state(query_handle[0], QueryState.FINISHED, 30000) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + statestored_args="--statestore_update_frequency_ms=5000") + def test_restart_catalogd(self): + self.execute_query_expect_success(self.client, "drop table if exists join_aa") + self.execute_query_expect_success(self.client, "create table join_aa(id int)") + # Make the catalog object version grow large enough + self.execute_query_expect_success(self.client, "invalidate metadata") + + # No need to care whether the dll is executed successfully, it is just to make + # the local catalog catche of impalad out of sync + for i in range(0, 10): + try: + query = "alter table join_aa add columns (age" + str(i) + " int)" + self.execute_query_async(query) + except Exception, e: + LOG.info(str(e)) + if i == 5: + self.cluster.catalogd.restart() + + self.execute_query_expect_success(self.client, + "alter table join_aa add columns (name string)") + self.execute_query_expect_success(self.client, "select name from join_aa") + self.execute_query_expect_success(self.client, "drop table join_aa") + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + statestored_args="--statestore_update_frequency_ms=5000") + def test_restart_catalogd_sync_ddl(self): + self.execute_query_expect_success(self.client, "drop table if exists join_aa") + self.execute_query_expect_success(self.client, "create table join_aa(id int)") + # Make the catalog object version grow large enough + self.execute_query_expect_success(self.client, "invalidate metadata") + query_options = {"sync_ddl": "true"} + + # No need to care whether the dll is executed successfully, it is just to make + # the local catalog catche of impalad out of sync + for i in range(0, 10): + try: + query = "alter table join_aa add columns (age" + str(i) + " int)" + self.execute_query_async(query, query_options) + except Exception, e: + LOG.info(str(e)) + if i == 5: + self.cluster.catalogd.restart() + + self.execute_query_expect_success(self.client, + "alter table join_aa add columns (name string)", query_options) + self.execute_query_expect_success(self.client, "select name from join_aa") + self.execute_query_expect_success(self.client, "drop table join_aa") + + UPDATE_FREQUENCY_S = 10 + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + statestored_args="--statestore_update_frequency_ms={frequency_ms}" + .format(frequency_ms=(UPDATE_FREQUENCY_S * 1000))) + def test_restart_catalogd_twice(self): + self.execute_query_expect_success(self.client, "drop table if exists join_aa") + self.cluster.catalogd.restart() + query = "create table join_aa(id int)" + query_handle = [] + + def execute_query_async(): + query_handle.append(self.execute_query(query)) + + thread = threading.Thread(target=execute_query_async) + thread.start() + sleep(self.UPDATE_FREQUENCY_S - 5) + self.cluster.catalogd.restart() + thread.join() + self.execute_query_expect_success(self.client, + "alter table join_aa add columns (name string)") + self.execute_query_expect_success(self.client, "select name from join_aa") + self.execute_query_expect_success(self.client, "drop table join_aa") + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal", + statestored_args="--statestore_update_frequency_ms=5000") + def test_restart_catalogd_with_local_catalog(self): + self.execute_query_expect_success(self.client, "drop table if exists join_aa") + self.execute_query_expect_success(self.client, "create table join_aa(id int)") + # Make the catalog object version grow large enough + self.execute_query_expect_success(self.client, "invalidate metadata") + + # No need to care whether the dll is executed successfully, it is just to make + # the local catalog catche of impalad out of sync + for i in range(0, 10): + try: + query = "alter table join_aa add columns (age" + str(i) + " int)" + self.execute_query_async(query) + except Exception, e: + LOG.info(str(e)) + if i == 5: + self.cluster.catalogd.restart() + + self.execute_query_expect_success(self.client, + "alter table join_aa add columns (name string)") + self.execute_query_expect_success(self.client, "select name from join_aa") + self.execute_query_expect_success(self.client, "select age0 from join_aa") + self.execute_query_expect_success(self.client, "drop table join_aa") + SUBSCRIBER_TIMEOUT_S = 2 CANCELLATION_GRACE_PERIOD_S = 5
