This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new b6bdf4c52 IMPALA-9118: Show catalog operation details in catalogd webUI
b6bdf4c52 is described below
commit b6bdf4c525acfe8b35928d43155cf3bea9be26cb
Author: stiga-huang <[email protected]>
AuthorDate: Sun Aug 27 16:40:46 2023 +0800
IMPALA-9118: Show catalog operation details in catalogd webUI
This patch extends the /operations page in catalogd WebUI to show the
in-flight and finished catalog operations. The following fields are
shown for each operation:
- Thread ID
- Query ID
- Client IP
- Coordinator
- Type
- Target
- User
- Start Time
- End Time (only shown for finished operations)
- Duration
- Status
- Details
Finished operation records are currently kept in memory and limited by
the catalog_operation_log_size flag (defaults to 100).
To collect the above fields, this patch extends
TCatalogServiceRequestHeader to contain the coordinator hostname. Also
fix some catalog RPCs that didn't fill these fields.
Tests:
- Add e2e test in custom_cluster/test_web_pages.py
- Manually verify the web pages when running a GVO job
Change-Id: I3cf3f0da2be2be79e546762a8083d4de338ff6aa
Reviewed-on: http://gerrit.cloudera.org:8080/20428
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/catalog/catalog-server.cc | 86 ++++++++-
be/src/catalog/catalog-server.h | 4 +
be/src/exec/catalog-op-executor.cc | 5 +-
be/src/exec/catalog-op-executor.h | 3 +-
be/src/service/client-request-state.cc | 2 +
be/src/util/backend-gflag-util.cc | 4 +
common/thrift/BackendGflags.thrift | 4 +
common/thrift/CatalogService.thrift | 3 +
common/thrift/JniCatalog.thrift | 23 ++-
.../apache/impala/analysis/ResetMetadataStmt.java | 7 +
.../impala/catalog/CatalogServiceCatalog.java | 5 +-
.../catalog/monitor/CatalogFinalizeDmlCounter.java | 2 +-
.../impala/catalog/monitor/CatalogMonitor.java | 8 +-
.../catalog/monitor/CatalogOperationCounter.java | 1 +
.../catalog/monitor/CatalogOperationMetrics.java | 89 ---------
.../catalog/monitor/CatalogOperationTracker.java | 213 +++++++++++++++++++++
.../monitor/CatalogResetMetadataCounter.java | 2 +-
.../org/apache/impala/service/BackendConfig.java | 9 +
.../apache/impala/service/CatalogOpExecutor.java | 175 +++++++++++------
.../java/org/apache/impala/service/Frontend.java | 1 +
.../java/org/apache/impala/service/JniCatalog.java | 16 +-
tests/custom_cluster/test_web_pages.py | 59 ++++++
www/catalog_operations.tmpl | 113 +++++++++--
23 files changed, 635 insertions(+), 199 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index d1f17ce3d..9b907b69f 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -32,6 +32,7 @@
#include "util/event-metrics.h"
#include "util/logging-support.h"
#include "util/metrics.h"
+#include "util/pretty-printer.h"
#include "util/thrift-debug-util.h"
#include "util/webserver.h"
@@ -148,6 +149,9 @@ DEFINE_bool(enable_skipping_older_events, false, "This
configuration is used to
"disable the optimisation. Set this true to enable skipping the older
events and"
"quickly catch with the events of HMS");
+DEFINE_int32(catalog_operation_log_size, 100, "Number of catalog operation log
records "
+ "to retain in catalogd. If -1, the operation log has unbounded size.");
+
DECLARE_string(state_store_host);
DECLARE_int32(state_store_port);
DECLARE_string(state_store_2_host);
@@ -874,17 +878,22 @@ void CatalogServer::CatalogObjectsUrlCallback(const
Webserver::WebRequest& req,
void CatalogServer::OperationUsageUrlCallback(
const Webserver::WebRequest& req, Document* document) {
- TGetOperationUsageResponse opeartion_usage;
- Status status = catalog_->GetOperationUsage(&opeartion_usage);
+ TGetOperationUsageResponse operation_usage;
+ Status status = catalog_->GetOperationUsage(&operation_usage);
if (!status.ok()) {
Value error(status.GetDetail().c_str(), document->GetAllocator());
document->AddMember("error", error, document->GetAllocator());
return;
}
+ GetCatalogOpSummary(operation_usage, document);
+ GetCatalogOpRecords(operation_usage, document);
+}
+void CatalogServer::GetCatalogOpSummary(const TGetOperationUsageResponse&
operation_usage,
+ Document* document) {
// Add the catalog operation counters to the document
Value catalog_op_list(kArrayType);
- for (const auto& catalog_op : opeartion_usage.catalog_op_counters) {
+ for (const auto& catalog_op : operation_usage.catalog_op_counters) {
Value catalog_op_obj(kObjectType);
Value op_name(catalog_op.catalog_op_name.c_str(),
document->GetAllocator());
catalog_op_obj.AddMember("catalog_op_name", op_name,
document->GetAllocator());
@@ -899,7 +908,7 @@ void CatalogServer::OperationUsageUrlCallback(
// Create a summary and add it to the document
map<string, int> aggregated_operations;
- for (const auto& catalog_op : opeartion_usage.catalog_op_counters) {
+ for (const auto& catalog_op : operation_usage.catalog_op_counters) {
aggregated_operations[catalog_op.catalog_op_name] += catalog_op.op_counter;
}
Value catalog_op_summary(kArrayType);
@@ -915,6 +924,75 @@ void CatalogServer::OperationUsageUrlCallback(
document->AddMember("catalog_op_summary", catalog_op_summary,
document->GetAllocator());
}
+static void CatalogOpListToJson(const vector<TCatalogOpRecord>& catalog_ops,
+ Value* catalog_op_list, Document* document) {
+ for (const auto& catalog_op : catalog_ops) {
+ Value obj(kObjectType);
+ Value op_name(catalog_op.catalog_op_name.c_str(),
document->GetAllocator());
+ obj.AddMember("catalog_op_name", op_name, document->GetAllocator());
+
+ Value thread_id;
+ thread_id.SetInt64(catalog_op.thread_id);
+ obj.AddMember("thread_id", thread_id, document->GetAllocator());
+
+ Value query_id(PrintId(catalog_op.query_id).c_str(),
document->GetAllocator());
+ obj.AddMember("query_id", query_id, document->GetAllocator());
+
+ Value client_ip(catalog_op.client_ip.c_str(), document->GetAllocator());
+ obj.AddMember("client_ip", client_ip, document->GetAllocator());
+
+ Value coordinator(catalog_op.coordinator_hostname.c_str(),
document->GetAllocator());
+ obj.AddMember("coordinator", coordinator, document->GetAllocator());
+
+ Value user(catalog_op.user.c_str(), document->GetAllocator());
+ obj.AddMember("user", user, document->GetAllocator());
+
+ Value target_name(catalog_op.target_name.c_str(),
document->GetAllocator());
+ obj.AddMember("target_name", target_name, document->GetAllocator());
+
+ Value start_time(ToUtcStringFromUnixMillis(catalog_op.start_time_ms,
+ TimePrecision::Millisecond).c_str(), document->GetAllocator());
+ obj.AddMember("start_time", start_time, document->GetAllocator());
+
+ int64_t end_time_ms;
+ if (catalog_op.finish_time_ms > 0) {
+ end_time_ms = catalog_op.finish_time_ms;
+ Value finish_time(ToUtcStringFromUnixMillis(catalog_op.finish_time_ms,
+ TimePrecision::Millisecond).c_str(), document->GetAllocator());
+ obj.AddMember("finish_time", finish_time, document->GetAllocator());
+ } else {
+ end_time_ms = UnixMillis();
+ }
+
+ int64_t duration_ms = end_time_ms - catalog_op.start_time_ms;
+ const string& printed_duration = PrettyPrinter::Print(duration_ms,
TUnit::TIME_MS);
+ Value duration(printed_duration.c_str(), document->GetAllocator());
+ obj.AddMember("duration", duration, document->GetAllocator());
+
+ Value status(catalog_op.status.c_str(), document->GetAllocator());
+ obj.AddMember("status", status, document->GetAllocator());
+
+ Value details(catalog_op.details.c_str(), document->GetAllocator());
+ obj.AddMember("details", details, document->GetAllocator());
+
+ catalog_op_list->PushBack(obj, document->GetAllocator());
+ }
+}
+
+void CatalogServer::GetCatalogOpRecords(const TGetOperationUsageResponse&
response,
+ Document* document) {
+ Value inflight_catalog_ops(kArrayType);
+ CatalogOpListToJson(response.in_flight_catalog_operations,
&inflight_catalog_ops,
+ document);
+ document->AddMember("inflight_catalog_operations", inflight_catalog_ops,
+ document->GetAllocator());
+ Value finished_catalog_ops(kArrayType);
+ CatalogOpListToJson(response.finished_catalog_operations,
&finished_catalog_ops,
+ document);
+ document->AddMember("finished_catalog_operations", finished_catalog_ops,
+ document->GetAllocator());
+}
+
void CatalogServer::TableMetricsUrlCallback(const Webserver::WebRequest& req,
Document* document) {
const auto& args = req.parsed_args;
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index b490894ce..6d1dde18a 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -304,6 +304,10 @@ class CatalogServer {
/// Retrieves the catalog operation metrics from FE.
void OperationUsageUrlCallback(
const Webserver::WebRequest& req, rapidjson::Document* document);
+ void GetCatalogOpSummary(
+ const TGetOperationUsageResponse& response, rapidjson::Document*
document);
+ void GetCatalogOpRecords(
+ const TGetOperationUsageResponse& response, rapidjson::Document*
document);
/// Debug webpage handler that is used to dump all the registered metrics of
a
/// table. The caller specifies the "name" parameter which is the fully
diff --git a/be/src/exec/catalog-op-executor.cc
b/be/src/exec/catalog-op-executor.cc
index 2078716d0..a7fb2381b 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -145,7 +145,7 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest&
request) {
}
}
-Status CatalogOpExecutor::ExecComputeStats(
+Status CatalogOpExecutor::ExecComputeStats(const TCatalogServiceRequestHeader&
header,
const TCatalogOpRequest& compute_stats_request,
const TTableSchema& tbl_stats_schema, const TRowSet& tbl_stats_data,
const TTableSchema& col_stats_schema, const TRowSet& col_stats_data) {
@@ -159,8 +159,7 @@ Status CatalogOpExecutor::ExecComputeStats(
update_stats_req.query_options.__set_sync_ddl(compute_stats_request.sync_ddl);
update_stats_req.query_options.__set_debug_action(
compute_stats_request.ddl_params.query_options.debug_action);
- update_stats_req.__set_header(TCatalogServiceRequestHeader());
- update_stats_req.header.__set_want_minimal_response(FLAGS_use_local_catalog);
+ update_stats_req.__set_header(header);
const TComputeStatsParams& compute_stats_params =
compute_stats_request.ddl_params.compute_stats_params;
diff --git a/be/src/exec/catalog-op-executor.h
b/be/src/exec/catalog-op-executor.h
index 8a0bcdeda..cab823bc6 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -57,7 +57,8 @@ class CatalogOpExecutor {
/// Translates the given compute stats request and its child-query results
into
/// a new table alteration request for updating the stats metadata, and
executes
/// the alteration via Exec();
- Status ExecComputeStats(const TCatalogOpRequest& compute_stats_request,
+ Status ExecComputeStats(const TCatalogServiceRequestHeader& header,
+ const TCatalogOpRequest& compute_stats_request,
const apache::hive::service::cli::thrift::TTableSchema& tbl_stats_schema,
const apache::hive::service::cli::thrift::TRowSet& tbl_stats_data,
const apache::hive::service::cli::thrift::TTableSchema& col_stats_schema,
diff --git a/be/src/service/client-request-state.cc
b/be/src/service/client-request-state.cc
index d20cbdee9..4e5e01e19 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1755,6 +1755,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
}
Status status = catalog_op_executor_->ExecComputeStats(
+ GetCatalogServiceRequestHeader(),
exec_request_->catalog_op_request,
child_queries[0]->result_schema(),
child_queries[0]->result_data(),
@@ -2177,6 +2178,7 @@ TCatalogServiceRequestHeader
ClientRequestState::GetCatalogServiceRequestHeader(
query_ctx_.client_request.__isset.redacted_stmt ?
query_ctx_.client_request.redacted_stmt :
query_ctx_.client_request.stmt);
header.__set_query_id(query_ctx_.query_id);
+ header.__set_coordinator_hostname(FLAGS_hostname);
return header;
}
diff --git a/be/src/util/backend-gflag-util.cc
b/be/src/util/backend-gflag-util.cc
index aef4304de..6ddbff9a8 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -111,6 +111,8 @@ DECLARE_string(java_weigher);
DECLARE_int32(iceberg_reload_new_files_threshold);
DECLARE_bool(enable_skipping_older_events);
DECLARE_bool(enable_json_scanner);
+DECLARE_int32(catalog_operation_log_size);
+DECLARE_string(hostname);
// HS2 SAML2.0 configuration
// Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -440,6 +442,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_enable_json_scanner(FLAGS_enable_json_scanner);
cfg.__set_max_filter_error_rate_from_full_scan(
FLAGS_max_filter_error_rate_from_full_scan);
+ cfg.__set_catalog_operation_log_size(FLAGS_catalog_operation_log_size);
+ cfg.__set_hostname(FLAGS_hostname);
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift
b/common/thrift/BackendGflags.thrift
index 919d34255..4b0ea6ce5 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -268,4 +268,8 @@ struct TBackendGflags {
118: required double max_filter_error_rate_from_full_scan
119: required i32 local_catalog_cache_concurrency_level
+
+ 120: required i32 catalog_operation_log_size
+
+ 121: required string hostname
}
diff --git a/common/thrift/CatalogService.thrift
b/common/thrift/CatalogService.thrift
index f309591f2..f984bbdb5 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -68,6 +68,9 @@ struct TCatalogServiceRequestHeader {
// The query id if this request comes from a query
5: optional Types.TUniqueId query_id
+
+ // Hostname of the coordinator
+ 6: optional string coordinator_hostname
}
// Returns details on the result of an operation that updates the catalog.
Information
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index d8699ce0a..d4b235770 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -863,10 +863,31 @@ struct TOperationUsageCounter {
3: optional i64 op_counter
}
+struct TCatalogOpRecord {
+ 1: required i64 thread_id
+ 2: required Types.TUniqueId query_id
+ 3: required string client_ip
+ 4: required string coordinator_hostname
+ 5: required string catalog_op_name
+ // Name of the target depends on the operation types, e.g. table name for
CreateTable,
+ // db name for CreateDb, function name for CreateFunction, etc.
+ 6: required string target_name
+ 7: required string user
+ 8: required i64 start_time_ms
+ 9: required i64 finish_time_ms
+ 10: required string status
+ 11: required string details
+}
+
// Response to getOperationUsage request.
struct TGetOperationUsageResponse {
- // List of the the number of running catalog operations
+ // List of the number of running catalog operations
1: required list<TOperationUsageCounter> catalog_op_counters
+ // List of the in-flight catalog operations. Sorted by start time.
+ 2: optional list<TCatalogOpRecord> in_flight_catalog_operations;
+ // List of the finished catalog operations. Sorted by finish time
descendingly, i.e.
+ // the most recently finished operations is shown first.
+ 3: optional list<TCatalogOpRecord> finished_catalog_operations;
}
struct TColumnName {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index b50c93a55..171312ccc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -27,6 +27,7 @@ import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
+import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TCatalogServiceRequestHeader;
import org.apache.impala.thrift.TResetMetadataRequest;
import org.apache.impala.thrift.TTableName;
@@ -82,6 +83,9 @@ public class ResetMetadataStmt extends StatementBase {
// Set during analysis.
private TUniqueId queryId_;
+ // Set during analysis.
+ private String clientIp_;
+
private ResetMetadataStmt(Action action, String db, TableName tableName,
PartitionSpec partitionSpec) {
Preconditions.checkNotNull(action);
@@ -146,6 +150,7 @@ public class ResetMetadataStmt extends StatementBase {
public void analyze(Analyzer analyzer) throws AnalysisException {
requestingUser_ = analyzer.getUser();
queryId_ = analyzer.getQueryCtx().getQuery_id();
+ clientIp_ =
analyzer.getQueryCtx().getSession().getNetwork_address().getHostname();
switch (action_) {
case INVALIDATE_METADATA_TABLE:
case REFRESH_TABLE:
@@ -257,6 +262,8 @@ public class ResetMetadataStmt extends StatementBase {
params.setHeader(new TCatalogServiceRequestHeader());
params.header.setRequesting_user(requestingUser_.getShortName());
params.header.setQuery_id(queryId_);
+
params.header.setCoordinator_hostname(BackendConfig.INSTANCE.getHostname());
+ params.header.setClient_ip(clientIp_);
params.setIs_refresh(action_.isRefresh());
if (tableName_ != null) {
params.setTable_name(new TTableName(tableName_.getDb(),
tableName_.getTbl()));
diff --git
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 27d7a64e4..7f7d74157 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -34,7 +34,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -123,7 +122,6 @@ import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.thrift.TTableUsage;
import org.apache.impala.thrift.TTableUsageMetrics;
-import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateTableUsageRequest;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.CatalogBlacklistUtils;
@@ -3427,8 +3425,7 @@ public class CatalogServiceCatalog extends Catalog {
* Retrieves information about the current catalog on-going operations.
*/
public TGetOperationUsageResponse getOperationUsage() {
- return new TGetOperationUsageResponse(
-
CatalogMonitor.INSTANCE.getCatalogOperationMetrics().getOperationMetrics());
+ return
CatalogMonitor.INSTANCE.getCatalogOperationTracker().getOperationMetrics();
}
/**
diff --git
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogFinalizeDmlCounter.java
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogFinalizeDmlCounter.java
index eaa39c949..32ccd4371 100644
---
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogFinalizeDmlCounter.java
+++
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogFinalizeDmlCounter.java
@@ -68,7 +68,7 @@ public class CatalogFinalizeDmlCounter extends
CatalogOperationCounter {
* after analysis. Therefore, we need to guess it if the SQL statement is
* available.
*/
- private FinalizeDmlType getDmlType(String sql_stmt) {
+ public static FinalizeDmlType getDmlType(String sql_stmt) {
sql_stmt = sql_stmt.toUpperCase();
if (sql_stmt.contains("INSERT INTO")) {
return FinalizeDmlType.FINALIZE_INSERT_INTO;
diff --git
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
index 488f66960..203d718c4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
@@ -28,19 +28,19 @@ public final class CatalogMonitor {
private final CatalogTableMetrics catalogTableMetrics_;
- private final CatalogOperationMetrics catalogOperationUsage_;
+ private final CatalogOperationTracker catalogOperationTracker_;
private final Metrics catalogdHmsCacheMetrics_ = new Metrics();
private CatalogMonitor() {
catalogTableMetrics_ = CatalogTableMetrics.INSTANCE;
- catalogOperationUsage_ = CatalogOperationMetrics.INSTANCE;
+ catalogOperationTracker_ = CatalogOperationTracker.INSTANCE;
}
public CatalogTableMetrics getCatalogTableMetrics() { return
catalogTableMetrics_; }
- public CatalogOperationMetrics getCatalogOperationMetrics() {
- return catalogOperationUsage_;
+ public CatalogOperationTracker getCatalogOperationTracker() {
+ return catalogOperationTracker_;
}
public Metrics getCatalogdHmsCacheMetrics() {
diff --git
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationCounter.java
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationCounter.java
index 35ec27561..6d2ddf176 100644
---
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationCounter.java
+++
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationCounter.java
@@ -95,6 +95,7 @@ public abstract class CatalogOperationCounter {
*/
protected String getTableName(Optional<TTableName> tTableName) {
if (tTableName.isPresent()) {
+ if (tTableName.get().table_name.isEmpty()) return
tTableName.get().db_name;
return tTableName.get().db_name + "." + tTableName.get().table_name;
} else {
return "Not available";
diff --git
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationMetrics.java
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationMetrics.java
deleted file mode 100644
index 02fc23d22..000000000
---
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationMetrics.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.catalog.monitor;
-
-import org.apache.impala.thrift.TDdlType;
-import org.apache.impala.thrift.TOperationUsageCounter;
-import org.apache.impala.thrift.TResetMetadataRequest;
-import org.apache.impala.thrift.TTableName;
-import org.apache.impala.thrift.TUpdateCatalogRequest;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Adapter class for the catalog operation counters. Currently, it tracks,
- * - the number of DDL operations in progress
- * - the number of reset metadata operations in progress
- * - the number of DML operations in progress
- */
-public final class CatalogOperationMetrics {
- public final static CatalogOperationMetrics INSTANCE = new
CatalogOperationMetrics();
-
- // Keeps track of the on-going DDL operations
- CatalogDdlCounter catalogDdlCounter;
-
- // Keeps track of the on-going reset metadata requests (refresh/invalidate)
- CatalogResetMetadataCounter catalogResetMetadataCounter;
-
- // Keeps track of the on-going finalize DML requests (insert/CTAS/upgrade)
- CatalogFinalizeDmlCounter catalogFinalizeDmlCounter;
-
- private CatalogOperationMetrics() {
- catalogDdlCounter = new CatalogDdlCounter();
- catalogResetMetadataCounter = new CatalogResetMetadataCounter();
- catalogFinalizeDmlCounter = new CatalogFinalizeDmlCounter();
- }
-
- public void increment(TDdlType tDdlType, Optional<TTableName> tTableName) {
- catalogDdlCounter.incrementOperation(tDdlType, tTableName);
- }
-
- public void decrement(TDdlType tDdlType, Optional<TTableName> tTableName) {
- catalogDdlCounter.decrementOperation(tDdlType, tTableName);
- }
-
- public void increment(TResetMetadataRequest req) {
- catalogResetMetadataCounter.incrementOperation(req);
- }
-
- public void decrement(TResetMetadataRequest req) {
- catalogResetMetadataCounter.decrementOperation(req);
- }
-
- public void increment(TUpdateCatalogRequest request) {
- catalogFinalizeDmlCounter.incrementOperation(request);
- }
-
- public void decrement(TUpdateCatalogRequest request) {
- catalogFinalizeDmlCounter.decrementOperation(request);
- }
-
- /**
- * Merges the CatalogOpMetricCounter operation summary metrics into a single
- * list that can be passed to the backend webserver.
- */
- public List<TOperationUsageCounter> getOperationMetrics() {
- List<TOperationUsageCounter> merged = new ArrayList<>();
- merged.addAll(catalogDdlCounter.getOperationUsage());
- merged.addAll(catalogResetMetadataCounter.getOperationUsage());
- merged.addAll(catalogFinalizeDmlCounter.getOperationUsage());
- return merged;
- }
-}
\ No newline at end of file
diff --git
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
new file mode 100644
index 000000000..47dc8d18f
--- /dev/null
+++
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.monitor;
+
+import com.google.common.base.Preconditions;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TCatalogServiceRequestHeader;
+import org.apache.impala.thrift.TDdlExecRequest;
+import org.apache.impala.thrift.TDdlType;
+import org.apache.impala.thrift.TCatalogOpRecord;
+import org.apache.impala.thrift.TGetOperationUsageResponse;
+import org.apache.impala.thrift.TOperationUsageCounter;
+import org.apache.impala.thrift.TResetMetadataRequest;
+import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.thrift.TUpdateCatalogRequest;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Adapter class for tracking the catalog operations. Currently, it tracks,
+ * - the number of DDL operations in progress
+ * - the number of reset metadata operations in progress
+ * - the number of DML operations in progress
+ * Each operation has a corresponding record tracked in memory. Historical
operations
+ * are also kept in memory and the size is controlled by
'catalog_operation_log_size'.
+ */
+public final class CatalogOperationTracker {
+ public final static CatalogOperationTracker INSTANCE = new
CatalogOperationTracker();
+
+ // Keeps track of the on-going DDL operations
+ CatalogDdlCounter catalogDdlCounter;
+
+ // Keeps track of the on-going reset metadata requests (refresh/invalidate)
+ CatalogResetMetadataCounter catalogResetMetadataCounter;
+
+ // Keeps track of the on-going finalize DML requests (insert/CTAS/upgrade)
+ CatalogFinalizeDmlCounter catalogFinalizeDmlCounter;
+
+ private final Map<TUniqueId, TCatalogOpRecord> inFlightOperations =
+ new ConcurrentHashMap<>();
+ private final Queue<TCatalogOpRecord> finishedOperations =
+ new ConcurrentLinkedQueue<>();
+ private final int catalogOperationLogSize;
+
+ private CatalogOperationTracker() {
+ catalogDdlCounter = new CatalogDdlCounter();
+ catalogResetMetadataCounter = new CatalogResetMetadataCounter();
+ catalogFinalizeDmlCounter = new CatalogFinalizeDmlCounter();
+ catalogOperationLogSize = BackendConfig.INSTANCE.catalogOperationLogSize();
+ Preconditions.checkState(catalogOperationLogSize >= 0);
+ }
+
+ private void addRecord(TCatalogServiceRequestHeader header,
+ String catalogOpName, Optional<TTableName> tTableName, String details) {
+ String user = "unknown";
+ String clientIp = "unknown";
+ String coordinator = "unknown";
+ TUniqueId queryId = header.getQuery_id();
+ if (header.isSetRequesting_user()) {
+ user = header.getRequesting_user();
+ }
+ if (header.isSetClient_ip()) {
+ clientIp = header.getClient_ip();
+ }
+ if (header.isSetCoordinator_hostname()) {
+ coordinator = header.getCoordinator_hostname();
+ }
+ if (queryId != null) {
+ TCatalogOpRecord record = new
TCatalogOpRecord(Thread.currentThread().getId(),
+ queryId, clientIp, coordinator, catalogOpName,
+ catalogDdlCounter.getTableName(tTableName), user,
+ System.currentTimeMillis(), -1, "STARTED", details);
+ inFlightOperations.put(queryId, record);
+ }
+ }
+
+ private void archiveRecord(TUniqueId queryId, String errorMsg) {
+ if (queryId != null && inFlightOperations.containsKey(queryId)) {
+ TCatalogOpRecord record = inFlightOperations.remove(queryId);
+ if (catalogOperationLogSize == 0) return;
+ record.setFinish_time_ms(System.currentTimeMillis());
+ if (errorMsg != null) {
+ record.setStatus("FAILED");
+ record.setDetails(record.getDetails() + ", error=" + errorMsg);
+ } else {
+ record.setStatus("FINISHED");
+ }
+ synchronized (finishedOperations) {
+ if (finishedOperations.size() >= catalogOperationLogSize) {
+ finishedOperations.poll();
+ }
+ finishedOperations.add(record);
+ }
+ }
+ }
+
+ private String getDdlType(TDdlExecRequest ddlRequest) {
+ if (ddlRequest.ddl_type == TDdlType.ALTER_TABLE) {
+ return "ALTER_TABLE_" +
ddlRequest.getAlter_table_params().getAlter_type();
+ }
+ return ddlRequest.ddl_type.name();
+ }
+
+ public void increment(TDdlExecRequest ddlRequest, Optional<TTableName>
tTableName) {
+ if (ddlRequest.isSetHeader()) {
+ String details = "query_options=" + ddlRequest.query_options.toString();
+ addRecord(ddlRequest.getHeader(), getDdlType(ddlRequest), tTableName,
details);
+ }
+ catalogDdlCounter.incrementOperation(ddlRequest.ddl_type, tTableName);
+ }
+
+ public void decrement(TDdlType tDdlType, TUniqueId queryId,
+ Optional<TTableName> tTableName, String errorMsg) {
+ archiveRecord(queryId, errorMsg);
+ catalogDdlCounter.decrementOperation(tDdlType, tTableName);
+ }
+
+ public void increment(TResetMetadataRequest req) {
+ Optional<TTableName> tTableName =
+ req.table_name != null ? Optional.of(req.table_name) :
Optional.empty();
+ if (req.isSetHeader()) {
+ String details = "sync_ddl=" + req.sync_ddl +
+ ", want_minimal_response=" + req.getHeader().want_minimal_response +
+ ", refresh_updated_hms_partitions=" +
req.refresh_updated_hms_partitions;
+ if (req.isSetDebug_action() && !req.debug_action.isEmpty()) {
+ details += ", debug_action=" + req.debug_action;
+ }
+ addRecord(req.getHeader(),
+ CatalogResetMetadataCounter.getResetMetadataType(req,
tTableName).name(),
+ tTableName, details);
+ }
+ catalogResetMetadataCounter.incrementOperation(req);
+ }
+
+ public void decrement(TResetMetadataRequest req, String errorMsg) {
+ if (req.isSetHeader()) {
+ archiveRecord(req.getHeader().getQuery_id(), errorMsg);
+ }
+ catalogResetMetadataCounter.decrementOperation(req);
+ }
+
+ public void increment(TUpdateCatalogRequest req) {
+ Optional<TTableName> tTableName =
+ Optional.of(new TTableName(req.db_name, req.target_table));
+ if (req.isSetHeader()) {
+ String details = "sync_ddl=" + req.sync_ddl +
+ ", is_overwrite=" + req.is_overwrite +
+ ", transaction_id=" + req.transaction_id +
+ ", write_id=" + req.write_id +
+ ", num_of_updated_partitions=" + req.getUpdated_partitionsSize();
+ if (req.isSetIceberg_operation()) {
+ details += ", iceberg_operation=" + req.iceberg_operation.operation;
+ }
+ if (req.isSetDebug_action() && !req.debug_action.isEmpty()) {
+ details += ", debug_action=" + req.debug_action;
+ }
+ addRecord(req.getHeader(),
+
CatalogFinalizeDmlCounter.getDmlType(req.getHeader().redacted_sql_stmt).name(),
+ tTableName, details);
+ }
+ catalogFinalizeDmlCounter.incrementOperation(req);
+ }
+
+ public void decrement(TUpdateCatalogRequest req, String errorMsg) {
+ if (req.isSetHeader()) {
+ archiveRecord(req.getHeader().getQuery_id(), errorMsg);
+ }
+ catalogFinalizeDmlCounter.decrementOperation(req);
+ }
+
+ /**
+ * Merges the CatalogOpMetricCounter operation summary metrics into a single
+ * list that can be passed to the backend webserver.
+ */
+ public TGetOperationUsageResponse getOperationMetrics() {
+ List<TOperationUsageCounter> merged = new ArrayList<>();
+ merged.addAll(catalogDdlCounter.getOperationUsage());
+ merged.addAll(catalogResetMetadataCounter.getOperationUsage());
+ merged.addAll(catalogFinalizeDmlCounter.getOperationUsage());
+ TGetOperationUsageResponse res = new TGetOperationUsageResponse(merged);
+ for (TCatalogOpRecord record : inFlightOperations.values()) {
+ res.addToIn_flight_catalog_operations(record);
+ }
+ List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations);
+ // Reverse the list to show recent operations first.
+ Collections.reverse(records);
+ res.setFinished_catalog_operations(records);
+ return res;
+ }
+}
\ No newline at end of file
diff --git
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogResetMetadataCounter.java
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogResetMetadataCounter.java
index 36af2c303..9600cee4a 100644
---
a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogResetMetadataCounter.java
+++
b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogResetMetadataCounter.java
@@ -61,7 +61,7 @@ public class CatalogResetMetadataCounter extends
CatalogOperationCounter {
getResetMetadataType(req, tTableName).toString(),
getTableName(tTableName));
}
- private ResetMetadataType getResetMetadataType(
+ public static ResetMetadataType getResetMetadataType(
TResetMetadataRequest req, Optional<TTableName> tTableName) {
if (req.is_refresh) {
return ResetMetadataType.REFRESH;
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 980e7b016..8456bfd3d 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -436,4 +436,13 @@ public class BackendConfig {
public double getMaxFilterErrorRateFromFullScan() {
return backendCfg_.max_filter_error_rate_from_full_scan;
}
+
+ public int catalogOperationLogSize() {
+ return backendCfg_.catalog_operation_log_size >= 0 ?
+ backendCfg_.catalog_operation_log_size : Integer.MAX_VALUE;
+ }
+
+ public String getHostname() {
+ return backendCfg_.hostname;
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index d263222db..2ef60d740 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -144,7 +144,7 @@ import
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKe
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.MetastoreNotificationException;
import org.apache.impala.catalog.monitor.CatalogMonitor;
-import org.apache.impala.catalog.monitor.CatalogOperationMetrics;
+import org.apache.impala.catalog.monitor.CatalogOperationTracker;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
@@ -210,6 +210,7 @@ import org.apache.impala.thrift.TDropStatsParams;
import org.apache.impala.thrift.TDropTableOrViewParams;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TFunctionBinaryType;
+import org.apache.impala.thrift.TFunctionName;
import org.apache.impala.thrift.TGrantRevokePrivParams;
import org.apache.impala.thrift.TGrantRevokeRoleParams;
import org.apache.impala.thrift.THdfsCachingOp;
@@ -238,6 +239,7 @@ import org.apache.impala.thrift.TTableRowFormat;
import org.apache.impala.thrift.TTableStats;
import org.apache.impala.thrift.TTestCaseData;
import org.apache.impala.thrift.TTruncateParams;
+import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateCatalogRequest;
import org.apache.impala.thrift.TUpdateCatalogResponse;
import org.apache.impala.thrift.TUpdatedPartition;
@@ -394,9 +396,9 @@ public class CatalogOpExecutor {
private final AuthorizationManager authzManager_;
private final HiveJavaFunctionFactory hiveJavaFuncFactory_;
- // A singleton monitoring class that keeps track of the catalog usage
metrics.
- private final CatalogOperationMetrics catalogOpMetric_ =
- CatalogMonitor.INSTANCE.getCatalogOperationMetrics();
+ // A singleton monitoring class that keeps track of the catalog operations.
+ private final CatalogOperationTracker catalogOpTracker_ =
+ CatalogMonitor.INSTANCE.getCatalogOperationTracker();
// Lock used to ensure that CREATE[DROP] TABLE[DATABASE] operations
performed in
// catalog_ and the corresponding RPC to apply the change in HMS are atomic.
@@ -423,6 +425,7 @@ public class CatalogOpExecutor {
response.setResult(new TCatalogUpdateResult());
response.getResult().setCatalog_service_id(JniCatalog.getServiceId());
User requestingUser = null;
+ TUniqueId queryId = null;
boolean wantMinimalResult = false;
if (ddlRequest.isSetHeader()) {
TCatalogServiceRequestHeader header = ddlRequest.getHeader();
@@ -430,49 +433,50 @@ public class CatalogOpExecutor {
requestingUser = new User(ddlRequest.getHeader().getRequesting_user());
}
wantMinimalResult = ddlRequest.getHeader().isWant_minimal_response();
+ queryId = header.getQuery_id();
}
Optional<TTableName> tTableName = Optional.empty();
- TDdlType ddl_type = ddlRequest.ddl_type;
+ TDdlType ddlType = ddlRequest.ddl_type;
try {
boolean syncDdl = ddlRequest.getQuery_options().isSync_ddl();
- switch (ddl_type) {
+ switch (ddlType) {
case ALTER_DATABASE:
TAlterDbParams alter_db_params = ddlRequest.getAlter_db_params();
tTableName = Optional.of(new TTableName(alter_db_params.db, ""));
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
alterDatabase(alter_db_params, wantMinimalResult, response);
break;
case ALTER_TABLE:
TAlterTableParams alter_table_params =
ddlRequest.getAlter_table_params();
tTableName = Optional.of(alter_table_params.getTable_name());
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
alterTable(alter_table_params,
ddlRequest.getQuery_options().getDebug_action(),
wantMinimalResult, response);
break;
case ALTER_VIEW:
TCreateOrAlterViewParams alter_view_params =
ddlRequest.getAlter_view_params();
tTableName = Optional.of(alter_view_params.getView_name());
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
alterView(alter_view_params, wantMinimalResult, response);
break;
case CREATE_DATABASE:
TCreateDbParams create_db_params = ddlRequest.getCreate_db_params();
tTableName = Optional.of(new TTableName(create_db_params.db, ""));
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
createDatabase(create_db_params, response, syncDdl,
wantMinimalResult);
break;
case CREATE_TABLE_AS_SELECT:
TCreateTableParams create_table_as_select_params =
ddlRequest.getCreate_table_params();
tTableName =
Optional.of(create_table_as_select_params.getTable_name());
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
response.setNew_table_created(createTable(create_table_as_select_params,
response, catalogTimeline, syncDdl, wantMinimalResult));
break;
case CREATE_TABLE:
TCreateTableParams create_table_params =
ddlRequest.getCreate_table_params();
tTableName = Optional.of((create_table_params.getTable_name()));
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
createTable(ddlRequest.getCreate_table_params(), response,
catalogTimeline,
syncDdl, wantMinimalResult);
break;
@@ -480,7 +484,7 @@ public class CatalogOpExecutor {
TCreateTableLikeParams create_table_like_params =
ddlRequest.getCreate_table_like_params();
tTableName = Optional.of(create_table_like_params.getTable_name());
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
createTableLike(create_table_like_params, response, catalogTimeline,
syncDdl,
wantMinimalResult);
break;
@@ -488,31 +492,37 @@ public class CatalogOpExecutor {
TCreateOrAlterViewParams create_view_params =
ddlRequest.getCreate_view_params();
tTableName = Optional.of(create_view_params.getView_name());
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
createView(create_view_params, wantMinimalResult, response,
catalogTimeline);
break;
case CREATE_FUNCTION:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
- createFunction(ddlRequest.getCreate_fn_params(), response);
+ TCreateFunctionParams create_func_params =
ddlRequest.getCreate_fn_params();
+ TFunctionName fnName = create_func_params.getFn().getName();
+ tTableName = Optional.of(new TTableName(fnName.db_name,
fnName.function_name));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
+ createFunction(create_func_params, response);
break;
case CREATE_DATA_SOURCE:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
- createDataSource(ddlRequest.getCreate_data_source_params(),
response);
+ TCreateDataSourceParams create_ds_params =
+ ddlRequest.getCreate_data_source_params();
+ tTableName = Optional.of(
+ new TTableName(create_ds_params.getData_source().name, ""));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
+ createDataSource(create_ds_params, response);
break;
case COMPUTE_STATS:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
Preconditions.checkState(false, "Compute stats should trigger an
ALTER TABLE.");
break;
case DROP_STATS:
TDropStatsParams drop_stats_params =
ddlRequest.getDrop_stats_params();
tTableName = Optional.of(drop_stats_params.getTable_name());
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
dropStats(drop_stats_params, wantMinimalResult, response);
break;
case DROP_DATABASE:
TDropDbParams drop_db_params = ddlRequest.getDrop_db_params();
tTableName = Optional.of(new TTableName(drop_db_params.getDb(), ""));
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
dropDatabase(drop_db_params, response);
break;
case DROP_TABLE:
@@ -520,7 +530,7 @@ public class CatalogOpExecutor {
TDropTableOrViewParams drop_table_or_view_params =
ddlRequest.getDrop_table_or_view_params();
tTableName = Optional.of(drop_table_or_view_params.getTable_name());
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
// Dropped tables and views are already returned as minimal results,
so don't
// need to pass down wantMinimalResult here.
dropTableOrView(drop_table_or_view_params, response,
@@ -529,60 +539,82 @@ public class CatalogOpExecutor {
case TRUNCATE_TABLE:
TTruncateParams truncate_params = ddlRequest.getTruncate_params();
tTableName = Optional.of(truncate_params.getTable_name());
- catalogOpMetric_.increment(ddl_type, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
truncateTable(truncate_params, wantMinimalResult, response,
ddlRequest.getQuery_options().getLock_max_wait_time_s());
break;
case DROP_FUNCTION:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
- dropFunction(ddlRequest.getDrop_fn_params(), response);
+ TDropFunctionParams drop_func_params =
ddlRequest.getDrop_fn_params();
+ TFunctionName dropFnName = drop_func_params.getFn_name();
+ tTableName = Optional.of(
+ new TTableName(dropFnName.db_name, dropFnName.function_name));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
+ dropFunction(drop_func_params, response);
break;
case DROP_DATA_SOURCE:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
- dropDataSource(ddlRequest.getDrop_data_source_params(), response);
+ TDropDataSourceParams drop_ds_params =
ddlRequest.getDrop_data_source_params();
+ tTableName = Optional.of(new TTableName(drop_ds_params.data_source,
""));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
+ dropDataSource(drop_ds_params, response);
break;
case CREATE_ROLE:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
- createRole(requestingUser, ddlRequest.getCreate_drop_role_params(),
response);
+ TCreateDropRoleParams create_role_params =
+ ddlRequest.getCreate_drop_role_params();
+ tTableName = Optional.of(new
TTableName(create_role_params.role_name, ""));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
+ createRole(requestingUser, create_role_params, response);
break;
case DROP_ROLE:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
- dropRole(requestingUser, ddlRequest.getCreate_drop_role_params(),
response);
+ TCreateDropRoleParams drop_role_params =
+ ddlRequest.getCreate_drop_role_params();
+ tTableName = Optional.of(new TTableName(drop_role_params.role_name,
""));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
+ dropRole(requestingUser, drop_role_params, response);
break;
case GRANT_ROLE:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
- grantRoleToGroup(
- requestingUser, ddlRequest.getGrant_revoke_role_params(),
response);
+ TGrantRevokeRoleParams grant_role_params =
+ ddlRequest.getGrant_revoke_role_params();
+ tTableName = Optional.of(new TTableName(
+ StringUtils.join(",", grant_role_params.group_names), ""));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
+ grantRoleToGroup(requestingUser, grant_role_params, response);
break;
case REVOKE_ROLE:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
+ TGrantRevokeRoleParams revoke_role_params =
+ ddlRequest.getGrant_revoke_role_params();
+ tTableName = Optional.of(new TTableName(
+ StringUtils.join(",", revoke_role_params.group_names), ""));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
revokeRoleFromGroup(
- requestingUser, ddlRequest.getGrant_revoke_role_params(),
response);
+ requestingUser, revoke_role_params, response);
break;
case GRANT_PRIVILEGE:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
- grantPrivilege(
- ddlRequest.getHeader(),
ddlRequest.getGrant_revoke_priv_params(), response);
+ TGrantRevokePrivParams grant_priv_params =
+ ddlRequest.getGrant_revoke_priv_params();
+ tTableName = Optional.of(new
TTableName(grant_priv_params.principal_name, ""));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
+ grantPrivilege(ddlRequest.getHeader(), grant_priv_params, response);
break;
case REVOKE_PRIVILEGE:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
- revokePrivilege(
- ddlRequest.getHeader(),
ddlRequest.getGrant_revoke_priv_params(), response);
+ TGrantRevokePrivParams revoke_priv_params =
+ ddlRequest.getGrant_revoke_priv_params();
+ tTableName = Optional.of(new
TTableName(revoke_priv_params.principal_name, ""));
+ catalogOpTracker_.increment(ddlRequest, tTableName);
+ revokePrivilege(ddlRequest.getHeader(), revoke_priv_params,
response);
break;
case COMMENT_ON:
- TCommentOnParams comment_on_params =
ddlRequest.getComment_on_params();
tTableName = Optional.of(new TTableName("", ""));
- alterCommentOn(comment_on_params, response, tTableName,
wantMinimalResult);
+ alterCommentOn(ddlRequest, response, tTableName, wantMinimalResult);
break;
case COPY_TESTCASE:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
+ catalogOpTracker_.increment(ddlRequest, Optional.empty());
copyTestCaseData(ddlRequest.getCopy_test_case_params(), response,
wantMinimalResult);
break;
default:
- catalogOpMetric_.increment(ddl_type, Optional.empty());
+ catalogOpTracker_.increment(ddlRequest, Optional.empty());
throw new IllegalStateException(
- "Unexpected DDL exec request type: " + ddl_type);
+ "Unexpected DDL exec request type: " + ddlType);
}
catalogTimeline.markEvent(DDL_FINISHED);
@@ -602,9 +634,12 @@ public class CatalogOpExecutor {
// At this point, the operation is considered successful. If any errors
occurred
// during execution, this function will throw an exception and the
CatalogServer
// will handle setting a bad status code.
- response.getResult().setStatus(new TStatus(TErrorCode.OK, new
ArrayList<String>()));
- } finally {
- catalogOpMetric_.decrement(ddl_type, tTableName);
+ response.getResult().setStatus(new TStatus(TErrorCode.OK, new
ArrayList<>()));
+ catalogOpTracker_.decrement(ddlType, queryId, tTableName,
/*exception*/null);
+ } catch (Exception e) {
+ catalogOpTracker_.decrement(ddlType, queryId, tTableName,
+ JniUtil.throwableToString(e));
+ throw e;
}
return response;
}
@@ -6511,6 +6546,19 @@ public class CatalogOpExecutor {
return fsList;
}
+ public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)
+ throws CatalogException {
+ catalogOpTracker_.increment(req);
+ try {
+ TResetMetadataResponse response = execResetMetadataImpl(req);
+ catalogOpTracker_.decrement(req, /*errorMsg*/null);
+ return response;
+ } catch (Exception e) {
+ catalogOpTracker_.decrement(req, JniUtil.throwableToString(e));
+ throw e;
+ }
+ }
+
/**
* Executes a TResetMetadataRequest and returns the result as a
* TResetMetadataResponse. Based on the request parameters, this operation
@@ -6525,7 +6573,7 @@ public class CatalogOpExecutor {
* For details on the specific commands see comments on their respective
* methods in CatalogServiceCatalog.java.
*/
- public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)
+ public TResetMetadataResponse execResetMetadataImpl(TResetMetadataRequest
req)
throws CatalogException {
String cmdString = CatalogOpUtil.getShortDescForReset(req);
TResetMetadataResponse resp = new TResetMetadataResponse();
@@ -6733,6 +6781,19 @@ public class CatalogOpExecutor {
}
}
+ public TUpdateCatalogResponse updateCatalog(TUpdateCatalogRequest update)
+ throws ImpalaException {
+ catalogOpTracker_.increment(update);
+ try {
+ TUpdateCatalogResponse response = updateCatalogImpl(update);
+ catalogOpTracker_.decrement(update, /*errorMsg*/null);
+ return response;
+ } catch (Exception e) {
+ catalogOpTracker_.decrement(update, JniUtil.throwableToString(e));
+ throw e;
+ }
+ }
+
/**
* Create any new partitions required as a result of an INSERT statement and
refreshes
* the table metadata after every INSERT statement. Any new partitions will
inherit
@@ -6743,7 +6804,7 @@ public class CatalogOpExecutor {
* watch the associated cache directives will be submitted. This will result
in an
* async table refresh once the cache request completes.
*/
- public TUpdateCatalogResponse updateCatalog(TUpdateCatalogRequest update)
+ public TUpdateCatalogResponse updateCatalogImpl(TUpdateCatalogRequest update)
throws ImpalaException {
TUpdateCatalogResponse response = new TUpdateCatalogResponse();
// Only update metastore for Hdfs tables.
@@ -7262,20 +7323,22 @@ public class CatalogOpExecutor {
return tbl;
}
- private void alterCommentOn(TCommentOnParams params, TDdlExecResponse
response,
+ private void alterCommentOn(TDdlExecRequest ddlRequest, TDdlExecResponse
response,
Optional<TTableName> tTableName, boolean wantMinimalResult)
throws ImpalaRuntimeException, CatalogException, InternalException {
+ Preconditions.checkState(tTableName.isPresent());
+ TCommentOnParams params = ddlRequest.getComment_on_params();
if (params.getDb() != null) {
Preconditions.checkArgument(!params.isSetTable_name() &&
!params.isSetColumn_name());
tTableName.get().setDb_name(params.db);
- catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
alterCommentOnDb(params.getDb(), params.getComment(), wantMinimalResult,
response);
} else if (params.getTable_name() != null) {
Preconditions.checkArgument(!params.isSetDb() &&
!params.isSetColumn_name());
tTableName.get().setDb_name(params.table_name.db_name);
tTableName.get().setTable_name(params.table_name.table_name);
- catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
alterCommentOnTableOrView(TableName.fromThrift(params.getTable_name()),
params.getComment(), wantMinimalResult, response);
} else if (params.getColumn_name() != null) {
@@ -7283,7 +7346,7 @@ public class CatalogOpExecutor {
TColumnName columnName = params.getColumn_name();
tTableName.get().setDb_name(columnName.table_name.table_name);
tTableName.get().setTable_name(columnName.table_name.table_name);
- catalogOpMetric_.increment(TDdlType.COMMENT_ON, tTableName);
+ catalogOpTracker_.increment(ddlRequest, tTableName);
alterCommentOnColumn(TableName.fromThrift(columnName.getTable_name()),
columnName.getColumn_name(), params.getComment(), wantMinimalResult,
response);
} else {
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 0d2153656..da34d41ce 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -868,6 +868,7 @@ public class Frontend {
clientRequest.getRedacted_stmt() : clientRequest.getStmt());
header.setWant_minimal_response(
BackendConfig.INSTANCE.getBackendCfg().use_local_catalog);
+ header.setCoordinator_hostname(BackendConfig.INSTANCE.getHostname());
ddl.getDdl_params().setHeader(header);
// Forward relevant query options to the catalogd.
TDdlQueryOptions ddlQueryOpts = new TDdlQueryOptions();
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index d4947ce64..afe40bfd1 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -25,8 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.impala.analysis.TableName;
@@ -48,8 +46,6 @@ import
org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.NoOpEventProcessor;
import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
-import org.apache.impala.catalog.monitor.CatalogMonitor;
-import org.apache.impala.catalog.monitor.CatalogOperationMetrics;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.JniUtil;
@@ -118,10 +114,6 @@ public class JniCatalog {
private final static ReentrantReadWriteLock catalogServiceIdLock_ =
new ReentrantReadWriteLock(true /*fair ordering*/);
- // A singleton monitoring class that keeps track of the catalog usage
metrics.
- private final CatalogOperationMetrics catalogOperationUsage_ =
- CatalogMonitor.INSTANCE.getCatalogOperationMetrics();
-
private static TUniqueId generateId() {
UUID uuid = UUID.randomUUID();
return new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
@@ -316,12 +308,10 @@ public class JniCatalog {
throws ImpalaException, TException {
TResetMetadataRequest req = new TResetMetadataRequest();
JniUtil.deserializeThrift(protocolFactory_, req, thriftResetMetadataReq);
- catalogOperationUsage_.increment(req);
String shortDesc = CatalogOpUtil.getShortDescForReset(req);
return execAndSerialize("resetMetadata", shortDesc,
- () -> catalogOpExecutor_.execResetMetadata(req),
- () -> catalogOperationUsage_.decrement(req));
+ () -> catalogOpExecutor_.execResetMetadata(req));
}
/**
@@ -490,13 +480,11 @@ public class JniCatalog {
throws ImpalaException, TException {
TUpdateCatalogRequest request = new TUpdateCatalogRequest();
JniUtil.deserializeThrift(protocolFactory_, request, thriftUpdateCatalog);
- catalogOperationUsage_.increment(request);
String shortDesc = "Update catalog for "
+ fullyQualifiedTableName(request.getDb_name(),
request.getTarget_table());
return execAndSerialize("updateCatalog", shortDesc,
- () -> catalogOpExecutor_.updateCatalog(request),
- () -> catalogOperationUsage_.decrement(request));
+ () -> catalogOpExecutor_.updateCatalog(request));
}
/**
diff --git a/tests/custom_cluster/test_web_pages.py
b/tests/custom_cluster/test_web_pages.py
index 069dc844e..495aa70a9 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -257,3 +257,62 @@ class TestWebPage(CustomClusterTestSuite):
response = requests.get("http://localhost:%s" % port)
assert 'Content-Security-Policy' not in response.headers, \
"CSP header present despite being disabled (port %s)" % port
+
+
@CustomClusterTestSuite.with_args(catalogd_args="--catalog_operation_log_size=2")
+ def test_catalog_operations_limit(self, unique_database):
+ tbl = unique_database + ".tbl"
+ self.execute_query("create table {0}_1 (id int)".format(tbl))
+ self.execute_query("create table {0}_2 (id int)".format(tbl))
+ self.execute_query("create table {0}_3 (id int)".format(tbl))
+ self.execute_query("drop table {0}_1".format(tbl))
+ response = requests.get("http://localhost:25020/operations?json")
+ assert response.status_code == requests.codes.ok
+ operations = json.loads(response.text)
+ assert "finished_catalog_operations" in operations
+ finished_operations = operations["finished_catalog_operations"]
+ # Verify only 2 operations are shown
+ assert len(finished_operations) == 2
+ op = finished_operations[0]
+ assert op["status"] == "FINISHED"
+ assert op["catalog_op_name"] == "DROP_TABLE"
+ assert op["target_name"] == tbl + "_1"
+ op = finished_operations[1]
+ assert op["status"] == "FINISHED"
+ assert op["catalog_op_name"] == "CREATE_TABLE"
+ assert op["target_name"] == tbl + "_3"
+
+
@CustomClusterTestSuite.with_args(catalogd_args="--catalog_operation_log_size=-1")
+ def test_catalog_operations_negative_limit(self, unique_database):
+ # Test negative limit on catalog_operation_log_size. The limit is
converted to be
+ # Integer.MAX_VALUE. Run hundreds of commands and see whether they are all
in the
+ # operation log.
+ tbl = unique_database + ".tbl"
+ self.execute_query("create table {0} (id int)".format(tbl))
+ num = 500
+ for i in range(num):
+ self.execute_query("invalidate metadata " + tbl)
+ response = requests.get("http://localhost:25020/operations?json")
+ assert response.status_code == requests.codes.ok
+ operations = json.loads(response.text)
+ assert "finished_catalog_operations" in operations
+ finished_operations = operations["finished_catalog_operations"]
+ # Verify all operations are in the history. There are one DROP_DATABASE,
one
+ # CREATE_DATABASE, one CREATE_TABLE and 'num' INVALIDATEs in the list.
+ assert len(finished_operations) == 3 + num
+ for i in range(num):
+ op = finished_operations[i]
+ assert op["status"] == "FINISHED"
+ assert op["catalog_op_name"] == "INVALIDATE_METADATA"
+ assert op["target_name"] == tbl
+ op = finished_operations[-3]
+ assert op["status"] == "FINISHED"
+ assert op["catalog_op_name"] == "CREATE_TABLE"
+ assert op["target_name"] == tbl
+ op = finished_operations[-2]
+ assert op["status"] == "FINISHED"
+ assert op["catalog_op_name"] == "CREATE_DATABASE"
+ assert op["target_name"] == unique_database
+ op = finished_operations[-1]
+ assert op["status"] == "FINISHED"
+ assert op["catalog_op_name"] == "DROP_DATABASE"
+ assert op["target_name"] == unique_database
diff --git a/www/catalog_operations.tmpl b/www/catalog_operations.tmpl
index 56ba8c9bf..42d8c148b 100644
--- a/www/catalog_operations.tmpl
+++ b/www/catalog_operations.tmpl
@@ -45,26 +45,6 @@ under the License.
{{/catalog_op_summary}}
</tbody>
</table>
- </div>
-</div>
-
-<script>
- $(document).ready(function() {
- $('#summary').DataTable({
- "order": [[ 1, "desc" ]],
- "pageLength": 10
- });
- });
-</script>
-
-<div class="panel panel-info">
-<div class="card">
- <div class="card-header">
- <h5 class="card-title">
- In-progress Catalog Operations
- </h5>
- </div>
- <div class="card-body">
<table id=operations class='table table-hover table-bordered'>
<thead>
<tr>
@@ -88,6 +68,10 @@ under the License.
<script>
$(document).ready(function() {
+ $('#summary').DataTable({
+ "order": [[ 1, "desc" ]],
+ "pageLength": 10
+ });
$('#operations').DataTable({
"order": [[ 1, "desc" ]],
"pageLength": 10
@@ -95,4 +79,91 @@ under the License.
});
</script>
-{{> www/common-footer.tmpl }}
\ No newline at end of file
+<div class="panel panel-info">
+<div class="card">
+ <div class="card-header">
+ <h5 class="card-title">
+ In-progress Catalog Operations
+ </h5>
+ </div>
+ <div class="card-body">
+ <table id="inflight_operations" class='table table-hover table-border'>
+ <tr>
+ <th>Thread ID</th>
+ <th>Query ID</th>
+ <th>Client IP</th>
+ <th>Coordinator</th>
+ <th>Type</th>
+ <th>Target</th>
+ <th>User</th>
+ <th>Start Time</th>
+ <th>Duration</th>
+ <th>Status</th>
+ <th>Details</th>
+ </tr>
+ {{#inflight_catalog_operations}}
+ <tr>
+ <td>{{thread_id}}</td>
+ <td>{{query_id}}</td>
+ <td>{{client_ip}}</td>
+ <td>{{coordinator}}</td>
+ <td>{{catalog_op_name}}</td>
+ <td>{{target_name}}</td>
+ <td>{{user}}</td>
+ <td>{{start_time}}</td>
+ <td>{{duration}}</td>
+ <td>{{status}}</td>
+ <td>{{details}}</td>
+ </tr>
+ {{/inflight_catalog_operations}}
+ </table>
+ </div>
+</div>
+
+<div class="panel panel-info">
+<div class="card">
+ <div class="card-header">
+ <h5 class="card-title">
+ Finished Catalog Operations
+ </h5>
+ </div>
+ <div class="card-body">
+ <p>This table lists all completed catalog operations that are archived in
memory.
+ The size of that archive is controlled with the
+ <samp>--catalog_operation_log_size</samp> command line parameter.</p>
+ <table id="finished_operations" class='table table-hover table-border'>
+ <tr>
+ <th>Thread ID</th>
+ <th>Query ID</th>
+ <th>Client IP</th>
+ <th>Coordinator</th>
+ <th>Type</th>
+ <th>Target</th>
+ <th>User</th>
+ <th>Start Time</th>
+ <th>End Time</th>
+ <th>Duration</th>
+ <th>Status</th>
+ <th>Details</th>
+ </tr>
+ {{#finished_catalog_operations}}
+ <tr>
+ <td>{{thread_id}}</td>
+ <td>{{query_id}}</td>
+ <td>{{client_ip}}</td>
+ <td>{{coordinator}}</td>
+ <td>{{catalog_op_name}}</td>
+ <td>{{target_name}}</td>
+ <td>{{user}}</td>
+ <td>{{start_time}}</td>
+ <td>{{finish_time}}</td>
+ <td>{{duration}}</td>
+ <td>{{status}}</td>
+ <td>{{details}}</td>
+ </tr>
+ {{/finished_catalog_operations}}
+ </table>
+ </div>
+</div>
+
+{{> www/common-footer.tmpl }}