This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b6bdf4c52 IMPALA-9118: Show catalog operation details in catalogd webUI
b6bdf4c52 is described below

commit b6bdf4c525acfe8b35928d43155cf3bea9be26cb
Author: stiga-huang <[email protected]>
AuthorDate: Sun Aug 27 16:40:46 2023 +0800

    IMPALA-9118: Show catalog operation details in catalogd webUI
    
    This patch extends the /operations page in catalogd WebUI to show the
    in-flight and finished catalog operations. The following fields are
    shown for each operation:
     - Thread ID
     - Query ID
     - Client IP
     - Coordinator
     - Type
     - Target
     - User
     - Start Time
     - End Time (only shown for finished operations)
     - Duration
     - Status
     - Details
    
    Finished operation records are currently kept in memory and limited by
    the catalog_operation_log_size flag (defaults to 100).
    
    To collect the above fields, this patch extends
    TCatalogServiceRequestHeader to contain the coordinator hostname. Also
    fix some catalog RPCs that didn't fill these fields.
    
    Tests:
     - Add e2e test in custom_cluster/test_web_pages.py
     - Manually verify the web pages when running a GVO job
    
    Change-Id: I3cf3f0da2be2be79e546762a8083d4de338ff6aa
    Reviewed-on: http://gerrit.cloudera.org:8080/20428
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |  86 ++++++++-
 be/src/catalog/catalog-server.h                    |   4 +
 be/src/exec/catalog-op-executor.cc                 |   5 +-
 be/src/exec/catalog-op-executor.h                  |   3 +-
 be/src/service/client-request-state.cc             |   2 +
 be/src/util/backend-gflag-util.cc                  |   4 +
 common/thrift/BackendGflags.thrift                 |   4 +
 common/thrift/CatalogService.thrift                |   3 +
 common/thrift/JniCatalog.thrift                    |  23 ++-
 .../apache/impala/analysis/ResetMetadataStmt.java  |   7 +
 .../impala/catalog/CatalogServiceCatalog.java      |   5 +-
 .../catalog/monitor/CatalogFinalizeDmlCounter.java |   2 +-
 .../impala/catalog/monitor/CatalogMonitor.java     |   8 +-
 .../catalog/monitor/CatalogOperationCounter.java   |   1 +
 .../catalog/monitor/CatalogOperationMetrics.java   |  89 ---------
 .../catalog/monitor/CatalogOperationTracker.java   | 213 +++++++++++++++++++++
 .../monitor/CatalogResetMetadataCounter.java       |   2 +-
 .../org/apache/impala/service/BackendConfig.java   |   9 +
 .../apache/impala/service/CatalogOpExecutor.java   | 175 +++++++++++------
 .../java/org/apache/impala/service/Frontend.java   |   1 +
 .../java/org/apache/impala/service/JniCatalog.java |  16 +-
 tests/custom_cluster/test_web_pages.py             |  59 ++++++
 www/catalog_operations.tmpl                        | 113 +++++++++--
 23 files changed, 635 insertions(+), 199 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index d1f17ce3d..9b907b69f 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -32,6 +32,7 @@
 #include "util/event-metrics.h"
 #include "util/logging-support.h"
 #include "util/metrics.h"
+#include "util/pretty-printer.h"
 #include "util/thrift-debug-util.h"
 #include "util/webserver.h"
 
@@ -148,6 +149,9 @@ DEFINE_bool(enable_skipping_older_events, false, "This 
configuration is used to
     "disable the optimisation. Set this true to enable skipping the older 
events and"
     "quickly catch with the events of HMS");
 
+DEFINE_int32(catalog_operation_log_size, 100, "Number of catalog operation log 
records "
+    "to retain in catalogd. If -1, the operation log has unbounded size.");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_port);
 DECLARE_string(state_store_2_host);
@@ -874,17 +878,22 @@ void CatalogServer::CatalogObjectsUrlCallback(const 
Webserver::WebRequest& req,
 
 void CatalogServer::OperationUsageUrlCallback(
     const Webserver::WebRequest& req, Document* document) {
-  TGetOperationUsageResponse opeartion_usage;
-  Status status = catalog_->GetOperationUsage(&opeartion_usage);
+  TGetOperationUsageResponse operation_usage;
+  Status status = catalog_->GetOperationUsage(&operation_usage);
   if (!status.ok()) {
     Value error(status.GetDetail().c_str(), document->GetAllocator());
     document->AddMember("error", error, document->GetAllocator());
     return;
   }
+  GetCatalogOpSummary(operation_usage, document);
+  GetCatalogOpRecords(operation_usage, document);
+}
 
+void CatalogServer::GetCatalogOpSummary(const TGetOperationUsageResponse& 
operation_usage,
+    Document* document) {
   // Add the catalog operation counters to the document
   Value catalog_op_list(kArrayType);
-  for (const auto& catalog_op : opeartion_usage.catalog_op_counters) {
+  for (const auto& catalog_op : operation_usage.catalog_op_counters) {
     Value catalog_op_obj(kObjectType);
     Value op_name(catalog_op.catalog_op_name.c_str(), 
document->GetAllocator());
     catalog_op_obj.AddMember("catalog_op_name", op_name, 
document->GetAllocator());
@@ -899,7 +908,7 @@ void CatalogServer::OperationUsageUrlCallback(
 
   // Create a summary and add it to the document
   map<string, int> aggregated_operations;
-  for (const auto& catalog_op : opeartion_usage.catalog_op_counters) {
+  for (const auto& catalog_op : operation_usage.catalog_op_counters) {
     aggregated_operations[catalog_op.catalog_op_name] += catalog_op.op_counter;
   }
   Value catalog_op_summary(kArrayType);
@@ -915,6 +924,75 @@ void CatalogServer::OperationUsageUrlCallback(
   document->AddMember("catalog_op_summary", catalog_op_summary, 
document->GetAllocator());
 }
 
+static void CatalogOpListToJson(const vector<TCatalogOpRecord>& catalog_ops,
+    Value* catalog_op_list, Document* document) {
+  for (const auto& catalog_op : catalog_ops) {
+    Value obj(kObjectType);
+    Value op_name(catalog_op.catalog_op_name.c_str(), 
document->GetAllocator());
+    obj.AddMember("catalog_op_name", op_name, document->GetAllocator());
+
+    Value thread_id;
+    thread_id.SetInt64(catalog_op.thread_id);
+    obj.AddMember("thread_id", thread_id, document->GetAllocator());
+
+    Value query_id(PrintId(catalog_op.query_id).c_str(), 
document->GetAllocator());
+    obj.AddMember("query_id", query_id, document->GetAllocator());
+
+    Value client_ip(catalog_op.client_ip.c_str(), document->GetAllocator());
+    obj.AddMember("client_ip", client_ip, document->GetAllocator());
+
+    Value coordinator(catalog_op.coordinator_hostname.c_str(), 
document->GetAllocator());
+    obj.AddMember("coordinator", coordinator, document->GetAllocator());
+
+    Value user(catalog_op.user.c_str(), document->GetAllocator());
+    obj.AddMember("user", user, document->GetAllocator());
+
+    Value target_name(catalog_op.target_name.c_str(), 
document->GetAllocator());
+    obj.AddMember("target_name", target_name, document->GetAllocator());
+
+    Value start_time(ToUtcStringFromUnixMillis(catalog_op.start_time_ms,
+        TimePrecision::Millisecond).c_str(), document->GetAllocator());
+    obj.AddMember("start_time", start_time, document->GetAllocator());
+
+    int64_t end_time_ms;
+    if (catalog_op.finish_time_ms > 0) {
+      end_time_ms = catalog_op.finish_time_ms;
+      Value finish_time(ToUtcStringFromUnixMillis(catalog_op.finish_time_ms,
+          TimePrecision::Millisecond).c_str(), document->GetAllocator());
+      obj.AddMember("finish_time", finish_time, document->GetAllocator());
+    } else {
+      end_time_ms = UnixMillis();
+    }
+
+    int64_t duration_ms = end_time_ms - catalog_op.start_time_ms;
+    const string& printed_duration = PrettyPrinter::Print(duration_ms, 
TUnit::TIME_MS);
+    Value duration(printed_duration.c_str(), document->GetAllocator());
+    obj.AddMember("duration", duration, document->GetAllocator());
+
+    Value status(catalog_op.status.c_str(), document->GetAllocator());
+    obj.AddMember("status", status, document->GetAllocator());
+
+    Value details(catalog_op.details.c_str(), document->GetAllocator());
+    obj.AddMember("details", details, document->GetAllocator());
+
+    catalog_op_list->PushBack(obj, document->GetAllocator());
+  }
+}
+
+void CatalogServer::GetCatalogOpRecords(const TGetOperationUsageResponse& 
response,
+    Document* document) {
+  Value inflight_catalog_ops(kArrayType);
+  CatalogOpListToJson(response.in_flight_catalog_operations, 
&inflight_catalog_ops,
+      document);
+  document->AddMember("inflight_catalog_operations", inflight_catalog_ops,
+      document->GetAllocator());
+  Value finished_catalog_ops(kArrayType);
+  CatalogOpListToJson(response.finished_catalog_operations, 
&finished_catalog_ops,
+      document);
+  document->AddMember("finished_catalog_operations", finished_catalog_ops,
+      document->GetAllocator());
+}
+
 void CatalogServer::TableMetricsUrlCallback(const Webserver::WebRequest& req,
     Document* document) {
   const auto& args = req.parsed_args;
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index b490894ce..6d1dde18a 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -304,6 +304,10 @@ class CatalogServer {
   /// Retrieves the catalog operation metrics from FE.
   void OperationUsageUrlCallback(
       const Webserver::WebRequest& req, rapidjson::Document* document);
+  void GetCatalogOpSummary(
+      const TGetOperationUsageResponse& response, rapidjson::Document* 
document);
+  void GetCatalogOpRecords(
+      const TGetOperationUsageResponse& response, rapidjson::Document* 
document);
 
   /// Debug webpage handler that is used to dump all the registered metrics of 
a
   /// table. The caller specifies the "name" parameter which is the fully
diff --git a/be/src/exec/catalog-op-executor.cc 
b/be/src/exec/catalog-op-executor.cc
index 2078716d0..a7fb2381b 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -145,7 +145,7 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& 
request) {
   }
 }
 
-Status CatalogOpExecutor::ExecComputeStats(
+Status CatalogOpExecutor::ExecComputeStats(const TCatalogServiceRequestHeader& 
header,
     const TCatalogOpRequest& compute_stats_request,
     const TTableSchema& tbl_stats_schema, const TRowSet& tbl_stats_data,
     const TTableSchema& col_stats_schema, const TRowSet& col_stats_data) {
@@ -159,8 +159,7 @@ Status CatalogOpExecutor::ExecComputeStats(
   
update_stats_req.query_options.__set_sync_ddl(compute_stats_request.sync_ddl);
   update_stats_req.query_options.__set_debug_action(
       compute_stats_request.ddl_params.query_options.debug_action);
-  update_stats_req.__set_header(TCatalogServiceRequestHeader());
-  update_stats_req.header.__set_want_minimal_response(FLAGS_use_local_catalog);
+  update_stats_req.__set_header(header);
 
   const TComputeStatsParams& compute_stats_params =
       compute_stats_request.ddl_params.compute_stats_params;
diff --git a/be/src/exec/catalog-op-executor.h 
b/be/src/exec/catalog-op-executor.h
index 8a0bcdeda..cab823bc6 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -57,7 +57,8 @@ class CatalogOpExecutor {
   /// Translates the given compute stats request and its child-query results 
into
   /// a new table alteration request for updating the stats metadata, and 
executes
   /// the alteration via Exec();
-  Status ExecComputeStats(const TCatalogOpRequest& compute_stats_request,
+  Status ExecComputeStats(const TCatalogServiceRequestHeader& header,
+      const TCatalogOpRequest& compute_stats_request,
       const apache::hive::service::cli::thrift::TTableSchema& tbl_stats_schema,
       const apache::hive::service::cli::thrift::TRowSet& tbl_stats_data,
       const apache::hive::service::cli::thrift::TTableSchema& col_stats_schema,
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index d20cbdee9..4e5e01e19 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1755,6 +1755,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   }
 
   Status status = catalog_op_executor_->ExecComputeStats(
+      GetCatalogServiceRequestHeader(),
       exec_request_->catalog_op_request,
       child_queries[0]->result_schema(),
       child_queries[0]->result_data(),
@@ -2177,6 +2178,7 @@ TCatalogServiceRequestHeader 
ClientRequestState::GetCatalogServiceRequestHeader(
       query_ctx_.client_request.__isset.redacted_stmt ?
           query_ctx_.client_request.redacted_stmt : 
query_ctx_.client_request.stmt);
   header.__set_query_id(query_ctx_.query_id);
+  header.__set_coordinator_hostname(FLAGS_hostname);
   return header;
 }
 
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index aef4304de..6ddbff9a8 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -111,6 +111,8 @@ DECLARE_string(java_weigher);
 DECLARE_int32(iceberg_reload_new_files_threshold);
 DECLARE_bool(enable_skipping_older_events);
 DECLARE_bool(enable_json_scanner);
+DECLARE_int32(catalog_operation_log_size);
+DECLARE_string(hostname);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -440,6 +442,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_enable_json_scanner(FLAGS_enable_json_scanner);
   cfg.__set_max_filter_error_rate_from_full_scan(
       FLAGS_max_filter_error_rate_from_full_scan);
+  cfg.__set_catalog_operation_log_size(FLAGS_catalog_operation_log_size);
+  cfg.__set_hostname(FLAGS_hostname);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 919d34255..4b0ea6ce5 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -268,4 +268,8 @@ struct TBackendGflags {
   118: required double max_filter_error_rate_from_full_scan
 
   119: required i32 local_catalog_cache_concurrency_level
+
+  120: required i32 catalog_operation_log_size
+
+  121: required string hostname
 }
diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index f309591f2..f984bbdb5 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -68,6 +68,9 @@ struct TCatalogServiceRequestHeader {
 
   // The query id if this request comes from a query
   5: optional Types.TUniqueId query_id
+
+  // Hostname of the coordinator
+  6: optional string coordinator_hostname
 }
 
 // Returns details on the result of an operation that updates the catalog. 
Information
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index d8699ce0a..d4b235770 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -863,10 +863,31 @@ struct TOperationUsageCounter {
   3: optional i64 op_counter
 }
 
+struct TCatalogOpRecord {
+  1: required i64 thread_id
+  2: required Types.TUniqueId query_id
+  3: required string client_ip
+  4: required string coordinator_hostname
+  5: required string catalog_op_name
+  // Name of the target depends on the operation types, e.g. table name for 
CreateTable,
+  // db name for CreateDb, function name for CreateFunction, etc.
+  6: required string target_name
+  7: required string user
+  8: required i64 start_time_ms
+  9: required i64 finish_time_ms
+  10: required string status
+  11: required string details
+}
+
 // Response to getOperationUsage request.
 struct TGetOperationUsageResponse {
-  // List of the the number of running catalog operations
+  // List of the number of running catalog operations
   1: required list<TOperationUsageCounter> catalog_op_counters
+  // List of the in-flight catalog operations. Sorted by start time.
+  2: optional list<TCatalogOpRecord> in_flight_catalog_operations;
+  // List of the finished catalog operations. Sorted by finish time 
descendingly, i.e.
+  // the most recently finished operations is shown first.
+  3: optional list<TCatalogOpRecord> finished_catalog_operations;
 }
 
 struct TColumnName {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index b50c93a55..171312ccc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -27,6 +27,7 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TTableName;
@@ -82,6 +83,9 @@ public class ResetMetadataStmt extends StatementBase {
   // Set during analysis.
   private TUniqueId queryId_;
 
+  // Set during analysis.
+  private String clientIp_;
+
   private ResetMetadataStmt(Action action, String db, TableName tableName,
       PartitionSpec partitionSpec) {
     Preconditions.checkNotNull(action);
@@ -146,6 +150,7 @@ public class ResetMetadataStmt extends StatementBase {
   public void analyze(Analyzer analyzer) throws AnalysisException {
     requestingUser_ = analyzer.getUser();
     queryId_ = analyzer.getQueryCtx().getQuery_id();
+    clientIp_ = 
analyzer.getQueryCtx().getSession().getNetwork_address().getHostname();
     switch (action_) {
       case INVALIDATE_METADATA_TABLE:
       case REFRESH_TABLE:
@@ -257,6 +262,8 @@ public class ResetMetadataStmt extends StatementBase {
     params.setHeader(new TCatalogServiceRequestHeader());
     params.header.setRequesting_user(requestingUser_.getShortName());
     params.header.setQuery_id(queryId_);
+    
params.header.setCoordinator_hostname(BackendConfig.INSTANCE.getHostname());
+    params.header.setClient_ip(clientIp_);
     params.setIs_refresh(action_.isRefresh());
     if (tableName_ != null) {
       params.setTable_name(new TTableName(tableName_.getDb(), 
tableName_.getTbl()));
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 27d7a64e4..7f7d74157 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -34,7 +34,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -123,7 +122,6 @@ import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.thrift.TTableUsage;
 import org.apache.impala.thrift.TTableUsageMetrics;
-import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateTableUsageRequest;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.CatalogBlacklistUtils;
@@ -3427,8 +3425,7 @@ public class CatalogServiceCatalog extends Catalog {
    * Retrieves information about the current catalog on-going operations.
    */
   public TGetOperationUsageResponse getOperationUsage() {
-    return new TGetOperationUsageResponse(
-        
CatalogMonitor.INSTANCE.getCatalogOperationMetrics().getOperationMetrics());
+    return 
CatalogMonitor.INSTANCE.getCatalogOperationTracker().getOperationMetrics();
   }
 
   /**
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogFinalizeDmlCounter.java
 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogFinalizeDmlCounter.java
index eaa39c949..32ccd4371 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogFinalizeDmlCounter.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogFinalizeDmlCounter.java
@@ -68,7 +68,7 @@ public class CatalogFinalizeDmlCounter extends 
CatalogOperationCounter {
    * after analysis. Therefore, we need to guess it if the SQL statement is
    * available.
    */
-  private FinalizeDmlType getDmlType(String sql_stmt) {
+  public static FinalizeDmlType getDmlType(String sql_stmt) {
     sql_stmt = sql_stmt.toUpperCase();
     if (sql_stmt.contains("INSERT INTO")) {
       return FinalizeDmlType.FINALIZE_INSERT_INTO;
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
index 488f66960..203d718c4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
@@ -28,19 +28,19 @@ public final class CatalogMonitor {
 
   private final CatalogTableMetrics catalogTableMetrics_;
 
-  private final CatalogOperationMetrics catalogOperationUsage_;
+  private final CatalogOperationTracker catalogOperationTracker_;
 
   private final Metrics catalogdHmsCacheMetrics_ = new Metrics();
 
   private CatalogMonitor() {
     catalogTableMetrics_ = CatalogTableMetrics.INSTANCE;
-    catalogOperationUsage_ = CatalogOperationMetrics.INSTANCE;
+    catalogOperationTracker_ = CatalogOperationTracker.INSTANCE;
   }
 
   public CatalogTableMetrics getCatalogTableMetrics() { return 
catalogTableMetrics_; }
 
-  public CatalogOperationMetrics getCatalogOperationMetrics() {
-    return catalogOperationUsage_;
+  public CatalogOperationTracker getCatalogOperationTracker() {
+    return catalogOperationTracker_;
   }
 
   public Metrics getCatalogdHmsCacheMetrics() {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationCounter.java
 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationCounter.java
index 35ec27561..6d2ddf176 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationCounter.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationCounter.java
@@ -95,6 +95,7 @@ public abstract class CatalogOperationCounter {
    */
   protected String getTableName(Optional<TTableName> tTableName) {
     if (tTableName.isPresent()) {
+      if (tTableName.get().table_name.isEmpty()) return 
tTableName.get().db_name;
       return tTableName.get().db_name + "." + tTableName.get().table_name;
     } else {
       return "Not available";
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationMetrics.java
 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationMetrics.java
deleted file mode 100644
index 02fc23d22..000000000
--- 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationMetrics.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.catalog.monitor;
-
-import org.apache.impala.thrift.TDdlType;
-import org.apache.impala.thrift.TOperationUsageCounter;
-import org.apache.impala.thrift.TResetMetadataRequest;
-import org.apache.impala.thrift.TTableName;
-import org.apache.impala.thrift.TUpdateCatalogRequest;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Adapter class for the catalog operation counters. Currently, it tracks,
- *  - the number of DDL operations in progress
- *  - the number of reset metadata operations in progress
- *  - the number of DML operations in progress
- */
-public final class CatalogOperationMetrics {
-  public final static CatalogOperationMetrics INSTANCE = new 
CatalogOperationMetrics();
-
-  // Keeps track of the on-going DDL operations
-  CatalogDdlCounter catalogDdlCounter;
-
-  // Keeps track of the on-going reset metadata requests (refresh/invalidate)
-  CatalogResetMetadataCounter catalogResetMetadataCounter;
-
-  // Keeps track of the on-going finalize DML requests (insert/CTAS/upgrade)
-  CatalogFinalizeDmlCounter catalogFinalizeDmlCounter;
-
-  private CatalogOperationMetrics() {
-    catalogDdlCounter = new CatalogDdlCounter();
-    catalogResetMetadataCounter = new CatalogResetMetadataCounter();
-    catalogFinalizeDmlCounter = new CatalogFinalizeDmlCounter();
-  }
-
-  public void increment(TDdlType tDdlType, Optional<TTableName> tTableName) {
-    catalogDdlCounter.incrementOperation(tDdlType, tTableName);
-  }
-
-  public void decrement(TDdlType tDdlType, Optional<TTableName> tTableName) {
-    catalogDdlCounter.decrementOperation(tDdlType, tTableName);
-  }
-
-  public void increment(TResetMetadataRequest req) {
-    catalogResetMetadataCounter.incrementOperation(req);
-  }
-
-  public void decrement(TResetMetadataRequest req) {
-    catalogResetMetadataCounter.decrementOperation(req);
-  }
-
-  public void increment(TUpdateCatalogRequest request) {
-    catalogFinalizeDmlCounter.incrementOperation(request);
-  }
-
-  public void decrement(TUpdateCatalogRequest request) {
-    catalogFinalizeDmlCounter.decrementOperation(request);
-  }
-
-  /**
-   * Merges the CatalogOpMetricCounter operation summary metrics into a single
-   * list that can be passed to the backend webserver.
-   */
-  public List<TOperationUsageCounter> getOperationMetrics() {
-    List<TOperationUsageCounter> merged = new ArrayList<>();
-    merged.addAll(catalogDdlCounter.getOperationUsage());
-    merged.addAll(catalogResetMetadataCounter.getOperationUsage());
-    merged.addAll(catalogFinalizeDmlCounter.getOperationUsage());
-    return merged;
-  }
-}
\ No newline at end of file
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
new file mode 100644
index 000000000..47dc8d18f
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.monitor;
+
+import com.google.common.base.Preconditions;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TCatalogServiceRequestHeader;
+import org.apache.impala.thrift.TDdlExecRequest;
+import org.apache.impala.thrift.TDdlType;
+import org.apache.impala.thrift.TCatalogOpRecord;
+import org.apache.impala.thrift.TGetOperationUsageResponse;
+import org.apache.impala.thrift.TOperationUsageCounter;
+import org.apache.impala.thrift.TResetMetadataRequest;
+import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.thrift.TUpdateCatalogRequest;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Adapter class for tracking the catalog operations. Currently, it tracks,
+ *  - the number of DDL operations in progress
+ *  - the number of reset metadata operations in progress
+ *  - the number of DML operations in progress
+ *  Each operation has a corresponding record tracked in memory. Historical 
operations
+ *  are also kept in memory and the size is controlled by 
'catalog_operation_log_size'.
+ */
+public final class CatalogOperationTracker {
+  public final static CatalogOperationTracker INSTANCE = new 
CatalogOperationTracker();
+
+  // Keeps track of the on-going DDL operations
+  CatalogDdlCounter catalogDdlCounter;
+
+  // Keeps track of the on-going reset metadata requests (refresh/invalidate)
+  CatalogResetMetadataCounter catalogResetMetadataCounter;
+
+  // Keeps track of the on-going finalize DML requests (insert/CTAS/upgrade)
+  CatalogFinalizeDmlCounter catalogFinalizeDmlCounter;
+
+  private final Map<TUniqueId, TCatalogOpRecord> inFlightOperations =
+      new ConcurrentHashMap<>();
+  private final Queue<TCatalogOpRecord> finishedOperations =
+      new ConcurrentLinkedQueue<>();
+  private final int catalogOperationLogSize;
+
+  private CatalogOperationTracker() {
+    catalogDdlCounter = new CatalogDdlCounter();
+    catalogResetMetadataCounter = new CatalogResetMetadataCounter();
+    catalogFinalizeDmlCounter = new CatalogFinalizeDmlCounter();
+    catalogOperationLogSize = BackendConfig.INSTANCE.catalogOperationLogSize();
+    Preconditions.checkState(catalogOperationLogSize >= 0);
+  }
+
+  private void addRecord(TCatalogServiceRequestHeader header,
+      String catalogOpName, Optional<TTableName> tTableName, String details) {
+    String user = "unknown";
+    String clientIp = "unknown";
+    String coordinator = "unknown";
+    TUniqueId queryId = header.getQuery_id();
+    if (header.isSetRequesting_user()) {
+      user = header.getRequesting_user();
+    }
+    if (header.isSetClient_ip()) {
+      clientIp = header.getClient_ip();
+    }
+    if (header.isSetCoordinator_hostname()) {
+      coordinator = header.getCoordinator_hostname();
+    }
+    if (queryId != null) {
+      TCatalogOpRecord record = new 
TCatalogOpRecord(Thread.currentThread().getId(),
+          queryId, clientIp, coordinator, catalogOpName,
+          catalogDdlCounter.getTableName(tTableName), user,
+          System.currentTimeMillis(), -1, "STARTED", details);
+      inFlightOperations.put(queryId, record);
+    }
+  }
+
+  private void archiveRecord(TUniqueId queryId, String errorMsg) {
+    if (queryId != null && inFlightOperations.containsKey(queryId)) {
+      TCatalogOpRecord record = inFlightOperations.remove(queryId);
+      if (catalogOperationLogSize == 0) return;
+      record.setFinish_time_ms(System.currentTimeMillis());
+      if (errorMsg != null) {
+        record.setStatus("FAILED");
+        record.setDetails(record.getDetails() + ", error=" + errorMsg);
+      } else {
+        record.setStatus("FINISHED");
+      }
+      synchronized (finishedOperations) {
+        if (finishedOperations.size() >= catalogOperationLogSize) {
+          finishedOperations.poll();
+        }
+        finishedOperations.add(record);
+      }
+    }
+  }
+
+  private String getDdlType(TDdlExecRequest ddlRequest) {
+    if (ddlRequest.ddl_type == TDdlType.ALTER_TABLE) {
+      return "ALTER_TABLE_" + 
ddlRequest.getAlter_table_params().getAlter_type();
+    }
+    return ddlRequest.ddl_type.name();
+  }
+
+  public void increment(TDdlExecRequest ddlRequest, Optional<TTableName> 
tTableName) {
+    if (ddlRequest.isSetHeader()) {
+      String details = "query_options=" + ddlRequest.query_options.toString();
+      addRecord(ddlRequest.getHeader(), getDdlType(ddlRequest), tTableName, 
details);
+    }
+    catalogDdlCounter.incrementOperation(ddlRequest.ddl_type, tTableName);
+  }
+
+  public void decrement(TDdlType tDdlType, TUniqueId queryId,
+      Optional<TTableName> tTableName, String errorMsg) {
+    archiveRecord(queryId, errorMsg);
+    catalogDdlCounter.decrementOperation(tDdlType, tTableName);
+  }
+
+  public void increment(TResetMetadataRequest req) {
+    Optional<TTableName> tTableName =
+        req.table_name != null ? Optional.of(req.table_name) : 
Optional.empty();
+    if (req.isSetHeader()) {
+      String details = "sync_ddl=" + req.sync_ddl +
+          ", want_minimal_response=" + req.getHeader().want_minimal_response +
+          ", refresh_updated_hms_partitions=" + 
req.refresh_updated_hms_partitions;
+      if (req.isSetDebug_action() && !req.debug_action.isEmpty()) {
+        details += ", debug_action=" + req.debug_action;
+      }
+      addRecord(req.getHeader(),
+          CatalogResetMetadataCounter.getResetMetadataType(req, 
tTableName).name(),
+          tTableName, details);
+    }
+    catalogResetMetadataCounter.incrementOperation(req);
+  }
+
+  public void decrement(TResetMetadataRequest req, String errorMsg) {
+    if (req.isSetHeader()) {
+      archiveRecord(req.getHeader().getQuery_id(), errorMsg);
+    }
+    catalogResetMetadataCounter.decrementOperation(req);
+  }
+
+  public void increment(TUpdateCatalogRequest req) {
+    Optional<TTableName> tTableName =
+        Optional.of(new TTableName(req.db_name, req.target_table));
+    if (req.isSetHeader()) {
+      String details = "sync_ddl=" + req.sync_ddl +
+          ", is_overwrite=" + req.is_overwrite +
+          ", transaction_id=" + req.transaction_id +
+          ", write_id=" + req.write_id +
+          ", num_of_updated_partitions=" + req.getUpdated_partitionsSize();
+      if (req.isSetIceberg_operation()) {
+        details += ", iceberg_operation=" + req.iceberg_operation.operation;
+      }
+      if (req.isSetDebug_action() && !req.debug_action.isEmpty()) {
+        details += ", debug_action=" + req.debug_action;
+      }
+      addRecord(req.getHeader(),
+          
CatalogFinalizeDmlCounter.getDmlType(req.getHeader().redacted_sql_stmt).name(),
+          tTableName, details);
+    }
+    catalogFinalizeDmlCounter.incrementOperation(req);
+  }
+
+  public void decrement(TUpdateCatalogRequest req, String errorMsg) {
+    if (req.isSetHeader()) {
+      archiveRecord(req.getHeader().getQuery_id(), errorMsg);
+    }
+    catalogFinalizeDmlCounter.decrementOperation(req);
+  }
+
+  /**
+   * Merges the CatalogOpMetricCounter operation summary metrics into a single
+   * list that can be passed to the backend webserver.
+   */
+  public TGetOperationUsageResponse getOperationMetrics() {
+    List<TOperationUsageCounter> merged = new ArrayList<>();
+    merged.addAll(catalogDdlCounter.getOperationUsage());
+    merged.addAll(catalogResetMetadataCounter.getOperationUsage());
+    merged.addAll(catalogFinalizeDmlCounter.getOperationUsage());
+    TGetOperationUsageResponse res = new TGetOperationUsageResponse(merged);
+    for (TCatalogOpRecord record : inFlightOperations.values()) {
+      res.addToIn_flight_catalog_operations(record);
+    }
+    List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations);
+    // Reverse the list to show recent operations first.
+    Collections.reverse(records);
+    res.setFinished_catalog_operations(records);
+    return res;
+  }
+}
\ No newline at end of file
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogResetMetadataCounter.java
 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogResetMetadataCounter.java
index 36af2c303..9600cee4a 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogResetMetadataCounter.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogResetMetadataCounter.java
@@ -61,7 +61,7 @@ public class CatalogResetMetadataCounter extends 
CatalogOperationCounter {
         getResetMetadataType(req, tTableName).toString(), 
getTableName(tTableName));
   }
 
-  private ResetMetadataType getResetMetadataType(
+  public static ResetMetadataType getResetMetadataType(
       TResetMetadataRequest req, Optional<TTableName> tTableName) {
     if (req.is_refresh) {
       return ResetMetadataType.REFRESH;
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 980e7b016..8456bfd3d 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -436,4 +436,13 @@ public class BackendConfig {
   public double getMaxFilterErrorRateFromFullScan() {
     return backendCfg_.max_filter_error_rate_from_full_scan;
   }
+
+  public int catalogOperationLogSize() {
+    return backendCfg_.catalog_operation_log_size >= 0 ?
+        backendCfg_.catalog_operation_log_size : Integer.MAX_VALUE;
+  }
+
+  public String getHostname() {
+    return backendCfg_.hostname;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index d263222db..2ef60d740 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -144,7 +144,7 @@ import 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKe
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import org.apache.impala.catalog.events.MetastoreNotificationException;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
-import org.apache.impala.catalog.monitor.CatalogOperationMetrics;
+import org.apache.impala.catalog.monitor.CatalogOperationTracker;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -210,6 +210,7 @@ import org.apache.impala.thrift.TDropStatsParams;
 import org.apache.impala.thrift.TDropTableOrViewParams;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TFunctionBinaryType;
+import org.apache.impala.thrift.TFunctionName;
 import org.apache.impala.thrift.TGrantRevokePrivParams;
 import org.apache.impala.thrift.TGrantRevokeRoleParams;
 import org.apache.impala.thrift.THdfsCachingOp;
@@ -238,6 +239,7 @@ import org.apache.impala.thrift.TTableRowFormat;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.thrift.TTestCaseData;
 import org.apache.impala.thrift.TTruncateParams;
+import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdateCatalogResponse;
 import org.apache.impala.thrift.TUpdatedPartition;
@@ -394,9 +396,9 @@ public class CatalogOpExecutor {
   private final AuthorizationManager authzManager_;
   private final HiveJavaFunctionFactory hiveJavaFuncFactory_;
 
-  // A singleton monitoring class that keeps track of the catalog usage 
metrics.
-  private final CatalogOperationMetrics catalogOpMetric_ =
-      CatalogMonitor.INSTANCE.getCatalogOperationMetrics();
+  // A singleton monitoring class that keeps track of the catalog operations.
+  private final CatalogOperationTracker catalogOpTracker_ =
+      CatalogMonitor.INSTANCE.getCatalogOperationTracker();
 
   // Lock used to ensure that CREATE[DROP] TABLE[DATABASE] operations 
performed in
   // catalog_ and the corresponding RPC to apply the change in HMS are atomic.
@@ -423,6 +425,7 @@ public class CatalogOpExecutor {
     response.setResult(new TCatalogUpdateResult());
     response.getResult().setCatalog_service_id(JniCatalog.getServiceId());
     User requestingUser = null;
+    TUniqueId queryId = null;
     boolean wantMinimalResult = false;
     if (ddlRequest.isSetHeader()) {
       TCatalogServiceRequestHeader header = ddlRequest.getHeader();
@@ -430,49 +433,50 @@ public class CatalogOpExecutor {
         requestingUser = new User(ddlRequest.getHeader().getRequesting_user());
       }
       wantMinimalResult = ddlRequest.getHeader().isWant_minimal_response();
+      queryId = header.getQuery_id();
     }
     Optional<TTableName> tTableName = Optional.empty();
-    TDdlType ddl_type = ddlRequest.ddl_type;
+    TDdlType ddlType = ddlRequest.ddl_type;
     try {
       boolean syncDdl = ddlRequest.getQuery_options().isSync_ddl();
-      switch (ddl_type) {
+      switch (ddlType) {
         case ALTER_DATABASE:
           TAlterDbParams alter_db_params = ddlRequest.getAlter_db_params();
           tTableName = Optional.of(new TTableName(alter_db_params.db, ""));
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           alterDatabase(alter_db_params, wantMinimalResult, response);
           break;
         case ALTER_TABLE:
           TAlterTableParams alter_table_params = 
ddlRequest.getAlter_table_params();
           tTableName = Optional.of(alter_table_params.getTable_name());
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           alterTable(alter_table_params, 
ddlRequest.getQuery_options().getDebug_action(),
               wantMinimalResult, response);
           break;
         case ALTER_VIEW:
           TCreateOrAlterViewParams alter_view_params = 
ddlRequest.getAlter_view_params();
           tTableName = Optional.of(alter_view_params.getView_name());
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           alterView(alter_view_params, wantMinimalResult, response);
           break;
         case CREATE_DATABASE:
           TCreateDbParams create_db_params = ddlRequest.getCreate_db_params();
           tTableName = Optional.of(new TTableName(create_db_params.db, ""));
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           createDatabase(create_db_params, response, syncDdl, 
wantMinimalResult);
           break;
         case CREATE_TABLE_AS_SELECT:
           TCreateTableParams create_table_as_select_params =
               ddlRequest.getCreate_table_params();
           tTableName = 
Optional.of(create_table_as_select_params.getTable_name());
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           
response.setNew_table_created(createTable(create_table_as_select_params,
               response, catalogTimeline, syncDdl, wantMinimalResult));
           break;
         case CREATE_TABLE:
           TCreateTableParams create_table_params = 
ddlRequest.getCreate_table_params();
           tTableName = Optional.of((create_table_params.getTable_name()));
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           createTable(ddlRequest.getCreate_table_params(), response, 
catalogTimeline,
               syncDdl, wantMinimalResult);
           break;
@@ -480,7 +484,7 @@ public class CatalogOpExecutor {
           TCreateTableLikeParams create_table_like_params =
               ddlRequest.getCreate_table_like_params();
           tTableName = Optional.of(create_table_like_params.getTable_name());
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           createTableLike(create_table_like_params, response, catalogTimeline, 
syncDdl,
               wantMinimalResult);
           break;
@@ -488,31 +492,37 @@ public class CatalogOpExecutor {
           TCreateOrAlterViewParams create_view_params =
               ddlRequest.getCreate_view_params();
           tTableName = Optional.of(create_view_params.getView_name());
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           createView(create_view_params, wantMinimalResult, response, 
catalogTimeline);
           break;
         case CREATE_FUNCTION:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
-          createFunction(ddlRequest.getCreate_fn_params(), response);
+          TCreateFunctionParams create_func_params = 
ddlRequest.getCreate_fn_params();
+          TFunctionName fnName = create_func_params.getFn().getName();
+          tTableName = Optional.of(new TTableName(fnName.db_name, 
fnName.function_name));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
+          createFunction(create_func_params, response);
           break;
         case CREATE_DATA_SOURCE:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
-          createDataSource(ddlRequest.getCreate_data_source_params(), 
response);
+          TCreateDataSourceParams create_ds_params =
+              ddlRequest.getCreate_data_source_params();
+          tTableName = Optional.of(
+              new TTableName(create_ds_params.getData_source().name, ""));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
+          createDataSource(create_ds_params, response);
           break;
         case COMPUTE_STATS:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
           Preconditions.checkState(false, "Compute stats should trigger an 
ALTER TABLE.");
           break;
         case DROP_STATS:
           TDropStatsParams drop_stats_params = 
ddlRequest.getDrop_stats_params();
           tTableName = Optional.of(drop_stats_params.getTable_name());
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           dropStats(drop_stats_params, wantMinimalResult, response);
           break;
         case DROP_DATABASE:
           TDropDbParams drop_db_params = ddlRequest.getDrop_db_params();
           tTableName = Optional.of(new TTableName(drop_db_params.getDb(), ""));
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           dropDatabase(drop_db_params, response);
           break;
         case DROP_TABLE:
@@ -520,7 +530,7 @@ public class CatalogOpExecutor {
           TDropTableOrViewParams drop_table_or_view_params =
               ddlRequest.getDrop_table_or_view_params();
           tTableName = Optional.of(drop_table_or_view_params.getTable_name());
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           // Dropped tables and views are already returned as minimal results, 
so don't
           // need to pass down wantMinimalResult here.
           dropTableOrView(drop_table_or_view_params, response,
@@ -529,60 +539,82 @@ public class CatalogOpExecutor {
         case TRUNCATE_TABLE:
           TTruncateParams truncate_params = ddlRequest.getTruncate_params();
           tTableName = Optional.of(truncate_params.getTable_name());
-          catalogOpMetric_.increment(ddl_type, tTableName);
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           truncateTable(truncate_params, wantMinimalResult, response,
               ddlRequest.getQuery_options().getLock_max_wait_time_s());
           break;
         case DROP_FUNCTION:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
-          dropFunction(ddlRequest.getDrop_fn_params(), response);
+          TDropFunctionParams drop_func_params = 
ddlRequest.getDrop_fn_params();
+          TFunctionName dropFnName = drop_func_params.getFn_name();
+          tTableName = Optional.of(
+              new TTableName(dropFnName.db_name, dropFnName.function_name));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
+          dropFunction(drop_func_params, response);
           break;
         case DROP_DATA_SOURCE:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
-          dropDataSource(ddlRequest.getDrop_data_source_params(), response);
+          TDropDataSourceParams drop_ds_params = 
ddlRequest.getDrop_data_source_params();
+          tTableName = Optional.of(new TTableName(drop_ds_params.data_source, 
""));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
+          dropDataSource(drop_ds_params, response);
           break;
         case CREATE_ROLE:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
-          createRole(requestingUser, ddlRequest.getCreate_drop_role_params(), 
response);
+          TCreateDropRoleParams create_role_params =
+              ddlRequest.getCreate_drop_role_params();
+          tTableName = Optional.of(new 
TTableName(create_role_params.role_name, ""));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
+          createRole(requestingUser, create_role_params, response);
           break;
         case DROP_ROLE:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
-          dropRole(requestingUser, ddlRequest.getCreate_drop_role_params(), 
response);
+          TCreateDropRoleParams drop_role_params =
+            ddlRequest.getCreate_drop_role_params();
+          tTableName = Optional.of(new TTableName(drop_role_params.role_name, 
""));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
+          dropRole(requestingUser, drop_role_params, response);
           break;
         case GRANT_ROLE:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
-          grantRoleToGroup(
-              requestingUser, ddlRequest.getGrant_revoke_role_params(), 
response);
+          TGrantRevokeRoleParams grant_role_params =
+              ddlRequest.getGrant_revoke_role_params();
+          tTableName = Optional.of(new TTableName(
+              StringUtils.join(",", grant_role_params.group_names), ""));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
+          grantRoleToGroup(requestingUser, grant_role_params, response);
           break;
         case REVOKE_ROLE:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
+          TGrantRevokeRoleParams revoke_role_params =
+              ddlRequest.getGrant_revoke_role_params();
+          tTableName = Optional.of(new TTableName(
+              StringUtils.join(",", revoke_role_params.group_names), ""));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
           revokeRoleFromGroup(
-              requestingUser, ddlRequest.getGrant_revoke_role_params(), 
response);
+              requestingUser, revoke_role_params, response);
           break;
         case GRANT_PRIVILEGE:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
-          grantPrivilege(
-              ddlRequest.getHeader(), 
ddlRequest.getGrant_revoke_priv_params(), response);
+          TGrantRevokePrivParams grant_priv_params =
+              ddlRequest.getGrant_revoke_priv_params();
+          tTableName = Optional.of(new 
TTableName(grant_priv_params.principal_name, ""));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
+          grantPrivilege(ddlRequest.getHeader(), grant_priv_params, response);
           break;
         case REVOKE_PRIVILEGE:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
-          revokePrivilege(
-              ddlRequest.getHeader(), 
ddlRequest.getGrant_revoke_priv_params(), response);
+          TGrantRevokePrivParams revoke_priv_params =
+              ddlRequest.getGrant_revoke_priv_params();
+          tTableName = Optional.of(new 
TTableName(revoke_priv_params.principal_name, ""));
+          catalogOpTracker_.increment(ddlRequest, tTableName);
+          revokePrivilege(ddlRequest.getHeader(), revoke_priv_params, 
response);
           break;
         case COMMENT_ON:
-          TCommentOnParams comment_on_params = 
ddlRequest.getComment_on_params();
           tTableName = Optional.of(new TTableName("", ""));
-          alterCommentOn(comment_on_params, response, tTableName, 
wantMinimalResult);
+          alterCommentOn(ddlRequest, response, tTableName, wantMinimalResult);
           break;
         case COPY_TESTCASE:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
+          catalogOpTracker_.increment(ddlRequest, Optional.empty());
           copyTestCaseData(ddlRequest.getCopy_test_case_params(), response,
               wantMinimalResult);
           break;
         default:
-          catalogOpMetric_.increment(ddl_type, Optional.empty());
+          catalogOpTracker_.increment(ddlRequest, Optional.empty());
           throw new IllegalStateException(
-              "Unexpected DDL exec request type: " + ddl_type);
+              "Unexpected DDL exec request type: " + ddlType);
       }
       catalogTimeline.markEvent(DDL_FINISHED);
 
@@ -602,9 +634,12 @@ public class CatalogOpExecutor {
       // At this point, the operation is considered successful. If any errors 
occurred
       // during execution, this function will throw an exception and the 
CatalogServer
       // will handle setting a bad status code.
-      response.getResult().setStatus(new TStatus(TErrorCode.OK, new 
ArrayList<String>()));
-    } finally {
-      catalogOpMetric_.decrement(ddl_type, tTableName);
+      response.getResult().setStatus(new TStatus(TErrorCode.OK, new 
ArrayList<>()));
+      catalogOpTracker_.decrement(ddlType, queryId, tTableName, 
/*exception*/null);
+    } catch (Exception e) {
+      catalogOpTracker_.decrement(ddlType, queryId, tTableName,
+          JniUtil.throwableToString(e));
+      throw e;
     }
     return response;
   }
@@ -6511,6 +6546,19 @@ public class CatalogOpExecutor {
     return fsList;
   }
 
+  public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)
+      throws CatalogException {
+    catalogOpTracker_.increment(req);
+    try {
+      TResetMetadataResponse response = execResetMetadataImpl(req);
+      catalogOpTracker_.decrement(req, /*errorMsg*/null);
+      return response;
+    } catch (Exception e) {
+      catalogOpTracker_.decrement(req, JniUtil.throwableToString(e));
+      throw e;
+    }
+  }
+
   /**
    * Executes a TResetMetadataRequest and returns the result as a
    * TResetMetadataResponse. Based on the request parameters, this operation
@@ -6525,7 +6573,7 @@ public class CatalogOpExecutor {
    * For details on the specific commands see comments on their respective
    * methods in CatalogServiceCatalog.java.
    */
-  public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)
+  public TResetMetadataResponse execResetMetadataImpl(TResetMetadataRequest 
req)
       throws CatalogException {
     String cmdString = CatalogOpUtil.getShortDescForReset(req);
     TResetMetadataResponse resp = new TResetMetadataResponse();
@@ -6733,6 +6781,19 @@ public class CatalogOpExecutor {
     }
   }
 
+  public TUpdateCatalogResponse updateCatalog(TUpdateCatalogRequest update)
+      throws ImpalaException {
+    catalogOpTracker_.increment(update);
+    try {
+      TUpdateCatalogResponse response = updateCatalogImpl(update);
+      catalogOpTracker_.decrement(update, /*errorMsg*/null);
+      return response;
+    } catch (Exception e) {
+      catalogOpTracker_.decrement(update, JniUtil.throwableToString(e));
+      throw e;
+    }
+  }
+
   /**
    * Create any new partitions required as a result of an INSERT statement and 
refreshes
    * the table metadata after every INSERT statement. Any new partitions will 
inherit
@@ -6743,7 +6804,7 @@ public class CatalogOpExecutor {
    * watch the associated cache directives will be submitted. This will result 
in an
    * async table refresh once the cache request completes.
    */
-  public TUpdateCatalogResponse updateCatalog(TUpdateCatalogRequest update)
+  public TUpdateCatalogResponse updateCatalogImpl(TUpdateCatalogRequest update)
       throws ImpalaException {
     TUpdateCatalogResponse response = new TUpdateCatalogResponse();
     // Only update metastore for Hdfs tables.
@@ -7262,20 +7323,22 @@ public class CatalogOpExecutor {
     return tbl;
   }
 
-  private void alterCommentOn(TCommentOnParams params, TDdlExecResponse 
response,
+  private void alterCommentOn(TDdlExecRequest ddlRequest, TDdlExecResponse 
response,
       Optional<TTableName> tTableName, boolean wantMinimalResult)
       throws ImpalaRuntimeException, CatalogException, InternalException {
+    Preconditions.checkState(tTableName.isPresent());
+    TCommentOnParams params = ddlRequest.getComment_on_params();
     if (params.getDb() != null) {
       Preconditions.checkArgument(!params.isSetTable_name() &&
           !params.isSetColumn_name());
       tTableName.get().setDb_name(params.db);
-      catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
+      catalogOpTracker_.increment(ddlRequest, tTableName);
       alterCommentOnDb(params.getDb(), params.getComment(), wantMinimalResult, 
response);
     } else if (params.getTable_name() != null) {
       Preconditions.checkArgument(!params.isSetDb() && 
!params.isSetColumn_name());
       tTableName.get().setDb_name(params.table_name.db_name);
       tTableName.get().setTable_name(params.table_name.table_name);
-      catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
+      catalogOpTracker_.increment(ddlRequest, tTableName);
       alterCommentOnTableOrView(TableName.fromThrift(params.getTable_name()),
           params.getComment(), wantMinimalResult, response);
     } else if (params.getColumn_name() != null) {
@@ -7283,7 +7346,7 @@ public class CatalogOpExecutor {
       TColumnName columnName = params.getColumn_name();
       tTableName.get().setDb_name(columnName.table_name.table_name);
       tTableName.get().setTable_name(columnName.table_name.table_name);
-      catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
+      catalogOpTracker_.increment(ddlRequest, tTableName);
       alterCommentOnColumn(TableName.fromThrift(columnName.getTable_name()),
           columnName.getColumn_name(), params.getComment(), wantMinimalResult, 
response);
     } else {
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 0d2153656..da34d41ce 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -868,6 +868,7 @@ public class Frontend {
           clientRequest.getRedacted_stmt() : clientRequest.getStmt());
       header.setWant_minimal_response(
           BackendConfig.INSTANCE.getBackendCfg().use_local_catalog);
+      header.setCoordinator_hostname(BackendConfig.INSTANCE.getHostname());
       ddl.getDdl_params().setHeader(header);
       // Forward relevant query options to the catalogd.
       TDdlQueryOptions ddlQueryOpts = new TDdlQueryOptions();
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java 
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index d4947ce64..afe40bfd1 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -25,8 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.impala.analysis.TableName;
@@ -48,8 +46,6 @@ import 
org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
 import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
-import org.apache.impala.catalog.monitor.CatalogMonitor;
-import org.apache.impala.catalog.monitor.CatalogOperationMetrics;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.JniUtil;
@@ -118,10 +114,6 @@ public class JniCatalog {
   private final static ReentrantReadWriteLock catalogServiceIdLock_ =
       new ReentrantReadWriteLock(true /*fair ordering*/);
 
-  // A singleton monitoring class that keeps track of the catalog usage 
metrics.
-  private final CatalogOperationMetrics catalogOperationUsage_ =
-      CatalogMonitor.INSTANCE.getCatalogOperationMetrics();
-
   private static TUniqueId generateId() {
     UUID uuid = UUID.randomUUID();
     return new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
@@ -316,12 +308,10 @@ public class JniCatalog {
       throws ImpalaException, TException {
     TResetMetadataRequest req = new TResetMetadataRequest();
     JniUtil.deserializeThrift(protocolFactory_, req, thriftResetMetadataReq);
-    catalogOperationUsage_.increment(req);
     String shortDesc = CatalogOpUtil.getShortDescForReset(req);
 
     return execAndSerialize("resetMetadata", shortDesc,
-        () -> catalogOpExecutor_.execResetMetadata(req),
-        () -> catalogOperationUsage_.decrement(req));
+        () -> catalogOpExecutor_.execResetMetadata(req));
   }
 
   /**
@@ -490,13 +480,11 @@ public class JniCatalog {
       throws ImpalaException, TException {
     TUpdateCatalogRequest request = new TUpdateCatalogRequest();
     JniUtil.deserializeThrift(protocolFactory_, request, thriftUpdateCatalog);
-    catalogOperationUsage_.increment(request);
     String shortDesc = "Update catalog for "
         + fullyQualifiedTableName(request.getDb_name(), 
request.getTarget_table());
 
     return execAndSerialize("updateCatalog", shortDesc,
-        () -> catalogOpExecutor_.updateCatalog(request),
-        () -> catalogOperationUsage_.decrement(request));
+        () -> catalogOpExecutor_.updateCatalog(request));
   }
 
   /**
diff --git a/tests/custom_cluster/test_web_pages.py 
b/tests/custom_cluster/test_web_pages.py
index 069dc844e..495aa70a9 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -257,3 +257,62 @@ class TestWebPage(CustomClusterTestSuite):
       response = requests.get("http://localhost:%s"; % port)
       assert 'Content-Security-Policy' not in response.headers, \
         "CSP header present despite being disabled (port %s)" % port
+
+  
@CustomClusterTestSuite.with_args(catalogd_args="--catalog_operation_log_size=2")
+  def test_catalog_operations_limit(self, unique_database):
+    tbl = unique_database + ".tbl"
+    self.execute_query("create table {0}_1 (id int)".format(tbl))
+    self.execute_query("create table {0}_2 (id int)".format(tbl))
+    self.execute_query("create table {0}_3 (id int)".format(tbl))
+    self.execute_query("drop table {0}_1".format(tbl))
+    response = requests.get("http://localhost:25020/operations?json";)
+    assert response.status_code == requests.codes.ok
+    operations = json.loads(response.text)
+    assert "finished_catalog_operations" in operations
+    finished_operations = operations["finished_catalog_operations"]
+    # Verify only 2 operations are shown
+    assert len(finished_operations) == 2
+    op = finished_operations[0]
+    assert op["status"] == "FINISHED"
+    assert op["catalog_op_name"] == "DROP_TABLE"
+    assert op["target_name"] == tbl + "_1"
+    op = finished_operations[1]
+    assert op["status"] == "FINISHED"
+    assert op["catalog_op_name"] == "CREATE_TABLE"
+    assert op["target_name"] == tbl + "_3"
+
+  
@CustomClusterTestSuite.with_args(catalogd_args="--catalog_operation_log_size=-1")
+  def test_catalog_operations_negative_limit(self, unique_database):
+    # Test negative limit on catalog_operation_log_size. The limit is 
converted to be
+    # Integer.MAX_VALUE. Run hundreds of commands and see whether they are all 
in the
+    # operation log.
+    tbl = unique_database + ".tbl"
+    self.execute_query("create table {0} (id int)".format(tbl))
+    num = 500
+    for i in range(num):
+      self.execute_query("invalidate metadata " + tbl)
+    response = requests.get("http://localhost:25020/operations?json";)
+    assert response.status_code == requests.codes.ok
+    operations = json.loads(response.text)
+    assert "finished_catalog_operations" in operations
+    finished_operations = operations["finished_catalog_operations"]
+    # Verify all operations are in the history. There are one DROP_DATABASE, 
one
+    # CREATE_DATABASE, one CREATE_TABLE and 'num' INVALIDATEs in the list.
+    assert len(finished_operations) == 3 + num
+    for i in range(num):
+      op = finished_operations[i]
+      assert op["status"] == "FINISHED"
+      assert op["catalog_op_name"] == "INVALIDATE_METADATA"
+      assert op["target_name"] == tbl
+    op = finished_operations[-3]
+    assert op["status"] == "FINISHED"
+    assert op["catalog_op_name"] == "CREATE_TABLE"
+    assert op["target_name"] == tbl
+    op = finished_operations[-2]
+    assert op["status"] == "FINISHED"
+    assert op["catalog_op_name"] == "CREATE_DATABASE"
+    assert op["target_name"] == unique_database
+    op = finished_operations[-1]
+    assert op["status"] == "FINISHED"
+    assert op["catalog_op_name"] == "DROP_DATABASE"
+    assert op["target_name"] == unique_database
diff --git a/www/catalog_operations.tmpl b/www/catalog_operations.tmpl
index 56ba8c9bf..42d8c148b 100644
--- a/www/catalog_operations.tmpl
+++ b/www/catalog_operations.tmpl
@@ -45,26 +45,6 @@ under the License.
         {{/catalog_op_summary}}
       </tbody>
     </table>
-  </div>
-</div>
-
-<script>
-    $(document).ready(function() {
-        $('#summary').DataTable({
-            "order": [[ 1, "desc" ]],
-            "pageLength": 10
-        });
-    });
-</script>
-
-<div class="panel panel-info">
-<div class="card">
-  <div class="card-header">
-      <h5 class="card-title">
-      In-progress Catalog Operations
-      </h5>
-  </div>
-  <div class="card-body">
     <table id=operations class='table table-hover table-bordered'>
       <thead>
         <tr>
@@ -88,6 +68,10 @@ under the License.
 
 <script>
     $(document).ready(function() {
+        $('#summary').DataTable({
+            "order": [[ 1, "desc" ]],
+            "pageLength": 10
+        });
         $('#operations').DataTable({
             "order": [[ 1, "desc" ]],
             "pageLength": 10
@@ -95,4 +79,91 @@ under the License.
     });
 </script>
 
-{{> www/common-footer.tmpl }}
\ No newline at end of file
+<div class="panel panel-info">
+<div class="card">
+  <div class="card-header">
+      <h5 class="card-title">
+      In-progress Catalog Operations
+      </h5>
+  </div>
+  <div class="card-body">
+    <table id="inflight_operations" class='table table-hover table-border'>
+      <tr>
+        <th>Thread ID</th>
+        <th>Query ID</th>
+        <th>Client IP</th>
+        <th>Coordinator</th>
+        <th>Type</th>
+        <th>Target</th>
+        <th>User</th>
+        <th>Start Time</th>
+        <th>Duration</th>
+        <th>Status</th>
+        <th>Details</th>
+      </tr>
+      {{#inflight_catalog_operations}}
+      <tr>
+        <td>{{thread_id}}</td>
+        <td>{{query_id}}</td>
+        <td>{{client_ip}}</td>
+        <td>{{coordinator}}</td>
+        <td>{{catalog_op_name}}</td>
+        <td>{{target_name}}</td>
+        <td>{{user}}</td>
+        <td>{{start_time}}</td>
+        <td>{{duration}}</td>
+        <td>{{status}}</td>
+        <td>{{details}}</td>
+      </tr>
+      {{/inflight_catalog_operations}}
+    </table>
+  </div>
+</div>
+
+<div class="panel panel-info">
+<div class="card">
+  <div class="card-header">
+      <h5 class="card-title">
+      Finished Catalog Operations
+      </h5>
+  </div>
+  <div class="card-body">
+    <p>This table lists all completed catalog operations that are archived in 
memory.
+       The size of that archive is controlled with the
+       <samp>--catalog_operation_log_size</samp> command line parameter.</p>
+    <table id="finished_operations" class='table table-hover table-border'>
+      <tr>
+        <th>Thread ID</th>
+        <th>Query ID</th>
+        <th>Client IP</th>
+        <th>Coordinator</th>
+        <th>Type</th>
+        <th>Target</th>
+        <th>User</th>
+        <th>Start Time</th>
+        <th>End Time</th>
+        <th>Duration</th>
+        <th>Status</th>
+        <th>Details</th>
+      </tr>
+      {{#finished_catalog_operations}}
+      <tr>
+        <td>{{thread_id}}</td>
+        <td>{{query_id}}</td>
+        <td>{{client_ip}}</td>
+        <td>{{coordinator}}</td>
+        <td>{{catalog_op_name}}</td>
+        <td>{{target_name}}</td>
+        <td>{{user}}</td>
+        <td>{{start_time}}</td>
+        <td>{{finish_time}}</td>
+        <td>{{duration}}</td>
+        <td>{{status}}</td>
+        <td>{{details}}</td>
+      </tr>
+      {{/finished_catalog_operations}}
+    </table>
+  </div>
+</div>
+
+{{> www/common-footer.tmpl }}

Reply via email to