This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 68c831176 IMPALA-12460: Add lag and histogram of event processing in
the log
68c831176 is described below
commit 68c831176b82612f0146887e571554f20ce29c1e
Author: stiga-huang <[email protected]>
AuthorDate: Mon Sep 25 19:44:41 2023 +0800
IMPALA-12460: Add lag and histogram of event processing in the log
This patch logs the lag of the event processing which is
(latestEventTime - lastSyncedEventTime) at the end of processing an
event batch. If the batch is slow to process, we also log the top-10
expensive events and the top-10 targets that contribute to this. Admins
can decide whether to disable event processing on some tables.
For table and partition level events, the target name is the table name.
For db level events, the target name is the db name. For events that
don't have db/table names, e.g. COMMIT_TXN, the target name is a
constant string - "CLUSTER_WIDE".
Log parsing example for expensive events:
-- In shell:
$ grep 'expensive events' catalogd.INFO > expensive-events.log
-- In Python:
import re
EVENT_PATTERN = re.compile(r"\(type=(\w+), id=(\d+), target=([\w\.]+),
duration_ms=(\d+)\)")
re.findall(EVENT_PATTERN, log_line)
Log parsing example for top targets in event processing:
-- In shell:
$ grep 'targets in event processing' catalogd.INFO > slow-targets.log
-- In Python:
import re
re.findall(r"\(target=([\w\.]+), duration_ms=(\d+)\)", log_line)
This patch also adds a thread annotation on the event-processor thread
so we know what event is under processing using the jstacks.
The unit of the event time is second. This patch fixes some variable
names that show them with ms.
Tests:
- Manually add some delay in event processing, verify the logs and
jstacks.
Change-Id: Ib9421b5e26bfa2324217ec9695fbd81636727d22
Reviewed-on: http://gerrit.cloudera.org:8080/20507
Reviewed-by: Daniel Becker <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../impala/catalog/events/MetastoreEvents.java | 6 ++
.../catalog/events/MetastoreEventsProcessor.java | 102 ++++++++++++++++-----
2 files changed, 87 insertions(+), 21 deletions(-)
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 593a8de79..391325bc9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -498,6 +498,12 @@ public class MetastoreEvents {
public String getTableName() { return tblName_; }
+ public String getTargetName() {
+ if (dbName_ == null && tblName_ == null) return "CLUSTER_WIDE";
+ if (tblName_ == null) return dbName_;
+ return dbName_ + "." + tblName_;
+ }
+
/**
* Process this event if it is enabled based on the flags on this object
*
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index d3debde25..866c264e6 100644
---
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -27,8 +27,10 @@ import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -45,6 +47,7 @@ import
org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.IncompleteTable;
@@ -235,13 +238,13 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
// last synced event id
public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id";
// last synced event time
- public static final String LAST_SYNCED_EVENT_TIME =
"last-synced-event-time-ms";
+ public static final String LAST_SYNCED_EVENT_TIME = "last-synced-event-time";
// latest event id in Hive metastore
public static final String LATEST_EVENT_ID = "latest-event-id";
// event time of the latest event in Hive metastore
- public static final String LATEST_EVENT_TIME = "latest-event-time-ms";
- // delay(ms) in events processing
- public static final String EVENT_PROCESSING_DELAY =
"event-processing-delay-ms";
+ public static final String LATEST_EVENT_TIME = "latest-event-time";
+ // delay(secs) in events processing
+ public static final String EVENT_PROCESSING_DELAY = "event-processing-delay";
// metric name for number of tables which are refreshed by event processor
so far
public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed";
// number of times events processor refreshed a partition
@@ -513,12 +516,12 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
// keeps track of the last event id which we have synced to
private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
- private final AtomicLong lastSyncedEventTimeMs_ = new AtomicLong(0);
+ private final AtomicLong lastSyncedEventTimeSecs_ = new AtomicLong(0);
// The event id and eventTime of the latest event in HMS. Only used in
metrics to show
// how far we are lagging behind.
private final AtomicLong latestEventId_ = new AtomicLong(0);
- private final AtomicLong latestEventTimeMs_ = new AtomicLong(0);
+ private final AtomicLong latestEventTimeSecs_ = new AtomicLong(0);
// The duration in nanoseconds of the processing of the last event batch.
private final AtomicLong lastEventProcessDurationNs_ = new AtomicLong(0);
@@ -609,11 +612,12 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
metrics_.addCounter(EVENTS_SKIPPED_METRIC);
metrics_.addGauge(STATUS_METRIC, (Gauge<String>) () ->
getStatus().toString());
metrics_.addGauge(LAST_SYNCED_ID_METRIC, (Gauge<Long>)
lastSyncedEventId_::get);
- metrics_.addGauge(LAST_SYNCED_EVENT_TIME, (Gauge<Long>)
lastSyncedEventTimeMs_::get);
+ metrics_.addGauge(LAST_SYNCED_EVENT_TIME,
+ (Gauge<Long>) lastSyncedEventTimeSecs_::get);
metrics_.addGauge(LATEST_EVENT_ID, (Gauge<Long>) latestEventId_::get);
- metrics_.addGauge(LATEST_EVENT_TIME, (Gauge<Long>)
latestEventTimeMs_::get);
+ metrics_.addGauge(LATEST_EVENT_TIME, (Gauge<Long>)
latestEventTimeSecs_::get);
metrics_.addGauge(EVENT_PROCESSING_DELAY,
- (Gauge<Long>) () -> latestEventTimeMs_.get() -
lastSyncedEventTimeMs_.get());
+ (Gauge<Long>) () -> latestEventTimeSecs_.get() -
lastSyncedEventTimeSecs_.get());
metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES);
metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES);
metrics_.addCounter(NUMBER_OF_TABLES_ADDED);
@@ -887,7 +891,7 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
// EventProcessor to continue getting new events after HMS is back up.
LOG.error("Unable to fetch the next batch of metastore events. Hive
Metastore " +
"may be unavailable. Will retry.", ex);
- } catch(MetastoreNotificationNeedsInvalidateException ex) {
+ } catch (MetastoreNotificationNeedsInvalidateException ex) {
updateStatus(EventProcessorStatus.NEEDS_INVALIDATE);
String msg = "Event processing needs a invalidate command to resolve the
state";
LOG.error(msg, ex);
@@ -933,10 +937,18 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
if (!eventIter.hasNext()) return;
NotificationEvent event = eventIter.next();
Preconditions.checkState(event.getEventId() == currentEventId);
- LOG.info("Latest event in HMS: id={}, time={}", currentEventId,
- event.getEventTime());
+ long lastSyncedEventId = lastSyncedEventId_.get();
+ long lastSyncedEventTime = lastSyncedEventTimeSecs_.get();
+ long currentEventTime = event.getEventTime();
latestEventId_.set(currentEventId);
- latestEventTimeMs_.set(event.getEventTime());
+ latestEventTimeSecs_.set(currentEventTime);
+ LOG.info("Latest event in HMS: id={}, time={}. Last synced event: id={},
time={}.",
+ currentEventId, currentEventTime, lastSyncedEventId,
lastSyncedEventTime);
+ if (currentEventTime > lastSyncedEventTime) {
+ LOG.warn("Lag: {}. {} events pending to be processed.",
+ PrintUtils.printTimeMs((currentEventTime - lastSyncedEventTime) *
1000),
+ currentEventId - lastSyncedEventId);
+ }
} catch (Exception e) {
LOG.error("Unable to update current notification event id. Last value:
{}",
latestEventId_, e);
@@ -955,9 +967,9 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
eventProcessorMetrics.setLast_synced_event_id(getLastSyncedEventId());
if (currentStatus != EventProcessorStatus.ACTIVE) return
eventProcessorMetrics;
// The following counters are only updated when event-processor is active.
-
eventProcessorMetrics.setLast_synced_event_time(lastSyncedEventTimeMs_.get());
+
eventProcessorMetrics.setLast_synced_event_time(lastSyncedEventTimeSecs_.get());
eventProcessorMetrics.setLatest_event_id(latestEventId_.get());
- eventProcessorMetrics.setLatest_event_time(latestEventTimeMs_.get());
+ eventProcessorMetrics.setLatest_event_time(latestEventTimeSecs_.get());
long eventsReceived = metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount();
long eventsSkipped = metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount();
@@ -1036,6 +1048,7 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
if (events.isEmpty()) return;
final Timer.Context context =
metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
+ Map<MetastoreEvent, Long> eventProcessingTime = new HashMap<>();
try {
List<MetastoreEvent> filteredEvents =
metastoreEventFactory_.getFilteredEvents(events, metrics_);
@@ -1052,10 +1065,18 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
break;
}
currentEvent_ = event.metastoreNotificationEvent_;
- event.processIfEnabled();
+ String targetName = event.getTargetName();
+ String desc = String.format("Processing %s on %s, eventId=%d",
+ event.getEventType(), targetName, event.getEventId());
+ try (ThreadNameAnnotator tna = new ThreadNameAnnotator(desc)) {
+ long startMs = System.currentTimeMillis();
+ event.processIfEnabled();
+ long elapsedTimeMs = System.currentTimeMillis() - startMs;
+ eventProcessingTime.put(event, elapsedTimeMs);
+ }
deleteEventLog_.garbageCollect(event.getEventId());
lastSyncedEventId_.set(event.getEventId());
- lastSyncedEventTimeMs_.set(event.getEventTime());
+ lastSyncedEventTimeSecs_.set(event.getEventTime());
}
}
} catch (CatalogException e) {
@@ -1063,11 +1084,50 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
"Unable to process event %d of type %s. Event processing will be
stopped.",
currentEvent_.getEventId(), currentEvent_.getEventType()), e);
} finally {
- long elapsed_ns = context.stop();
- lastEventProcessDurationNs_.set(elapsed_ns);
- LOG.info("Time elapsed in processing event batch: {}",
- PrintUtils.printTimeNs(elapsed_ns));
+ long elapsedNs = context.stop();
+ lastEventProcessDurationNs_.set(elapsedNs);
+ logEventMetrics(eventProcessingTime, elapsedNs);
+ }
+ }
+
+ private void logEventMetrics(Map<MetastoreEvent, Long> eventProcessingTime,
+ long elapsedNs) {
+ LOG.info("Time elapsed in processing event batch: {}",
+ PrintUtils.printTimeNs(elapsedNs));
+ // Only log the metrics when the processing on this batch is slow.
+ if (elapsedNs < HdfsTable.LOADING_WARNING_TIME_NS) return;
+ // Get the top-10 expensive events
+ List<Map.Entry<MetastoreEvent, Long>> eventList =
+ new ArrayList<>(eventProcessingTime.entrySet());
+ eventList.sort(Map.Entry.<MetastoreEvent,
Long>comparingByValue().reversed());
+ int num = Math.min(10, eventList.size());
+ StringBuilder report = new StringBuilder("Top " + num + " expensive
events: ");
+ for (Map.Entry<MetastoreEvent, Long> entry : eventList.subList(0, num)) {
+ MetastoreEvent event = entry.getKey();
+ long durationMs = entry.getValue();
+ report.append(String.format("(type=%s, id=%s, target=%s, duration_ms=%d)
",
+ event.getEventType(), event.getEventId(), event.getTargetName(),
durationMs));
+ }
+ // Get the top-10 expensive targets
+ Map<String, Long> durationPerTable = new HashMap<>();
+ for (MetastoreEvent event : eventProcessingTime.keySet()) {
+ String targetName = event.getTargetName();
+ long durationMs = durationPerTable.getOrDefault(targetName, 0L) +
+ eventProcessingTime.get(event);
+ durationPerTable.put(targetName, durationMs);
+ }
+ List<Map.Entry<String, Long>> targetList =
+ new ArrayList<>(durationPerTable.entrySet());
+ targetList.sort(Map.Entry.<String, Long>comparingByValue().reversed());
+ num = Math.min(10, targetList.size());
+ report.append("\nTop ").append(num).append(" targets in event processing:
");
+ for (Map.Entry<String, Long> entry : targetList.subList(0, num)) {
+ String targetName = entry.getKey();
+ long durationMs = entry.getValue();
+ report.append(String.format("(target=%s, duration_ms=%d) ",
+ targetName, durationMs));
}
+ LOG.warn(report.toString());
}
/**