This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 58590376ed3a1b2cac2bbc3b6a59d5cd34c53672
Author: stiga-huang <[email protected]>
AuthorDate: Wed Jun 7 13:09:48 2023 +0800

    IMPALA-12189: updateCatalog should handle failures in createTblTransaction
    
    updateCatalog() invokes createTblTransaction() for transactional tables.
    It's called after acquiring the table lock. The write lock of catalog's
    versionLock will also be acquired by the current thread. Whenever we hit
    an exception, we should release those locks. This patch moves the code
    calling createTblTransaction() into the exception handling scope.
    
    Tests:
     - Add a debug action to abort the transaction in updateCatalog() so
       createTblTransaction() will fail.
     - Add e2e test for the error handling.
    
    Change-Id: I3a64764d0568fc1e6c6f4c52f9e220df3130bd84
    Reviewed-on: http://gerrit.cloudera.org:8080/20020
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 33 +++++++++++++---------
 .../java/org/apache/impala/util/DebugUtils.java    | 20 +++++++++++++
 tests/query_test/test_acid.py                      | 14 +++++++++
 3 files changed, 53 insertions(+), 14 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index b7726c5e4..811db2cfc 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -6624,24 +6624,29 @@ public class CatalogOpExecutor {
     final Timer.Context context
         = 
table.getMetrics().getTimer(HdfsTable.CATALOG_UPDATE_DURATION_METRIC).time();
 
-    long transactionId = -1;
-    TblTransaction tblTxn = null;
-    if (update.isSetTransaction_id()) {
-      transactionId = update.getTransaction_id();
-      Preconditions.checkState(transactionId > 0);
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-         // Setup transactional parameters needed to do alter table/partitions 
later.
-         // TODO: Could be optimized to possibly save some RPCs, as these 
parameters are
-         //       not always needed + the writeId of the INSERT could be 
probably reused.
-         tblTxn = MetastoreShim.createTblTransaction(
-             msClient.getHiveClient(), table.getMetaStoreTable(), 
transactionId);
-      }
-    }
-
     try {
       // Get new catalog version for table in insert.
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
+
+      TblTransaction tblTxn = null;
+      if (update.isSetTransaction_id()) {
+        long transactionId = update.getTransaction_id();
+        Preconditions.checkState(transactionId > 0);
+        try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+          if (DebugUtils.hasDebugAction(update.getDebug_action(),
+              DebugUtils.UPDATE_CATALOG_ABORT_INSERT_TXN)) {
+            MetastoreShim.abortTransaction(msClient.getHiveClient(), 
transactionId);
+            LOG.info("Aborted txn due to the debug action.");
+          }
+          // Setup transactional parameters needed to do alter 
table/partitions later.
+          // TODO: Could be optimized to possibly save some RPCs, as these 
parameters are
+          //       not always needed + the writeId of the INSERT could be 
probably reused.
+          tblTxn = MetastoreShim.createTblTransaction(
+              msClient.getHiveClient(), table.getMetaStoreTable(), 
transactionId);
+        }
+      }
+
       // Collects the cache directive IDs of any cached table/partitions that 
were
       // targeted. A watch on these cache directives is submitted to the
       // TableLoadingMgr and the table will be refreshed asynchronously after 
all
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 5d2b353da..97cd2b522 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -54,6 +54,26 @@ public class DebugUtils {
   // CatalogOpExecutor#updateCatalog() finishes.
   public static final String INSERT_FINISH_DELAY = 
"catalogd_insert_finish_delay";
 
+  // debug action label to abort the transaction in updateCatalog.
+  public static final String UPDATE_CATALOG_ABORT_INSERT_TXN =
+      "catalogd_update_catalog_abort_txn";
+
+  /**
+   * Returns true if the label of action is set in the debugActions
+   */
+  public static boolean hasDebugAction(String debugActions, String label) {
+    if (Strings.isNullOrEmpty(debugActions)) {
+      return false;
+    }
+    List<String> actions = Splitter.on('|').splitToList(debugActions);
+    for (String action : actions) {
+      List<String> components = Splitter.on(':').splitToList(action);
+      if (components.isEmpty()) continue;
+      if (components.get(0).equalsIgnoreCase(label)) return true;
+    }
+    return false;
+  }
+
   /**
    * Given list of debug actions, execute the debug action pertaining to the 
given label.
    * The debugActions string is of the format specified for the 
query_option/configuration
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index 53d52270a..98931119f 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -346,3 +346,17 @@ class TestAcid(ImpalaTestSuite):
     valid_write_ids = acid_util.get_valid_write_ids(db_name, tbl_name)
     new_write_id = valid_write_ids.tblValidWriteIds[0].writeIdHighWaterMark
     assert new_write_id > orig_write_id
+
+  @SkipIfHive2.acid
+  def test_alloc_write_id_error_handing(self, unique_database):
+    tbl_name = "insert_only_table"
+    self.client.execute("""CREATE TABLE {0}.{1} (i int)
+        TBLPROPERTIES('transactional'='true', 
'transactional_properties'='insert_only')
+        """.format(unique_database, tbl_name))
+    self.execute_query_expect_failure(
+        self.client,
+        "INSERT INTO {0}.{1} VALUES (0), (1), (2)".format(unique_database, 
tbl_name),
+        {"debug_action": "catalogd_update_catalog_abort_txn"})
+    # Create a new table and load it in catalogd. Catalogd should not hang.
+    self.client.execute("CREATE TABLE {0}.tbl (i int)".format(unique_database))
+    self.client.execute("DESCRIBE {0}.tbl".format(unique_database))

Reply via email to