This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5add447a9c97c32cd0938c76813f1780c3308ef0 Author: stiga-huang <[email protected]> AuthorDate: Mon Nov 27 20:37:03 2023 +0800 IMPALA-12577: Correctly update last-synced-event-time There are two metrics tracking the progress of HMS event processing: last-synced-event-id and last-synced-event-time. We have several places that only update last-synced-event-id without updating last-synced-event-time which is used to calculate the lag of event processing. Users might set up alerts on the lag so we'd better keep it updated. Tests - Add code to verify the metrics in MetastoreEventsProcessorTest - Fix a potential NullPointer error in HiveJdbcClientPool.java - Ran CORE test Change-Id: I033ace83221bc5e0c84c19aac78a41c1eeda31f2 Reviewed-on: http://gerrit.cloudera.org:8080/20732 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../catalog/events/MetastoreEventsProcessor.java | 53 ++++++++++++++++++---- .../SynchronousHMSEventProcessorForTests.java | 27 +++++++++-- .../apache/impala/testutil/HiveJdbcClientPool.java | 6 ++- 3 files changed, 71 insertions(+), 15 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java index 87650d702..f066c7078 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java @@ -569,6 +569,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { this.catalog_ = Preconditions.checkNotNull(catalogOpExecutor.getCatalog()); validateConfigs(); lastSyncedEventId_.set(startSyncFromId); + lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(startSyncFromId)); initMetrics(); metastoreEventFactory_ = new MetastoreEventFactory(catalogOpExecutor); pollingFrequencyInSec_ = pollingFrequencyInSec; @@ -757,6 +758,41 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { return latestEventId; } + /** + * Fetch the event from HMS specified by 'eventId' + * @return null if the event has been cleaned up or any error occurs. + */ + private NotificationEvent getEventFromHMS(MetaStoreClient msClient, long eventId) { + NotificationEventRequest eventRequest = new NotificationEventRequest(); + eventRequest.setLastEvent(eventId - 1); + eventRequest.setMaxEvents(1); + try { + NotificationEventResponse response = MetastoreShim.getNextNotification( + msClient.getHiveClient(), eventRequest, false); + Iterator<NotificationEvent> eventIter = response.getEventsIterator(); + if (!eventIter.hasNext()) { + LOG.warn("Unable to fetch event {}. It has been cleaned up", eventId); + return null; + } + return eventIter.next(); + } catch (TException e) { + LOG.warn("Unable to fetch event {}", eventId, e); + } + return null; + } + + /** + * Get the event time by fetching the specified event from HMS. + * @return 0 if the event has been cleaned up or any error occurs. + */ + private int getEventTimeFromHMS(long eventId) { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + NotificationEvent event = getEventFromHMS(msClient, eventId); + if (event != null) return event.getEventTime(); + } + return 0; + } + /** * Starts the event processor from a given event id */ @@ -775,6 +811,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { } } lastSyncedEventId_.set(fromEventId); + lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(fromEventId)); updateStatus(EventProcessorStatus.ACTIVE); LOG.info(String.format( "Metastore event processing restarted. Last synced event id was updated " @@ -964,16 +1001,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { return; } // Fetch the last event to get its eventTime. - NotificationEventRequest eventRequest = new NotificationEventRequest(); - eventRequest.setLastEvent(currentEventId - 1); - eventRequest.setMaxEvents(1); - NotificationEventResponse response = MetastoreShim.getNextNotification( - msClient.getHiveClient(), eventRequest, false); - Iterator<NotificationEvent> eventIter = response.getEventsIterator(); + NotificationEvent event = getEventFromHMS(msClient, currentEventId); // Events could be empty if they are just cleaned up. - if (!eventIter.hasNext()) return; - NotificationEvent event = eventIter.next(); - Preconditions.checkState(event.getEventId() == currentEventId); + if (event == null) return; long lastSyncedEventId = lastSyncedEventId_.get(); long lastSyncedEventTime = lastSyncedEventTimeSecs_.get(); long currentEventTime = event.getEventTime(); @@ -1090,6 +1120,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { // Possible to receive empty list due to event skip list in notification event // request. Update the last synced event id with current event id on metastore lastSyncedEventId_.set(currentEventId); + lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(currentEventId)); } return; } @@ -1100,7 +1131,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { List<MetastoreEvent> filteredEvents = metastoreEventFactory_.getFilteredEvents(events, metrics_); if (filteredEvents.isEmpty()) { - lastSyncedEventId_.set(events.get(events.size() - 1).getEventId()); + NotificationEvent e = events.get(events.size() - 1); + lastSyncedEventId_.set(e.getEventId()); + lastSyncedEventTimeSecs_.set(e.getEventTime()); return; } for (MetastoreEvent event : filteredEvents) { diff --git a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java index a82398105..d121be01d 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java @@ -17,12 +17,10 @@ package org.apache.impala.catalog.events; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.impala.catalog.CatalogException; import org.apache.impala.common.Metrics; import org.apache.impala.service.CatalogOpExecutor; +import org.junit.Assert; /** * A test MetastoreEventProcessor which executes in the same thread. Useful for testing @@ -45,4 +43,27 @@ public class SynchronousHMSEventProcessorForTests extends MetastoreEventsProcess public void startScheduler() { // nothing to do here; there is no background thread for this processor } + + @Override + public void processEvents() { + super.processEvents(); + super.updateLatestEventId(); + verifyEventSyncedMetrics(); + } + + private void verifyEventSyncedMetrics() { + Metrics metrics = getMetrics(); + long lastSyncedEventId = (Long) metrics.getGauge( + MetastoreEventsProcessor.LAST_SYNCED_ID_METRIC).getValue(); + long latestEventId = (Long) metrics.getGauge( + MetastoreEventsProcessor.LATEST_EVENT_ID).getValue(); + long lastSyncedEventTime = (Long) metrics.getGauge( + MetastoreEventsProcessor.LAST_SYNCED_EVENT_TIME).getValue(); + long latestEventTime = (Long) metrics.getGauge( + MetastoreEventsProcessor.LATEST_EVENT_TIME).getValue(); + if (lastSyncedEventId == latestEventId) { + Assert.assertEquals("Incorrect last synced event time for event " + latestEventId, + latestEventTime, lastSyncedEventTime); + } + } } diff --git a/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java b/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java index ff22a8887..da6386d58 100644 --- a/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java +++ b/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java @@ -152,8 +152,10 @@ public class HiveJdbcClientPool implements Closeable { while (closedCount > 0) { try { HiveJdbcClient client = freeClients_.poll(5 * 60, TimeUnit.SECONDS); - if (client.stmt_ != null) { client.stmt_.close(); } - if (client.conn_ != null) { client.conn_.close(); } + if (client != null) { + if (client.stmt_ != null) { client.stmt_.close(); } + if (client.conn_ != null) { client.conn_.close(); } + } closedCount--; } catch (Exception e) { throw new RuntimeException(e);
