This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eb649628a198704db05db637d145af0ce2bb6587
Author: stiga-huang <[email protected]>
AuthorDate: Fri Apr 18 15:18:47 2025 +0800

    IMPALA-13974: Don't check catName if fetching COMMIT_COMPACTION_EVENT or 
ALLOC_WRITE_ID_EVENT
    
    COMMIT_COMPACTION_EVENT is one kind of HMS notification events that are
    generated when a transactional table finishes a compaction.
    ALLOC_WRITE_ID_EVENT is another kind of transactional table events
    generated when a data modification, e.g. INSERT, starts.
    
    Due to the bug of HIVE-28912, the catName of them is always NULL. Note
    that catName is an optional field for all kinds of HMS events:
    struct NotificationEvent {
        1: required i64 eventId,
        2: required i32 eventTime,
        3: required string eventType,
        4: optional string dbName,
        5: optional string tableName,
        6: required string message,
        7: optional string messageFormat,
        8: optional string catName
    }
    It's OK to be NULL for some global events like OPEN_TXN, COMMIT_TXN,
    ABORT_TXN, etc. But for table level events, catName should be the
    catalog name of the table. Usually, it's the default catalog, 'hive'.
    
    When checking COMMIT_COMPACTION_EVENT and ALLOC_WRITE_ID_EVENT events on
    a table, we shouldn't set the filter on catName. Otherwise, they will
    all be skipped due to this bug.
    
    There are two filters we use in fetching HMS events. One is the server
    side filter we set in NotificationEventRequest. The other one is the
    client side filter, NotificationFilter, used in filtering events fetched
    from HMS.
    
    This patch removes the check on catName in these filters to avoid
    missing COMMIT_COMPACTION_EVENT and ALLOC_WRITE_ID_EVENT.
    
    MetastoreEventsProcessorTest.testNotificationEventRequest() has some
    wrong expected numbers due to this bug. This patch fixes them and
    refactors the code to be more readable.
    
    Tests:
     - Ran test_hms_event_sync_with_commit_compaction 20 times. Without the
       fix, it fails in 2-3 runs.
     - Ran MetastoreEventsProcessorTest.testNotificationEventRequest().
    
    Change-Id: I2e182d32ee2bb8a69c4f71c05eb9e87a5a115f24
    Reviewed-on: http://gerrit.cloudera.org:8080/22794
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/compat/MetastoreShim.java    | 10 ++++
 .../catalog/events/MetastoreEventsProcessor.java   |  4 +-
 .../events/MetastoreEventsProcessorTest.java       | 55 +++++++++++++---------
 3 files changed, 45 insertions(+), 24 deletions(-)

diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index ad6eab6e5..f259a573c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -609,6 +609,16 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     if (eventTypeSkipList != null) {
       eventRequest.setEventTypeSkipList(eventTypeSkipList);
     }
+    // Due to the bug of HIVE-28912, fetching COMMIT_COMPACTION_EVENT or
+    // ALLOC_WRITE_ID_EVENT shouldn't specify the catName.
+    // 'eventTypeSkipList' is null or doesn't contain a type means we want 
this type.
+    if (eventRequest.isSetCatName() && (eventTypeSkipList == null
+        || !eventTypeSkipList.contains(
+            MetastoreEventType.COMMIT_COMPACTION_EVENT.toString())
+        || !eventTypeSkipList.contains(
+            MetastoreEventType.ALLOC_WRITE_ID_EVENT.toString()))) {
+      eventRequest.unsetCatName();
+    }
     return msClient.getThriftClient().get_next_notification(eventRequest);
   }
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 313e91405..b24b741e7 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -1920,9 +1920,11 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
         MetaDataFilter.TABLE_EVENT_TYPES : 
MetaDataFilter.TABLE_EXIST_EVENT_TYPES;
     for (String dbName : db2Tables.keySet()) {
       Set<String> tblNames = new HashSet<>(db2Tables.get(dbName));
+      // Due to HIVE-28912 we don't check catName in the filter
       NotificationFilter filter = e -> dbName.equalsIgnoreCase(e.getDbName())
           && tblNames.contains(e.getTableName().toLowerCase())
-          && MetastoreShim.isDefaultCatalog(e.getCatName())
+          // Due to HIVE-28912 we don't check catName in the filter
+          // && MetastoreShim.isDefaultCatalog(e.getCatName())
           && eventTypes.contains(e.getEventType());
       MetaDataFilter metaDataFilter = new MetaDataFilter(filter,
           MetastoreShim.getDefaultCatalogName(), dbName, 
db2Tables.get(dbName));
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 47a200cc1..302f79a7f 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -4167,16 +4167,16 @@ public class MetastoreEventsProcessorTest {
   @Test
   public void testNotificationEventRequest() throws Exception {
     long currentEventId = eventsProcessor_.getCurrentEventId();
-    int EVENTS_BATCH_SIZE_PER_RPC = 1000;
     // Generate some DB only related events
     createDatabaseFromImpala(TEST_DB_NAME, null);
     String testDbParamKey = "testKey", testDbParamVal = "testVal";
     String testTable1 = "testNotifyTable1", testTable2 = "testNotifyTable2";
     addDatabaseParameters(testDbParamKey, testDbParamVal);
     eventsProcessor_.processEvents();
-    // Verify DB related events only.
 
     // Generate some table events for managed table
+    // Insert into a transactional table and abort the txn. This generates 
OPEN_TXN,
+    // ALLOC_WRITE_ID_EVENT and ABORT_TXN events.
     testInsertIntoTransactionalTable(testTable1, true, false);
     alterTableAddColsFromImpala(
         TEST_DB_NAME, testTable1, "newCol", TPrimitiveType.STRING);
@@ -4197,45 +4197,54 @@ public class MetastoreEventsProcessorTest {
         .equals(event.getEventType());
     MetaDataFilter metaDataFilter = new MetaDataFilter(filter,
         MetastoreShim.getDefaultCatalogName(), TEST_DB_NAME);
-    assertEquals(MetastoreEventsProcessor
-        .getNextMetastoreEventsWithFilterInBatches(catalog_, currentEventId,
-            metaDataFilter, EVENTS_BATCH_SIZE_PER_RPC).size(), 1);
+    // Get one CREATE_DATABASE event
+    assertEventCount(currentEventId, metaDataFilter, 1);
     Db db = catalog_.getDb(TEST_DB_NAME);
     metaDataFilter.setNotificationFilter(
         MetastoreEventsProcessor.getDbNotificationEventFilter(db));
-    
assertEquals(MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(
-        catalog_, currentEventId, metaDataFilter, 
EVENTS_BATCH_SIZE_PER_RPC).size(), 2);
+    // Get all db events: CREATE_DATABASE, ALTER_DATABASE
+    assertEventCount(currentEventId, metaDataFilter, 2);
 
     // verify events for table 1
-    filter = event -> "ALTER_TABLE".equals(event.getEventType());
+    filter = event -> AlterTableEvent.EVENT_TYPE.equals(event.getEventType());
     metaDataFilter = new MetaDataFilter(filter, 
MetastoreShim.getDefaultCatalogName(),
         TEST_DB_NAME, testTable1);
-    
assertEquals(MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(
-        catalog_, currentEventId, metaDataFilter, 
EVENTS_BATCH_SIZE_PER_RPC).size(), 2);
+    // Get 2 ALTER_TABLE events of adding and removing columns respectively.
+    assertEventCount(currentEventId, metaDataFilter, 2);
+    // Without filter, there are 4 events: CREATE_TABLE, ALLOC_WRITE_ID_EVENT,
+    // ALTER_TABLE, ALTER_TABLE.
     metaDataFilter.setNotificationFilter(null);
-    
assertEquals(MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(
-        catalog_, currentEventId, metaDataFilter, 
EVENTS_BATCH_SIZE_PER_RPC).size(), 3);
+    assertEventCount(currentEventId, metaDataFilter, 4);
 
     // verify events for table 2
-    filter = event -> "ALTER_TABLE".equals(event.getEventType());
+    filter = event -> AlterTableEvent.EVENT_TYPE.equals(event.getEventType());
     metaDataFilter = new MetaDataFilter(filter, 
MetastoreShim.getDefaultCatalogName(),
         TEST_DB_NAME, testTable2);
-    
assertEquals(MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(
-        catalog_, currentEventId, metaDataFilter, 
EVENTS_BATCH_SIZE_PER_RPC).size(), 2);
+    // Get 2 ALTER_TABLE events of setting owner and file format respectively.
+    assertEventCount(currentEventId, metaDataFilter, 2);
+    // Without the filter, get one more CREATE_TABLE event so it's 3 in total.
     metaDataFilter.setNotificationFilter(null);
-    
assertEquals(MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(
-        catalog_, currentEventId, metaDataFilter, 
EVENTS_BATCH_SIZE_PER_RPC).size(), 3);
+    assertEventCount(currentEventId, metaDataFilter, 3);
 
     // verify all events
-    filter = event -> "ALTER_TABLE".equals(event.getEventType());
+    filter = event -> AlterTableEvent.EVENT_TYPE.equals(event.getEventType());
     metaDataFilter = new MetaDataFilter(filter, 
MetastoreShim.getDefaultCatalogName(),
         TEST_DB_NAME);
-    
assertEquals(MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(
-        catalog_, currentEventId, metaDataFilter, 
EVENTS_BATCH_SIZE_PER_RPC).size(), 4);
+    // 4 ALTER_TABLE events in this db
+    assertEventCount(currentEventId, metaDataFilter, 4);
     metaDataFilter.setNotificationFilter(null);
-    // 2 DB + 6 table events
-    
assertEquals(MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(
-        catalog_, currentEventId, metaDataFilter, 
EVENTS_BATCH_SIZE_PER_RPC).size(), 8);
+    // 2 DB + 7 table events
+    // CREATE_DATABASE, ALTER_DATABASE, CREATE_TABLE, ALLOC_WRITE_ID_EVENT, 
ALTER_TABLE,
+    // ALTER_TABLE, CREATE_TABLE, ALTER_TABLE, ALTER_TABLE.
+    assertEventCount(currentEventId, metaDataFilter, 9);
+  }
+
+  private void assertEventCount(long currentEventId, MetaDataFilter 
metaDataFilter,
+      int expected) throws MetastoreNotificationFetchException {
+    List<NotificationEvent> events =
+        MetastoreEventsProcessor.getNextMetastoreEventsWithFilterInBatches(
+            catalog_, currentEventId, metaDataFilter, 1000);
+    assertEquals("Actual events: " + events, expected, events.size());
   }
 
   @Test

Reply via email to