This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5250cc14b63114fa3976784d6bac39d743e4011a
Author: stiga-huang <[email protected]>
AuthorDate: Mon Feb 26 09:50:13 2024 +0800

    IMPALA-12827: Fix failures in processing AbortTxnEvent due to aborted write 
id is cleaned up
    
    HdfsTable tracks the ValidWriteIdList from HMS. When the table is
    reloaded, the ValidWriteIdList is updated to the latest state. An
    ABORT_TXN event that is lagging behind could match to aborted write ids
    that have already been cleaned up by the HMS housekeeping thread. Such
    write ids can't be found in the cached ValidWriteIdList as opened or
    aborted write ids. This hits a Precondition check and fails the event
    processing.
    
    This patch fixes the check to allow this case. Also adds more logs for
    dealing with write ids.
    
    Tests
     - Add custom-cluster test to start Hive with the housekeeping thread
       turned on and verified that such ABORT_TXN event is processed
       correctly.
    
    Change-Id: I93b6f684d6e4b94961d804a0c022029249873681
    Reviewed-on: http://gerrit.cloudera.org:8080/21071
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 bin/create-test-configuration.sh                   |  7 ++++
 .../org/apache/impala/compat/MetastoreShim.java    |  2 +
 .../java/org/apache/impala/catalog/HdfsTable.java  |  9 ++--
 .../impala/catalog/events/MetastoreEvents.java     |  9 +++-
 .../hive/common/MutableValidReaderWriteIdList.java | 14 +++++--
 fe/src/test/resources/hive-site.xml.py             |  5 +++
 tests/common/impala_test_suite.py                  |  1 +
 tests/custom_cluster/test_events_custom_configs.py | 48 +++++++++++++++++++++-
 8 files changed, 84 insertions(+), 11 deletions(-)

diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh
index 90861eeb1..e8fbde807 100755
--- a/bin/create-test-configuration.sh
+++ b/bin/create-test-configuration.sh
@@ -151,6 +151,13 @@ mkdir -p hive-site-events-cleanup
 rm -f hive-site-events-cleanup/hive-site.xml
 ln -s "${CONFIG_DIR}/hive-site_events_cleanup.xml" 
hive-site-events-cleanup/hive-site.xml
 
+export HIVE_VARIANT=housekeeping_on
+$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py 
hive-site_housekeeping_on.xml
+mkdir -p hive-site-housekeeping-on
+rm -f hive-site-housekeeping-on/hive-site.xml
+ln -s "${CONFIG_DIR}/hive-site_housekeeping_on.xml" \
+    hive-site-housekeeping-on/hive-site.xml
+
 export HIVE_VARIANT=ranger_auth
 HIVE_RANGER_CONF_DIR=hive-site-ranger-auth
 $IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py 
hive-site_ranger_auth.xml
diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 516fe6b7a..2f11da0c3 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -839,6 +839,8 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
       commitTxnMessage_ = MetastoreEventsProcessor.getMessageDeserializer()
           .getCommitTxnMessage(event.getMessage());
       txnId_ = commitTxnMessage_.getTxnId();
+      LOG.info("EventId: {} EventType: COMMIT_TXN transaction id: {}", 
getEventId(),
+          txnId_);
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 05f62814b..f3c3e3132 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -3033,16 +3033,15 @@ public class HdfsTable extends Table implements 
FeFsTable {
       throws TableLoadingException {
     String tblFullName = getFullName();
     if (LOG.isTraceEnabled()) LOG.trace("Get valid writeIds for table: " + 
tblFullName);
-    ValidWriteIdList validWriteIds = null;
+    ValidWriteIdList validWriteIds;
     try {
       validWriteIds = MetastoreShim.fetchValidWriteIds(client, tblFullName);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Valid writeIds: " + validWriteIds.writeToString());
-      }
+      LOG.info("Valid writeIds of table {}: {}", tblFullName,
+          validWriteIds.writeToString());
       return validWriteIds;
     } catch (Exception e) {
       throw new TableLoadingException(String.format("Error loading 
ValidWriteIds for " +
-          "table '%s'", getName()), e);
+          "table '%s'", tblFullName), e);
     }
   }
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 0f073ffce..cf5650e85 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -2802,7 +2802,7 @@ public class MetastoreEvents {
     @Override
     protected void processTableEvent() throws MetastoreNotificationException {
       if (msTbl_ == null) {
-        debugLog("Ignoring the event since table {} is not found",
+        debugLog("Ignoring the event since table {} does not exist or is 
unloaded",
             getFullyQualifiedTblName());
         return;
       }
@@ -2823,6 +2823,8 @@ public class MetastoreEvents {
           TableWriteId tableWriteId = new TableWriteId(dbName_, tblName_,
               tbl_.getCreateEventId(), txnToWriteId.getWriteId());
           catalog_.addWriteId(txnToWriteId.getTxnId(), tableWriteId);
+          infoLog("Added write id {} on table {}.{} for txn {}",
+              txnToWriteId.getWriteId(), dbName_, tblName_, 
txnToWriteId.getTxnId());
         }
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException("Failed to 
mark open "
@@ -3007,12 +3009,15 @@ public class MetastoreEvents {
           MetastoreEventsProcessor.getMessageDeserializer().getAbortTxnMessage(
               event.getMessage());
       txnId_ = abortTxnMessage.getTxnId();
+      infoLog("Received AbortTxnEvent for transaction " + txnId_);
     }
 
     @Override
     protected void process() throws MetastoreNotificationException {
       try {
-        addAbortedWriteIdsToTables(catalog_.getWriteIds(txnId_));
+        Set<TableWriteId> tableWriteIds = catalog_.getWriteIds(txnId_);
+        infoLog("Adding {} aborted write ids", tableWriteIds.size());
+        addAbortedWriteIdsToTables(tableWriteIds);
       } catch (CatalogException e) {
         throw new 
MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
             + "mark aborted write ids to table for txn {}. Event processing 
cannot "
diff --git 
a/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java
 
b/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java
index b3a82a1f9..571c63afc 100644
--- 
a/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java
+++ 
b/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java
@@ -251,6 +251,8 @@ public class MutableValidReaderWriteIdList implements 
MutableValidWriteIdList {
     for (long currentId = highWatermark + 1; currentId <= writeId; 
currentId++) {
       exceptions.add(currentId);
     }
+    LOG.debug("Added OPEN write id: {}. Old high water mark: {}.",
+        writeId, highWatermark);
     highWatermark = writeId;
     return true;
   }
@@ -263,19 +265,25 @@ public class MutableValidReaderWriteIdList implements 
MutableValidWriteIdList {
     boolean added = false;
     long maxWriteId = Collections.max(writeIds);
     if (maxWriteId > highWatermark) {
-      LOG.trace("Current high water mark: {} and max aborted write id: {}, so 
mark them "
+      LOG.info("Current high water mark: {} and max aborted write id: {}, so 
mark them "
           + "as open first", highWatermark, maxWriteId);
       addOpenWriteId(maxWriteId);
       added = true;
     }
     for (long writeId : writeIds) {
       int index = Collections.binarySearch(exceptions, writeId);
-      // make sure the write id is not committed
-      Preconditions.checkState(index >= 0);
+      if (index < 0) {
+        LOG.info("Not added ABORTED write id {} since it's not opened and 
might " +
+            "already be cleaned up. minOpenWriteId: {}.", writeId, 
minOpenWriteId);
+        continue;
+      }
       added = added || !abortedBits.get(index);
       abortedBits.set(index);
     }
     updateMinOpenWriteId();
+    if (!added) {
+      LOG.info("Not added any ABORTED write ids of the given {}", 
writeIds.size());
+    }
     return added;
   }
 
diff --git a/fe/src/test/resources/hive-site.xml.py 
b/fe/src/test/resources/hive-site.xml.py
index c5f377925..9b9c2dbfd 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -88,6 +88,11 @@ elif variant == 'events_cleanup':
     'hive.metastore.event.db.listener.timetolive': '60s',
     'hive.metastore.event.db.listener.clean.interval': '10s'
   })
+elif variant == 'housekeeping_on':
+  # HMS configs needed for regression test for IMPALA-12827
+  CONFIG.update({
+    'hive.metastore.housekeeping.threads.on': 'true',
+  })
 
 # HBase-related configs.
 # Impala processes need to connect to zookeeper on INTERNAL_LISTEN_HOST for 
HBase.
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 89e0cc4e7..dd0b95175 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1035,6 +1035,7 @@ class ImpalaTestSuite(BaseTestSuite):
   # out to beeline for better performance
   def run_stmt_in_hive(self, stmt, username=None):
     """Run a statement in Hive by Beeline."""
+    LOG.info("-- executing in HiveServer2\n\n" + stmt + "\n")
     url = 'jdbc:hive2://' + pytest.config.option.hive_server2
     return self.run_stmt_in_beeline(url, username, stmt)
 
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 09ab84b58..eafdcda1f 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -17,6 +17,7 @@
 from __future__ import absolute_import, division, print_function
 from builtins import range
 import logging
+from os import getenv
 import pytest
 
 
@@ -25,12 +26,16 @@ from hive_metastore.ttypes import FireEventRequestData
 from hive_metastore.ttypes import InsertEventRequestData
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfFS
+from tests.common.skip import SkipIf, SkipIfFS
+from tests.util.acid_txn import AcidTxn
 from tests.util.hive_utils import HiveDbWrapper
 from tests.util.event_processor_utils import EventProcessorUtils
 from tests.util.filesystem_utils import WAREHOUSE
 from tests.util.iceberg_util import IcebergCatalogs
 
+HIVE_SITE_HOUSEKEEPING_ON =\
+    getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on'
+
 
 @SkipIfFS.hive
 class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
@@ -1144,3 +1149,44 @@ class 
TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     data = int(self.execute_scalar("select count(*) from {0}.{1}".format(
       unique_database, hive_tbl)))
     assert data == 0
+
+  @SkipIf.is_test_jdk
+  @CustomClusterTestSuite.with_args(
+      catalogd_args="--hms_event_polling_interval_s=100",
+      hive_conf_dir=HIVE_SITE_HOUSEKEEPING_ON)
+  def test_commit_compaction_with_abort_txn(self, unique_database):
+    """Use a long enough polling interval to allow Hive statements to finish 
before
+       the ABORT_TXN event is processed. In local tests, the Hive statements 
usually
+       finish in 60s.
+       TODO: improve this by adding commands to pause and resume the 
event-processor."""
+    tbl = "part_table"
+    fq_tbl = unique_database + '.' + tbl
+    acid = AcidTxn(self.hive_client)
+    self.run_stmt_in_hive(
+        "create transactional table {} (i int) partitioned by (p 
int)".format(fq_tbl))
+
+    # Allocate a write id on this table and abort the txn
+    txn_id = acid.open_txns()
+    acid.allocate_table_write_ids(txn_id, unique_database, tbl)
+    acid.abort_txn(txn_id)
+
+    # Insert some rows and trigger compaction
+    for i in range(2):
+      self.run_stmt_in_hive(
+          "insert into {} partition(p=0) values (1),(2),(3)".format(fq_tbl))
+    self.run_stmt_in_hive(
+        "alter table {} partition(p=0) compact 'major' and 
wait".format(fq_tbl))
+
+    # The CREATE_TABLE event hasn't been processed yet so we have to 
explictily invalidate
+    # the table first.
+    self.client.execute("invalidate metadata " + fq_tbl)
+    # Reload the table so the latest valid writeIdList is loaded
+    self.client.execute("refresh " + fq_tbl)
+    # Process the ABORT_TXN event
+    EventProcessorUtils.wait_for_event_processing(self, timeout=100)
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+    # Uncomment this once we can stop and resume the event-processor using 
commands.
+    # Currently the test is flaky with it since the Hive statements could take 
longer to
+    # finish than 100s (e.g. I saw a run of 5mins).
+    # self.assert_catalogd_log_contains("INFO", "Not added ABORTED write id 1 
since it's "
+    #    + "not opened and might already be cleaned up")

Reply via email to