This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 784971c018c2dc44c53d7c0f366ad49cd8681ac6
Author: Venu Reddy <[email protected]>
AuthorDate: Thu Feb 29 03:53:14 2024 +0530

    IMPALA-12851: Fix AllocWriteIdEvent process issue to add 
txnId-tableWriteIds mapping
    
    During AllocWriteIdEvent process, txnId to tableWriteIds mapping is
    not added to catalog in the following cases:
    1. When CREATE_TABLE event is followed by ALLOC_WRITE_ID_EVENT for
    the table in the same batch of MetastoreEventsProcessor.processEvents(),
    process AllocWriteIdEvent cannot find catalog table since
    CREATE_TABLE is not processed by the time of AllocWriteIdEvent
    object construction.
    2. When catalog table is present. But it is not loaded.
    
    This patch fixes:
    1. Removes the usage of get table from catalog in all the event
    constructors. Currently, AllocWriteIdEvent, ReloadEvent,
    CommitCompactionEvent get the catalog table in their constructors.
    2. Adds the txnId to tableWriteIds mapping in catalog even when
    table is not loaded. And ensures the write ids are not added to
    table if it is a non-partitioned table.
    3. Also fixed a bug in TableWriteId's hashCode() implementation that
    is breaking hashcode contract. Two same TableWriteId of different
    instances produce different hashcode though they are equal.
    4. Fixed CatalogHmsSyncToLatestEventIdTest.cleanUp() issue.
    flagInvalidateCache and flagSyncToLatestEventId are incorrectly set
    in cleanUp.
    
    Testing:
    - Added tests in MetastoreEventsProcessorTest
    
    Change-Id: I8b1a918befd4ee694880fd4e3cc04cb55b64955f
    Reviewed-on: http://gerrit.cloudera.org:8080/21087
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/CatalogServiceCatalog.java      |   8 +
 .../org/apache/impala/catalog/TableWriteId.java    |   2 +-
 .../impala/catalog/events/MetastoreEvents.java     |  57 +++----
 .../events/MetastoreEventsProcessorTest.java       | 168 +++++++++++++++++++++
 .../CatalogHmsSyncToLatestEventIdTest.java         |   5 +-
 5 files changed, 198 insertions(+), 42 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 88753aa14..163f84169 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3934,6 +3934,14 @@ public class CatalogServiceCatalog extends Catalog {
           dbName, tblName, eventId);
       return;
     }
+    if (tbl.getNumClusteringCols() == 0) {
+      // For non-partitioned tables, we just reload the whole table without 
keeping
+      // track of write ids.
+      LOG.debug("Not adding write ids to table {}.{} for event {} since it is "
+              + "a non-partitioned table",
+          dbName, tblName, eventId);
+      return;
+    }
     if (!tryWriteLock(tbl)) {
       throw new CatalogException(String.format(
           "Error locking table %s for event %d", tbl.getFullName(), eventId));
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java 
b/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java
index ce269e444..10b20d0cc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java
@@ -66,6 +66,6 @@ public class TableWriteId {
   }
 
   public int hashCode() {
-    return java.util.Objects.hash(super.hashCode(), createEventId_, writeId_);
+    return java.util.Objects.hash(dbName_, tblName_, createEventId_, writeId_);
   }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index cf5650e85..96bed7e1c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -2770,7 +2770,6 @@ public class MetastoreEvents {
    */
   public static class AllocWriteIdEvent extends MetastoreTableEvent {
     private final List<TxnToWriteId> txnToWriteIdList_;
-    private org.apache.impala.catalog.Table tbl_;
 
     private AllocWriteIdEvent(CatalogOpExecutor catalogOpExecutor,
         Metrics metrics, NotificationEvent event) throws 
MetastoreNotificationException {
@@ -2782,34 +2781,13 @@ public class MetastoreEvents {
           
MetastoreEventsProcessor.getMessageDeserializer().getAllocWriteIdMessage(
               event.getMessage());
       txnToWriteIdList_ = allocWriteIdMessage.getTxnToWriteIdList();
-      try {
-        // We need to retrieve msTbl_ from catalog because the 
AllocWriteIdEvent
-        // doesn't bring the table object. However, we need msTbl_ for
-        // MetastoreTableEvent.isEventProcessingDisabled() to determine if 
event
-        // processing is disabled for the table.
-        tbl_ = catalog_.getTable(dbName_, tblName_);
-        if (tbl_ != null && tbl_.getCreateEventId() < getEventId()) {
-          msTbl_ = tbl_.getMetaStoreTable();
-        }
-      } catch (DatabaseNotFoundException e) {
-        // do nothing
-      } catch (Exception e) {
-        throw new MetastoreNotificationException(debugString("Unable to 
retrieve table "
-            + "object for AllocWriteIdEvent: {}", getEventId()), e);
-      }
     }
 
     @Override
     protected void processTableEvent() throws MetastoreNotificationException {
-      if (msTbl_ == null) {
-        debugLog("Ignoring the event since table {} does not exist or is 
unloaded",
-            getFullyQualifiedTblName());
-        return;
-      }
-      // For non-partitioned tables, we can just reload the whole table without
-      // keeping track of write ids.
-      if (msTbl_.getPartitionKeysSize() == 0) {
-        debugLog("Ignoring the event since table {} is non-partitioned",
+      org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, 
tblName_);
+      if (tbl == null) {
+        debugLog("Ignoring the event since table {} does not exist",
             getFullyQualifiedTblName());
         return;
       }
@@ -2820,8 +2798,8 @@ public class MetastoreEvents {
         catalog_.addWriteIdsToTable(dbName_, tblName_, getEventId(), writeIds,
             MutableValidWriteIdList.WriteIdStatus.OPEN);
         for (TxnToWriteId txnToWriteId : txnToWriteIdList_) {
-          TableWriteId tableWriteId = new TableWriteId(dbName_, tblName_,
-              tbl_.getCreateEventId(), txnToWriteId.getWriteId());
+          TableWriteId tableWriteId = new TableWriteId(
+              dbName_, tblName_, tbl.getCreateEventId(), 
txnToWriteId.getWriteId());
           catalog_.addWriteId(txnToWriteId.getTxnId(), tableWriteId);
           infoLog("Added write id {} on table {}.{} for txn {}",
               txnToWriteId.getWriteId(), dbName_, tblName_, 
txnToWriteId.getTxnId());
@@ -2841,6 +2819,13 @@ public class MetastoreEvents {
 
     @Override
     protected boolean isEventProcessingDisabled() {
+      // TODO:  Have an init method to set fields that cannot be initialized 
in the
+      // event constructors and invoke it as a first step before processing 
event. It
+      // can be useful for other such events.
+      org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, 
tblName_);
+      if (tbl != null && tbl.getCreateEventId() < getEventId()) {
+        msTbl_ = tbl.getMetaStoreTable();
+      }
       if (msTbl_ == null) {
         return false;
       }
@@ -2861,8 +2846,6 @@ public class MetastoreEvents {
     // if isRefresh_ is set to true then it is refresh query, else it is 
invalidate query
     private boolean isRefresh_;
 
-    private org.apache.impala.catalog.Table tbl_;
-
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory 
instead
      */
@@ -2878,7 +2861,6 @@ public class MetastoreEvents {
             updatedFields.get("table"));
         reloadPartition_ = (Partition)updatedFields.get("partition");
         isRefresh_ = (boolean)updatedFields.get("isRefresh");
-        tbl_ = catalog_.getTable(dbName_, tblName_);
       } catch (Exception e) {
         throw new MetastoreNotificationException(debugString("Unable to "
                 + "parse reload message"), e);
@@ -2913,11 +2895,10 @@ public class MetastoreEvents {
     }
 
     private boolean isOlderEvent() {
-      if (tbl_ == null || tbl_ instanceof IncompleteTable) {
-        return false;
-      }
+      org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, 
tblName_);
+      if (tbl == null || tbl instanceof IncompleteTable) { return false; }
       // Always check the lastRefreshEventId on the table first for table 
level refresh
-      if (tbl_.getLastRefreshEventId() >= getEventId()
+      if (tbl.getLastRefreshEventId() >= getEventId()
           || (reloadPartition_ != null
                  && catalog_.isPartitionLoadedAfterEvent(
                         dbName_, tblName_, reloadPartition_, getEventId()))) {
@@ -3074,10 +3055,6 @@ public class MetastoreEvents {
       try {
         partitionName_ =
             MetastoreShim.getPartitionNameFromCommitCompactionEvent(event);
-        org.apache.impala.catalog.Table tbl = catalog_.getTable(dbName_, 
tblName_);
-        if (tbl != null && tbl.getCreateEventId() < getEventId()) {
-          msTbl_ = tbl.getMetaStoreTable();
-        }
       } catch (Exception ex) {
         warnLog("Unable to parse commit compaction message: {}", 
ex.getMessage());
       }
@@ -3108,6 +3085,10 @@ public class MetastoreEvents {
 
     @Override
     protected boolean isEventProcessingDisabled() {
+      org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, 
tblName_);
+      if (tbl != null && tbl.getCreateEventId() < getEventId()) {
+        msTbl_ = tbl.getMetaStoreTable();
+      }
       if (msTbl_ == null) {
         return false;
       }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index ddae5a95e..386d9f9f2 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -86,6 +86,7 @@ import 
org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.MetastoreApiTestUtils;
 import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.TableWriteId;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
 import org.apache.impala.catalog.events.MetastoreEvents.AlterDatabaseEvent;
@@ -112,6 +113,7 @@ import 
org.apache.impala.hive.executor.TestHiveJavaFunctionFactory;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.HiveJdbcClientPool;
 import org.apache.impala.testutil.IncompetentMetastoreClientPool;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TAlterDbParams;
@@ -3605,6 +3607,172 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  @Test
+  public void testAllocWriteIdEventForPartTableWithoutLoad() throws Exception {
+    String tblName = "test_alloc_writeid_part_table";
+    testAllocWriteIdEvent(tblName, true, false);
+  }
+
+  @Test
+  public void testAllocWriteIdEventForNonPartTableWithoutLoad() throws 
Exception {
+    String tblName = "test_alloc_writeid_table";
+    testAllocWriteIdEvent(tblName, false, false);
+  }
+
+  @Test
+  public void testAllocWriteIdEventForPartTable() throws Exception {
+    String tblName = "test_alloc_writeid_part_table_load";
+    testAllocWriteIdEvent(tblName, true, true);
+  }
+
+  @Test
+  public void testAllocWriteIdEventForNonPartTable() throws Exception {
+    String tblName = "test_alloc_writeid_table_load";
+    testAllocWriteIdEvent(tblName, false, true);
+  }
+
+  @Test
+  public void testReloadEventOnLoadedTable() throws Exception {
+    String tblName = "test_reload";
+    createDatabase(TEST_DB_NAME, null);
+    eventsProcessor_.processEvents();
+    createTable(tblName, false);
+    // Fire a reload event
+    MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(), true, 
null,
+        TEST_DB_NAME, tblName, Collections.emptyMap());
+    // Fetch all the events
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    List<MetastoreEvent> filteredEvents =
+        eventsProcessor_.getEventsFactory().getFilteredEvents(
+            events, eventsProcessor_.getMetrics());
+    assertTrue(filteredEvents.size() == 2);
+    // Process create table event and load table
+    MetastoreEvent event = filteredEvents.get(0);
+    assertEquals(MetastoreEventType.CREATE_TABLE, event.getEventType());
+    event.processIfEnabled();
+    loadTable(tblName);
+    // Process reload event twice and check if table is refreshed on first 
reload event
+    // and skipped for second event.
+    event = filteredEvents.get(1);
+    assertEquals(MetastoreEventType.RELOAD, event.getEventType());
+    // First reload should refresh table
+    long refreshCount =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES)
+            .getCount();
+    event.processIfEnabled();
+    assertEquals(refreshCount + 1,
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES)
+            .getCount());
+    // Second reload should be skipped
+    long skipCount = eventsProcessor_.getMetrics()
+                         
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                         .getCount();
+    event.processIfEnabled();
+    assertEquals(skipCount + 1,
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+            .getCount());
+  }
+
+  @Test
+  public void testCommitCompactionEventOnLoadedTable() throws Exception {
+    String tblName = "test_commit_compaction";
+    createDatabase(TEST_DB_NAME, null);
+    eventsProcessor_.processEvents();
+    createTransactionalTable(TEST_DB_NAME, tblName, false);
+    insertIntoTable(TEST_DB_NAME, tblName);
+    insertIntoTable(TEST_DB_NAME, tblName);
+    alterTableAddParameter(
+        tblName, MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(), 
"true");
+
+    // Run hive query to trigger compaction
+    try (HiveJdbcClientPool jdbcClientPool = HiveJdbcClientPool.create(1);
+         HiveJdbcClientPool.HiveJdbcClient hiveClient = 
jdbcClientPool.getClient()) {
+      hiveClient.executeSql(
+          "alter table " + TEST_DB_NAME + '.' + tblName + " compact 'minor' 
and wait");
+    }
+    // Fetch all the events
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    List<MetastoreEvent> filteredEvents =
+        eventsProcessor_.getEventsFactory().getFilteredEvents(
+            events, eventsProcessor_.getMetrics());
+    assertTrue(filteredEvents.size() > 1);
+    // Process create table event and load table
+    MetastoreEvent event = filteredEvents.get(0);
+    assertEquals(MetastoreEventType.CREATE_TABLE, event.getEventType());
+    event.processIfEnabled();
+    loadTable(tblName);
+    // Process commit compaction event should skip the event because
+    // DISABLE_EVENT_HMS_SYNC is set to true
+    event = filteredEvents.get(filteredEvents.size() - 1);
+    assertEquals(MetastoreEventType.COMMIT_COMPACTION, event.getEventType());
+    long skipCount = eventsProcessor_.getMetrics()
+                         
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                         .getCount();
+    event.processIfEnabled();
+    assertEquals(skipCount + 1,
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+            .getCount());
+  }
+
+  private void insertIntoTable(String dbName, String tableName) throws 
Exception {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      org.apache.hadoop.hive.metastore.api.Table msTable =
+          client.getHiveClient().getTable(dbName, tableName);
+      long txnId = MetastoreShim.openTransaction(client.getHiveClient());
+      long writeId = MetastoreShim.allocateTableWriteId(
+          client.getHiveClient(), txnId, dbName, tableName);
+      simulateInsertIntoTransactionalTableFromFS(msTable, null, 1, txnId, 
writeId);
+      MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+    }
+  }
+
+  public void testAllocWriteIdEvent(String tblName, boolean isPartitioned,
+      boolean isLoadTable) throws TException, TransactionException, 
CatalogException {
+    createDatabase(TEST_DB_NAME, null);
+    eventsProcessor_.processEvents();
+    createTransactionalTable(TEST_DB_NAME, tblName, isPartitioned);
+    if (isLoadTable) {
+      eventsProcessor_.processEvents();
+      // Load table
+      loadTable(tblName);
+    }
+    long txnId;
+    long writeId;
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      txnId = MetastoreShim.openTransaction(client.getHiveClient());
+      writeId = MetastoreShim.allocateTableWriteId(
+          client.getHiveClient(), txnId, TEST_DB_NAME, tblName);
+      eventsProcessor_.processEvents();
+      // Abort transaction and do not process event
+      MetastoreShim.abortTransaction(client.getHiveClient(), txnId);
+    }
+    Table table = catalog_.getTableNoThrow(TEST_DB_NAME, tblName);
+    assertNotNull("Table is not present in catalog", table);
+    // Check whether txnId to write id mapping is present
+    Set<TableWriteId> writeIds = catalog_.getWriteIds(txnId);
+    assertEquals(1, writeIds.size());
+    assertTrue(writeIds.contains(
+        new TableWriteId(TEST_DB_NAME, tblName, table.getCreateEventId(), 
writeId)));
+    if (isLoadTable) {
+      assertTrue(table instanceof HdfsTable);
+      // For loaded partitioned table, write id is added on alloc write id 
event process
+      // Whereas, for non-partitioned table, write id is not added on alloc 
write id
+      // event process
+      if (isPartitioned) {
+        assertEquals(writeId, table.getValidWriteIds().getHighWatermark());
+      } else {
+        assertNotEquals(writeId, table.getValidWriteIds().getHighWatermark());
+      }
+    } else {
+      assertTrue(table instanceof IncompleteTable);
+      assertNull(table.getValidWriteIds());
+    }
+  }
+
   private void createDatabase(String catName, String dbName,
       Map<String, String> params) throws TException {
     try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java
index 761ba560d..762c6fe69 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java
@@ -129,9 +129,8 @@ public class CatalogHmsSyncToLatestEventIdTest extends 
AbstractCatalogMetastoreT
     public static void cleanUp() throws Exception {
         // in cleanup, set flag's values to previous value
         
BackendConfig.INSTANCE.setEnableCatalogdHMSCache(flagEnableCatalogCache);
-        
BackendConfig.INSTANCE.setEnableSyncToLatestEventOnDdls(flagInvalidateCache);
-        BackendConfig.INSTANCE.setInvalidateCatalogdHMSCacheOnDDLs(
-            flagSyncToLatestEventId);
+        
BackendConfig.INSTANCE.setEnableSyncToLatestEventOnDdls(flagSyncToLatestEventId);
+        
BackendConfig.INSTANCE.setInvalidateCatalogdHMSCacheOnDDLs(flagInvalidateCache);
         if (eventsProcessor_ != null) {
             eventsProcessor_.shutdown();
         }

Reply via email to