This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 62b6081852b4851437f554aa68aa8321099568b0 Author: Venu Reddy <[email protected]> AuthorDate: Fri Sep 15 20:25:46 2023 +0530 IMPALA-12399: (addendum) Fixed possible deadloop and also do not set the event skip list in notification request to update latestEventId_ Following points are fixed: 1. Possible deadloop in MetastoreEventsProcessor#getNextMetastoreEventsInBatches() when OPEN_TXN is the last event on HMS at the moment the method is invoked. Need to return when events received in response is empty. 2. latestEventId_ reflects the latest event in HMS. Should not set the event skip list in notification request to update latestEventId_. 3. Update lastSyncedEventId_ when event list returned is empty due to event skip list in notification event. Testing: - Added tests and executed all existing tests. Change-Id: Idb4b8c3db23d39226f10b33cca4e6a1ab271b925 Reviewed-on: http://gerrit.cloudera.org:8080/20487 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/compat/MetastoreShim.java | 8 +- .../org/apache/impala/compat/MetastoreShim.java | 13 +++- .../catalog/events/MetastoreEventsProcessor.java | 87 ++++++++++++++------- .../events/MetastoreEventsProcessorTest.java | 88 +++++++++++++++++----- 4 files changed, 150 insertions(+), 46 deletions(-) diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 9fab5e384..cd6a97a81 100644 --- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -457,9 +457,15 @@ public class MetastoreShim extends Hive3MetastoreShimBase { * eventIds returned. * * @see org.apache.hadoop.hive.metastore.HiveMetaStoreClient#getNextNotification + * @param msClient Metastore client + * @param eventRequest Notification event request + * @param isSkipUnwantedEventTypes Whether to set skip event types in request + * @return NotificationEventResponse + * @throws TException */ public static NotificationEventResponse getNextNotification(IMetaStoreClient msClient, - NotificationEventRequest eventRequest) throws TException { + NotificationEventRequest eventRequest, boolean isSkipUnwantedEventTypes) + throws TException { return getThriftClient(msClient).get_next_notification(eventRequest); } diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index d36b46444..504e13a9c 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -583,10 +583,19 @@ public class MetastoreShim extends Hive3MetastoreShimBase { /** * Wrapper around IMetaStoreClient.getThriftClient().get_next_notification() to deal * with added arguments. + * + * @param msClient Metastore client + * @param eventRequest Notification event request + * @param isSkipUnwantedEventTypes Whether to set skip event types in request + * @return NotificationEventResponse + * @throws TException */ public static NotificationEventResponse getNextNotification(IMetaStoreClient msClient, - NotificationEventRequest eventRequest) throws TException { - eventRequest.setEventTypeSkipList(MetastoreEventsProcessor.getEventSkipList()); + NotificationEventRequest eventRequest, boolean isSkipUnwantedEventTypes) + throws TException { + if (isSkipUnwantedEventTypes) { + eventRequest.setEventTypeSkipList(MetastoreEventsProcessor.getEventSkipList()); + } return msClient.getThriftClient().get_next_notification(eventRequest); } diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java index 0e0e54c4f..87650d702 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java @@ -318,8 +318,12 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { NotificationEventRequest eventRequest = new NotificationEventRequest(); eventRequest.setMaxEvents(batchSize); eventRequest.setLastEvent(currentEventId); - NotificationEventResponse notificationEventResponse = MetastoreShim - .getNextNotification(msc.getHiveClient(), eventRequest); + NotificationEventResponse notificationEventResponse = + MetastoreShim.getNextNotification(msc.getHiveClient(), eventRequest, true); + if (notificationEventResponse.getEvents().isEmpty()) { + // Possible to receive empty list due to event skip list in request + return result; + } for (NotificationEvent event : notificationEventResponse.getEvents()) { // if no filter is provided we add all the events if (filter == null || filter.accept(event)) result.add(event); @@ -689,6 +693,15 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { return lastSyncedEventId_.get(); } + /** + * Returns the current value of latestEventId_. This method is not thread-safe and + * only to be used for testing purposes + */ + @VisibleForTesting + public long getLatestEventId() { + return latestEventId_.get(); + } + @VisibleForTesting void startScheduler() { Preconditions.checkState(pollingFrequencyInSec_ > 0); @@ -813,6 +826,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { * NotificationEvents are filtered using the NotificationFilter provided if it is not * null. * @param eventId The returned events are all after this given event id. + * @param currentEventId Current event id on metastore * @param getAllEvents If this is true all the events since eventId are returned. * Note that Hive MetaStore can limit the response to a specific * maximum number of limit based on the value of configuration @@ -826,22 +840,15 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { * @throws MetastoreNotificationFetchException In case of exceptions from HMS. */ public List<NotificationEvent> getNextMetastoreEvents(final long eventId, - final boolean getAllEvents, @Nullable final NotificationFilter filter) + final long currentEventId, final boolean getAllEvents, + @Nullable final NotificationFilter filter) throws MetastoreNotificationFetchException { + // no new events since we last polled + if (currentEventId <= eventId) { + return Collections.emptyList(); + } final Timer.Context context = metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).time(); try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { - // fetch the current notification event id. We assume that the polling interval - // is small enough that most of these polling operations result in zero new - // events. In such a case, fetching current notification event id is much faster - // (and cheaper on HMS side) instead of polling for events directly - CurrentNotificationEventId currentNotificationEventId = - msClient.getHiveClient().getCurrentNotificationEventId(); - long currentEventId = currentNotificationEventId.getEventId(); - - // no new events since we last polled - if (currentEventId <= eventId) { - return Collections.emptyList(); - } int batchSize = getAllEvents ? -1 : EVENTS_BATCH_SIZE_PER_RPC; // we use the thrift API directly instead of // HiveMetastoreClient#getNextNotification because the HMS client can throw an @@ -849,8 +856,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { NotificationEventRequest eventRequest = new NotificationEventRequest(); eventRequest.setLastEvent(eventId); eventRequest.setMaxEvents(batchSize); - NotificationEventResponse response = MetastoreShim - .getNextNotification(msClient.getHiveClient(), eventRequest); + NotificationEventResponse response = + MetastoreShim.getNextNotification(msClient.getHiveClient(), eventRequest, true); LOG.info(String.format("Received %d events. Start event id : %d", response.getEvents().size(), eventId)); if (filter == null) return response.getEvents(); @@ -874,8 +881,21 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { */ @VisibleForTesting protected List<NotificationEvent> getNextMetastoreEvents() + throws MetastoreNotificationFetchException, CatalogException { + return getNextMetastoreEvents(getCurrentEventId()); + } + + /** + * Fetch the next batch of NotificationEvents from metastore. The default batch size is + * <code>EVENTS_BATCH_SIZE_PER_RPC</code> + * @param currentEventId Current event id on metastore + * @return List of NotificationEvents from metastore since lastSyncedEventId + * @throws MetastoreNotificationFetchException + */ + @VisibleForTesting + public List<NotificationEvent> getNextMetastoreEvents(long currentEventId) throws MetastoreNotificationFetchException { - return getNextMetastoreEvents(lastSyncedEventId_.get(), false, null); + return getNextMetastoreEvents(lastSyncedEventId_.get(), currentEventId, false, null); } /** @@ -895,9 +915,13 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { currentStatus, lastSyncedEventId_.get())); return; } - - List<NotificationEvent> events = getNextMetastoreEvents(); - processEvents(events); + // fetch the current notification event id. We assume that the polling interval + // is small enough that most of these polling operations result in zero new + // events. In such a case, fetching current notification event id is much faster + // (and cheaper on HMS side) instead of polling for events directly + long currentEventId = getCurrentEventId(); + List<NotificationEvent> events = getNextMetastoreEvents(currentEventId); + processEvents(currentEventId, events); } catch (MetastoreNotificationFetchException ex) { // No need to change the EventProcessor state to error since we want the // EventProcessor to continue getting new events after HMS is back up. @@ -925,7 +949,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { /** * Update the latest event id regularly so we know how far we are lagging behind. */ - private void updateLatestEventId() { + @VisibleForTesting + public void updateLatestEventId() { EventProcessorStatus currentStatus = eventProcessorStatus_; if (currentStatus != EventProcessorStatus.ACTIVE) { return; @@ -942,8 +967,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { NotificationEventRequest eventRequest = new NotificationEventRequest(); eventRequest.setLastEvent(currentEventId - 1); eventRequest.setMaxEvents(1); - NotificationEventResponse response = MetastoreShim - .getNextNotification(msClient.getHiveClient(), eventRequest); + NotificationEventResponse response = MetastoreShim.getNextNotification( + msClient.getHiveClient(), eventRequest, false); Iterator<NotificationEvent> eventIter = response.getEventsIterator(); // Events could be empty if they are just cleaned up. if (!eventIter.hasNext()) return; @@ -1050,14 +1075,24 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { /** * Process the given list of notification events. Useful for tests which provide a list * of events + * @param currentEventId Current event id on metastore + * @param events List of NotificationEvents + * @throws MetastoreNotificationException */ @VisibleForTesting - protected void processEvents(List<NotificationEvent> events) + protected void processEvents(long currentEventId, List<NotificationEvent> events) throws MetastoreNotificationException { currentEvent_ = null; // update the events received metric before returning metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size()); - if (events.isEmpty()) return; + if (events.isEmpty()) { + if (lastSyncedEventId_.get() < currentEventId) { + // Possible to receive empty list due to event skip list in notification event + // request. Update the last synced event id with current event id on metastore + lastSyncedEventId_.set(currentEventId); + } + return; + } final Timer.Context context = metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time(); Map<MetastoreEvent, Long> eventProcessingTime = new HashMap<>(); diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index b1dd8b097..1ee5da6c5 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -1431,7 +1431,7 @@ public class MetastoreEventsProcessorTest { @Override public List<NotificationEvent> getNextMetastoreEvents() - throws MetastoreNotificationFetchException { + throws MetastoreNotificationFetchException, CatalogException { // Throw exception roughly half of the time Random rand = new Random(); if (rand.nextInt(10) % 2 == 0){ @@ -1513,14 +1513,16 @@ public class MetastoreEventsProcessorTest { assertNotNull("Table should have been found after create table statement", catalog_.getTable(TEST_DB_NAME, testTblName)); loadTable(testTblName); - List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); + long currentEventId = eventsProcessor_.getCurrentEventId(); + List<NotificationEvent> events = + eventsProcessor_.getNextMetastoreEvents(currentEventId); // the first create table event should not change anything to the catalogd's // created table assertEquals(3, events.size()); Table existingTable = catalog_.getTable(TEST_DB_NAME, testTblName); long id = MetastoreShim.getTableId(existingTable.getMetaStoreTable()); assertEquals("CREATE_TABLE", events.get(0).getEventType()); - eventsProcessor_.processEvents(Lists.newArrayList(events.get(0))); + eventsProcessor_.processEvents(currentEventId, Lists.newArrayList(events.get(0))); // after processing the create_table the original table should still remain the same long testId = MetastoreShim.getTableId(catalog_.getTable(TEST_DB_NAME, testTblName).getMetaStoreTable()); @@ -1531,7 +1533,7 @@ public class MetastoreEventsProcessorTest { long numFilteredEvents = eventsProcessor_.getMetrics() .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount(); - eventsProcessor_.processEvents(Lists.newArrayList(events.get(1))); + eventsProcessor_.processEvents(currentEventId, Lists.newArrayList(events.get(1))); // Verify that the drop_table event is skipped and the metric is incremented. assertEquals(numFilteredEvents + 1, eventsProcessor_.getMetrics() .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount()); @@ -1540,7 +1542,7 @@ public class MetastoreEventsProcessorTest { + "is stale", catalog_.getTable(TEST_DB_NAME, testTblName)); // the final create table event should also be ignored since its a self-event assertEquals("CREATE_TABLE", events.get(2).getEventType()); - eventsProcessor_.processEvents(Lists.newArrayList(events.get(2))); + eventsProcessor_.processEvents(currentEventId, Lists.newArrayList(events.get(2))); assertFalse( "Table should have been loaded since the create_table should be " + "ignored", catalog_.getTable(TEST_DB_NAME, @@ -1566,7 +1568,7 @@ public class MetastoreEventsProcessorTest { List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); assertEquals(2, events.size()); - eventsProcessor_.processEvents(events); + eventsProcessor_.processEvents(); assertNotNull(catalog_.getDb(TEST_DB_NAME)); assertNotNull(catalog_.getTable(TEST_DB_NAME, testTblName)); assertFalse("Table should have been loaded since it was already latest", catalog_ @@ -1577,7 +1579,7 @@ public class MetastoreEventsProcessorTest { events = eventsProcessor_.getNextMetastoreEvents(); // should have 1 drop_table event assertEquals(1, events.size()); - eventsProcessor_.processEvents(events); + eventsProcessor_.processEvents(); // dropping a non-existant table should cause event processor to go into error state assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus()); assertNull(catalog_.getTable(TEST_DB_NAME, testTblName)); @@ -1606,12 +1608,14 @@ public class MetastoreEventsProcessorTest { assertNull(catalog_.getDb(TEST_DB_NAME)); createDatabaseFromImpala(TEST_DB_NAME, "second"); assertNotNull(catalog_.getDb(TEST_DB_NAME)); - List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); + long currentEventId = eventsProcessor_.getCurrentEventId(); + List<NotificationEvent> events = + eventsProcessor_.getNextMetastoreEvents(currentEventId); // should have 3 events for create,drop and create database assertEquals(3, events.size()); assertEquals("CREATE_DATABASE", events.get(0).getEventType()); - eventsProcessor_.processEvents(Lists.newArrayList(events.get(0))); + eventsProcessor_.processEvents(currentEventId, Lists.newArrayList(events.get(0))); // create_database event should have no effect since catalogD has already a later // version of database with the same name. assertNotNull(catalog_.getDb(TEST_DB_NAME)); @@ -1620,7 +1624,7 @@ public class MetastoreEventsProcessorTest { // now process drop_database event assertEquals("DROP_DATABASE", events.get(1).getEventType()); - eventsProcessor_.processEvents(Lists.newArrayList(events.get(1))); + eventsProcessor_.processEvents(currentEventId, Lists.newArrayList(events.get(1))); // database should not be dropped since catalogD is at the latest state assertNotNull(catalog_.getDb(TEST_DB_NAME)); assertEquals("second", @@ -1628,7 +1632,7 @@ public class MetastoreEventsProcessorTest { // the third create_database event should have no effect too assertEquals("CREATE_DATABASE", events.get(2).getEventType()); - eventsProcessor_.processEvents(Lists.newArrayList(events.get(2))); + eventsProcessor_.processEvents(currentEventId, Lists.newArrayList(events.get(2))); assertNotNull(catalog_.getDb(TEST_DB_NAME)); assertEquals("second", catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getDescription()); @@ -2074,7 +2078,7 @@ public class MetastoreEventsProcessorTest { } private void cleanUpTblsForFlagTests(String dbName) - throws TException, MetastoreNotificationFetchException { + throws TException, MetastoreNotificationFetchException, CatalogException { if (catalog_.getDb(dbName) == null) return; dropDatabaseCascade(dbName); @@ -3265,15 +3269,65 @@ public class MetastoreEventsProcessorTest { */ @Test public void testSkipFetchOpenTransactionEvent() throws Exception { + long currentEventId = eventsProcessor_.getCurrentEventId(); try (MetaStoreClient client = catalog_.getMetaStoreClient()) { - // Make an empty transaction + // 1. Fetch notification events after open and commit transaction long txnId = MetastoreShim.openTransaction(client.getHiveClient()); + assertEquals(currentEventId + 1, eventsProcessor_.getCurrentEventId()); + // Verify latest event id + eventsProcessor_.updateLatestEventId(); + assertEquals(currentEventId + 1, eventsProcessor_.getLatestEventId()); + + MetastoreShim.commitTransaction(client.getHiveClient(), txnId); + assertEquals(currentEventId + 2, eventsProcessor_.getCurrentEventId()); + // Verify the latest event id again + eventsProcessor_.updateLatestEventId(); + assertEquals(currentEventId + 2, eventsProcessor_.getLatestEventId()); + // Commit transaction event alone is returned from metastore + List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); + assertEquals(1, events.size()); + assertEquals(MetastoreEventType.COMMIT_TXN, + MetastoreEventType.from(events.get(0).getEventType())); + // Verify last synced event id before and after processEvents + assertEquals(currentEventId, eventsProcessor_.getLastSyncedEventId()); + eventsProcessor_.processEvents(); + assertEquals(currentEventId + 2, eventsProcessor_.getLastSyncedEventId()); + + // 2. Fetch notification events right after open transaction + currentEventId = eventsProcessor_.getCurrentEventId(); + txnId = MetastoreShim.openTransaction(client.getHiveClient()); + assertEquals(currentEventId + 1, eventsProcessor_.getCurrentEventId()); + // Open transaction event is not returned from metastore + events = eventsProcessor_.getNextMetastoreEvents(); + assertEquals(0, events.size()); + // Verify last synced event id before and after processEvents + assertEquals(currentEventId, eventsProcessor_.getLastSyncedEventId()); + eventsProcessor_.processEvents(); + assertEquals(currentEventId + 1, eventsProcessor_.getLastSyncedEventId()); + MetastoreShim.commitTransaction(client.getHiveClient(), txnId); + assertEquals(currentEventId + 2, eventsProcessor_.getCurrentEventId()); + } + } + + /** + * Test fetching events in batch when last occurred event is open transaction + * @throws Exception + */ + @Test + public void testFetchEventsInBatchWithOpenTxnAsLastEvent() throws Exception { + long currentEventId = eventsProcessor_.getCurrentEventId(); + try (MetaStoreClient client = catalog_.getMetaStoreClient()) { + long txnId = MetastoreShim.openTransaction(client.getHiveClient()); + assertEquals(currentEventId + 1, eventsProcessor_.getCurrentEventId()); + List<NotificationEvent> events = + MetastoreEventsProcessor.getNextMetastoreEventsInBatches( + eventsProcessor_.catalog_, currentEventId, null); + // Open transaction event is not returned from metastore + assertEquals(0, events.size()); + MetastoreShim.commitTransaction(client.getHiveClient(), txnId); + assertEquals(currentEventId + 2, eventsProcessor_.getCurrentEventId()); } - List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents(); - assertEquals(1, events.size()); - assertEquals(MetastoreEventType.COMMIT_TXN, - MetastoreEventType.from(events.get(0).getEventType())); } private void createDatabase(String catName, String dbName,
