This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 784971c018c2dc44c53d7c0f366ad49cd8681ac6 Author: Venu Reddy <[email protected]> AuthorDate: Thu Feb 29 03:53:14 2024 +0530 IMPALA-12851: Fix AllocWriteIdEvent process issue to add txnId-tableWriteIds mapping During AllocWriteIdEvent process, txnId to tableWriteIds mapping is not added to catalog in the following cases: 1. When CREATE_TABLE event is followed by ALLOC_WRITE_ID_EVENT for the table in the same batch of MetastoreEventsProcessor.processEvents(), process AllocWriteIdEvent cannot find catalog table since CREATE_TABLE is not processed by the time of AllocWriteIdEvent object construction. 2. When catalog table is present. But it is not loaded. This patch fixes: 1. Removes the usage of get table from catalog in all the event constructors. Currently, AllocWriteIdEvent, ReloadEvent, CommitCompactionEvent get the catalog table in their constructors. 2. Adds the txnId to tableWriteIds mapping in catalog even when table is not loaded. And ensures the write ids are not added to table if it is a non-partitioned table. 3. Also fixed a bug in TableWriteId's hashCode() implementation that is breaking hashcode contract. Two same TableWriteId of different instances produce different hashcode though they are equal. 4. Fixed CatalogHmsSyncToLatestEventIdTest.cleanUp() issue. flagInvalidateCache and flagSyncToLatestEventId are incorrectly set in cleanUp. Testing: - Added tests in MetastoreEventsProcessorTest Change-Id: I8b1a918befd4ee694880fd4e3cc04cb55b64955f Reviewed-on: http://gerrit.cloudera.org:8080/21087 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/CatalogServiceCatalog.java | 8 + .../org/apache/impala/catalog/TableWriteId.java | 2 +- .../impala/catalog/events/MetastoreEvents.java | 57 +++---- .../events/MetastoreEventsProcessorTest.java | 168 +++++++++++++++++++++ .../CatalogHmsSyncToLatestEventIdTest.java | 5 +- 5 files changed, 198 insertions(+), 42 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 88753aa14..163f84169 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -3934,6 +3934,14 @@ public class CatalogServiceCatalog extends Catalog { dbName, tblName, eventId); return; } + if (tbl.getNumClusteringCols() == 0) { + // For non-partitioned tables, we just reload the whole table without keeping + // track of write ids. + LOG.debug("Not adding write ids to table {}.{} for event {} since it is " + + "a non-partitioned table", + dbName, tblName, eventId); + return; + } if (!tryWriteLock(tbl)) { throw new CatalogException(String.format( "Error locking table %s for event %d", tbl.getFullName(), eventId)); diff --git a/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java b/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java index ce269e444..10b20d0cc 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java +++ b/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java @@ -66,6 +66,6 @@ public class TableWriteId { } public int hashCode() { - return java.util.Objects.hash(super.hashCode(), createEventId_, writeId_); + return java.util.Objects.hash(dbName_, tblName_, createEventId_, writeId_); } } diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index cf5650e85..96bed7e1c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -2770,7 +2770,6 @@ public class MetastoreEvents { */ public static class AllocWriteIdEvent extends MetastoreTableEvent { private final List<TxnToWriteId> txnToWriteIdList_; - private org.apache.impala.catalog.Table tbl_; private AllocWriteIdEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics, NotificationEvent event) throws MetastoreNotificationException { @@ -2782,34 +2781,13 @@ public class MetastoreEvents { MetastoreEventsProcessor.getMessageDeserializer().getAllocWriteIdMessage( event.getMessage()); txnToWriteIdList_ = allocWriteIdMessage.getTxnToWriteIdList(); - try { - // We need to retrieve msTbl_ from catalog because the AllocWriteIdEvent - // doesn't bring the table object. However, we need msTbl_ for - // MetastoreTableEvent.isEventProcessingDisabled() to determine if event - // processing is disabled for the table. - tbl_ = catalog_.getTable(dbName_, tblName_); - if (tbl_ != null && tbl_.getCreateEventId() < getEventId()) { - msTbl_ = tbl_.getMetaStoreTable(); - } - } catch (DatabaseNotFoundException e) { - // do nothing - } catch (Exception e) { - throw new MetastoreNotificationException(debugString("Unable to retrieve table " - + "object for AllocWriteIdEvent: {}", getEventId()), e); - } } @Override protected void processTableEvent() throws MetastoreNotificationException { - if (msTbl_ == null) { - debugLog("Ignoring the event since table {} does not exist or is unloaded", - getFullyQualifiedTblName()); - return; - } - // For non-partitioned tables, we can just reload the whole table without - // keeping track of write ids. - if (msTbl_.getPartitionKeysSize() == 0) { - debugLog("Ignoring the event since table {} is non-partitioned", + org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, tblName_); + if (tbl == null) { + debugLog("Ignoring the event since table {} does not exist", getFullyQualifiedTblName()); return; } @@ -2820,8 +2798,8 @@ public class MetastoreEvents { catalog_.addWriteIdsToTable(dbName_, tblName_, getEventId(), writeIds, MutableValidWriteIdList.WriteIdStatus.OPEN); for (TxnToWriteId txnToWriteId : txnToWriteIdList_) { - TableWriteId tableWriteId = new TableWriteId(dbName_, tblName_, - tbl_.getCreateEventId(), txnToWriteId.getWriteId()); + TableWriteId tableWriteId = new TableWriteId( + dbName_, tblName_, tbl.getCreateEventId(), txnToWriteId.getWriteId()); catalog_.addWriteId(txnToWriteId.getTxnId(), tableWriteId); infoLog("Added write id {} on table {}.{} for txn {}", txnToWriteId.getWriteId(), dbName_, tblName_, txnToWriteId.getTxnId()); @@ -2841,6 +2819,13 @@ public class MetastoreEvents { @Override protected boolean isEventProcessingDisabled() { + // TODO: Have an init method to set fields that cannot be initialized in the + // event constructors and invoke it as a first step before processing event. It + // can be useful for other such events. + org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, tblName_); + if (tbl != null && tbl.getCreateEventId() < getEventId()) { + msTbl_ = tbl.getMetaStoreTable(); + } if (msTbl_ == null) { return false; } @@ -2861,8 +2846,6 @@ public class MetastoreEvents { // if isRefresh_ is set to true then it is refresh query, else it is invalidate query private boolean isRefresh_; - private org.apache.impala.catalog.Table tbl_; - /** * Prevent instantiation from outside should use MetastoreEventFactory instead */ @@ -2878,7 +2861,6 @@ public class MetastoreEvents { updatedFields.get("table")); reloadPartition_ = (Partition)updatedFields.get("partition"); isRefresh_ = (boolean)updatedFields.get("isRefresh"); - tbl_ = catalog_.getTable(dbName_, tblName_); } catch (Exception e) { throw new MetastoreNotificationException(debugString("Unable to " + "parse reload message"), e); @@ -2913,11 +2895,10 @@ public class MetastoreEvents { } private boolean isOlderEvent() { - if (tbl_ == null || tbl_ instanceof IncompleteTable) { - return false; - } + org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, tblName_); + if (tbl == null || tbl instanceof IncompleteTable) { return false; } // Always check the lastRefreshEventId on the table first for table level refresh - if (tbl_.getLastRefreshEventId() >= getEventId() + if (tbl.getLastRefreshEventId() >= getEventId() || (reloadPartition_ != null && catalog_.isPartitionLoadedAfterEvent( dbName_, tblName_, reloadPartition_, getEventId()))) { @@ -3074,10 +3055,6 @@ public class MetastoreEvents { try { partitionName_ = MetastoreShim.getPartitionNameFromCommitCompactionEvent(event); - org.apache.impala.catalog.Table tbl = catalog_.getTable(dbName_, tblName_); - if (tbl != null && tbl.getCreateEventId() < getEventId()) { - msTbl_ = tbl.getMetaStoreTable(); - } } catch (Exception ex) { warnLog("Unable to parse commit compaction message: {}", ex.getMessage()); } @@ -3108,6 +3085,10 @@ public class MetastoreEvents { @Override protected boolean isEventProcessingDisabled() { + org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, tblName_); + if (tbl != null && tbl.getCreateEventId() < getEventId()) { + msTbl_ = tbl.getMetaStoreTable(); + } if (msTbl_ == null) { return false; } diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index ddae5a95e..386d9f9f2 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -86,6 +86,7 @@ import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.MetastoreApiTestUtils; import org.apache.impala.catalog.ScalarFunction; import org.apache.impala.catalog.Table; +import org.apache.impala.catalog.TableWriteId; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.events.ConfigValidator.ValidationResult; import org.apache.impala.catalog.events.MetastoreEvents.AlterDatabaseEvent; @@ -112,6 +113,7 @@ import org.apache.impala.hive.executor.TestHiveJavaFunctionFactory; import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.service.FeSupport; import org.apache.impala.testutil.CatalogServiceTestCatalog; +import org.apache.impala.testutil.HiveJdbcClientPool; import org.apache.impala.testutil.IncompetentMetastoreClientPool; import org.apache.impala.testutil.TestUtils; import org.apache.impala.thrift.TAlterDbParams; @@ -3605,6 +3607,172 @@ public class MetastoreEventsProcessorTest { } } + @Test + public void testAllocWriteIdEventForPartTableWithoutLoad() throws Exception { + String tblName = "test_alloc_writeid_part_table"; + testAllocWriteIdEvent(tblName, true, false); + } + + @Test + public void testAllocWriteIdEventForNonPartTableWithoutLoad() throws Exception { + String tblName = "test_alloc_writeid_table"; + testAllocWriteIdEvent(tblName, false, false); + } + + @Test + public void testAllocWriteIdEventForPartTable() throws Exception { + String tblName = "test_alloc_writeid_part_table_load"; + testAllocWriteIdEvent(tblName, true, true); + } + + @Test + public void testAllocWriteIdEventForNonPartTable() throws Exception { + String tblName = "test_alloc_writeid_table_load"; + testAllocWriteIdEvent(tblName, false, true); + } + + @Test + public void testReloadEventOnLoadedTable() throws Exception { + String tblName = "test_reload"; + createDatabase(TEST_DB_NAME, null); + eventsProcessor_.processEvents(); + createTable(tblName, false); + // Fire a reload event + MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(), true, null, + TEST_DB_NAME, tblName, Collections.emptyMap()); + // Fetch all the events + List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); + List<MetastoreEvent> filteredEvents = + eventsProcessor_.getEventsFactory().getFilteredEvents( + events, eventsProcessor_.getMetrics()); + assertTrue(filteredEvents.size() == 2); + // Process create table event and load table + MetastoreEvent event = filteredEvents.get(0); + assertEquals(MetastoreEventType.CREATE_TABLE, event.getEventType()); + event.processIfEnabled(); + loadTable(tblName); + // Process reload event twice and check if table is refreshed on first reload event + // and skipped for second event. + event = filteredEvents.get(1); + assertEquals(MetastoreEventType.RELOAD, event.getEventType()); + // First reload should refresh table + long refreshCount = + eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES) + .getCount(); + event.processIfEnabled(); + assertEquals(refreshCount + 1, + eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES) + .getCount()); + // Second reload should be skipped + long skipCount = eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .getCount(); + event.processIfEnabled(); + assertEquals(skipCount + 1, + eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .getCount()); + } + + @Test + public void testCommitCompactionEventOnLoadedTable() throws Exception { + String tblName = "test_commit_compaction"; + createDatabase(TEST_DB_NAME, null); + eventsProcessor_.processEvents(); + createTransactionalTable(TEST_DB_NAME, tblName, false); + insertIntoTable(TEST_DB_NAME, tblName); + insertIntoTable(TEST_DB_NAME, tblName); + alterTableAddParameter( + tblName, MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(), "true"); + + // Run hive query to trigger compaction + try (HiveJdbcClientPool jdbcClientPool = HiveJdbcClientPool.create(1); + HiveJdbcClientPool.HiveJdbcClient hiveClient = jdbcClientPool.getClient()) { + hiveClient.executeSql( + "alter table " + TEST_DB_NAME + '.' + tblName + " compact 'minor' and wait"); + } + // Fetch all the events + List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); + List<MetastoreEvent> filteredEvents = + eventsProcessor_.getEventsFactory().getFilteredEvents( + events, eventsProcessor_.getMetrics()); + assertTrue(filteredEvents.size() > 1); + // Process create table event and load table + MetastoreEvent event = filteredEvents.get(0); + assertEquals(MetastoreEventType.CREATE_TABLE, event.getEventType()); + event.processIfEnabled(); + loadTable(tblName); + // Process commit compaction event should skip the event because + // DISABLE_EVENT_HMS_SYNC is set to true + event = filteredEvents.get(filteredEvents.size() - 1); + assertEquals(MetastoreEventType.COMMIT_COMPACTION, event.getEventType()); + long skipCount = eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .getCount(); + event.processIfEnabled(); + assertEquals(skipCount + 1, + eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .getCount()); + } + + private void insertIntoTable(String dbName, String tableName) throws Exception { + try (MetaStoreClient client = catalog_.getMetaStoreClient()) { + org.apache.hadoop.hive.metastore.api.Table msTable = + client.getHiveClient().getTable(dbName, tableName); + long txnId = MetastoreShim.openTransaction(client.getHiveClient()); + long writeId = MetastoreShim.allocateTableWriteId( + client.getHiveClient(), txnId, dbName, tableName); + simulateInsertIntoTransactionalTableFromFS(msTable, null, 1, txnId, writeId); + MetastoreShim.commitTransaction(client.getHiveClient(), txnId); + } + } + + public void testAllocWriteIdEvent(String tblName, boolean isPartitioned, + boolean isLoadTable) throws TException, TransactionException, CatalogException { + createDatabase(TEST_DB_NAME, null); + eventsProcessor_.processEvents(); + createTransactionalTable(TEST_DB_NAME, tblName, isPartitioned); + if (isLoadTable) { + eventsProcessor_.processEvents(); + // Load table + loadTable(tblName); + } + long txnId; + long writeId; + try (MetaStoreClient client = catalog_.getMetaStoreClient()) { + txnId = MetastoreShim.openTransaction(client.getHiveClient()); + writeId = MetastoreShim.allocateTableWriteId( + client.getHiveClient(), txnId, TEST_DB_NAME, tblName); + eventsProcessor_.processEvents(); + // Abort transaction and do not process event + MetastoreShim.abortTransaction(client.getHiveClient(), txnId); + } + Table table = catalog_.getTableNoThrow(TEST_DB_NAME, tblName); + assertNotNull("Table is not present in catalog", table); + // Check whether txnId to write id mapping is present + Set<TableWriteId> writeIds = catalog_.getWriteIds(txnId); + assertEquals(1, writeIds.size()); + assertTrue(writeIds.contains( + new TableWriteId(TEST_DB_NAME, tblName, table.getCreateEventId(), writeId))); + if (isLoadTable) { + assertTrue(table instanceof HdfsTable); + // For loaded partitioned table, write id is added on alloc write id event process + // Whereas, for non-partitioned table, write id is not added on alloc write id + // event process + if (isPartitioned) { + assertEquals(writeId, table.getValidWriteIds().getHighWatermark()); + } else { + assertNotEquals(writeId, table.getValidWriteIds().getHighWatermark()); + } + } else { + assertTrue(table instanceof IncompleteTable); + assertNull(table.getValidWriteIds()); + } + } + private void createDatabase(String catName, String dbName, Map<String, String> params) throws TException { try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) { diff --git a/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java index 761ba560d..762c6fe69 100644 --- a/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java @@ -129,9 +129,8 @@ public class CatalogHmsSyncToLatestEventIdTest extends AbstractCatalogMetastoreT public static void cleanUp() throws Exception { // in cleanup, set flag's values to previous value BackendConfig.INSTANCE.setEnableCatalogdHMSCache(flagEnableCatalogCache); - BackendConfig.INSTANCE.setEnableSyncToLatestEventOnDdls(flagInvalidateCache); - BackendConfig.INSTANCE.setInvalidateCatalogdHMSCacheOnDDLs( - flagSyncToLatestEventId); + BackendConfig.INSTANCE.setEnableSyncToLatestEventOnDdls(flagSyncToLatestEventId); + BackendConfig.INSTANCE.setInvalidateCatalogdHMSCacheOnDDLs(flagInvalidateCache); if (eventsProcessor_ != null) { eventsProcessor_.shutdown(); }
