This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit dc46aa48d9449729f312aaaa3312e0d8df150063
Author: stiga-huang <[email protected]>
AuthorDate: Thu Jul 3 19:47:41 2025 +0800

    IMPALA-13285: Ignore COMMIT_TXN events on Apache Hive 3
    
    In Apache Hive 3, HMS doesn't provide the API to retrive WriteEvents
    info of a given transaction. COMMIT_TXN event just contains a
    transaction id so Impala can't process it.
    
    This patch ignores COMMIT_TXN events when building on Apache Hive 3.
    Some tests in MetastoreEventsProcessorTest and EventExecutorServiceTest
    are skipped due to this.
    
    Tests:
     - Manually tested on Apache Hive 3. Verified that EventProcessor still
       works after receiving COMMIT_TXN events.
     - Passed some tests in MetastoreEventsProcessorTest and
       EventExecutorServiceTest that previously failed due to EventProcessor
       going into ERROR state.
    
    Change-Id: I863e39b3702028a14e83fed1fe912b441f2c28db
    Reviewed-on: http://gerrit.cloudera.org:8080/23117
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/compat/MetastoreShim.java    | 32 ++++------------------
 .../impala/catalog/events/MetastoreEvents.java     |  5 +---
 .../catalog/events/EventExecutorServiceTest.java   |  8 ++++++
 .../events/MetastoreEventsProcessorTest.java       | 23 ++++++++++++++++
 .../java/org/apache/impala/testutil/TestUtils.java |  7 +++++
 5 files changed, 45 insertions(+), 30 deletions(-)

diff --git 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 24fe4933b..8ed415899 100644
--- 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -74,7 +74,7 @@ import org.apache.impala.catalog.Hive3MetastoreShimBase;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.MetastoreEvents.DerivedMetastoreEvent;
-import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.IgnoredEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreTableEvent;
 import 
org.apache.impala.catalog.events.MetastoreEventsProcessor.MetaDataFilter;
 import org.apache.impala.catalog.events.MetastoreNotificationException;
@@ -631,38 +631,17 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
   /**
    * CDP Hive-3 only function.
    */
-  public static class CommitTxnEvent extends MetastoreEvent {
+  public static class CommitTxnEvent extends IgnoredEvent {
     public static final String EVENT_TYPE = "COMMIT_TXN";
 
     public CommitTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) {
       super(catalogOpExecutor, metrics, event);
-      throw new UnsupportedOperationException("CommitTxnEvent is not 
supported.");
     }
 
     @Override
-    protected void process() throws MetastoreNotificationException {
-
-    }
-
-    @Override
-    protected boolean onFailure(Exception e) {
-      return false;
-    }
-
-    @Override
-    protected boolean isEventProcessingDisabled() {
-      return false;
-    }
-
-    @Override
-    protected SelfEventContext getSelfEventContext() {
-      return null;
-    }
-
-    @Override
-    protected boolean shouldSkipWhenSyncingToLatestEventId() {
-      return false;
+    public void process() {
+      LOG.info("Ignoring COMMIT_TXN event {}", getEventId());
     }
   }
 
@@ -1045,6 +1024,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
    */
   public static List<PseudoCommitTxnEvent> getPseudoCommitTxnEvents(
       CommitTxnEvent event) {
-    throw new UnsupportedOperationException("CommitTxnEvent is not 
supported.");
+    LOG.info("Ignoring COMMIT_TXN event {}", event.getEventId());
+    return Collections.emptyList();
   }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 6bd4aeebc..e87985d54 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -3606,10 +3606,7 @@ public class MetastoreEvents {
    */
   public static class IgnoredEvent extends MetastoreEvent {
 
-    /**
-     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
-     */
-    private IgnoredEvent(
+    public IgnoredEvent(
         CatalogOpExecutor catalogOpExecutor, Metrics metrics, 
NotificationEvent event) {
       super(catalogOpExecutor, metrics, event);
     }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
index 8c746ceac..793eb841c 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
@@ -53,9 +53,11 @@ import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.TestUtils;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -643,6 +645,9 @@ public class EventExecutorServiceTest {
    */
   @Test
   public void testCommitTxn() throws Exception {
+    Assume.assumeFalse("Skipping this since COMMIT_TXN events are ignored on 
Apache " +
+            "Hive 2/3. So the validWriteIds list is not updated correctly.",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     EventExecutorService eventExecutorService = createEventExecutorService(2, 
2);
     transactionTest(false);
     shutDownEventExecutorService(eventExecutorService);
@@ -654,6 +659,9 @@ public class EventExecutorServiceTest {
    */
   @Test
   public void testAbortTxn() throws Exception {
+    Assume.assumeFalse("Skipping this since COMMIT_TXN events are ignored on 
Apache " +
+            "Hive 2/3. So the validWriteIds list is not updated correctly.",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     EventExecutorService eventExecutorService = createEventExecutorService(2, 
2);
     transactionTest(true);
     shutDownEventExecutorService(eventExecutorService);
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index a409a6630..18c9f9d78 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -2859,6 +2859,8 @@ public class MetastoreEventsProcessorTest {
 
   @Test
   public void testCommitEvent() throws TException, ImpalaException, 
IOException {
+    Assume.assumeFalse("Skipping this since it depends on the behavior of CDP 
Hive 3",
+        TestUtils.isApacheHiveVersion());
     // Turn on incremental refresh for transactional table
     final TBackendGflags origCfg = BackendConfig.INSTANCE.getBackendCfg();
     try {
@@ -2879,6 +2881,8 @@ public class MetastoreEventsProcessorTest {
 
   @Test
   public void testAbortEvent() throws TException, ImpalaException, IOException 
{
+    Assume.assumeFalse("COMMIT_TXN events are not processed on Apache Hive 
2/3",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     // Turn on incremental refresh for transactional table
     final TBackendGflags origCfg = BackendConfig.INSTANCE.getBackendCfg();
     try {
@@ -3632,6 +3636,8 @@ public class MetastoreEventsProcessorTest {
    */
   @Test
   public void testSkipFetchOpenTransactionEvent() throws Exception {
+    Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     long currentEventId = eventsProcessor_.getCurrentEventId();
     try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
       // 1. Fetch notification events after open and commit transaction
@@ -3678,6 +3684,8 @@ public class MetastoreEventsProcessorTest {
    */
   @Test
   public void testFetchEventsInBatchWithOpenTxnAsLastEvent() throws Exception {
+    Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     long currentEventId = eventsProcessor_.getCurrentEventId();
     try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
       long txnId = MetastoreShim.openTransaction(client.getHiveClient());
@@ -3694,6 +3702,8 @@ public class MetastoreEventsProcessorTest {
 
   @Test
   public void testNotFetchingUnwantedEvents() throws Exception {
+    Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     String tblName = "test_event_skip_list";
     createDatabase(TEST_DB_NAME, null);
     Map<String, String> params = new HashMap<>();
@@ -3840,6 +3850,8 @@ public class MetastoreEventsProcessorTest {
 
   @Test
   public void testReloadEventOnLoadedTable() throws Exception {
+    Assume.assumeFalse("RELOAD event is not generated on Apache Hive 2/3",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     String tblName = "test_reload";
     createDatabase(TEST_DB_NAME, null);
     eventsProcessor_.processEvents();
@@ -3885,6 +3897,9 @@ public class MetastoreEventsProcessorTest {
 
   @Test
   public void testCommitCompactionEventOnLoadedTable() throws Exception {
+    Assume.assumeFalse("Skipping this since COMMIT_TXN event is not supported 
on " +
+            "Apache Hive 2/3",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     String tblName = "test_commit_compaction";
     createDatabase(TEST_DB_NAME, null);
     eventsProcessor_.processEvents();
@@ -3984,6 +3999,8 @@ public class MetastoreEventsProcessorTest {
    */
   @Test
   public void testEmptyPartitionValues() throws Exception {
+    Assume.assumeFalse("RELOAD event is not generated on Apache Hive 2/3",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     String prevFlag = BackendConfig.INSTANCE.debugActions();
     try {
       String tblName = "test_empty";
@@ -4080,6 +4097,8 @@ public class MetastoreEventsProcessorTest {
 
   public void testAllocWriteIdEvent(String tblName, boolean isPartitioned,
       boolean isLoadTable) throws TException, TransactionException, 
CatalogException {
+    Assume.assumeFalse("COMMIT_TXN events are not processed on Apache Hive 
2/3",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     createDatabase(TEST_DB_NAME, null);
     eventsProcessor_.processEvents();
     createTransactionalTable(TEST_DB_NAME, tblName, isPartitioned);
@@ -4122,6 +4141,8 @@ public class MetastoreEventsProcessorTest {
 
   @Test
   public void testNotificationEventRequest() throws Exception {
+    Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3",
+        TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 
3);
     long currentEventId = eventsProcessor_.getCurrentEventId();
     // Generate some DB only related events
     createDatabaseFromImpala(TEST_DB_NAME, null);
@@ -4239,6 +4260,8 @@ public class MetastoreEventsProcessorTest {
 
   @Test
   public void testCommitTxnEventTargetName() throws Exception {
+    Assume.assumeFalse("Skipping this since it depends on the behavior of CDP 
Hive 3",
+        TestUtils.isApacheHiveVersion());
     String tblName = "test_commit_txn";
     String partTblName = "test_commit_txn_part";
     String insertNonPartTbl =
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java 
b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
index 665a1b97d..b5036b37a 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
@@ -447,6 +447,13 @@ public class TestUtils {
     return Integer.parseInt(hiveMajorVersion);
   }
 
+  /**
+   * Returns whether we are using Apache Hive versions.
+   */
+  public static boolean isApacheHiveVersion() {
+    return Boolean.parseBoolean(System.getenv("USE_APACHE_HIVE"));
+  }
+
   /**
    * Gets checks if the catalog server running on the given host and port has
    * catalog-v2 enabled

Reply via email to