This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 59bb9259ef44d7c9d3f3eb1355d500d3ba75f513
Author: ttttttz <[email protected]>
AuthorDate: Sun Jun 25 13:42:32 2023 +0800

    IMPALA-12131: Fix empty partition map in non-partitioned table when file 
metadata loading fails
    
    When inserting non-partitioned tables, the catalog update
    request could fail due to file not found exceptions. At that
    point we have reset(cleared) the partition map so it becomes
    empty after the failure, which is an illegal state and will
    cause failures in later operations. Currently, users have to
    manually invalidate the metadata of the table to recover. We
    can improve this by making all the updates happen after all
    the external loadings succeed. So any failures in loading the
    file metadata won't leave the table metadata in a partially
    updated state.
    
    Testing:
    1. Added a test which simulates a failure in a catalog update
    request by throwing an exception through the debug action and
    confirms that subsequent catalog update requests are not
    affected by the failure.
    
    Change-Id: I28e76a73b7905c24eb93b935124d20ea7abe8513
    Reviewed-on: http://gerrit.cloudera.org:8080/19878
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  | 12 +++++-
 .../apache/impala/service/CatalogOpExecutor.java   | 10 ++---
 .../java/org/apache/impala/util/DebugUtils.java    |  4 ++
 tests/query_test/test_insert.py                    | 44 ++++++++++++++++++++++
 4 files changed, 63 insertions(+), 7 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index da83a47ac..7037df6de 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -774,6 +774,12 @@ public class HdfsTable extends Table implements FeFsTable {
     final Clock clock = Clock.defaultClock();
     long startTime = clock.getTick();
 
+    if (DebugUtils.hasDebugAction(debugActions,
+        DebugUtils.LOAD_FILE_METADATA_THROW_EXCEPTION)) {
+      throw new CatalogException("Threw a catalog exception due to the debug 
action " +
+          "during loading file metadata.");
+    }
+
     //TODO: maybe it'd be better to load the valid txn list in the context of a
     // transaction to have consistent valid write ids and valid transaction 
ids.
     // Currently tables are loaded when they are first referenced and stay in 
catalog
@@ -1372,11 +1378,10 @@ public class HdfsTable extends Table implements 
FeFsTable {
     if (LOG.isTraceEnabled()) {
       LOG.trace("update unpartitioned table: " + getFullName());
     }
+    // Step 1: fetch external metadata
     HdfsPartition oldPartition = 
Iterables.getOnlyElement(partitionMap_.values());
-    resetPartitions();
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
-    setPrototypePartition(msTbl.getSd());
     HdfsPartition.Builder partBuilder = createPartitionBuilder(msTbl.getSd(),
         /*msPartition=*/null, new FsPermissionCache());
     // Copy over the FDs from the old partition to the new one, so that
@@ -1389,6 +1394,9 @@ public class HdfsTable extends Table implements FeFsTable 
{
     partBuilder.setPrevId(oldPartition.getId());
     long fileMdLoadTime = loadFileMetadataForPartitions(client,
         ImmutableList.of(partBuilder), /*isRefresh=*/true, debugAction);
+    // Step 2: update internal fields
+    resetPartitions();
+    setPrototypePartition(msTbl.getSd());
     setUnpartitionedTableStats(partBuilder);
     addPartition(partBuilder.build());
     return fileMdLoadTime;
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1d7ac7bbf..35057b9f9 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1500,7 +1500,7 @@ public class CatalogOpExecutor {
       boolean reloadFileMetadata, boolean reloadTableSchema,
       Set<String> partitionsToUpdate, String reason) throws CatalogException {
     loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata, 
reloadTableSchema,
-        partitionsToUpdate, null, reason);
+        partitionsToUpdate, null, reason, null);
   }
 
   /**
@@ -1511,7 +1511,7 @@ public class CatalogOpExecutor {
   private void loadTableMetadata(Table tbl, long newCatalogVersion,
       boolean reloadFileMetadata, boolean reloadTableSchema,
       Set<String> partitionsToUpdate, @Nullable Map<String, Long> 
partitionToEventId,
-      String reason)
+      String reason, @Nullable String debugAction)
       throws CatalogException {
     Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
@@ -1519,8 +1519,8 @@ public class CatalogOpExecutor {
           getMetaStoreTable(msClient, tbl);
       if (tbl instanceof HdfsTable) {
         ((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
-            reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, 
null,
-            partitionToEventId, reason);
+            reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate,
+            debugAction, partitionToEventId, reason);
       } else {
         tbl.load(true, msClient.getHiveClient(), msTbl, reason);
       }
@@ -6856,7 +6856,7 @@ public class CatalogOpExecutor {
       }
 
       loadTableMetadata(table, newCatalogVersion, true, false, 
partsToLoadMetadata,
-          partitionToEventId, "INSERT");
+          partitionToEventId, "INSERT", update.getDebug_action());
       addTableToCatalogUpdate(table, update.header.want_minimal_response,
           response.result);
     } finally {
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 97cd2b522..45c5eede7 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -54,6 +54,10 @@ public class DebugUtils {
   // CatalogOpExecutor#updateCatalog() finishes.
   public static final String INSERT_FINISH_DELAY = 
"catalogd_insert_finish_delay";
 
+  // debug action label for throwing an exception during 
loadFileMetadataForPartitions.
+  public static final String LOAD_FILE_METADATA_THROW_EXCEPTION =
+      "catalogd_load_file_metadata_throw_exception";
+
   // debug action label to abort the transaction in updateCatalog.
   public static final String UPDATE_CATALOG_ABORT_INSERT_TXN =
       "catalogd_update_catalog_abort_txn";
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index b48e6ed09..d70af89da 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -491,3 +491,47 @@ class TestInsertHdfsWriterLimit(ImpalaTestSuite):
     assert num_instances_per_host == expected_num_instances_per_host, \
       result.runtime_profile
     self.client.clear_configuration()
+
+
+class TestInsertNonPartitionedTable(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestInsertNonPartitionedTable, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+          v.get_value('table_format').file_format == 'text'
+          and v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+
+  @classmethod
+  def setup_class(cls):
+    super(TestInsertNonPartitionedTable, cls).setup_class()
+
+  def test_insert_load_file_fail(self, vector, unique_database):
+    """Tests metadata won't be corrupted after file metadata loading fails
+    in non-partitioned tables."""
+    table_name = '{0}.{1}'.format(unique_database, 'test_unpartition_tbl')
+    self.client.execute('create table {0}(f0 int)'
+        .format(table_name))
+    self.client.execute('insert overwrite table {0} select 0'
+        .format(table_name))
+    result = self.client.execute("select f0 from {0}".format(table_name))
+    assert result.data == ["0"]
+
+    exec_options = vector.get_value('exec_option')
+    exec_options['debug_action'] = 
'catalogd_load_file_metadata_throw_exception'
+    try:
+      self.execute_query("insert overwrite table {0} select 1"
+          .format(table_name), exec_options)
+      assert False, "Expected query to fail."
+    except Exception as e:
+      assert "Failed to load metadata for table:" in str(e)
+
+    exec_options['debug_action'] = ''
+    self.execute_query("insert overwrite table {0} select 2"
+        .format(table_name), exec_options)
+    result = self.client.execute("select f0 from {0}".format(table_name))
+    assert result.data == ["2"]

Reply via email to