This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ab4e62d3c IMPALA-13120: Load failed table without need for manual
invalidate
ab4e62d3c is described below
commit ab4e62d3c3dd623a8b5ad896641db07782cbb939
Author: Venu Reddy <[email protected]>
AuthorDate: Tue Jun 4 16:02:02 2024 +0530
IMPALA-13120: Load failed table without need for manual invalidate
If the metastore is down when the table load is triggered, catalogd
updates a new version of incomplete table with cause as
TableLoadingException. On coordinator/impalad, StmtMetadataLoader
loadTables that has been waiting for table load to complete,
considers the table as loaded. Then, during the analyzer’s table
resolve step, for the incomplete table, TableLoadingException
(i.e., Could not connect to meta store, failed to load metadata for
table and running invalidate metadata for table may resolve this
problem) is thrown.
Henceforth, any query on the table doesn’t trigger the load since
the table is incomplete with TableLoadingException cause. Even though
metastore is UP later at some time, queries continue to throw
the same exception. It is both misleading and also not required to
manually invalidate all such tables.
Note: Incomplete table with cause is considered as loaded.
This patch tries again to load the table that has previously failed
due to metastore connection error(i.e., a recoverable error), when a
query involving the table is fired. Idea is to keep track of table
object present in db that requires load. On successful/failed load,
table object in db is updated. Therefore the tracked table object
reference can be compared to table object in db to detect the
completion of load.
Testing:
- Added end-to-end tests
Change-Id: Ia882fdd865ef716351be7f1eaf203a9fb04c1c15
Reviewed-on: http://gerrit.cloudera.org:8080/21478
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../java/org/apache/impala/analysis/Analyzer.java | 2 +-
.../apache/impala/analysis/StmtMetadataLoader.java | 28 +++++++++++++++++----
.../impala/catalog/CatalogServiceCatalog.java | 13 ++++++++--
.../apache/impala/catalog/FeIncompleteTable.java | 7 ++++++
.../org/apache/impala/catalog/IncompleteTable.java | 19 ++++++++++++++
tests/custom_cluster/test_catalog_hms_failures.py | 29 ++++++++++++++++++++++
6 files changed, 90 insertions(+), 8 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 926b0263d..4a51477fc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -3700,7 +3700,7 @@ public class Analyzer {
throw new AnalysisException(TBL_DOES_NOT_EXIST_ERROR_MSG +
tblName.toString());
}
}
- Preconditions.checkState(table.isLoaded());
+ Preconditions.checkState(table.isLoaded(), "table: %s should be loaded",
tblName);
if (table instanceof FeIncompleteTable) {
// If there were problems loading this table's metadata, throw an
exception
// when it is accessed.
diff --git
a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
index e4dfa7752..ef0099ddd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
@@ -166,7 +166,14 @@ public class StmtMetadataLoader {
Preconditions.checkState(numLoadRequestsSent_ == 0);
Preconditions.checkState(numCatalogUpdatesReceived_ == 0);
FeCatalog catalog = fe_.getCatalog();
- Set<TableName> missingTbls = getMissingTables(catalog, tbls);
+ // missingTblsSnapshot builds tableName to table in the db mapping for
tables that
+ // are either not loaded or have failed to load due to recoverable error
in the
+ // previous queries. It is used to detect change of table in db since the
time it is
+ // added to missingTblsSnapshot when the table is loaded.
+ Map<TableName, FeTable> missingTblsSnapshot = new HashMap<>();
+ // TableName is added to missingTbls set as long as table is not loaded or
the table
+ // in db has not changed(i.e., table in db is same as table in
missingTblsSnapshot).
+ Set<TableName> missingTbls = getMissingTables(catalog, tbls,
missingTblsSnapshot);
// There are no missing tables. Return to avoid making an RPC to the
CatalogServer
// and adding events to the timeline.
if (missingTbls.isEmpty()) {
@@ -231,7 +238,8 @@ public class StmtMetadataLoader {
// Wait for the next catalog update and then revise the loaded/missing
tables.
catalog.waitForCatalogUpdate(Frontend.MAX_CATALOG_UPDATE_WAIT_TIME_MS);
- Set<TableName> newMissingTbls = getMissingTables(catalog, missingTbls);
+ Set<TableName> newMissingTbls =
+ getMissingTables(catalog, missingTbls, missingTblsSnapshot);
// Issue a load request for the new missing tables in these cases:
// 1) Catalog has restarted so all in-flight loads have been lost
// 2) There are new missing tables due to view expansion
@@ -303,7 +311,8 @@ public class StmtMetadataLoader {
* Path.getCandidateTables(). Non-existent tables are ignored and not
returned or
* added to 'loadedOrFailedTbls_'.
*/
- private Set<TableName> getMissingTables(FeCatalog catalog, Set<TableName>
tbls) {
+ private Set<TableName> getMissingTables(FeCatalog catalog, Set<TableName>
tbls,
+ Map<TableName, FeTable> missingTblsSnapshot) {
Set<TableName> missingTbls = new HashSet<>();
Set<TableName> viewTbls = new HashSet<>();
for (TableName tblName: tbls) {
@@ -313,7 +322,14 @@ public class StmtMetadataLoader {
dbs_.add(tblName.getDb());
FeTable tbl = db.getTable(tblName.getTbl());
if (tbl == null) continue;
- if (!tbl.isLoaded()) {
+ if (!tbl.isLoaded()
+ || (tbl instanceof FeIncompleteTable
+ && ((FeIncompleteTable)
tbl).isLoadFailedByRecoverableError())) {
+ // Add table to missingTblsSnapshot only for the first
time(putIfAbsent) if the
+ // table is not loaded or the previous load has failed due to
recoverable error.
+ missingTblsSnapshot.putIfAbsent(tblName, tbl);
+ }
+ if (!tbl.isLoaded() || missingTblsSnapshot.get(tblName) == tbl) {
missingTbls.add(tblName);
continue;
}
@@ -338,7 +354,9 @@ public class StmtMetadataLoader {
}
}
// Recursively collect loaded/missing tables from loaded views.
- if (!viewTbls.isEmpty()) missingTbls.addAll(getMissingTables(catalog,
viewTbls));
+ if (!viewTbls.isEmpty()) {
+ missingTbls.addAll(getMissingTables(catalog, viewTbls,
missingTblsSnapshot));
+ }
return missingTbls;
}
diff --git
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 65a0c564b..55cc783ea 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2467,11 +2467,20 @@ public class CatalogServiceCatalog extends Catalog {
if (tbl == null) return null;
LOG.trace("table {} exits in cache, last synced id {}",
tbl.getFullName(),
tbl.getLastSyncedEventId());
+ boolean isLoaded = tbl.isLoaded();
+ if (isLoaded && tbl instanceof IncompleteTable
+ && ((IncompleteTable) tbl).isLoadFailedByRecoverableError()) {
+ // If the previous load of incomplete table had failed due to
recoverable errors,
+ // try loading again instead of returning the existing table
+ isLoaded = false;
+ }
// if no validWriteIdList is provided, we return the tbl if its loaded
// In the external front end use case it is possible that an external
table might
// have validWriteIdList, so we can simply ignore this value if table is
external
- if (tbl.isLoaded() && (validWriteIdList == null ||
-
(!AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())))) {
+ if (isLoaded
+ && (validWriteIdList == null
+ || (!AcidUtils.isTransactionalTable(
+ tbl.getMetaStoreTable().getParameters())))) {
incrementCatalogDCacheHitMetric(reason);
LOG.trace("returning already loaded table {}", tbl.getFullName());
return tbl;
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIncompleteTable.java
b/fe/src/main/java/org/apache/impala/catalog/FeIncompleteTable.java
index b688a10d6..297389c60 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIncompleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIncompleteTable.java
@@ -24,4 +24,11 @@ import org.apache.impala.common.ImpalaException;
public interface FeIncompleteTable extends FeTable {
/** Return the cause of the failure to load */
public ImpalaException getCause();
+
+ /**
+ * Whether load has failed due to recoverable errors. Default is false.
+ */
+ default boolean isLoadFailedByRecoverableError() {
+ return false;
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
index 338cc7e67..d0ee06a06 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Set;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.impala.common.ImpalaException;
@@ -83,6 +84,24 @@ public class IncompleteTable extends Table implements
FeIncompleteTable {
@Override
public ImpalaException getCause() { return cause_; }
+ /**
+ * Returns true if the load has failed due to recoverable errors such as
+ * metastore connection error
+ * @return
+ */
+ @Override
+ public boolean isLoadFailedByRecoverableError() {
+ if (cause_ instanceof TableLoadingException) {
+ String metastoreConnectionError = "Could not connect to meta store";
+ if (cause_.getMessage().contains(metastoreConnectionError)
+ || (cause_.getCause() instanceof MetaException
+ &&
cause_.getCause().getMessage().contains(metastoreConnectionError))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* See comment on cause_.
*/
diff --git a/tests/custom_cluster/test_catalog_hms_failures.py
b/tests/custom_cluster/test_catalog_hms_failures.py
index 35350da2c..32de2a4f7 100644
--- a/tests/custom_cluster/test_catalog_hms_failures.py
+++ b/tests/custom_cluster/test_catalog_hms_failures.py
@@ -85,6 +85,35 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
self.client.execute("invalidate metadata %s" % tbl_name)
self.client.execute("describe %s" % tbl_name)
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args='--use_local_catalog',
+ catalogd_args='--catalog_topic_mode=minimal')
+ def test_local_catalog_load_with_hms_state_change(self, unique_database):
+ self.run_test_load_with_hms_down_and_up(unique_database,
+
"local_catalog_load_with_hms_state_change")
+
+ @pytest.mark.execute_serially
+ def test_load_with_hms_state_change(self, unique_database):
+ self.run_test_load_with_hms_down_and_up(unique_database,
+ "load_with_hms_state_change")
+
+ def run_test_load_with_hms_down_and_up(self, unique_database, table_name):
+ table = unique_database + "." + table_name
+ self.client.execute("create table {0} (i int)".format(table))
+ kill_cmd = os.path.join(os.environ['IMPALA_HOME'],
'testdata/bin/kill-hive-server.sh')
+ check_call([kill_cmd, '-only_metastore'], close_fds=True)
+ for _ in range(2):
+ try:
+ self.client.execute("describe {0}".format(table))
+ except ImpalaBeeswaxException as e:
+ assert "Failed to load metadata for table: %s. "\
+ "Running 'invalidate metadata %s' may resolve this problem." \
+ % (table, table) in str(e)
+ self.run_hive_metastore()
+ res = self.client.execute("describe {0}".format(table))
+ assert res.data == ["i\tint\t"]
+
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args='--use_local_catalog --catalog_topic_mode=minimal',