This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1cf8f5065acb79e4377492c6e67c312249e42a78
Author: stiga-huang <[email protected]>
AuthorDate: Tue May 23 16:50:58 2023 +0800

    IMPALA-12053: Expose event-processor error message in WebUI
    
    When the event-processor goes into the ERROR/NEEDS_INVALIDATE state, we
    can only check logs to get the detailed information. This is
    inconvenient in triaging failures. This patch exposes the error message
    in the /events WebUI. It includes the timestamp string and the
    stacktrace of the exception.
    
    This patch makes the /events page visable. Also modifies the test code
    of EventProcessorUtils.wait_for_synced_event_id() to print the error
    message if the event processor is down.
    
    A trivial bug of lastProcessedEvent is not updated (IMPALA-11588) is
    also fixed in this patch. Refactored the variable to be a member of the
    class so internal methods can update it before processing each event.
    
    Some new metrics are not added in the /events page, e.g.
    latest-event-id, latest-event-time-ms, last-synced-event-time-ms. This
    patch addes them and also add a metric of event-processing-delay-ms
    which is latest-event-time-ms minors last-synced-event-time-ms.
    
    Tests:
     - Manually inject codes to fail the event processor and verified the
       WebUI.
     - Ran metadata/test_event_processing.py when the event processor is in
       ERROR state. Verified the error message is shown up in test output.
    
    Change-Id: I077375422bc3d24eed57c95c6b05ac408228f083
    Reviewed-on: http://gerrit.cloudera.org:8080/19916
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |  8 ++-
 common/thrift/JniCatalog.thrift                    |  4 +-
 .../impala/catalog/events/MetastoreEvents.java     |  2 +-
 .../catalog/events/MetastoreEventsProcessor.java   | 58 +++++++++++++++++-----
 tests/util/event_processor_utils.py                | 15 +++++-
 www/events.tmpl                                    |  5 ++
 6 files changed, 75 insertions(+), 17 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index f2a5ffc30..cb31bc0d2 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -366,7 +366,7 @@ void CatalogServer::RegisterWebpages(Webserver* webserver, 
bool metrics_only) {
       false);
   webserver->RegisterUrlCallback(EVENT_WEB_PAGE, EVENT_METRICS_TEMPLATE,
       [this](const auto& args, auto* doc) { 
this->EventMetricsUrlCallback(args, doc); },
-      false);
+      true);
   webserver->RegisterUrlCallback(CATALOG_OPERATIONS_WEB_PAGE, 
CATALOG_OPERATIONS_TEMPLATE,
       [this](const auto& args, auto* doc) { 
this->OperationUsageUrlCallback(args, doc); },
       true);
@@ -677,6 +677,12 @@ void CatalogServer::EventMetricsUrlCallback(
       event_processor_summary_response.summary.c_str(), 
document->GetAllocator());
   document->AddMember(
       "event_processor_metrics", event_processor_summary, 
document->GetAllocator());
+  if (event_processor_summary_response.__isset.error_msg) {
+    Value error_msg(
+        event_processor_summary_response.error_msg.c_str(), 
document->GetAllocator());
+    document->AddMember(
+        "event_processor_error_msg", error_msg, document->GetAllocator());
+  }
 }
 
 void CatalogServer::CatalogObjectsUrlCallback(const Webserver::WebRequest& req,
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 56398eaf0..d8699ce0a 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -1025,4 +1025,6 @@ struct TEventProcessorMetricsSummaryResponse {
   // summary view of the events processor which can include status,
   // metrics and other details
   1: required string summary
-}
\ No newline at end of file
+  // Error messages if the events processor goes into ERROR/NEEDS_INVALIDATE 
states
+  2: optional string error_msg
+}
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index fabdfc37d..aae2122c4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1156,7 +1156,7 @@ public class MetastoreEvents {
         // determined that the event needs to be processed instead of skipped, 
or we
         // somehow missed the previous create database event.
         throw new MetastoreNotificationException(
-            debugString("Unable to process event", e));
+            debugString("Unable to process event"), e);
       }
     }
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 7c2d6d368..19b7ddc36 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -23,6 +23,7 @@ import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
@@ -232,6 +234,14 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   public static final String STATUS_METRIC = "status";
   // last synced event id
   public static final String LAST_SYNCED_ID_METRIC = "last-synced-event-id";
+  // last synced event time
+  public static final String LAST_SYNCED_EVENT_TIME = 
"last-synced-event-time-ms";
+  // latest event id in Hive metastore
+  public static final String LATEST_EVENT_ID = "latest-event-id";
+  // event time of the latest event in Hive metastore
+  public static final String LATEST_EVENT_TIME = "latest-event-time-ms";
+  // delay(ms) in events processing
+  public static final String EVENT_PROCESSING_DELAY = 
"event-processing-delay-ms";
   // metric name for number of tables which are refreshed by event processor 
so far
   public static final String NUMBER_OF_TABLE_REFRESHES = "tables-refreshed";
   // number of times events processor refreshed a partition
@@ -489,9 +499,15 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   // current status of this event processor
   private EventProcessorStatus eventProcessorStatus_ = 
EventProcessorStatus.STOPPED;
 
+  // error message when event processor comes into ERROR/NEEDS_INVALIDATE 
states
+  private String eventProcessorErrorMsg_ = null;
+
   // event factory which is used to get or create MetastoreEvents
   private final MetastoreEventFactory metastoreEventFactory_;
 
+  // keeps track of the current event that we are processing
+  private NotificationEvent currentEvent_;
+
   // keeps track of the last event id which we have synced to
   private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
   private final AtomicLong lastSyncedEventTimeMs_ = new AtomicLong(0);
@@ -588,10 +604,13 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     metrics_.addTimer(EVENTS_PROCESS_DURATION_METRIC);
     metrics_.addMeter(EVENTS_RECEIVED_METRIC);
     metrics_.addCounter(EVENTS_SKIPPED_METRIC);
-    metrics_.addGauge(STATUS_METRIC,
-        (Gauge<String>) () -> getStatus().toString());
-    metrics_.addGauge(LAST_SYNCED_ID_METRIC,
-        (Gauge<Long>) () -> lastSyncedEventId_.get());
+    metrics_.addGauge(STATUS_METRIC, (Gauge<String>) () -> 
getStatus().toString());
+    metrics_.addGauge(LAST_SYNCED_ID_METRIC, (Gauge<Long>) 
lastSyncedEventId_::get);
+    metrics_.addGauge(LAST_SYNCED_EVENT_TIME, (Gauge<Long>) 
lastSyncedEventTimeMs_::get);
+    metrics_.addGauge(LATEST_EVENT_ID, (Gauge<Long>) latestEventId_::get);
+    metrics_.addGauge(LATEST_EVENT_TIME, (Gauge<Long>) 
latestEventTimeMs_::get);
+    metrics_.addGauge(EVENT_PROCESSING_DELAY,
+        (Gauge<Long>) () -> latestEventTimeMs_.get() - 
lastSyncedEventTimeMs_.get());
     metrics_.addCounter(NUMBER_OF_TABLE_REFRESHES);
     metrics_.addCounter(NUMBER_OF_PARTITION_REFRESHES);
     metrics_.addCounter(NUMBER_OF_TABLES_ADDED);
@@ -837,7 +856,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
    */
   @Override
   public void processEvents() {
-    NotificationEvent lastProcessedEvent = null;
+    currentEvent_ = null;
     try {
       EventProcessorStatus currentStatus = eventProcessorStatus_;
       if (currentStatus != EventProcessorStatus.ACTIVE) {
@@ -856,14 +875,20 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
         "may be unavailable. Will retry.", ex);
     } catch(MetastoreNotificationNeedsInvalidateException ex) {
       updateStatus(EventProcessorStatus.NEEDS_INVALIDATE);
-      LOG.error("Event processing needs a invalidate command to resolve the 
state", ex);
+      String msg = "Event processing needs a invalidate command to resolve the 
state";
+      LOG.error(msg, ex);
+      eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + 
'\n' +
+          ExceptionUtils.getFullStackTrace(ex);
     } catch (Exception ex) {
       // There are lot of Preconditions which can throw RuntimeExceptions when 
we
       // process events this catch all exception block is needed so that the 
scheduler
       // thread does not die silently
       updateStatus(EventProcessorStatus.ERROR);
-      LOG.error("Unexpected exception received while processing event", ex);
-      dumpEventInfoToLog(lastProcessedEvent);
+      String msg = "Unexpected exception received while processing event";
+      LOG.error(msg, ex);
+      eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + 
'\n' +
+          ExceptionUtils.getFullStackTrace(ex);
+      dumpEventInfoToLog(currentEvent_);
     }
   }
 
@@ -973,6 +998,9 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     TEventProcessorMetricsSummaryResponse summaryResponse =
         new TEventProcessorMetricsSummaryResponse();
     summaryResponse.setSummary(metrics_.toString());
+    if (eventProcessorErrorMsg_ != null) {
+      summaryResponse.setError_msg(eventProcessorErrorMsg_);
+    }
     return summaryResponse;
   }
 
@@ -988,7 +1016,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   @VisibleForTesting
   protected void processEvents(List<NotificationEvent> events)
       throws MetastoreNotificationException {
-    NotificationEvent lastProcessedEvent = null;
+    currentEvent_ = null;
     // update the events received metric before returning
     metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
     if (events.isEmpty()) return;
@@ -1009,7 +1037,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
           if (eventProcessorStatus_ != EventProcessorStatus.ACTIVE) {
             break;
           }
-          lastProcessedEvent = event.metastoreNotificationEvent_;
+          currentEvent_ = event.metastoreNotificationEvent_;
           event.processIfEnabled();
           deleteEventLog_.garbageCollect(event.getEventId());
           lastSyncedEventId_.set(event.getEventId());
@@ -1019,7 +1047,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     } catch (CatalogException e) {
       throw new MetastoreNotificationException(String.format(
           "Unable to process event %d of type %s. Event processing will be 
stopped.",
-          lastProcessedEvent.getEventId(), lastProcessedEvent.getEventType()), 
e);
+          currentEvent_.getEventId(), currentEvent_.getEventType()), e);
     } finally {
       long elapsed_ns = context.stop();
       lastEventProcessDurationNs_.set(elapsed_ns);
@@ -1037,7 +1065,9 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
 
   private void dumpEventInfoToLog(NotificationEvent event) {
     if (event == null) {
-      LOG.error("Notification event is null");
+      String error = "Notification event is null";
+      LOG.error(error);
+      eventProcessorErrorMsg_ += '\n' + error;
       return;
     }
     StringBuilder msg =
@@ -1049,7 +1079,9 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       msg.append("Table name: ").append(event.getTableName()).append("\n");
     }
     msg.append("Event message: ").append(event.getMessage()).append("\n");
-    LOG.error(msg.toString());
+    String msgStr = msg.toString();
+    LOG.error(msgStr);
+    eventProcessorErrorMsg_ += '\n' + msgStr;
   }
 
   /**
diff --git a/tests/util/event_processor_utils.py 
b/tests/util/event_processor_utils.py
index 08223f9e8..d485a0432 100644
--- a/tests/util/event_processor_utils.py
+++ b/tests/util/event_processor_utils.py
@@ -60,7 +60,9 @@ class EventProcessorUtils(object):
         break
       status = EventProcessorUtils.get_event_processor_status()
       if status not in ["ACTIVE", "PAUSED"]:
-        raise Exception("Event processor is not working. Status: 
{0}".format(status))
+        error_msg = EventProcessorUtils.get_event_processor_error_msg()
+        raise Exception("Event processor is not working. Status: {0}. Error 
msg: {1}"
+                        .format(status, error_msg))
       made_progress = current_synced_id > last_synced_id
       if t >= end_time:
         raise Exception(
@@ -115,6 +117,17 @@ class EventProcessorUtils(object):
     pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
     return dict(pairs)
 
+  @staticmethod
+  def get_event_processor_error_msg():
+    """Scrapes the catalog's /events webpage and return the error message (if 
exists) of
+    the event processor"""
+    response = requests.get("%s/events?json" % 
EventProcessorUtils.DEFAULT_CATALOG_URL)
+    assert response.status_code == requests.codes.ok
+    res_json = json.loads(response.text)
+    if "event_processor_error_msg" in res_json:
+      return res_json["event_processor_error_msg"].strip()
+    return None
+
   @staticmethod
   def get_int_metric(metric_key, default_val=None):
     """Returns the int value of event processor metric from the /events 
catalogd debug
diff --git a/www/events.tmpl b/www/events.tmpl
index 484cfeae2..cdc16cae3 100644
--- a/www/events.tmpl
+++ b/www/events.tmpl
@@ -22,4 +22,9 @@ under the License.
 <h3>Event Processor Summary</h3>
 <pre>{{event_processor_metrics}}</pre>
 
+{{?event_processor_error_msg}}
+<h3>Error Message</h3>
+<pre>{{event_processor_error_msg}}</pre>
+{{/event_processor_error_msg}}
+
 {{> www/common-footer.tmpl }}

Reply via email to