This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-3.4.2
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ee9c20f936761af1319acfd46c55ed2019383358
Author: liuyao <[email protected]>
AuthorDate: Sat Jul 3 19:04:58 2021 +0800

    IMPALA-5476: Fix catalogd restart brings stale metadata
    
    ImpaladCatalog#updateCatalog() doesn't trigger a full topic update
    request when detecting catalogServiceId changes. It just updates the
    local catalogServiceId and throws an exception to abort applying the
    DDL/DML results. This causes a problem when catalogd is restarted and
    the DDL/DML is executed on the restarted instance. In this case, only
    the local catalogServiceId is updated to the latest. The local catalog
    remains stale. Then when dealing with the following updates from
    statestore, the catalogServiceId always matches, so updates will be
    applied without exceptions. However, the catalog objects usually won't
    be updated since they have higher versions (from the old catalogd
    instance) than those in the update. This brings the local catalog out
    of sync until the catalog version of the new catalogd grows larger
    enough.
    
    Note that in dealing with the catalog updates from statestore, if the
    catalogServiceId unmatches, impalad will request a full topic update.
    See more in ImpalaServer::CatalogUpdateCallback().
    
    This patch fixes this issue by checking the catalogServiceId before
    invoking UpdateCatalogCache() of FE. If catalogServiceId doesn't match
    the one in the DDL/DML result, wait until it changes. The following
    update from statestore will change it and unblocks the DDL/DML thread.
    
    Testing
    
    add several tests in
    tests/custom_cluster/test_restart_services.py
    
    Change-Id: I9fe25f5a2a42fb432e306ef08ae35750c8f3c50c
    Reviewed-on: http://gerrit.cloudera.org:8080/17645
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/impala-server.cc               |  59 +++++++++++----
 tests/custom_cluster/test_restart_services.py | 104 ++++++++++++++++++++++++++
 2 files changed, 150 insertions(+), 13 deletions(-)

diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 8bf57d23d..771808a89 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1687,19 +1687,52 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       WaitForCatalogUpdateTopicPropagation(catalog_service_id);
     }
   } else {
-    CatalogUpdateResultIterator callback_ctx(catalog_update_result);
-    TUpdateCatalogCacheRequest update_req;
-    update_req.__set_is_delta(true);
-    
update_req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx));
-    // The catalog version is updated in WaitForCatalogUpdate below. So we 
need a
-    // standalone field in the request to update the service ID without 
touching the
-    // catalog version.
-    
update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
-    // Apply the changes to the local catalog cache.
-    TUpdateCatalogCacheResponse resp;
-    Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, 
&resp);
-    if (!status.ok()) LOG(ERROR) << status.GetDetail();
-    RETURN_IF_ERROR(status);
+    TUniqueId cur_service_id;
+    {
+      unique_lock<mutex> ver_lock(catalog_version_lock_);
+      cur_service_id = catalog_update_info_.catalog_service_id;
+      if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+        LOG(INFO) << "Catalog service ID mismatch. Current ID: "
+            << PrintId(cur_service_id) << ". ID in response: "
+            << PrintId(catalog_service_id) << ". Catalogd may be restarted. 
Waiting for"
+            " new catalog update from statestore.";
+        // Catalog service ID has been changed, and impalad request a full 
topic update.
+        // When impalad completes the full topic update, it will exit this 
loop.
+        while (cur_service_id == catalog_update_info_.catalog_service_id) {
+          catalog_version_update_cv_.Wait(ver_lock);
+        }
+        cur_service_id = catalog_update_info_.catalog_service_id;
+      }
+    }
+
+    if (cur_service_id == catalog_service_id) {
+      CatalogUpdateResultIterator callback_ctx(catalog_update_result);
+      TUpdateCatalogCacheRequest update_req;
+      update_req.__set_is_delta(true);
+      
update_req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx));
+      // The catalog version is updated in WaitForCatalogUpdate below. So we 
need a
+      // standalone field in the request to update the service ID without 
touching the
+      // catalog version.
+      
update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
+      // Apply the changes to the local catalog cache.
+      TUpdateCatalogCacheResponse resp;
+      Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, 
&resp);
+      if (!status.ok()) LOG(ERROR) << status.GetDetail();
+      RETURN_IF_ERROR(status);
+    } else {
+      // We can't apply updates on another service id, because the local 
catalog is still
+      // inconsistent with the catalogd that executes the DDL. Catalogd may be 
restarted
+      // more than once inside a statestore update cycle. 'cur_service_id' 
could belong
+      // to 1) a stale update from the previous restarted catalogd, or 2) a 
newer update
+      // from next restarted catalogd. We are good to ignore the DDL result at 
the second
+      // case. However, in the first case clients may see stale catalog until 
the
+      // expected catalog topic update comes.
+      // TODO: handle the first case in IMPALA-10875.
+      LOG(WARNING) << "Ignoring catalog update result of catalog service ID: "
+          << PrintId(catalog_service_id) << ". The expected catalog service 
ID: "
+          << PrintId(catalog_service_id) << ". Current catalog service ID: "
+          << PrintId(cur_service_id) <<". Catalogd may be restarted more than 
once.";
+    }
     if (!wait_for_all_subscribers) return Status::OK();
     // Wait until we receive and process the catalog update that covers the 
effects
     // (catalog objects) of this operation.
diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index 90191eda8..d436d6579 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -163,6 +163,110 @@ class TestRestart(CustomClusterTestSuite):
     thread.join()
     self.wait_for_state(query_handle[0], QueryState.FINISHED, 30000)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms=5000")
+  def test_restart_catalogd(self):
+    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id 
int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+
+    # No need to care whether the dll is executed successfully, it is just to 
make
+    # the local catalog catche of impalad out of sync
+    for i in range(0, 10):
+      try:
+        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        self.execute_query_async(query)
+      except Exception, e:
+        LOG.info(str(e))
+      if i == 5:
+        self.cluster.catalogd.restart()
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms=5000")
+  def test_restart_catalogd_sync_ddl(self):
+    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id 
int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+    query_options = {"sync_ddl": "true"}
+
+    # No need to care whether the dll is executed successfully, it is just to 
make
+    # the local catalog catche of impalad out of sync
+    for i in range(0, 10):
+      try:
+        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        self.execute_query_async(query, query_options)
+      except Exception, e:
+        LOG.info(str(e))
+      if i == 5:
+        self.cluster.catalogd.restart()
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)", query_options)
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
+  UPDATE_FREQUENCY_S = 10
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms={frequency_ms}"
+    .format(frequency_ms=(UPDATE_FREQUENCY_S * 1000)))
+  def test_restart_catalogd_twice(self):
+    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
+    self.cluster.catalogd.restart()
+    query = "create table join_aa(id int)"
+    query_handle = []
+
+    def execute_query_async():
+      query_handle.append(self.execute_query(query))
+
+    thread = threading.Thread(target=execute_query_async)
+    thread.start()
+    sleep(self.UPDATE_FREQUENCY_S - 5)
+    self.cluster.catalogd.restart()
+    thread.join()
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_catalog=true",
+      catalogd_args="--catalog_topic_mode=minimal",
+      statestored_args="--statestore_update_frequency_ms=5000")
+  def test_restart_catalogd_with_local_catalog(self):
+    self.execute_query_expect_success(self.client, "drop table if exists 
join_aa")
+    self.execute_query_expect_success(self.client, "create table join_aa(id 
int)")
+    # Make the catalog object version grow large enough
+    self.execute_query_expect_success(self.client, "invalidate metadata")
+
+    # No need to care whether the dll is executed successfully, it is just to 
make
+    # the local catalog catche of impalad out of sync
+    for i in range(0, 10):
+      try:
+        query = "alter table join_aa add columns (age" + str(i) + " int)"
+        self.execute_query_async(query)
+      except Exception, e:
+        LOG.info(str(e))
+      if i == 5:
+        self.cluster.catalogd.restart()
+
+    self.execute_query_expect_success(self.client,
+        "alter table join_aa add columns (name string)")
+    self.execute_query_expect_success(self.client, "select name from join_aa")
+    self.execute_query_expect_success(self.client, "select age0 from join_aa")
+    self.execute_query_expect_success(self.client, "drop table join_aa")
+
   SUBSCRIBER_TIMEOUT_S = 2
   CANCELLATION_GRACE_PERIOD_S = 5
 

Reply via email to