This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit dc46aa48d9449729f312aaaa3312e0d8df150063 Author: stiga-huang <[email protected]> AuthorDate: Thu Jul 3 19:47:41 2025 +0800 IMPALA-13285: Ignore COMMIT_TXN events on Apache Hive 3 In Apache Hive 3, HMS doesn't provide the API to retrive WriteEvents info of a given transaction. COMMIT_TXN event just contains a transaction id so Impala can't process it. This patch ignores COMMIT_TXN events when building on Apache Hive 3. Some tests in MetastoreEventsProcessorTest and EventExecutorServiceTest are skipped due to this. Tests: - Manually tested on Apache Hive 3. Verified that EventProcessor still works after receiving COMMIT_TXN events. - Passed some tests in MetastoreEventsProcessorTest and EventExecutorServiceTest that previously failed due to EventProcessor going into ERROR state. Change-Id: I863e39b3702028a14e83fed1fe912b441f2c28db Reviewed-on: http://gerrit.cloudera.org:8080/23117 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/compat/MetastoreShim.java | 32 ++++------------------ .../impala/catalog/events/MetastoreEvents.java | 5 +--- .../catalog/events/EventExecutorServiceTest.java | 8 ++++++ .../events/MetastoreEventsProcessorTest.java | 23 ++++++++++++++++ .../java/org/apache/impala/testutil/TestUtils.java | 7 +++++ 5 files changed, 45 insertions(+), 30 deletions(-) diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 24fe4933b..8ed415899 100644 --- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -74,7 +74,7 @@ import org.apache.impala.catalog.Hive3MetastoreShimBase; import org.apache.impala.catalog.MetaStoreClientPool; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.events.MetastoreEvents.DerivedMetastoreEvent; -import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent; +import org.apache.impala.catalog.events.MetastoreEvents.IgnoredEvent; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreTableEvent; import org.apache.impala.catalog.events.MetastoreEventsProcessor.MetaDataFilter; import org.apache.impala.catalog.events.MetastoreNotificationException; @@ -631,38 +631,17 @@ public class MetastoreShim extends Hive3MetastoreShimBase { /** * CDP Hive-3 only function. */ - public static class CommitTxnEvent extends MetastoreEvent { + public static class CommitTxnEvent extends IgnoredEvent { public static final String EVENT_TYPE = "COMMIT_TXN"; public CommitTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics, NotificationEvent event) { super(catalogOpExecutor, metrics, event); - throw new UnsupportedOperationException("CommitTxnEvent is not supported."); } @Override - protected void process() throws MetastoreNotificationException { - - } - - @Override - protected boolean onFailure(Exception e) { - return false; - } - - @Override - protected boolean isEventProcessingDisabled() { - return false; - } - - @Override - protected SelfEventContext getSelfEventContext() { - return null; - } - - @Override - protected boolean shouldSkipWhenSyncingToLatestEventId() { - return false; + public void process() { + LOG.info("Ignoring COMMIT_TXN event {}", getEventId()); } } @@ -1045,6 +1024,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase { */ public static List<PseudoCommitTxnEvent> getPseudoCommitTxnEvents( CommitTxnEvent event) { - throw new UnsupportedOperationException("CommitTxnEvent is not supported."); + LOG.info("Ignoring COMMIT_TXN event {}", event.getEventId()); + return Collections.emptyList(); } } diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 6bd4aeebc..e87985d54 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -3606,10 +3606,7 @@ public class MetastoreEvents { */ public static class IgnoredEvent extends MetastoreEvent { - /** - * Prevent instantiation from outside should use MetastoreEventFactory instead - */ - private IgnoredEvent( + public IgnoredEvent( CatalogOpExecutor catalogOpExecutor, Metrics metrics, NotificationEvent event) { super(catalogOpExecutor, metrics, event); } diff --git a/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java b/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java index 8c746ceac..793eb841c 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java @@ -53,9 +53,11 @@ import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.BackendConfig; import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.testutil.CatalogServiceTestCatalog; +import org.apache.impala.testutil.TestUtils; import org.apache.thrift.TException; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -643,6 +645,9 @@ public class EventExecutorServiceTest { */ @Test public void testCommitTxn() throws Exception { + Assume.assumeFalse("Skipping this since COMMIT_TXN events are ignored on Apache " + + "Hive 2/3. So the validWriteIds list is not updated correctly.", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); EventExecutorService eventExecutorService = createEventExecutorService(2, 2); transactionTest(false); shutDownEventExecutorService(eventExecutorService); @@ -654,6 +659,9 @@ public class EventExecutorServiceTest { */ @Test public void testAbortTxn() throws Exception { + Assume.assumeFalse("Skipping this since COMMIT_TXN events are ignored on Apache " + + "Hive 2/3. So the validWriteIds list is not updated correctly.", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); EventExecutorService eventExecutorService = createEventExecutorService(2, 2); transactionTest(true); shutDownEventExecutorService(eventExecutorService); diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index a409a6630..18c9f9d78 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -2859,6 +2859,8 @@ public class MetastoreEventsProcessorTest { @Test public void testCommitEvent() throws TException, ImpalaException, IOException { + Assume.assumeFalse("Skipping this since it depends on the behavior of CDP Hive 3", + TestUtils.isApacheHiveVersion()); // Turn on incremental refresh for transactional table final TBackendGflags origCfg = BackendConfig.INSTANCE.getBackendCfg(); try { @@ -2879,6 +2881,8 @@ public class MetastoreEventsProcessorTest { @Test public void testAbortEvent() throws TException, ImpalaException, IOException { + Assume.assumeFalse("COMMIT_TXN events are not processed on Apache Hive 2/3", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); // Turn on incremental refresh for transactional table final TBackendGflags origCfg = BackendConfig.INSTANCE.getBackendCfg(); try { @@ -3632,6 +3636,8 @@ public class MetastoreEventsProcessorTest { */ @Test public void testSkipFetchOpenTransactionEvent() throws Exception { + Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); long currentEventId = eventsProcessor_.getCurrentEventId(); try (MetaStoreClient client = catalog_.getMetaStoreClient()) { // 1. Fetch notification events after open and commit transaction @@ -3678,6 +3684,8 @@ public class MetastoreEventsProcessorTest { */ @Test public void testFetchEventsInBatchWithOpenTxnAsLastEvent() throws Exception { + Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); long currentEventId = eventsProcessor_.getCurrentEventId(); try (MetaStoreClient client = catalog_.getMetaStoreClient()) { long txnId = MetastoreShim.openTransaction(client.getHiveClient()); @@ -3694,6 +3702,8 @@ public class MetastoreEventsProcessorTest { @Test public void testNotFetchingUnwantedEvents() throws Exception { + Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); String tblName = "test_event_skip_list"; createDatabase(TEST_DB_NAME, null); Map<String, String> params = new HashMap<>(); @@ -3840,6 +3850,8 @@ public class MetastoreEventsProcessorTest { @Test public void testReloadEventOnLoadedTable() throws Exception { + Assume.assumeFalse("RELOAD event is not generated on Apache Hive 2/3", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); String tblName = "test_reload"; createDatabase(TEST_DB_NAME, null); eventsProcessor_.processEvents(); @@ -3885,6 +3897,9 @@ public class MetastoreEventsProcessorTest { @Test public void testCommitCompactionEventOnLoadedTable() throws Exception { + Assume.assumeFalse("Skipping this since COMMIT_TXN event is not supported on " + + "Apache Hive 2/3", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); String tblName = "test_commit_compaction"; createDatabase(TEST_DB_NAME, null); eventsProcessor_.processEvents(); @@ -3984,6 +3999,8 @@ public class MetastoreEventsProcessorTest { */ @Test public void testEmptyPartitionValues() throws Exception { + Assume.assumeFalse("RELOAD event is not generated on Apache Hive 2/3", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); String prevFlag = BackendConfig.INSTANCE.debugActions(); try { String tblName = "test_empty"; @@ -4080,6 +4097,8 @@ public class MetastoreEventsProcessorTest { public void testAllocWriteIdEvent(String tblName, boolean isPartitioned, boolean isLoadTable) throws TException, TransactionException, CatalogException { + Assume.assumeFalse("COMMIT_TXN events are not processed on Apache Hive 2/3", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); createDatabase(TEST_DB_NAME, null); eventsProcessor_.processEvents(); createTransactionalTable(TEST_DB_NAME, tblName, isPartitioned); @@ -4122,6 +4141,8 @@ public class MetastoreEventsProcessorTest { @Test public void testNotificationEventRequest() throws Exception { + Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3", + TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3); long currentEventId = eventsProcessor_.getCurrentEventId(); // Generate some DB only related events createDatabaseFromImpala(TEST_DB_NAME, null); @@ -4239,6 +4260,8 @@ public class MetastoreEventsProcessorTest { @Test public void testCommitTxnEventTargetName() throws Exception { + Assume.assumeFalse("Skipping this since it depends on the behavior of CDP Hive 3", + TestUtils.isApacheHiveVersion()); String tblName = "test_commit_txn"; String partTblName = "test_commit_txn_part"; String insertNonPartTbl = diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java index 665a1b97d..b5036b37a 100644 --- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java +++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java @@ -447,6 +447,13 @@ public class TestUtils { return Integer.parseInt(hiveMajorVersion); } + /** + * Returns whether we are using Apache Hive versions. + */ + public static boolean isApacheHiveVersion() { + return Boolean.parseBoolean(System.getenv("USE_APACHE_HIVE")); + } + /** * Gets checks if the catalog server running on the given host and port has * catalog-v2 enabled
