This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c529b855e95e5b4fe4fed504afa4e34b3e2481dc Author: Michael Smith <[email protected]> AuthorDate: Wed Jan 10 16:57:17 2024 -0800 IMPALA-12626: Add Tables Queried to profile/history Adds "Tables Queried" to the query profile, enumerating a comma-separated list of tables accessed during a query: Tables Queried: tpch.customer,tpch.lineitem Also adds "tables_queried" to impala_query_log and impala_query_live with the same content. Requires 'drop table sys.impala_query_log' to recreate it with the new column. Change-Id: I9c9c80b2adf7f3e44225a191fe8eb9df3c4bc5aa Reviewed-on: http://gerrit.cloudera.org:8080/20886 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/system-table-scanner.cc | 5 +++ be/src/service/client-request-state.cc | 36 +++++++--------------- be/src/service/client-request-state.h | 1 + be/src/service/query-state-record.cc | 1 + be/src/service/query-state-record.h | 3 ++ be/src/service/workload-management-fields.cc | 6 ++++ be/src/util/debug-util.cc | 9 ++++++ be/src/util/debug-util.h | 3 ++ be/src/util/error-util.cc | 8 ++--- common/thrift/Frontend.thrift | 2 ++ common/thrift/SystemTables.thrift | 1 + .../java/org/apache/impala/service/Frontend.java | 3 ++ tests/custom_cluster/test_query_live.py | 12 ++++++++ tests/query_test/test_observability.py | 14 +++++++++ tests/util/workload_management.py | 15 ++++++++- 15 files changed, 87 insertions(+), 32 deletions(-) diff --git a/be/src/exec/system-table-scanner.cc b/be/src/exec/system-table-scanner.cc index 3d0dbad6b..fa91fb165 100644 --- a/be/src/exec/system-table-scanner.cc +++ b/be/src/exec/system-table-scanner.cc @@ -335,6 +335,11 @@ Status QueryScanner::MaterializeNextTuple( RETURN_IF_ERROR(WriteStringSlot( trim_left_copy_if(record.plan, is_any_of("\n")), pool, slot)); break; + case TQueryTableColumn::TABLES_QUERIED: + if (!query.tables.empty()) { + RETURN_IF_ERROR(WriteStringSlot(PrintTableList(query.tables), pool, slot)); + } + break; default: DCHECK(false) << "Unknown column position " << slot_desc->col_pos(); } diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 538fde8c1..3704716a3 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -229,12 +229,16 @@ void ClientRequestState::SetBlacklistedExecutorAddresses( Status ClientRequestState::Exec() { MarkActive(); + const TExecRequest& exec_req = exec_request(); profile_->AddChild(server_profile_); summary_profile_->AddInfoString("Query Type", PrintValue(stmt_type())); summary_profile_->AddInfoString("Query Options (set by configuration)", DebugQueryOptions(query_ctx_.client_request.query_options)); summary_profile_->AddInfoString("Query Options (set by configuration and planner)", - DebugQueryOptions(exec_request().query_options)); + DebugQueryOptions(exec_req.query_options)); + if (!exec_req.tables.empty()) { + summary_profile_->AddInfoString("Tables Queried", PrintTableList(exec_req.tables)); + } if (query_ctx_.__isset.overridden_mt_dop_value) { DCHECK(query_ctx_.client_request.query_options.__isset.mt_dop); summary_profile_->AddInfoString("MT_DOP limited by admission control", @@ -243,7 +247,6 @@ Status ClientRequestState::Exec() { query_ctx_.client_request.query_options.mt_dop)); } - const TExecRequest& exec_req = exec_request(); switch (exec_req.stmt_type) { case TStmtType::QUERY: case TStmtType::DML: @@ -533,38 +536,21 @@ Status ClientRequestState::ExecQueryOrDmlRequest( if (!query_exec_request.query_ctx.__isset.parent_query_id && query_exec_request.query_ctx.__isset.tables_missing_stats && !query_exec_request.query_ctx.tables_missing_stats.empty()) { - stringstream ss; - const vector<TTableName>& tbls = query_exec_request.query_ctx.tables_missing_stats; - for (int i = 0; i < tbls.size(); ++i) { - if (i != 0) ss << ","; - ss << tbls[i].db_name << "." << tbls[i].table_name; - } - summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, ss.str()); + summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, + PrintTableList(query_exec_request.query_ctx.tables_missing_stats)); } if (!query_exec_request.query_ctx.__isset.parent_query_id && query_exec_request.query_ctx.__isset.tables_with_corrupt_stats && !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) { - stringstream ss; - const vector<TTableName>& tbls = - query_exec_request.query_ctx.tables_with_corrupt_stats; - for (int i = 0; i < tbls.size(); ++i) { - if (i != 0) ss << ","; - ss << tbls[i].db_name << "." << tbls[i].table_name; - } - summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str()); + summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, + PrintTableList(query_exec_request.query_ctx.tables_with_corrupt_stats)); } if (query_exec_request.query_ctx.__isset.tables_missing_diskids && !query_exec_request.query_ctx.tables_missing_diskids.empty()) { - stringstream ss; - const vector<TTableName>& tbls = - query_exec_request.query_ctx.tables_missing_diskids; - for (int i = 0; i < tbls.size(); ++i) { - if (i != 0) ss << ","; - ss << tbls[i].db_name << "." << tbls[i].table_name; - } - summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str()); + summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, + PrintTableList(query_exec_request.query_ctx.tables_missing_diskids)); } { diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index d0d1b3d78..c96fcc73e 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -334,6 +334,7 @@ class ClientRequestState { } /// Returns 0:0 if this is a root query TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; } + const vector<TTableName>& tables() const { return exec_request().tables; } const std::vector<std::string>& GetAnalysisWarnings() const { return exec_request().analysis_warnings; diff --git a/be/src/service/query-state-record.cc b/be/src/service/query-state-record.cc index 0b594df83..8c7458a03 100644 --- a/be/src/service/query-state-record.cc +++ b/be/src/service/query-state-record.cc @@ -203,6 +203,7 @@ QueryStateExpanded::QueryStateExpanded(const ClientRequestState& exec_state, .query_exec_request.dedicated_coord_mem_estimate; row_materialization_rate = exec_state.row_materialization_rate(); row_materialization_time = exec_state.row_materialization_timer(); + tables = exec_state.tables(); // Update name_rows_fetched with the final count after query close. base_state->num_rows_fetched = exec_state.num_rows_fetched_counter(); diff --git a/be/src/service/query-state-record.h b/be/src/service/query-state-record.h index c3fec5518..6ada7a0e9 100644 --- a/be/src/service/query-state-record.h +++ b/be/src/service/query-state-record.h @@ -340,6 +340,9 @@ struct QueryStateExpanded { /// Events Timeline Iterator EventsTimelineIterator EventsTimeline() const; + // Source tables accessed by this query. + std::vector<TTableName> tables; + /// Required data will be copied from the provided ClientRequestState into members of /// the struct. QueryStateExpanded(const ClientRequestState& exec_state, diff --git a/be/src/service/workload-management-fields.cc b/be/src/service/workload-management-fields.cc index 19ef98fb7..a95bb671c 100644 --- a/be/src/service/workload-management-fields.cc +++ b/be/src/service/workload-management-fields.cc @@ -411,6 +411,12 @@ const std::list<FieldDefinition> FIELD_DEFINITIONS = { << "'"; }), + // Tables Queried + FieldDefinition("tables_queried", TPrimitiveType::STRING, + [](FieldParserContext& ctx){ + ctx.sql << "'" << PrintTableList(ctx.record->tables) << "'"; + }), + }; // FIELDS_PARSERS constant list } //namespace workload_management diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc index 927319de8..005bf1c7e 100644 --- a/be/src/util/debug-util.cc +++ b/be/src/util/debug-util.cc @@ -277,6 +277,15 @@ string PrintNumericPath(const SchemaPath& path) { return ss.str(); } +string PrintTableList(const vector<TTableName>& tbls) { + stringstream ss; + for (int i = 0; i < tbls.size(); ++i) { + if (i != 0) ss << ","; + ss << tbls[i].db_name << "." << tbls[i].table_name; + } + return ss.str(); +} + string GetBuildVersion(bool compact) { stringstream ss; ss << GetDaemonBuildVersion() diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h index 4179eefcd..4d7255e45 100644 --- a/be/src/util/debug-util.h +++ b/be/src/util/debug-util.h @@ -91,6 +91,9 @@ template<typename ThriftStruct> std::string PrintThrift(const ThriftStruct& t) { return apache::thrift::ThriftDebugString(t); } +/// Return a list of TTableName as a comma-separated string. +std::string PrintTableList(const std::vector<TTableName>& tbls); + /// Parse 's' into a TUniqueId object. The format of s needs to be the output format /// from PrintId. (<hi_part>:<low_part>) /// Returns true if parse succeeded. diff --git a/be/src/util/error-util.cc b/be/src/util/error-util.cc index 430f2c833..0d9443b20 100644 --- a/be/src/util/error-util.cc +++ b/be/src/util/error-util.cc @@ -16,6 +16,7 @@ // under the License. #include "util/error-util-internal.h" +#include "util/debug-util.h" #include "util/string-util.h" #include <errno.h> @@ -47,12 +48,7 @@ string GetTablesMissingStatsWarning(const vector<TTableName>& tables_missing_sta stringstream ss; if (tables_missing_stats.empty()) return string(""); ss << "WARNING: The following tables are missing relevant table and/or column " - << "statistics.\n"; - for (int i = 0; i < tables_missing_stats.size(); ++i) { - const TTableName& table_name = tables_missing_stats[i]; - if (i != 0) ss << ","; - ss << table_name.db_name << "." << table_name.table_name; - } + << "statistics.\n" << PrintTableList(tables_missing_stats); return ss.str(); } diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index d564fd631..93a0d92bf 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -678,6 +678,8 @@ struct TExecRequest { // Request for "ALTER TABLE ... CONVERT TO" statements. 19: optional TConvertTableRequest convert_table_request + + 20: optional list<CatalogObjects.TTableName> tables } // Parameters to FeSupport.cacheJar(). diff --git a/common/thrift/SystemTables.thrift b/common/thrift/SystemTables.thrift index 224fcca41..3ca81db47 100644 --- a/common/thrift/SystemTables.thrift +++ b/common/thrift/SystemTables.thrift @@ -68,4 +68,5 @@ enum TQueryTableColumn { PERNODE_PEAK_MEM_MEAN SQL PLAN + TABLES_QUERIED } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index f68ecb2eb..2c7649430 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -2412,6 +2412,9 @@ public class Frontend { } Preconditions.checkNotNull(analysisResult.getStmt()); TExecRequest result = createBaseExecRequest(queryCtx, analysisResult); + for (TableName table : stmtTableCache.tables.keySet()) { + result.addToTables(table.toThrift()); + } // Transfer the expected number of executors in executor group set to // analyzer's global state. The info is needed to compute the number of nodes to be diff --git a/tests/custom_cluster/test_query_live.py b/tests/custom_cluster/test_query_live.py index ee8c17e0c..ca44122e0 100644 --- a/tests/custom_cluster/test_query_live.py +++ b/tests/custom_cluster/test_query_live.py @@ -98,6 +98,18 @@ class TestQueryLive(CustomClusterTestSuite): 'select * from sys.impala_query_live where cluster_id = "test_query_live_0"') assert len(result5.data) == 0 + result = self.execute_query(""" + select count(*) from functional.alltypestiny a + inner join functional.alltypes b on a.id = b.id + inner join functional.alltypessmall c on b.id = c.id + """) + result5 = self.execute_query( + 'select tables_queried from sys.impala_query_live where query_id = "' + + result.query_id + '"') + assert len(result5.data) == 1 + assert result5.data[0] == \ + "functional.alltypes,functional.alltypestiny,functional.alltypessmall" + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " "--cluster_id=test_query_live", catalogd_args="--enable_workload_mgmt", diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py index b11453585..60761e57c 100644 --- a/tests/query_test/test_observability.py +++ b/tests/query_test/test_observability.py @@ -188,6 +188,20 @@ class TestObservability(ImpalaTestSuite): expected_str = expected_str.format(timezone=server_timezone) assert expected_str in profile, profile + def test_profile(self): + """Test that expected fields are populated in the profile.""" + query = """select count(distinct a.int_col) from functional.alltypes a + inner join functional.alltypessmall b on (a.id = b.id + cast(sleep(15) as INT))""" + result = self.execute_query(query) + + assert "Query Type: QUERY" in result.runtime_profile + assert "Query State: " in result.runtime_profile + assert "Default Db: default" in result.runtime_profile + tables = re.search(r'\n\s+Tables Queried:\s+(.*?)\n', result.runtime_profile) + assert tables is not None + assert sorted(tables.group(1).split(",")) \ + == ["functional.alltypes", "functional.alltypessmall"] + def test_exec_summary(self): """Test that the exec summary is populated correctly in every query state""" query = "select count(*) from functional.alltypes" diff --git a/tests/util/workload_management.py b/tests/util/workload_management.py index 900e523b3..2ee3eba3f 100644 --- a/tests/util/workload_management.py +++ b/tests/util/workload_management.py @@ -25,7 +25,7 @@ from tests.util.assert_time import assert_time_str, convert_to_nanos from tests.util.memory import assert_byte_str, convert_to_bytes DEDICATED_COORD_SAFETY_BUFFER_BYTES = 104857600 -EXPECTED_QUERY_COLS = 48 +EXPECTED_QUERY_COLS = 49 CLUSTER_ID = "CLUSTER_ID" @@ -76,6 +76,7 @@ PERNODE_PEAK_MEM_MAX = "PERNODE_PEAK_MEM_MAX" PERNODE_PEAK_MEM_MEAN = "PERNODE_PEAK_MEM_MEAN" SQL = "SQL" PLAN = "PLAN" +TABLES_QUERIED = "TABLES_QUERIED" def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, impalad=None, @@ -699,6 +700,18 @@ def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, impal assert plan is not None assert data[index] == plan.group(1) + # Tables Queried + index += 1 + assert sql_results.column_labels[index] == TABLES_QUERIED + ret_data[TABLES_QUERIED] = data[index] + tables = re.search(r'\n\s+Tables Queried:\s+(.*?)\n', profile_text) + if query_state_value == "EXCEPTION" or query_type == "DDL": + assert tables is None + assert data[index] == "" + else: + assert tables is not None + assert data[index] == tables.group(1) + return ret_data # function assert_query
