This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch branch-4.4.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 91a9dd81ccd38e2e4b975e4ddf49f97c3e01b051 Author: Michael Smith <[email protected]> AuthorDate: Mon Apr 15 15:10:43 2024 -0700 IMPALA-13005: Create Query Live table in HMS Creates the 'sys.impala_query_live' table in HMS using a similar 'CREATE TABLE' command to 'sys.impala_query_log'. Updates frontend to identify a System Table based on the '__IMPALA_SYSTEM_TABLE' property. Tables improperly marked with '__IMPALA_SYSTEM_TABLE' will error when attempting to scan them because no relevant scanner will be available. Creating the table in HMS simplifies supporting 'SHOW CREATE TABLE' and 'DESCRIBE EXTENDED', so allows them for parity with Query Log. Explicitly disables 'COMPUTE STATS' on system tables as it doesn't work correctly. Makes System Tables work with local catalog mode, fixing LocalCatalogException: Unknown table type for table sys.impala_query_live Updates workload management implementation to rely more on SystemTables.thrift definition, and adds DCHECKs to verify completeness and ordering. Testing: - adds additional test cases for changes to introspection commands - passes existing test_query_live and test_query_log suites Change-Id: Idf302ee54a819fdee2db0ae582a5eeddffe4a5b4 Reviewed-on: http://gerrit.cloudera.org:8080/21302 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> (cherry picked from commit 73a9ef9c4c00419b5082f355d96961c5971634d6) --- be/generated-sources/gen-cpp/CMakeLists.txt | 1 + be/src/exec/system-table-scanner.cc | 4 +- be/src/service/workload-management-fields.cc | 120 +++++++++--------- be/src/service/workload-management.cc | 94 +++++++++----- be/src/service/workload-management.h | 20 +-- common/thrift/CatalogObjects.thrift | 9 +- common/thrift/SystemTables.thrift | 1 + .../java/org/apache/impala/analysis/Analyzer.java | 8 +- .../apache/impala/analysis/ComputeStatsStmt.java | 4 + .../apache/impala/analysis/DescribeTableStmt.java | 9 -- .../impala/analysis/ShowCreateTableStmt.java | 3 - .../apache/impala/analysis/SystemTableRef.java} | 27 ++-- .../impala/catalog/CatalogServiceCatalog.java | 4 +- fe/src/main/java/org/apache/impala/catalog/Db.java | 7 -- .../org/apache/impala/catalog/FeSystemTable.java} | 24 ++-- .../org/apache/impala/catalog/SystemTable.java | 137 ++++----------------- .../main/java/org/apache/impala/catalog/Table.java | 2 + .../impala/catalog/local/LocalSystemTable.java | 105 ++++++++++++++++ .../apache/impala/catalog/local/LocalTable.java | 3 + .../apache/impala/planner/SingleNodePlanner.java | 4 +- .../apache/impala/planner/SystemTableScanNode.java | 6 +- .../java/org/apache/impala/service/Frontend.java | 6 +- .../org/apache/impala/catalog/SystemTableTest.java | 7 +- .../org/apache/impala/planner/PlannerTest.java | 12 -- .../queries/PlannerTest/impala-query-live.test | 34 ----- tests/custom_cluster/test_query_live.py | 56 +++++++++ 26 files changed, 386 insertions(+), 321 deletions(-) diff --git a/be/generated-sources/gen-cpp/CMakeLists.txt b/be/generated-sources/gen-cpp/CMakeLists.txt index 2f1a23940..ca220b8d9 100644 --- a/be/generated-sources/gen-cpp/CMakeLists.txt +++ b/be/generated-sources/gen-cpp/CMakeLists.txt @@ -75,6 +75,7 @@ set(SRC_FILES StatestoreService_types.cpp StatestoreSubscriber.cpp Status_types.cpp + SystemTables_types.cpp Types_types.cpp Zip_types.cpp ) diff --git a/be/src/exec/system-table-scanner.cc b/be/src/exec/system-table-scanner.cc index 87030ebdc..417a839d7 100644 --- a/be/src/exec/system-table-scanner.cc +++ b/be/src/exec/system-table-scanner.cc @@ -54,7 +54,7 @@ static constexpr double NANOS_TO_MILLIS = 1000000; Status SystemTableScanner::CreateScanner(RuntimeState* state, RuntimeProfile* profile, TSystemTableName::type table_name, std::unique_ptr<SystemTableScanner>* scanner) { switch (table_name) { - case TSystemTableName::QUERY_LIVE: + case TSystemTableName::IMPALA_QUERY_LIVE: *scanner = make_unique<QueryScanner>(state, profile); break; default: @@ -174,6 +174,8 @@ Status QueryScanner::MaterializeNextTuple( const QueryStateExpanded& query = *query_records_.front(); const QueryStateRecord& record = *query.base_state; ExecEnv* exec_env = ExecEnv::GetInstance(); + // Verify there are no clustering columns (partitions) to offset col_pos. + DCHECK_EQ(0, tuple_desc->table_desc()->num_clustering_cols()); for (const SlotDescriptor* slot_desc : tuple_desc->slots()) { void* slot = tuple->GetSlot(slot_desc->tuple_offset()); diff --git a/be/src/service/workload-management-fields.cc b/be/src/service/workload-management-fields.cc index 072242b7c..4cff8d2c6 100644 --- a/be/src/service/workload-management-fields.cc +++ b/be/src/service/workload-management-fields.cc @@ -92,35 +92,35 @@ static void _write_event(FieldParserContext& ctx, QueryEvent target_event) { } /// List of query table columns. Must be kept in-sync with SystemTables.thrift -const list<FieldDefinition> FIELD_DEFINITIONS = { +const array<FieldDefinition, NumQueryTableColumns> FIELD_DEFINITIONS{{ // Cluster Id // Required - FieldDefinition("cluster_id", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::CLUSTER_ID, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.cluster_id << "'"; }), // Query Id - FieldDefinition("query_id", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::QUERY_ID, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << PrintId(ctx.record->base_state->id) << "'"; }), // Session Id - FieldDefinition("session_id", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::SESSION_ID, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << PrintId(ctx.record->session_id) << "'"; }), // Session Type - FieldDefinition("session_type", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::SESSION_TYPE, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.record->session_type << "'"; }), // Hiveserver2 Protocol Version - FieldDefinition("hiveserver2_protocol_version", TPrimitiveType::STRING, - [](FieldParserContext& ctx){ + FieldDefinition(TQueryTableColumn::HIVESERVER2_PROTOCOL_VERSION, + TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'"; if (ctx.record->session_type == TSessionType::HIVESERVER2) { ctx.sql << ctx.record->hiveserver2_protocol_version; @@ -129,32 +129,32 @@ const list<FieldDefinition> FIELD_DEFINITIONS = { }), // Effective User - FieldDefinition("db_user", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::DB_USER, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.record->base_state->effective_user << "'"; }), // DB User - FieldDefinition("db_user_connection", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::DB_USER_CONNECTION, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.record->db_user_connection << "'"; }), // Default DB - FieldDefinition("db_name", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::DB_NAME, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.record->base_state->default_db << "'"; }), // Impala Coordinator - FieldDefinition("impala_coordinator", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::IMPALA_COORDINATOR, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" <<TNetworkAddressToString( ExecEnv::GetInstance()->configured_backend_address()) << "'"; }), // Query Status - FieldDefinition("query_status", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::QUERY_STATUS, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'"; if (ctx.record->base_state->query_status.ok()) { @@ -166,72 +166,72 @@ const list<FieldDefinition> FIELD_DEFINITIONS = { }), // Query State - FieldDefinition("query_state", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::QUERY_STATE, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.record->base_state->query_state << "'"; }), // Impala Query End State - FieldDefinition("impala_query_end_state", - TPrimitiveType::STRING, [](FieldParserContext& ctx){ + FieldDefinition(TQueryTableColumn::IMPALA_QUERY_END_STATE, TPrimitiveType::STRING, + [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.record->impala_query_end_state << "'"; }), // Query Type - FieldDefinition("query_type", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::QUERY_TYPE, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.record->base_state->stmt_type << "'"; }), // Client Network Address - FieldDefinition("network_address", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::NETWORK_ADDRESS, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << TNetworkAddressToString(ctx.record->client_address) << "'"; }), // Query Start Time in UTC // Required - FieldDefinition("start_time_utc", TPrimitiveType::TIMESTAMP, + FieldDefinition(TQueryTableColumn::START_TIME_UTC, TPrimitiveType::TIMESTAMP, [](FieldParserContext& ctx){ ctx.sql << "UNIX_MICROS_TO_UTC_TIMESTAMP(" << ctx.record->base_state->start_time_us << ")"; }), // Query Duration - FieldDefinition("total_time_ms", TPrimitiveType::DECIMAL, + FieldDefinition(TQueryTableColumn::TOTAL_TIME_MS, TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_decimal(ctx, (ctx.record->base_state->end_time_us - ctx.record->base_state->start_time_us), MICROS_TO_MILLIS); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Query Options set by Configuration - FieldDefinition("query_opts_config", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::QUERY_OPTS_CONFIG, TPrimitiveType::STRING, [](FieldParserContext& ctx){ const string opts_str = DebugQueryOptions(ctx.record->query_options); ctx.sql << "'" << EscapeSql(opts_str) << "'"; }), // Resource Pool - FieldDefinition("resource_pool", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::RESOURCE_POOL, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << EscapeSql(ctx.record->base_state->resource_pool) << "'"; }), // Per-host Memory Estimate - FieldDefinition("per_host_mem_estimate", TPrimitiveType::BIGINT, + FieldDefinition(TQueryTableColumn::PER_HOST_MEM_ESTIMATE, TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ ctx.sql << ctx.record->per_host_mem_estimate; }), // Dedicated Coordinator Memory Estimate - FieldDefinition("dedicated_coord_mem_estimate", TPrimitiveType::BIGINT, - [](FieldParserContext& ctx){ + FieldDefinition(TQueryTableColumn::DEDICATED_COORD_MEM_ESTIMATE, + TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ ctx.sql << ctx.record->dedicated_coord_mem_estimate; }), // Per-Host Fragment Instances - FieldDefinition("per_host_fragment_instances", TPrimitiveType::STRING, - [](FieldParserContext& ctx){ + FieldDefinition(TQueryTableColumn::PER_HOST_FRAGMENT_INSTANCES, + TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'"; if (!ctx.record->per_host_state.empty()) { @@ -246,7 +246,7 @@ const list<FieldDefinition> FIELD_DEFINITIONS = { }), // Backends Count - FieldDefinition("backends_count", TPrimitiveType::INT, + FieldDefinition(TQueryTableColumn::BACKENDS_COUNT, TPrimitiveType::INT, [](FieldParserContext& ctx){ if (ctx.record->per_host_state.empty()) { ctx.sql << 0; @@ -256,133 +256,133 @@ const list<FieldDefinition> FIELD_DEFINITIONS = { }), // Admission Result - FieldDefinition("admission_result", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::ADMISSION_RESULT, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.record->admission_result << "'"; }), // Cluster Memory Admitted - FieldDefinition("cluster_memory_admitted", TPrimitiveType::BIGINT, + FieldDefinition(TQueryTableColumn::CLUSTER_MEMORY_ADMITTED, TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ ctx.sql << ctx.record->base_state->cluster_mem_est; }), // Executor Group - FieldDefinition("executor_group", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::EXECUTOR_GROUP, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << ctx.record->executor_group << "'"; }), // Executor Groups - FieldDefinition("executor_groups", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::EXECUTOR_GROUPS, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << EscapeSql(ctx.record->executor_groups) << "'"; }), // Exec Summary (also known as the operator summary) - FieldDefinition("exec_summary", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::EXEC_SUMMARY, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << EscapeSql(ctx.record->exec_summary) << "'"; }), // Number of rows fetched - FieldDefinition("num_rows_fetched", TPrimitiveType::BIGINT, + FieldDefinition(TQueryTableColumn::NUM_ROWS_FETCHED, TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ ctx.sql << ctx.record->base_state->num_rows_fetched; }), // Row Materialization Rate - FieldDefinition("row_materialization_rows_per_sec", TPrimitiveType::BIGINT, - [](FieldParserContext& ctx){ + FieldDefinition(TQueryTableColumn::ROW_MATERIALIZATION_ROWS_PER_SEC, + TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ ctx.sql << ctx.record->row_materialization_rate; }), // Row Materialization Time - FieldDefinition("row_materialization_time_ms", TPrimitiveType::DECIMAL, - [](FieldParserContext& ctx){ + FieldDefinition(TQueryTableColumn::ROW_MATERIALIZATION_TIME_MS, + TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_decimal(ctx, ctx.record->row_materialization_time, NANOS_TO_MILLIS); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Compressed Bytes Spilled to Disk - FieldDefinition("compressed_bytes_spilled", TPrimitiveType::BIGINT, + FieldDefinition(TQueryTableColumn::COMPRESSED_BYTES_SPILLED, TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ ctx.sql << ctx.record->compressed_bytes_spilled; }), // Events Timeline Planning Finished - FieldDefinition("event_planning_finished", TPrimitiveType::DECIMAL, + FieldDefinition(TQueryTableColumn::EVENT_PLANNING_FINISHED, TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_event(ctx, PLANNING_FINISHED); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Events Timeline Submit for Admission - FieldDefinition("event_submit_for_admission", TPrimitiveType::DECIMAL, - [](FieldParserContext& ctx){ + FieldDefinition(TQueryTableColumn::EVENT_SUBMIT_FOR_ADMISSION, + TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_event(ctx, SUBMIT_FOR_ADMISSION); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Events Timeline Completed Admission - FieldDefinition("event_completed_admission", TPrimitiveType::DECIMAL, - [](FieldParserContext& ctx){ + FieldDefinition(TQueryTableColumn::EVENT_COMPLETED_ADMISSION, + TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_event(ctx, COMPLETED_ADMISSION); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Events Timeline All Execution Backends Started - FieldDefinition("event_all_backends_started", TPrimitiveType::DECIMAL, - [](FieldParserContext& ctx){ + FieldDefinition(TQueryTableColumn::EVENT_ALL_BACKENDS_STARTED, + TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_event(ctx, ALL_BACKENDS_STARTED); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Events Timeline Rows Available - FieldDefinition("event_rows_available", TPrimitiveType::DECIMAL, + FieldDefinition(TQueryTableColumn::EVENT_ROWS_AVAILABLE, TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_event(ctx, ROWS_AVAILABLE); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Events Timeline First Row Fetched - FieldDefinition("event_first_row_fetched", TPrimitiveType::DECIMAL, + FieldDefinition(TQueryTableColumn::EVENT_FIRST_ROW_FETCHED, TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_event(ctx, FIRST_ROW_FETCHED); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Events Timeline Last Row Fetched - FieldDefinition("event_last_row_fetched", TPrimitiveType::DECIMAL, + FieldDefinition(TQueryTableColumn::EVENT_LAST_ROW_FETCHED, TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_event(ctx, LAST_ROW_FETCHED); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Events Timeline Unregister Query - FieldDefinition("event_unregister_query", TPrimitiveType::DECIMAL, + FieldDefinition(TQueryTableColumn::EVENT_UNREGISTER_QUERY, TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_event(ctx, UNREGISTER_QUERY); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Read IO Wait Time Total - FieldDefinition("read_io_wait_total_ms", TPrimitiveType::DECIMAL, + FieldDefinition(TQueryTableColumn::READ_IO_WAIT_TOTAL_MS, TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_decimal(ctx, ctx.record->read_io_wait_time_total, NANOS_TO_MILLIS); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Read IO Wait Time Mean - FieldDefinition("read_io_wait_mean_ms", TPrimitiveType::DECIMAL, + FieldDefinition(TQueryTableColumn::READ_IO_WAIT_MEAN_MS, TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){ _write_decimal(ctx, ctx.record->read_io_wait_time_mean, NANOS_TO_MILLIS); }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE), // Bytes Read from the Data Cache Total - FieldDefinition("bytes_read_cache_total", TPrimitiveType::BIGINT, + FieldDefinition(TQueryTableColumn::BYTES_READ_CACHE_TOTAL, TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ ctx.sql << ctx.record->bytes_read_cache_total; }), // Bytes Read Total - FieldDefinition("bytes_read_total", TPrimitiveType::BIGINT, + FieldDefinition(TQueryTableColumn::BYTES_READ_TOTAL, TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ ctx.sql << ctx.record->bytes_read_total; }), // Per-Node Peak Memory Usage Min - FieldDefinition("pernode_peak_mem_min", TPrimitiveType::BIGINT, + FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MIN, TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ auto min_elem = min_element(ctx.record->per_host_state.cbegin(), ctx.record->per_host_state.cend(), PerHostPeakMemoryComparator); @@ -395,7 +395,7 @@ const list<FieldDefinition> FIELD_DEFINITIONS = { }), // Per-Node Peak Memory Usage Max - FieldDefinition("pernode_peak_mem_max", TPrimitiveType::BIGINT, + FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MAX, TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ auto max_elem = max_element(ctx.record->per_host_state.cbegin(), ctx.record->per_host_state.cend(), PerHostPeakMemoryComparator); @@ -408,7 +408,7 @@ const list<FieldDefinition> FIELD_DEFINITIONS = { }), // Per-Node Peak Memory Usage Mean - FieldDefinition("pernode_peak_mem_mean", TPrimitiveType::BIGINT, + FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MEAN, TPrimitiveType::BIGINT, [](FieldParserContext& ctx){ int64_t calc_mean = 0; @@ -424,14 +424,14 @@ const list<FieldDefinition> FIELD_DEFINITIONS = { }), // SQL Statement - FieldDefinition("sql", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::SQL, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << EscapeSql(ctx.record->redacted_sql, FLAGS_query_log_max_sql_length) << "'"; }), // Query Plan - FieldDefinition("plan", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::PLAN, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << EscapeSql(ctx.record->base_state->plan, FLAGS_query_log_max_plan_length) @@ -439,12 +439,12 @@ const list<FieldDefinition> FIELD_DEFINITIONS = { }), // Tables Queried - FieldDefinition("tables_queried", TPrimitiveType::STRING, + FieldDefinition(TQueryTableColumn::TABLES_QUERIED, TPrimitiveType::STRING, [](FieldParserContext& ctx){ ctx.sql << "'" << PrintTableList(ctx.record->tables) << "'"; }), - }; // FIELDS_PARSERS constant list + }}; // FIELDS_PARSERS const array } //namespace workload_management diff --git a/be/src/service/workload-management.cc b/be/src/service/workload-management.cc index a8cbfcd5d..ca570e787 100644 --- a/be/src/service/workload-management.cc +++ b/be/src/service/workload-management.cc @@ -26,6 +26,7 @@ #include <thread> #include <utility> +#include <boost/algorithm/string/case_conv.hpp> #include <gflags/gflags_declare.h> #include <glog/logging.h> #include <gutil/strings/strcat.h> @@ -33,8 +34,10 @@ #include "common/compiler-util.h" #include "common/logging.h" #include "common/status.h" +#include "gen-cpp/CatalogObjects_constants.h" #include "gen-cpp/Frontend_types.h" #include "gen-cpp/Query_types.h" +#include "gen-cpp/SystemTables_types.h" #include "gen-cpp/Types_types.h" #include "runtime/query-driver.h" #include "service/client-request-state.h" @@ -93,20 +96,33 @@ static inline bool MaxRecordsExceeded(size_t record_count) noexcept { return FLAGS_query_log_max_queued > 0 && record_count > FLAGS_query_log_max_queued; } // function MaxRecordsExceeded -/// Sets up the completed queries database and table by generating and executing the -/// necessary DML statements. -static const Status SetupDbTable(InternalServer* server, const string& table_name) { +/// Sets up the sys database generating and executing the necessary DML statements. +static const Status SetupDb(InternalServer* server) { insert_query_opts.__set_sync_ddl(true); - RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user, - StrCat("create database if not exists ", DB, " comment " + StrCat("CREATE DATABASE IF NOT EXISTS ", DB, " COMMENT " "'System database for Impala introspection'"), insert_query_opts, false)); + insert_query_opts.__set_sync_ddl(false); + return Status::OK(); +} // function SetupDb + +/// Returns column name as lower-case to match common SQL style. +static string GetColumnName(const FieldDefinition& field) { + std::string column_name = to_string(field.db_column); + boost::algorithm::to_lower(column_name); + return column_name; +} + +/// Sets up the query table by generating and executing the necessary DML statements. +static const Status SetupTable(InternalServer* server, const string& table_name, + bool is_system_table = false) { + insert_query_opts.__set_sync_ddl(true); StringStreamPop create_table_sql; create_table_sql << "CREATE TABLE IF NOT EXISTS " << table_name << "("; for (const auto& field : FIELD_DEFINITIONS) { - create_table_sql << field.db_column_name << " " << field.db_column_type; + create_table_sql << GetColumnName(field) << " " << field.db_column_type; if (field.db_column_type == TPrimitiveType::DECIMAL) { create_table_sql << "(" << field.precision << "," << field.scale << ")"; @@ -116,16 +132,23 @@ static const Status SetupDbTable(InternalServer* server, const string& table_nam } create_table_sql.move_back(); - create_table_sql << ") PARTITIONED BY SPEC(identity(cluster_id), HOUR(start_time_utc)) " - << "STORED AS iceberg "; + create_table_sql << ") "; - if (!FLAGS_query_log_table_location.empty()) { - create_table_sql << "LOCATION '" << FLAGS_query_log_table_location << "' "; + if (!is_system_table) { + create_table_sql << "PARTITIONED BY SPEC(identity(cluster_id), HOUR(start_time_utc)) " + << "STORED AS iceberg "; + + if (!FLAGS_query_log_table_location.empty()) { + create_table_sql << "LOCATION '" << FLAGS_query_log_table_location << "' "; + } } create_table_sql << "TBLPROPERTIES ('schema_version'='1.0.0','format-version'='2'"; - if (!FLAGS_query_log_table_props.empty()) { + if (is_system_table) { + create_table_sql << ",'" + << g_CatalogObjects_constants.TBL_PROP_SYSTEM_TABLE <<"'='true'"; + } else if (!FLAGS_query_log_table_props.empty()) { create_table_sql << "," << FLAGS_query_log_table_props; } @@ -136,12 +159,11 @@ static const Status SetupDbTable(InternalServer* server, const string& table_nam insert_query_opts.__set_sync_ddl(false); - LOG(INFO) << "Completed query log initialization. storage_type=\"" - << FLAGS_enable_workload_mgmt << "\" write_interval=\"" << + LOG(INFO) << "Completed " << table_name << " initialization. write_interval=\"" << FLAGS_query_log_write_interval_s << "s\""; return Status::OK(); -} // function SetupDbTable +} // function SetupTable /// Iterates through the list of field in `FIELDS_PARSERS` executing each parser for the /// given `QueryStateExpanded` object. This function builds the `FieldParserContext` @@ -179,6 +201,13 @@ size_t ImpalaServer::NumLiveQueries() { } Status ImpalaServer::InitWorkloadManagement() { + // Verify FIELD_DEFINITIONS includes all QueryTableColumns. + DCHECK_EQ(_TQueryTableColumn_VALUES_TO_NAMES.size(), FIELD_DEFINITIONS.size()); + for (const auto& field : FIELD_DEFINITIONS) { + // Verify all fields match their column position. + DCHECK_EQ(FIELD_DEFINITIONS[field.db_column].db_column, field.db_column); + } + if (FLAGS_enable_workload_mgmt) { return Thread::Create("impala-server", "completed-queries", bind<void>(&ImpalaServer::CompletedQueriesThread, this), @@ -273,6 +302,17 @@ void ImpalaServer::EnqueueCompletedQuery(const QueryHandle& query_handle, PrintId(query_handle->query_id()) << "'"; } // ImpalaServer::EnqueueCompletedQuery +static string get_insert_prefix(const string& table_name) { + StringStreamPop fields; + fields << "INSERT INTO " << table_name << "("; + for (const auto& field : FIELD_DEFINITIONS) { + fields << GetColumnName(field) << ","; + } + fields.move_back(); + fields << ") VALUES "; + return fields.str(); +} + void ImpalaServer::CompletedQueriesThread() { { lock_guard<mutex> l(completed_queries_threadstate_mu_); @@ -288,21 +328,19 @@ void ImpalaServer::CompletedQueriesThread() { } // Fully qualified table name based on startup flags. - const string table_name = StrCat(DB, ".", FLAGS_query_log_table_name); + const string log_table_name = StrCat(DB, ".", FLAGS_query_log_table_name); // Non-values portion of the completed queries insert dml. Does not change across // queries. - StringStreamPop fields; - fields << "INSERT INTO " << table_name << "("; - for (const auto& field : FIELD_DEFINITIONS) { - fields << field.db_column_name << ","; - } - fields.move_back(); - fields << ") VALUES "; - _insert_dml = fields.str(); + _insert_dml = get_insert_prefix(log_table_name); // The initialization code only works when run in a separate thread for reasons unknown. - ABORT_IF_ERROR(SetupDbTable(internal_server_.get(), table_name)); + ABORT_IF_ERROR(SetupDb(internal_server_.get())); + ABORT_IF_ERROR(SetupTable(internal_server_.get(), log_table_name)); + std::string live_table_name = to_string(TSystemTableName::IMPALA_QUERY_LIVE); + boost::algorithm::to_lower(live_table_name); + ABORT_IF_ERROR(SetupTable(internal_server_.get(), + StrCat(DB, ".", live_table_name), true)); { lock_guard<mutex> l(completed_queries_threadstate_mu_); @@ -373,7 +411,7 @@ void ImpalaServer::CompletedQueriesThread() { for (auto iter = queries_to_insert.begin(); iter != queries_to_insert.end(); iter++) { if (iter->insert_attempts_count >= FLAGS_query_log_max_insert_attempts) { - LOG(ERROR) << "could not write completed query table=\"" << table_name << + LOG(ERROR) << "could not write completed query table=\"" << log_table_name << "\" query_id=\"" << PrintId(iter->query->base_state->id) << "\""; iter = queries_to_insert.erase(iter); ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(-1); @@ -424,7 +462,7 @@ void ImpalaServer::CompletedQueriesThread() { &tmp_query_id); if (ret_status.ok()) { - LOG(INFO) << "wrote completed queries table=\"" << table_name << "\" " + LOG(INFO) << "wrote completed queries table=\"" << log_table_name << "\" " "record_count=\"" << queries_to_insert.size() << "\""; ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment( queries_to_insert.size() * -1); @@ -432,8 +470,8 @@ void ImpalaServer::CompletedQueriesThread() { ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment( queries_to_insert.size()); } else { - LOG(WARNING) << "failed to write completed queries table=\"" << table_name << - "\" record_count=\"" << queries_to_insert.size() << "\""; + LOG(WARNING) << "failed to write completed queries table=\"" << + log_table_name << "\" record_count=\"" << queries_to_insert.size() << "\""; LOG(WARNING) << ret_status.GetDetail(); ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size()); completed_queries_lock_.lock(); diff --git a/be/src/service/workload-management.h b/be/src/service/workload-management.h index a5c865f12..8e196249d 100644 --- a/be/src/service/workload-management.h +++ b/be/src/service/workload-management.h @@ -17,13 +17,14 @@ #pragma once -#include <list> +#include <array> #include <memory> #include <string> #include <utility> #include <gflags/gflags.h> +#include "gen-cpp/SystemTables_types.h" #include "gen-cpp/Types_types.h" #include "service/query-state-record.h" #include "util/string-util.h" @@ -50,24 +51,27 @@ using FieldParser = void (*)(FieldParserContext&); /// Contains all necessary information for the definition and parsing of a single field /// in workload management. struct FieldDefinition { - const std::string db_column_name; + const TQueryTableColumn::type db_column; const TPrimitiveType::type db_column_type; const FieldParser parser; const int16_t precision; const int16_t scale; - FieldDefinition(const std::string db_col_name, const TPrimitiveType::type db_col_type, - const FieldParser fp, const int16_t precision = 0, - const int16_t scale = 0) : db_column_name(std::move(db_col_name)), - db_column_type(std::move(db_col_type)), parser(std::move(fp)), - precision(precision), scale(scale) {} + FieldDefinition(const TQueryTableColumn::type db_col, + const TPrimitiveType::type db_col_type, const FieldParser fp, + const int16_t precision = 0, const int16_t scale = 0) : + db_column(std::move(db_col)), db_column_type(std::move(db_col_type)), + parser(std::move(fp)), precision(precision), scale(scale) {} }; // struct FieldDefinition +/// Number of query table columns +constexpr size_t NumQueryTableColumns = TQueryTableColumn::TABLES_QUERIED + 1; + /// This list is the main data structure for workload management. Each list entry /// contains the name of a column in the completed queries table, the type of that column, /// and an implementation of a FieldParser for generating the value of that column from a /// `QueryStateExpanded` object. -extern const std::list<FieldDefinition> FIELD_DEFINITIONS; +extern const std::array<FieldDefinition, NumQueryTableColumns> FIELD_DEFINITIONS; /// Track the state of the thread that processes the completed queries queue. Access to /// the ThreadState variable must only happen after taking a lock on the associated mutex. diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 37297ae3b..a0e28a92a 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -146,6 +146,9 @@ enum TTablePropertyType { SERDE_PROPERTY = 1 } +// Table properties used by Impala +const string TBL_PROP_SYSTEM_TABLE = "__IMPALA_SYSTEM_TABLE" + // The access level that is available to Impala on the Catalog object. enum TAccessLevel { NONE = 0 @@ -669,10 +672,10 @@ struct TIcebergTable { 10: optional map<string, TIcebergPartitionStats> partition_stats; } -// Describes the purpose of a particular system table. -// Table names can be found in SystemTable.java +// System Table identifiers. +// These are used as the table name, so should not be changed. enum TSystemTableName { - QUERY_LIVE = 0 + IMPALA_QUERY_LIVE = 0 } // Represents a System Table diff --git a/common/thrift/SystemTables.thrift b/common/thrift/SystemTables.thrift index 27563d7bb..a67282609 100644 --- a/common/thrift/SystemTables.thrift +++ b/common/thrift/SystemTables.thrift @@ -19,6 +19,7 @@ namespace cpp impala namespace java org.apache.impala.thrift # Must be kept in-sync with workload-management-fields.cc +# Used as column names, so do not change existing enums. enum TQueryTableColumn { CLUSTER_ID QUERY_ID diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 08689d2c7..b825f88d8 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -59,6 +59,7 @@ import org.apache.impala.catalog.FeHBaseTable; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeIncompleteTable; import org.apache.impala.catalog.FeKuduTable; +import org.apache.impala.catalog.FeSystemTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeView; import org.apache.impala.catalog.IcebergTimeTravelTable; @@ -68,7 +69,6 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; -import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.TypeCompatibility; @@ -982,12 +982,14 @@ public class Analyzer { if (table instanceof IcebergMetadataTable) { return new IcebergMetadataTableRef(tableRef, resolvedPath); } + if (table instanceof FeSystemTable) { + return new SystemTableRef(tableRef, resolvedPath); + } // The table must be a base table. Preconditions.checkState(table instanceof FeFsTable || table instanceof FeKuduTable || table instanceof FeHBaseTable || - table instanceof FeDataSourceTable || - table instanceof SystemTable); + table instanceof FeDataSourceTable); return new BaseTableRef(tableRef, resolvedPath); } else { return new CollectionTableRef(tableRef, resolvedPath, false); diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java index 721b5ec9d..99bb77c85 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java @@ -403,6 +403,10 @@ public class ComputeStatsStmt extends StatementBase { throw new AnalysisException(String.format( "COMPUTE STATS not supported for nested collection: %s", tableName_)); } + if (tableRef instanceof SystemTableRef) { + throw new AnalysisException(String.format( + "COMPUTE STATS not supported for system table: %s", tableName_)); + } table_ = analyzer.getTable(tableName_, Privilege.ALTER, Privilege.SELECT); if (!(table_ instanceof FeFsTable)) { diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java index 8e6442f39..37ac060bf 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java @@ -24,7 +24,6 @@ import org.apache.impala.analysis.Path.PathType; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.StructType; -import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; @@ -139,7 +138,6 @@ public class DescribeTableStmt extends StatementBase { analyzer.getTable(table_.getTableName(), /* add column-level privilege */ true, Privilege.ANY); checkMinimalForIcebergMetadataTable(); - checkMinimalForSystemTable(); if (!targetsTable()) analyzeComplexType(analyzer); } @@ -176,13 +174,6 @@ public class DescribeTableStmt extends StatementBase { } } - private void checkMinimalForSystemTable() throws AnalysisException { - if (table_ instanceof SystemTable && outputStyle_ != TDescribeOutputStyle.MINIMAL) { - throw new AnalysisException( - "DESCRIBE FORMATTED|EXTENDED cannot refer to a system table."); - } - } - public TDescribeTableParams toThrift() { TDescribeTableParams params = new TDescribeTableParams(); params.setOutput_style(outputStyle_); diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java index 87e4c95e3..32ebceffa 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeView; -import org.apache.impala.catalog.SystemTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TTableName; @@ -77,8 +76,6 @@ public class ShowCreateTableStmt extends StatementBase { // statement references a column by its implicitly defined column names. viewAnalyzer.setUseHiveColLabels(true); viewQuery.analyze(viewAnalyzer); - } else if (table instanceof SystemTable) { - throw new AnalysisException("Not supported on system tables."); } } diff --git a/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java b/fe/src/main/java/org/apache/impala/analysis/SystemTableRef.java similarity index 55% copy from fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java copy to fe/src/main/java/org/apache/impala/analysis/SystemTableRef.java index 966ce5975..b9454e945 100644 --- a/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java +++ b/fe/src/main/java/org/apache/impala/analysis/SystemTableRef.java @@ -15,24 +15,23 @@ // specific language governing permissions and limitations // under the License. -package org.apache.impala.catalog; +package org.apache.impala.analysis; -import org.apache.impala.catalog.Db; -import org.apache.impala.catalog.SystemTable; -import org.apache.impala.common.FrontendTestBase; -import org.apache.impala.thrift.TSystemTableName; -import org.junit.Test; +import org.apache.impala.catalog.FeSystemTable; -import static org.junit.Assert.assertEquals; +import com.google.common.base.Preconditions; /** - * Tests for the SystemTable class + * TableRef class for system tables. + * + * Represents a table that is registered as a normal table in HMS, but content is + * constructed in-memory. Currently COMPUTE STATS does not work on these tables, and + * write operations are not allowed. */ -public class SystemTableTest extends FrontendTestBase { - @Test - public void testSystemTableNames() { - Db sysDb = feFixture_.addTestDb(Db.SYS, "system db"); - SystemTable queryLiveTable = SystemTable.CreateQueryLiveTable(sysDb, "impala"); - assertEquals(TSystemTableName.QUERY_LIVE, queryLiveTable.getSystemTableName()); +public class SystemTableRef extends BaseTableRef { + + public SystemTableRef(TableRef tableRef, Path resolvedPath) { + super(tableRef, resolvedPath); + Preconditions.checkState(resolvedPath.getRootTable() instanceof FeSystemTable); } } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 813ce2bf2..fc8fa029e 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -126,6 +126,7 @@ import org.apache.impala.thrift.TPartitionStats; import org.apache.impala.thrift.TPrincipalType; import org.apache.impala.thrift.TPrivilege; import org.apache.impala.thrift.TResetMetadataRequest; +import org.apache.impala.thrift.TSystemTableName; import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TTableName; import org.apache.impala.thrift.TTableType; @@ -392,7 +393,8 @@ public class CatalogServiceCatalog extends Catalog { Preconditions.checkState(topicUpdateTblLockMaxWaitTimeMs_ >= 0, "topic_update_tbl_max_wait_time_ms must be positive"); impalaSysTables = Arrays.asList( - BackendConfig.INSTANCE.queryLogTableName(), SystemTable.QUERY_LIVE); + BackendConfig.INSTANCE.queryLogTableName(), + TSystemTableName.IMPALA_QUERY_LIVE.toString().toLowerCase()); tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads); loadInBackground_ = loadInBackground; try { diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java index 6d527d465..bfaa37895 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Db.java +++ b/fe/src/main/java/org/apache/impala/catalog/Db.java @@ -133,13 +133,6 @@ public class Db extends CatalogObjectImpl implements FeDb { setMetastoreDb(name, msDb); tableCache_ = new CatalogObjectCache<>(); functions_ = new HashMap<>(); - - // This constructor is called from a static initializer in tests. - if (BackendConfig.INSTANCE != null && BackendConfig.INSTANCE.enableWorkloadMgmt() && - name.equalsIgnoreCase(SYS)) { - // Add built-in tables. - addTable(SystemTable.CreateQueryLiveTable(this, getOwnerUser())); - } } public long getCreateEventId() { return createEventId_; } diff --git a/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java b/fe/src/main/java/org/apache/impala/catalog/FeSystemTable.java similarity index 61% copy from fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java copy to fe/src/main/java/org/apache/impala/catalog/FeSystemTable.java index 966ce5975..851a87e1d 100644 --- a/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeSystemTable.java @@ -17,22 +17,18 @@ package org.apache.impala.catalog; -import org.apache.impala.catalog.Db; -import org.apache.impala.catalog.SystemTable; -import org.apache.impala.common.FrontendTestBase; +import org.apache.impala.thrift.TResultSet; import org.apache.impala.thrift.TSystemTableName; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; /** - * Tests for the SystemTable class + * Represents a system table backed by internal memory. */ -public class SystemTableTest extends FrontendTestBase { - @Test - public void testSystemTableNames() { - Db sysDb = feFixture_.addTestDb(Db.SYS, "system db"); - SystemTable queryLiveTable = SystemTable.CreateQueryLiveTable(sysDb, "impala"); - assertEquals(TSystemTableName.QUERY_LIVE, queryLiveTable.getSystemTableName()); - } +public interface FeSystemTable extends FeTable { + // Gets the system table identifier. + TSystemTableName getSystemTableName(); + + // TODO(todd): it seems like all FeTables implement this, perhaps + // this should just be a method on FeTable and simplify the code + // in Frontend.getTableStats? + TResultSet getTableStats(); } diff --git a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java index 62b8d16ac..b8622e688 100644 --- a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java @@ -17,24 +17,17 @@ package org.apache.impala.catalog; -import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READ; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.Set; -import org.apache.hadoop.hive.metastore.TableType; +import org.apache.commons.lang3.BooleanUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.impala.common.InternalException; -import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.CatalogObjectsConstants; +import org.apache.impala.thrift.TAccessLevel; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TColumn; -import org.apache.impala.thrift.TQueryTableColumn; import org.apache.impala.thrift.TResultSet; import org.apache.impala.thrift.TResultSetMetadata; import org.apache.impala.thrift.TSystemTable; @@ -44,95 +37,37 @@ import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableType; import org.apache.impala.util.EventSequence; import org.apache.impala.util.TResultRowBuilder; -import com.google.common.collect.ImmutableMap; import com.google.common.base.Preconditions; /** * Represents a system table reflecting backend internal state. */ -public final class SystemTable extends Table { - public static final String QUERY_LIVE = "impala_query_live"; - private static final Map<String, TSystemTableName> SYSTEM_TABLE_NAME_MAP = - ImmutableMap.of(QUERY_LIVE, TSystemTableName.QUERY_LIVE); - - // Constants declaring how durations measured in milliseconds will be stored in the db. - // Must match constants with the same name declared in workload-management-fields.cc. - private static final int DURATION_DECIMAL_PRECISION = 18; - private static final int DURATION_DECIMAL_SCALE = 3; - - private SystemTable(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, +public final class SystemTable extends Table implements FeSystemTable { + protected SystemTable(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner) { super(msTable, db, name, owner); + // System Tables are read-only. + accessLevel_ = TAccessLevel.READ_ONLY; } - // Get Type for a TQueryTableColumn - private static Type getColumnType(TQueryTableColumn column) { - switch (column) { - case START_TIME_UTC: - return Type.TIMESTAMP; - case PER_HOST_MEM_ESTIMATE: - case DEDICATED_COORD_MEM_ESTIMATE: - case CLUSTER_MEMORY_ADMITTED: - case NUM_ROWS_FETCHED: - case ROW_MATERIALIZATION_ROWS_PER_SEC: - case COMPRESSED_BYTES_SPILLED: - case BYTES_READ_CACHE_TOTAL: - case BYTES_READ_TOTAL: - case PERNODE_PEAK_MEM_MIN: - case PERNODE_PEAK_MEM_MAX: - case PERNODE_PEAK_MEM_MEAN: - return Type.BIGINT; - case BACKENDS_COUNT: - return Type.INT; - case TOTAL_TIME_MS: - case ROW_MATERIALIZATION_TIME_MS: - case EVENT_PLANNING_FINISHED: - case EVENT_SUBMIT_FOR_ADMISSION: - case EVENT_COMPLETED_ADMISSION: - case EVENT_ALL_BACKENDS_STARTED: - case EVENT_ROWS_AVAILABLE: - case EVENT_FIRST_ROW_FETCHED: - case EVENT_LAST_ROW_FETCHED: - case EVENT_UNREGISTER_QUERY: - case READ_IO_WAIT_TOTAL_MS: - case READ_IO_WAIT_MEAN_MS: - return ScalarType.createDecimalType( - DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE); - default: - return Type.STRING; - } - } - - public static SystemTable CreateQueryLiveTable(Db db, String owner) { - List<FieldSchema> fsList = new ArrayList<FieldSchema>(); - for (TQueryTableColumn column : TQueryTableColumn.values()) { - // The type string must be lowercase for Hive to read the column metadata properly. - String typeSql = getColumnType(column).toSql().toLowerCase(); - FieldSchema fs = new FieldSchema(column.name().toLowerCase(), typeSql, ""); - fsList.add(fs); - } - org.apache.hadoop.hive.metastore.api.Table msTable = - createMetastoreTable(db.getName(), QUERY_LIVE, owner, fsList); - - SystemTable table = new SystemTable(msTable, db, QUERY_LIVE, owner); - for (TQueryTableColumn column : TQueryTableColumn.values()) { - table.addColumn(new Column( - column.name().toLowerCase(), getColumnType(column), column.ordinal())); - } - return table; + @Override // FeSystemTable + public TSystemTableName getSystemTableName() { + return TSystemTableName.valueOf(getName().toUpperCase()); } - public TSystemTableName getSystemTableName() { - return SYSTEM_TABLE_NAME_MAP.get(getName()); + public static boolean isSystemTable(org.apache.hadoop.hive.metastore.api.Table msTbl) { + String value = msTbl.getParameters().get( + CatalogObjectsConstants.TBL_PROP_SYSTEM_TABLE); + return value != null && BooleanUtils.toBoolean(value); } @Override public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) { // Create thrift descriptors to send to the BE. - TTableDescriptor tableDescriptor = - new TTableDescriptor(tableId, TTableType.SYSTEM_TABLE, getTColumnDescriptors(), - numClusteringCols_, name_, db_.getName()); + TTableDescriptor tableDescriptor = new TTableDescriptor(tableId, + TTableType.SYSTEM_TABLE, getTColumnDescriptors(), + getNumClusteringCols(), getName(), getDb().getName()); tableDescriptor.setSystemTable(getTSystemTable()); return tableDescriptor; } @@ -164,8 +99,13 @@ public final class SystemTable extends Table { public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl, String reason, EventSequence catalogTimeline) throws TableLoadingException { - // Table is always loaded. - Preconditions.checkState(false); + int pos = colsByPos_.size(); + // Should be no partition columns. + Preconditions.checkState(pos == 0); + for (FieldSchema s: msTbl.getSd().getCols()) { + Type type = FeCatalogUtils.parseColumnType(s, getName()); + addColumn(new Column(s.getName(), type, s.getComment(), pos++)); + } } /** @@ -184,6 +124,7 @@ public final class SystemTable extends Table { * TABLE STATS statement. The schema of the returned TResultSet is set inside * this method. */ + @Override // FeSystemTable public TResultSet getTableStats() { TResultSet result = new TResultSet(); TResultSetMetadata resultSchema = new TResultSetMetadata(); @@ -194,32 +135,4 @@ public final class SystemTable extends Table { result.addToRows(rowBuilder.get()); return result; } - - private static org.apache.hadoop.hive.metastore.api.Table - createMetastoreTable(String dbName, String tableName, String owner, - List<FieldSchema> columns) { - // Based on CatalogOpExecutor#createMetaStoreTable - org.apache.hadoop.hive.metastore.api.Table tbl = - new org.apache.hadoop.hive.metastore.api.Table(); - tbl.setDbName(dbName); - tbl.setTableName(tableName); - tbl.setOwner(owner); - tbl.setParameters(new HashMap<String, String>()); - tbl.setTableType(TableType.MANAGED_TABLE.toString()); - tbl.setPartitionKeys(new ArrayList<FieldSchema>()); - if (MetastoreShim.getMajorVersion() > 2) { - MetastoreShim.setTableAccessType(tbl, ACCESSTYPE_READ); - } - - StorageDescriptor sd = new StorageDescriptor(); - sd.setSerdeInfo(new org.apache.hadoop.hive.metastore.api.SerDeInfo()); - sd.getSerdeInfo().setParameters(new HashMap<>()); - sd.setCompressed(false); - sd.setBucketCols(new ArrayList<>(0)); - sd.setSortCols(new ArrayList<>(0)); - sd.setCols(columns); - tbl.setSd(sd); - - return tbl; - } } diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 9df477ec2..ac7befc0a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -549,6 +549,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { // have a special table property to indicate that Impala should use an external // data source. table = new DataSourceTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); + } else if (SystemTable.isSystemTable(msTbl)) { + table = new SystemTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); } else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) { table = new HdfsTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalSystemTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalSystemTable.java new file mode 100644 index 000000000..2a590afb5 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalSystemTable.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog.local; + +import java.util.Set; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.impala.catalog.FeCatalogUtils; +import org.apache.impala.catalog.FeSystemTable; +import org.apache.impala.catalog.local.MetaProvider.TableMetaRef; +import org.apache.impala.catalog.Type; +import org.apache.impala.common.InternalException; +import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TResultSet; +import org.apache.impala.thrift.TResultSetMetadata; +import org.apache.impala.thrift.TSystemTable; +import org.apache.impala.thrift.TSystemTableName; +import org.apache.impala.thrift.TTableDescriptor; +import org.apache.impala.thrift.TTableType; +import org.apache.impala.util.TResultRowBuilder; + +import com.google.common.base.Preconditions; + +/** + * System table instance loaded from {@link LocalCatalog}. + * + * System tables are identified by the TBL_PROP_SYSTEM_TABLE table parameter. + */ +public class LocalSystemTable extends LocalTable implements FeSystemTable { + public static LocalSystemTable load(LocalDb db, Table msTbl, TableMetaRef ref) { + Preconditions.checkNotNull(db); + Preconditions.checkNotNull(msTbl); + return new LocalSystemTable(db, msTbl, ref); + } + + private LocalSystemTable(LocalDb db, Table msTbl, TableMetaRef ref) { + super(db, msTbl, ref); + } + + @Override // FeSystemTable + public TSystemTableName getSystemTableName() { + return TSystemTableName.valueOf(getName().toUpperCase()); + } + + @Override + public long getNumRows() { + try { + // Return an estimate of the number of live queries assuming balanced load across + // coordinators. + return FeSupport.NumLiveQueries() * FeSupport.GetCoordinators().getAddressesSize(); + } catch (InternalException e) { + return super.getNumRows(); + } + } + + /** + * Returns statistics on this table as a tabular result set. Used for the + * SHOW TABLE STATS statement. The schema of the returned TResultSet is set + * inside this method. + */ + @Override // FeSystemTable + public TResultSet getTableStats() { + TResultSet result = new TResultSet(); + TResultSetMetadata resultSchema = new TResultSetMetadata(); + resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift())); + result.setSchema(resultSchema); + TResultRowBuilder rowBuilder = new TResultRowBuilder(); + rowBuilder.add(getNumRows()); + result.addToRows(rowBuilder.get()); + return result; + } + + @Override + public TTableDescriptor toThriftDescriptor( + int tableId, Set<Long> referencedPartitions) { + TTableDescriptor tableDescriptor = new TTableDescriptor(tableId, + TTableType.SYSTEM_TABLE, FeCatalogUtils.getTColumnDescriptors(this), + getNumClusteringCols(), getName(), getDb().getName()); + tableDescriptor.setSystemTable(getTSystemTable()); + return tableDescriptor; + } + + /** + * Returns a thrift structure for the system table. + */ + private TSystemTable getTSystemTable() { + return new TSystemTable(getSystemTableName()); + } +} diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java index 15e1bda6a..a469a4af5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java @@ -46,6 +46,7 @@ import org.apache.impala.catalog.SideloadTableStats; import org.apache.impala.catalog.SqlConstraints; import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; +import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.VirtualColumn; import org.apache.impala.catalog.local.MetaProvider.TableMetaRef; @@ -131,6 +132,8 @@ abstract class LocalTable implements FeTable { t = LocalIcebergTable.loadIcebergTableViaMetaProvider(db, msTbl, ref); } else if (DataSourceTable.isDataSourceTable(msTbl)) { t = LocalDataSourceTable.load(db, msTbl, ref); + } else if (SystemTable.isSystemTable(msTbl)) { + t = LocalSystemTable.load(db, msTbl, ref); } else if (HdfsFileFormat.isHdfsInputFormatClass( msTbl.getSd().getInputFormat())) { t = LocalFsTable.load(db, msTbl, ref); diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 566b10272..a379e9cf4 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -67,10 +67,10 @@ import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeHBaseTable; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeKuduTable; +import org.apache.impala.catalog.FeSystemTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.ScalarType; -import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; @@ -1902,7 +1902,7 @@ public class SingleNodePlanner { return scanNode; } else if (table instanceof IcebergMetadataTable) { return createIcebergMetadataScanNode(tblRef, conjuncts, analyzer); - } else if (table instanceof SystemTable) { + } else if (table instanceof FeSystemTable) { scanNode = new SystemTableScanNode(ctx_.getNextNodeId(), tblRef.getDesc()); scanNode.addConjuncts(conjuncts); scanNode.init(analyzer); diff --git a/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java b/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java index ebba72849..269d0754b 100644 --- a/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.TupleDescriptor; -import org.apache.impala.catalog.SystemTable; +import org.apache.impala.catalog.FeSystemTable; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; import org.apache.impala.service.FeSupport; @@ -43,10 +43,10 @@ import com.google.common.collect.Lists; public class SystemTableScanNode extends ScanNode { public SystemTableScanNode(PlanNodeId id, TupleDescriptor desc) { super(id, desc, "SCAN SYSTEM_TABLE"); - table_ = (SystemTable) desc_.getTable(); + table_ = (FeSystemTable) desc_.getTable(); } - private final SystemTable table_; + private final FeSystemTable table_; @Override public void init(Analyzer analyzer) throws ImpalaException { diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index cd7363ebb..4da39d05a 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -120,6 +120,7 @@ import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeHBaseTable; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeKuduTable; +import org.apache.impala.catalog.FeSystemTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.IcebergPositionDeleteTable; @@ -129,7 +130,6 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable; import org.apache.impala.catalog.MetaStoreClientPool; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.StructType; -import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.local.InconsistentMetadataFetchException; @@ -1659,8 +1659,8 @@ public class Frontend { } } else if (table instanceof MaterializedViewHdfsTable) { return ((MaterializedViewHdfsTable) table).getTableStats(); - } else if (table instanceof SystemTable) { - return ((SystemTable) table).getTableStats(); + } else if (table instanceof FeSystemTable) { + return ((FeSystemTable) table).getTableStats(); } else { throw new InternalException("Invalid table class: " + table.getClass()); } diff --git a/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java b/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java index 966ce5975..4aa05ae99 100644 --- a/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java @@ -17,8 +17,6 @@ package org.apache.impala.catalog; -import org.apache.impala.catalog.Db; -import org.apache.impala.catalog.SystemTable; import org.apache.impala.common.FrontendTestBase; import org.apache.impala.thrift.TSystemTableName; import org.junit.Test; @@ -32,7 +30,8 @@ public class SystemTableTest extends FrontendTestBase { @Test public void testSystemTableNames() { Db sysDb = feFixture_.addTestDb(Db.SYS, "system db"); - SystemTable queryLiveTable = SystemTable.CreateQueryLiveTable(sysDb, "impala"); - assertEquals(TSystemTableName.QUERY_LIVE, queryLiveTable.getSystemTableName()); + SystemTable queryLiveTable = new SystemTable( + null, sysDb, "impala_query_live", "impala"); + assertEquals(TSystemTableName.IMPALA_QUERY_LIVE, queryLiveTable.getSystemTableName()); } } diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index e3a09c236..47b543293 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -1552,16 +1552,4 @@ public class PlannerTest extends PlannerTestBase { Lists.newArrayList(2, 3)), IcebergScanPlanner.getOrderedEqualityFieldIds(inp)); } - - /** - * Test queries against sys.impala_query_live. - */ - @Test - public void testQueryLive() { - boolean savedEnableWorkloadMgmt = BackendConfig.INSTANCE.enableWorkloadMgmt(); - BackendConfig.INSTANCE.setEnableWorkloadMgmt(true); - addTestDb(Db.SYS, "ensure system db"); - runPlannerTestFile("impala-query-live"); - BackendConfig.INSTANCE.setEnableWorkloadMgmt(savedEnableWorkloadMgmt); - } } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/impala-query-live.test b/testdata/workloads/functional-planner/queries/PlannerTest/impala-query-live.test deleted file mode 100644 index 343ee806d..000000000 --- a/testdata/workloads/functional-planner/queries/PlannerTest/impala-query-live.test +++ /dev/null @@ -1,34 +0,0 @@ -# Can query impala_query_live. Skips DISTRIBUTEDPLAN because that requires -# coordinators, which are not configured in frontend tests. -select count(*) from sys.impala_query_live ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: count(*) -| row-size=8B cardinality=1 -| -00:SCAN SYSTEM_TABLE [sys.impala_query_live] - row-size=0B cardinality=1 -==== -# Error trying to create new sys.impala_query_live -create table sys.impala_query_live (i int) ----- PLAN -AnalysisException: Table already exists: sys.impala_query_live -==== -drop table sys.impala_query_live ----- PLAN -AnalysisException: Write not supported. Table sys.impala_query_live access type is: READONLY -==== -insert into sys.impala_query_live values (1) ----- PLAN -AnalysisException: Write not supported. Table sys.impala_query_live access type is: READONLY -==== -update sys.impala_query_live set query_id = "nonsense" ----- PLAN -AnalysisException: Impala only supports modifying Kudu and Iceberg tables, but the following table is neither: sys.impala_query_live -==== -delete sys.impala_query_live where query_id = "nonsense" ----- PLAN -AnalysisException: Impala only supports modifying Kudu and Iceberg tables, but the following table is neither: sys.impala_query_live -==== diff --git a/tests/custom_cluster/test_query_live.py b/tests/custom_cluster/test_query_live.py index 93bdfaadb..9629afb5d 100644 --- a/tests/custom_cluster/test_query_live.py +++ b/tests/custom_cluster/test_query_live.py @@ -110,6 +110,62 @@ class TestQueryLive(CustomClusterTestSuite): assert result5.data[0] == \ "functional.alltypes,functional.alltypestiny,functional.alltypessmall" + # describe query + describe_result = self.execute_query('describe sys.impala_query_live') + assert len(describe_result.data) == 49 + + describe_ext_result = self.execute_query('describe extended sys.impala_query_live') + assert len(describe_ext_result.data) == 82 + + # show create table + show_create_tbl = self.execute_query('show create table sys.impala_query_live') + assert len(show_create_tbl.data) == 1 + assert 'CREATE EXTERNAL TABLE sys.impala_query_live' in show_create_tbl.data[0] + assert "'__IMPALA_SYSTEM_TABLE'='true'" in show_create_tbl.data[0] + + # cannot compute stats or perform write operations + compute_stats_result = self.execute_query_expect_failure(self.client, + 'compute stats sys.impala_query_live') + assert 'AnalysisException: COMPUTE STATS not supported for system table: '\ + 'sys.impala_query_live' in str(compute_stats_result) + + create_result = self.execute_query_expect_failure(self.client, + 'create table sys.impala_query_live (i int)') + assert 'AnalysisException: Table already exists: sys.impala_query_live'\ + in str(create_result) + + insert_result = self.execute_query_expect_failure(self.client, + 'insert into sys.impala_query_live select * from sys.impala_query_live limit 1') + assert 'UnsupportedOperationException: Cannot create data sink into table of type: '\ + 'org.apache.impala.catalog.SystemTable' in str(insert_result) + + update_result = self.execute_query_expect_failure(self.client, + 'update sys.impala_query_live set query_id = ""') + assert 'AnalysisException: Impala only supports modifying Kudu and Iceberg tables, '\ + 'but the following table is neither: sys.impala_query_live'\ + in str(update_result) + + delete_result = self.execute_query_expect_failure(self.client, + 'delete from sys.impala_query_live') + assert 'AnalysisException: Impala only supports modifying Kudu and Iceberg tables, '\ + 'but the following table is neither: sys.impala_query_live'\ + in str(delete_result) + + # Drop table at the end, it's only recreated on impalad startup. + self.execute_query_expect_success(self.client, 'drop table sys.impala_query_live') + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live " + "--use_local_catalog=true", + catalogd_args="--enable_workload_mgmt " + "--catalog_topic_mode=minimal") + def test_local_catalog(self): + """Asserts the query live table works with local catalog mode.""" + result = self.client.execute("select * from functional.alltypes", + fetch_profile_after_close=True) + assert_query('sys.impala_query_live', self.client, 'test_query_live', + result.runtime_profile) + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " "--cluster_id=test_query_live", catalogd_args="--enable_workload_mgmt",
