This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 9b8e43e9a1ae871e676cded8e9eddc90aa96f314 Author: jasonmfehr <[email protected]> AuthorDate: Tue Feb 27 14:35:16 2024 -0800 IMPALA-12426: QueryStateRecord Refactor The QueryStateRecord struct is used to store important information about a completed query for the Impala web UI page of recently completed queries. Since significant portions of this struct has data that is also needed in workload management, it has been refactored. The QueryStateRecord struct was a private child struct under the ImpalaServer class. It has now been moved to a top-level struct within the impala namespace. A new struct named QueryStateExpanded has also been created. This struct contains a shared pointer to a QueryStateRecord so the same QueryStateRecord instance can be used by both the web UI and workload management. The QueryStateExpanded struct also contains additional data that is used exclusively by workload management. New ctests have been added to exercise the added comparators. Change-Id: I57d470db6fea71ec12e43f86e3fd62ed6259c83a Reviewed-on: http://gerrit.cloudera.org:8080/21059 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/runtime/coordinator.cc | 16 + be/src/runtime/coordinator.h | 6 + be/src/service/CMakeLists.txt | 3 + be/src/service/client-request-state.cc | 24 ++ be/src/service/client-request-state.h | 14 + be/src/service/impala-http-handler.cc | 15 +- be/src/service/impala-http-handler.h | 2 +- be/src/service/impala-server.cc | 138 +------ be/src/service/impala-server.h | 143 +------ be/src/service/query-state-record-test.cc | 158 ++++++++ be/src/service/query-state-record.cc | 410 +++++++++++++++++++++ be/src/service/query-state-record.h | 321 ++++++++++++++++ be/src/util/runtime-profile-test.cc | 5 +- be/src/util/runtime-profile.cc | 8 +- be/src/util/runtime-profile.h | 4 +- common/thrift/Query.thrift | 4 + .../java/org/apache/impala/planner/Planner.java | 2 + 17 files changed, 979 insertions(+), 294 deletions(-) diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index f8fae8582..73a1a4e4f 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -19,8 +19,10 @@ #include <cerrno> #include <iomanip> +#include <list> #include <sstream> #include <unordered_set> +#include <utility> #include <thrift/protocol/TDebugProtocol.h> #include <boost/algorithm/string/join.hpp> @@ -1505,6 +1507,20 @@ vector<NetworkAddressPB> Coordinator::GetActiveBackends( return result; } +list<pair<NetworkAddressPB, Coordinator::ResourceUtilization>> + Coordinator::BackendResourceUtilization() { + DCHECK(exec_rpcs_complete_.Load()) << "Exec() must be called first."; + list<pair<NetworkAddressPB, Coordinator::ResourceUtilization>> result; + + lock_guard<SpinLock> l(backend_states_init_lock_); + for (BackendState* backend_state : backend_states_) { + result.push_back(make_pair(backend_state->impalad_address(), + backend_state->GetResourceUtilization())); + } + + return result; +} + void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* context) { VLOG(2) << "Coordinator::UpdateFilter(filter_id=" << params.filter_id() << ")"; shared_lock<shared_mutex> lock(filter_routing_table_->lock); diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 411113dd8..92fea991f 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -17,9 +17,11 @@ #pragma once +#include <list> #include <memory> #include <string> #include <vector> +#include <utility> #include <boost/unordered_map.hpp> #include <rapidjson/document.h> @@ -264,6 +266,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// latest status reports received from those backends). ResourceUtilization ComputeQueryResourceUtilization(); + /// Return the backends and their associated resource utilization. + std::list<std::pair<NetworkAddressPB, ResourceUtilization>> + BackendResourceUtilization(); + /// Return the backends in 'candidates' that still have at least one fragment instance /// executing on them. The returned backends may not be in the same order as the input. std::vector<NetworkAddressPB> GetActiveBackends( diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index 56cfaa384..c5fc31226 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -43,6 +43,7 @@ add_library(Service internal-server.cc query-options.cc query-result-set.cc + query-state-record.cc ) add_dependencies(Service gen-deps) @@ -132,6 +133,7 @@ add_library(ServiceTests STATIC hs2-util-test.cc impala-server-test.cc query-options-test.cc + query-state-record-test.cc ) add_dependencies(ServiceTests gen-deps) @@ -140,4 +142,5 @@ ADD_BE_TEST(session-expiry-test session-expiry-test.cc) # TODO: this leaks thrif ADD_UNIFIED_BE_LSAN_TEST(hs2-util-test "StitchNullsTest.*:PrintTColumnValueTest.*") ADD_UNIFIED_BE_LSAN_TEST(query-options-test QueryOptions.*) ADD_UNIFIED_BE_LSAN_TEST(impala-server-test ImpalaServerTest.*) +ADD_UNIFIED_BE_LSAN_TEST(query-state-record-test QueryStateRecordTest.*:PerHostStateTest.*) ADD_BE_LSAN_TEST(internal-server-test) diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 761c12e7e..3deb8ef0c 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -2367,4 +2367,28 @@ void ClientRequestState::AddTableResetHints(const TConvertTableRequest& params, status->MergeStatus(Status(table_reset_hint)); } +int64_t ClientRequestState::num_rows_fetched_counter() const { + if (LIKELY(num_rows_fetched_counter_ != nullptr)) { + return num_rows_fetched_counter_->value(); + } + + return 0; +} + +int64_t ClientRequestState::row_materialization_rate() const { + if (LIKELY(row_materialization_rate_ != nullptr)) { + return row_materialization_rate_->value(); + } + + return 0; +} + +int64_t ClientRequestState::row_materialization_timer() const { + if (LIKELY(row_materialization_timer_ != nullptr)) { + return row_materialization_timer_->value(); + } + + return 0; +} + } diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index a9f8c1912..e92a23b71 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -244,6 +244,11 @@ class ClientRequestState { TSessionType::type session_type() const { return query_ctx_.session.session_type; } const TUniqueId& session_id() const { return query_ctx_.session.session_id; } const std::string& default_db() const { return query_ctx_.session.database; } + const std::string& redacted_sql() const { + return + query_ctx_.client_request.__isset.redacted_stmt ? + query_ctx_.client_request.redacted_stmt : query_ctx_.client_request.stmt; + } bool eos() const { return eos_.Load(); } const QuerySchedulePB* schedule() const { return schedule_.get(); } @@ -275,6 +280,14 @@ class ClientRequestState { bool returns_result_set() { return !result_metadata_.columns.empty(); } const TResultSetMetadata* result_metadata() const { return &result_metadata_; } const TUniqueId& query_id() const { return query_ctx_.query_id; } + + /// Return values from counters. See explanation on declaration of + /// num_rows_fetched_counter_, row_materialization_rate_, and row_materialization_timer_ + /// for details. + int64_t num_rows_fetched_counter() const; + int64_t row_materialization_rate() const; + int64_t row_materialization_timer() const; + /// Returns the TExecRequest for the query associated with this ClientRequestState. /// Contents are a place-holder until GetExecRequest(TQueryCtx) initializes the /// TExecRequest. @@ -312,6 +325,7 @@ class ClientRequestState { } const RuntimeProfile* profile() const { return profile_; } const RuntimeProfile* summary_profile() const { return summary_profile_; } + const RuntimeProfile* frontend_profile() const { return frontend_profile_; } int64_t start_time_us() const { return start_time_us_; } int64_t end_time_us() const { return end_time_us_.Load(); } const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; } diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 04579866d..390a218bb 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -43,6 +43,7 @@ #include "service/client-request-state.h" #include "service/frontend.h" #include "service/impala-server.h" +#include "service/query-state-record.h" #include "thrift/protocol/TDebugProtocol.h" #include "util/coding-util.h" #include "util/debug-util.h" @@ -510,7 +511,7 @@ std::string ImpalaHttpHandler::ProgressToString(int64_t num_completed, int64_t t return ss.str(); } -void ImpalaHttpHandler::QueryStateToJson(const ImpalaServer::QueryStateRecord& record, +void ImpalaHttpHandler::QueryStateToJson(const QueryStateRecord& record, Value* value, Document* document, bool inflight) { Value user(record.effective_user.c_str(), document->GetAllocator()); value->AddMember("effective_user", user, document->GetAllocator()); @@ -623,19 +624,19 @@ void ImpalaHttpHandler::QueryStateHandler(const Webserver::WebRequest& req, Document* document) { AddQueryRecordTips(document); - set<ImpalaServer::QueryStateRecord, ImpalaServer::QueryStateRecordLessThan> + set<QueryStateRecord, QueryStateRecord::StartTimeComparator> sorted_query_records; server_->query_driver_map_.DoFuncForAllEntries( [&](const std::shared_ptr<QueryDriver>& query_driver) { sorted_query_records.insert( - ImpalaServer::QueryStateRecord(*query_driver->GetActiveClientRequestState())); + QueryStateRecord(*query_driver->GetActiveClientRequestState())); }); unordered_set<TUniqueId> in_flight_query_ids; Value in_flight_queries(kArrayType); int64_t num_waiting_queries = 0; - for (const ImpalaServer::QueryStateRecord& record: sorted_query_records) { + for (const QueryStateRecord& record: sorted_query_records) { Value record_json(kObjectType); QueryStateToJson(record, &record_json, document, true); @@ -662,7 +663,7 @@ void ImpalaHttpHandler::QueryStateHandler(const Webserver::WebRequest& req, Value completed_queries(kArrayType); { lock_guard<mutex> l(server_->query_log_lock_); - for (const shared_ptr<ImpalaServer::QueryStateRecord>& log_entry : + for (const shared_ptr<QueryStateRecord>& log_entry : server_->query_log_) { // Don't show duplicated entries between in-flight and completed queries. if (in_flight_query_ids.find(log_entry->id) != in_flight_query_ids.end()) continue; @@ -1185,7 +1186,7 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include return; } - shared_ptr<ImpalaServer::QueryStateRecord> query_record = nullptr; + shared_ptr<QueryStateRecord> query_record = nullptr; TExecSummary summary; string stmt; string plan; @@ -1199,7 +1200,7 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include status = server_->GetQueryHandle(query_id, &query_handle); if (status.ok()) { inflight = true; - query_record = make_shared<ImpalaServer::QueryStateRecord>(*query_handle); + query_record = make_shared<QueryStateRecord>(*query_handle); // If the query plan isn't generated, avoid waiting for the request // state lock to be acquired, since it could potentially be an expensive // call, if the table Catalog metadata loading is in progress. Instead diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h index ebbdd9570..968495ed0 100644 --- a/be/src/service/impala-http-handler.h +++ b/be/src/service/impala-http-handler.h @@ -207,7 +207,7 @@ class ImpalaHttpHandler { /// Helper method to render a single QueryStateRecord as a Json object Used by /// QueryStateHandler(). - void QueryStateToJson(const ImpalaServer::QueryStateRecord& record, + void QueryStateToJson(const QueryStateRecord& record, rapidjson::Value* value, rapidjson::Document* document, bool inflight); /// Json callback for /backends, which prints a table of known backends. diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index f88a32e3c..57880d2d6 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -85,6 +85,7 @@ #include "service/client-request-state.h" #include "service/frontend.h" #include "service/impala-http-handler.h" +#include "service/query-state-record.h" #include "util/auth-util.h" #include "util/bit-util.h" #include "util/coding-util.h" @@ -1182,54 +1183,6 @@ void ImpalaServer::ArchiveQuery(const QueryHandle& query_handle) { } } -int64_t ImpalaServer::EstimateSize(const QueryStateRecord* record) { - int64_t size = sizeof(QueryStateRecord); // 800 - size += sizeof(uint8_t) * record->compressed_profile.capacity(); - size += record->effective_user.capacity(); - size += record->default_db.capacity(); - size += record->stmt.capacity(); - size += record->plan.capacity(); - size += record->query_state.capacity(); - size += record->timeline.capacity(); - size += record->resource_pool.capacity(); - - // The following dynamic memory of field members are estimated rather than - // exactly sized. Some of thrift members might be nested, but the estimation - // does not traverse deeper than the first level. - - // TExecSummary exec_summary - if (record->exec_summary.__isset.nodes) { - size += sizeof(TPlanNodeExecSummary) * record->exec_summary.nodes.capacity(); - } - if (record->exec_summary.__isset.exch_to_sender_map) { - size += sizeof(int32_t) * 2 * record->exec_summary.exch_to_sender_map.size(); - } - if (record->exec_summary.__isset.error_logs) { - for (const auto& log : record->exec_summary.error_logs) size += log.capacity(); - } - if (record->exec_summary.__isset.queued_reason) { - size += record->exec_summary.queued_reason.capacity(); - } - - // Status query_status - if (!record->query_status.ok()) { - size += record->query_status.msg().msg().capacity(); - for (const auto& detail : record->query_status.msg().details()) { - size += detail.capacity(); - } - } - - // TEventSequence event_sequence - size += record->event_sequence.name.capacity(); - size += sizeof(int64_t) * record->event_sequence.timestamps.capacity(); - for (const auto& label : record->event_sequence.labels) size += label.capacity(); - - // vector<TPlanFragment> fragments - size += sizeof(TPlanFragment) * record->fragments.capacity(); - - return size; -} - ImpalaServer::~ImpalaServer() {} void ImpalaServer::AddPoolConfiguration(TQueryCtx* ctx, @@ -2541,95 +2494,6 @@ void ImpalaServer::BuildLocalBackendDescriptorInternal(BackendDescriptorPB* be_d be_desc->set_version(GetBuildVersion(/* compact */ true)); } -ImpalaServer::QueryStateRecord::QueryStateRecord( - const ClientRequestState& query_handle, vector<uint8_t>&& compressed_profile) - : compressed_profile(compressed_profile) { - Init(query_handle); -} - -ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& query_handle) - : compressed_profile() { - Init(query_handle); -} - -void ImpalaServer::QueryStateRecord::Init(const ClientRequestState& query_handle) { - id = query_handle.query_id(); - - const string* plan_str = query_handle.summary_profile()->GetInfoString("Plan"); - if (plan_str != nullptr) plan = *plan_str; - stmt = query_handle.sql_stmt(); - effective_user = query_handle.effective_user(); - default_db = query_handle.default_db(); - start_time_us = query_handle.start_time_us(); - end_time_us = query_handle.end_time_us(); - wait_time_ms = query_handle.wait_time_ms(); - query_handle.summary_profile()->GetTimeline(&timeline); - - Coordinator* coord = query_handle.GetCoordinator(); - if (coord != nullptr) { - num_completed_scan_ranges = coord->scan_progress().num_complete(); - total_scan_ranges = coord->scan_progress().total(); - num_completed_fragment_instances = coord->query_progress().num_complete(); - total_fragment_instances = coord->query_progress().total(); - auto utilization = coord->ComputeQueryResourceUtilization(); - total_peak_mem_usage = utilization.total_peak_mem_usage; - cluster_mem_est = query_handle.schedule()->cluster_mem_est(); - bytes_read = utilization.bytes_read; - bytes_sent = utilization.exchange_bytes_sent + utilization.scan_bytes_sent; - has_coord = true; - } else { - num_completed_scan_ranges = 0; - total_scan_ranges = 0; - num_completed_fragment_instances = 0; - total_fragment_instances = 0; - total_peak_mem_usage = 0; - cluster_mem_est = 0; - bytes_read = 0; - bytes_sent = 0; - has_coord = false; - } - beeswax_query_state = query_handle.BeeswaxQueryState(); - ClientRequestState::RetryState retry_state = query_handle.retry_state(); - if (retry_state == ClientRequestState::RetryState::NOT_RETRIED) { - query_state = _QueryState_VALUES_TO_NAMES.find(beeswax_query_state)->second; - } else { - query_state = query_handle.RetryStateToString(retry_state); - } - num_rows_fetched = query_handle.num_rows_fetched(); - query_status = query_handle.query_status(); - - query_handle.query_events()->ToThrift(&event_sequence); - - const TExecRequest& request = query_handle.exec_request(); - stmt_type = request.stmt_type; - // Save the query fragments so that the plan can be visualised. - for (const TPlanExecInfo& plan_exec_info: request.query_exec_request.plan_exec_info) { - fragments.insert(fragments.end(), - plan_exec_info.fragments.begin(), plan_exec_info.fragments.end()); - } - all_rows_returned = query_handle.eos(); - last_active_time_ms = query_handle.last_active_ms(); - // For statement types other than QUERY/DML, show an empty string for resource pool - // to indicate that they are not subjected to admission control. - if (stmt_type == TStmtType::QUERY || stmt_type == TStmtType::DML) { - resource_pool = query_handle.request_pool(); - } - user_has_profile_access = query_handle.user_has_profile_access(); - - // In some cases like canceling and closing the original query or closing the session - // we may not create the new query, we also check whether the retrided query id is set. - was_retried = query_handle.WasRetried() && query_handle.IsSetRetriedId(); - if (was_retried) { - retried_query_id = make_unique<TUniqueId>(query_handle.retried_id()); - } -} - -bool ImpalaServer::QueryStateRecordLessThan::operator() ( - const QueryStateRecord& lhs, const QueryStateRecord& rhs) const { - if (lhs.start_time_us == rhs.start_time_us) return lhs.id < rhs.id; - return lhs.start_time_us < rhs.start_time_us; -} - void ImpalaServer::ConnectionStart( const ThriftServer::ConnectionContext& connection_context) { if (connection_context.server_name == BEESWAX_SERVER_NAME || diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index a4fa0c35a..4cd5dd5d5 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -39,6 +39,7 @@ #include "runtime/types.h" #include "service/internal-server.h" #include "service/query-options.h" +#include "service/query-state-record.h" #include "statestore/statestore-subscriber.h" #include "util/condition-variable.h" #include "util/container-util.h" @@ -941,148 +942,6 @@ class ImpalaServer : public ImpalaServiceIf, /// which are returned as strings (see IMPALA-11041). std::string ColumnTypeToBeeswaxTypeString(const TColumnType& type); - /// Snapshot of a query's state, archived in the query log. Not mutated after - /// construction. Please update EstimateSize() if field member changed. - struct QueryStateRecord { - /// Compressed representation of profile returned by RuntimeProfile::Compress(). - /// Must be initialised to a valid value if this is a completed query. - /// Empty if this was initialised from a running query. - const std::vector<uint8_t> compressed_profile; - - /// Query id - TUniqueId id; - - /// Queries are run and authorized on behalf of the effective_user. - /// If there is no delegated user, this will be the connected user. Otherwise, it - /// will be set to the delegated user. - std::string effective_user; - - /// If true, effective_user has access to the runtime profile and execution - /// summary. - bool user_has_profile_access; - - /// default db for this query - std::string default_db; - - /// SQL statement text - std::string stmt; - - /// Text representation of plan - std::string plan; - - /// DDL, DML etc. - TStmtType::type stmt_type; - - /// True if the query required a coordinator fragment - bool has_coord; - - /// The number of scan ranges that have completed. - int64_t num_completed_scan_ranges; - - /// The total number of scan ranges. - int64_t total_scan_ranges; - - /// The number of fragment instances that have completed. - int64_t num_completed_fragment_instances; - - /// The total number of fragment instances. - int64_t total_fragment_instances; - - /// The number of rows fetched by the client - int64_t num_rows_fetched; - - /// The state of the query as of this snapshot. The possible values for the - /// query_state = union(beeswax::QueryState, ClientRequestState::RetryState). This is - /// necessary so that the query_state can accurately reflect if a query has been - /// retried or not. This string is not displayed in the runtime profiles, it is only - /// displayed on the /queries endpoint of the Web UI when listing out the state of - /// each query. This is necessary so that users can clearly see if a query has been - /// retried or not. - std::string query_state; - - /// The beeswax::QueryState of the query as of this snapshot. - beeswax::QueryState::type beeswax_query_state; - - /// Start and end time of the query, in Unix microseconds. - /// A query whose end_time_us is 0 indicates that it is an in-flight query. - /// These two variables are initialized with the corresponding values from - /// ClientRequestState. - int64_t start_time_us, end_time_us; - - /// The request waited time in ms for queued. - int64_t wait_time_ms; - - /// Total peak memory usage by this query at all backends. - int64_t total_peak_mem_usage; - - /// The cluster wide estimated memory usage of this query. - int64_t cluster_mem_est; - - /// Total bytes read by this query at all backends. - int64_t bytes_read; - - /// The total number of bytes sent (across the network) by this query in exchange - /// nodes. Does not include remote reads, data written to disk, or data sent to the - /// client. - int64_t bytes_sent; - - // Query timeline from summary profile. - std::string timeline; - - /// Summary of execution for this query. - TExecSummary exec_summary; - - Status query_status; - - /// Timeline of important query events - TEventSequence event_sequence; - - /// Save the query plan fragments so that the plan tree can be rendered on the debug - /// webpages. - vector<TPlanFragment> fragments; - - // If true, this query has no more rows to return - bool all_rows_returned; - - // The most recent time this query was actively being processed, in Unix milliseconds. - int64_t last_active_time_ms; - - /// Resource pool to which the request was submitted for admission, or an empty - /// string if this request doesn't go through admission control. - std::string resource_pool; - - /// True if this query was retried, false otherwise. - bool was_retried = false; - - /// If this query was retried, the query id of the retried query. - std::unique_ptr<const TUniqueId> retried_query_id; - - /// Initialise from 'exec_state' of a completed query. 'compressed_profile' must be - /// a runtime profile decompressed with RuntimeProfile::Compress(). - QueryStateRecord( - const ClientRequestState& exec_state, std::vector<uint8_t>&& compressed_profile); - - /// Initialize from 'exec_state' of a running query - QueryStateRecord(const ClientRequestState& exec_state); - - /// Default constructor used only when participating in collections - QueryStateRecord() { } - - private: - // Common initialization for constructors. - void Init(const ClientRequestState& exec_state); - }; - - struct QueryStateRecordLessThan { - /// Comparator that sorts by start time. - bool operator() (const QueryStateRecord& lhs, const QueryStateRecord& rhs) const; - }; - - /// Return the estimated size of given record in bytes. - /// It does not meant to return exact byte size of given QueryStateRecord in memory, - /// but should account for compressed_profile vector of record. - static int64_t EstimateSize(const QueryStateRecord* record); - /// Returns the active QueryHandle for this query id. The QueryHandle contains the /// active ClientRequestState. Returns an error Status if the query id cannot be found. /// If caller is an RPC thread, RPC context will be registered for time tracking diff --git a/be/src/service/query-state-record-test.cc b/be/src/service/query-state-record-test.cc new file mode 100644 index 000000000..089462d9b --- /dev/null +++ b/be/src/service/query-state-record-test.cc @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <limits> +#include <string> +#include <utility> +#include <vector> + +#include "testutil/gtest-util.h" + +#include "gen-cpp/Types_types.h" +#include "service/query-state-record.h" + +namespace impala { + +TEST(QueryStateRecordTest, StartTimeComparatorNotEqual) { + QueryStateRecord::StartTimeComparator fixture; + + QueryStateRecord a; + QueryStateRecord b; + + a.start_time_us = std::numeric_limits<int64_t>::min(); + b.start_time_us = std::numeric_limits<int64_t>::max(); + + EXPECT_TRUE(fixture(a, b)); + EXPECT_FALSE(fixture(b, a)); +} + +TEST(QueryStateRecordTest, StartTimeComparatorEqualIdSame) { + QueryStateRecord::StartTimeComparator fixture; + + QueryStateRecord a; + QueryStateRecord b; + + a.start_time_us = 1; + b.start_time_us = 1; + + EXPECT_FALSE(fixture(a, b)); + EXPECT_FALSE(fixture(b, a)); +} + +TEST(QueryStateRecordTest, StartTimeComparatorEqualIdDifferent) { + QueryStateRecord::StartTimeComparator fixture; + + QueryStateRecord a; + QueryStateRecord b; + + a.start_time_us = 1; + a.id.lo = 1; + a.id.hi = 2; + b.start_time_us = 1; + + EXPECT_FALSE(fixture(a, b)); + EXPECT_TRUE(fixture(b, a)); +} + +TEST(QueryStateRecordTest, EventsTimelineIterator) { + std::vector<std::string> labels; + std::vector<std::int64_t> timestamps; + int cntr = 0; + + labels.push_back("three"); + timestamps.push_back(3); + + labels.push_back("four"); + timestamps.push_back(4); + + labels.push_back("zero"); + timestamps.push_back(0); + + labels.push_back("two"); + timestamps.push_back(2); + + labels.push_back("one"); + timestamps.push_back(1); + + labels.push_back("one"); + timestamps.push_back(1); + + for (const auto& actual : EventsTimelineIterator(&labels, ×tamps)) { + switch (cntr) { + case 0: + EXPECT_EQ("three", actual.first); + EXPECT_EQ(3, actual.second); + break; + case 1: + EXPECT_EQ("four", actual.first); + EXPECT_EQ(4, actual.second); + break; + case 2: + EXPECT_EQ("zero", actual.first); + EXPECT_EQ(0, actual.second); + break; + case 3: + EXPECT_EQ("two", actual.first); + EXPECT_EQ(2, actual.second); + break; + case 4: + EXPECT_EQ("one", actual.first); + EXPECT_EQ(1, actual.second); + break; + case 5: + EXPECT_EQ("one", actual.first); + EXPECT_EQ(1, actual.second); + break; + default: + FAIL(); + } + + cntr++; + } +} + +TEST(PerHostStateTest, PeakMemoryComparatorLessThan) { + TNetworkAddress addr_a; + PerHostState a; + a.peak_memory_usage = std::numeric_limits<int64_t>::min(); + std::pair<TNetworkAddress, PerHostState> pair_a = std::make_pair(addr_a, a); + + TNetworkAddress addr_b; + PerHostState b; + b.peak_memory_usage = std::numeric_limits<int64_t>::max(); + std::pair<TNetworkAddress, PerHostState> pair_b = std::make_pair(addr_b, b); + + EXPECT_TRUE(PerHostPeakMemoryComparator(pair_a, pair_b)); + EXPECT_FALSE(PerHostPeakMemoryComparator(pair_b, pair_a)); +} + +TEST(PerHostStateTest, PeakMemoryComparatorEqual) { + TNetworkAddress addr_a; + PerHostState a; + a.peak_memory_usage = 0; + std::pair<TNetworkAddress, PerHostState> pair_a = std::make_pair(addr_a, a); + + TNetworkAddress addr_b; + PerHostState b; + b.peak_memory_usage = 0; + std::pair<TNetworkAddress, PerHostState> pair_b = std::make_pair(addr_b, b); + + EXPECT_FALSE(PerHostPeakMemoryComparator(pair_a, pair_b)); + EXPECT_FALSE(PerHostPeakMemoryComparator(pair_b, pair_a)); +} + +} //namespace impala diff --git a/be/src/service/query-state-record.cc b/be/src/service/query-state-record.cc new file mode 100644 index 000000000..7bea6ac3b --- /dev/null +++ b/be/src/service/query-state-record.cc @@ -0,0 +1,410 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Contains implementations of the QueryStateRecord and QueryStateExpanded struct +// functions. These structs represent the state of a query for capturing the query in the +// query log and completed queries table. + +#include <map> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include <boost/algorithm/string.hpp> + +#include <gutil/strings/numbers.h> +#include <gutil/strings/strcat.h> +#include "runtime/coordinator.h" +#include "scheduling/admission-controller.h" +#include "scheduling/scheduler.h" +#include "service/client-request-state.h" +#include "service/query-state-record.h" +#include "util/debug-util.h" +#include "util/network-util.h" +#include "util/string-util.h" + +using namespace std; + +namespace impala { + +QueryStateRecord::QueryStateRecord( + const ClientRequestState& query_handle, vector<uint8_t>&& compressed_profile) + : compressed_profile(compressed_profile) { + Init(query_handle); +} + +QueryStateRecord::QueryStateRecord(const ClientRequestState& query_handle) + : compressed_profile() { + Init(query_handle); +} + +void QueryStateRecord::Init(const ClientRequestState& query_handle) { + id = query_handle.query_id(); + + const string* plan_str = query_handle.summary_profile()->GetInfoString("Plan"); + if (plan_str != nullptr) { + plan = *plan_str; + // Remove any trailing newlines. + boost::algorithm::trim_if(plan, boost::algorithm::is_any_of("\n")); + } + + stmt = query_handle.sql_stmt(); + effective_user = query_handle.effective_user(); + default_db = query_handle.default_db(); + start_time_us = query_handle.start_time_us(); + end_time_us = query_handle.end_time_us(); + wait_time_ms = query_handle.wait_time_ms(); + query_handle.summary_profile()->GetTimeline(&timeline); + + Coordinator* coord = query_handle.GetCoordinator(); + if (coord != nullptr) { + num_completed_scan_ranges = coord->scan_progress().num_complete(); + total_scan_ranges = coord->scan_progress().total(); + num_completed_fragment_instances = coord->query_progress().num_complete(); + total_fragment_instances = coord->query_progress().total(); + const auto& utilization = coord->ComputeQueryResourceUtilization(); + total_peak_mem_usage = utilization.total_peak_mem_usage; + cluster_mem_est = query_handle.schedule()->cluster_mem_est(); + bytes_read = utilization.bytes_read; + bytes_sent = utilization.exchange_bytes_sent + utilization.scan_bytes_sent; + has_coord = true; + } else { + num_completed_scan_ranges = 0; + total_scan_ranges = 0; + num_completed_fragment_instances = 0; + total_fragment_instances = 0; + total_peak_mem_usage = 0; + cluster_mem_est = 0; + bytes_read = 0; + bytes_sent = 0; + has_coord = false; + } + beeswax_query_state = query_handle.BeeswaxQueryState(); + ClientRequestState::RetryState retry_state = query_handle.retry_state(); + if (retry_state == ClientRequestState::RetryState::NOT_RETRIED) { + query_state = beeswax::_QueryState_VALUES_TO_NAMES.find(beeswax_query_state)->second; + } else { + query_state = query_handle.RetryStateToString(retry_state); + } + num_rows_fetched = query_handle.num_rows_fetched(); + query_status = query_handle.query_status(); + + query_handle.query_events()->ToThrift(&event_sequence); + + const TExecRequest& request = query_handle.exec_request(); + stmt_type = request.stmt_type; + // Save the query fragments so that the plan can be visualised. + for (const TPlanExecInfo& plan_exec_info: request.query_exec_request.plan_exec_info) { + fragments.insert(fragments.end(), + plan_exec_info.fragments.begin(), plan_exec_info.fragments.end()); + } + all_rows_returned = query_handle.eos(); + last_active_time_ms = query_handle.last_active_ms(); + // For statement types other than QUERY/DML, show an empty string for resource pool + // to indicate that they are not subjected to admission control. + if (stmt_type == TStmtType::QUERY || stmt_type == TStmtType::DML) { + resource_pool = query_handle.request_pool(); + } + user_has_profile_access = query_handle.user_has_profile_access(); + + // In some cases like canceling and closing the original query or closing the session + // we may not create the new query, we also check whether the retrided query id is set. + was_retried = query_handle.WasRetried() && query_handle.IsSetRetriedId(); + if (was_retried) { + retried_query_id = make_unique<TUniqueId>(query_handle.retried_id()); + } +} + +bool QueryStateRecord::StartTimeComparator::operator() ( + const QueryStateRecord& lhs, const QueryStateRecord& rhs) const { + if (lhs.start_time_us == rhs.start_time_us) return lhs.id < rhs.id; + return lhs.start_time_us < rhs.start_time_us; +} + +int64_t EstimateSize(const QueryStateRecord* record) { + int64_t size = sizeof(QueryStateRecord); // 800 + size += sizeof(uint8_t) * record->compressed_profile.capacity(); + size += record->effective_user.capacity(); + size += record->default_db.capacity(); + size += record->stmt.capacity(); + size += record->plan.capacity(); + size += record->query_state.capacity(); + size += record->timeline.capacity(); + size += record->resource_pool.capacity(); + + // The following dynamic memory of field members are estimated rather than + // exactly sized. Some of thrift members might be nested, but the estimation + // does not traverse deeper than the first level. + + // TExecSummary exec_summary + if (record->exec_summary.__isset.nodes) { + size += sizeof(TPlanNodeExecSummary) * record->exec_summary.nodes.capacity(); + } + if (record->exec_summary.__isset.exch_to_sender_map) { + size += sizeof(int32_t) * 2 * record->exec_summary.exch_to_sender_map.size(); + } + if (record->exec_summary.__isset.error_logs) { + for (const auto& log : record->exec_summary.error_logs) size += log.capacity(); + } + if (record->exec_summary.__isset.queued_reason) { + size += record->exec_summary.queued_reason.capacity(); + } + + // Status query_status + if (!record->query_status.ok()) { + size += record->query_status.msg().msg().capacity(); + for (const auto& detail : record->query_status.msg().details()) { + size += detail.capacity(); + } + } + + // TEventSequence event_sequence + size += record->event_sequence.name.capacity(); + size += sizeof(int64_t) * record->event_sequence.timestamps.capacity(); + for (const auto& label : record->event_sequence.labels) size += label.capacity(); + + // vector<TPlanFragment> fragments + size += sizeof(TPlanFragment) * record->fragments.capacity(); + + return size; +} + +QueryStateExpanded::QueryStateExpanded(const ClientRequestState& exec_state, + const std::shared_ptr<QueryStateRecord> base) : + base_state(base ? move(base) : make_shared<QueryStateRecord>(exec_state)) { + if (exec_state.session()->session_type == TSessionType::HIVESERVER2){ + hiveserver2_protocol_version = exec_state.session()->hs2_version; + } + query_options = exec_state.query_options(); + session_id = exec_state.session_id(); + session_type = exec_state.session()->session_type; + redacted_sql = exec_state.redacted_sql(); + db_user_connection = exec_state.connected_user(); + client_address = exec_state.session()->network_address; + impala_query_end_state = exec_state.ExecStateToString(exec_state.exec_state()); + per_host_mem_estimate = exec_state.exec_request() + .query_exec_request.planner_per_host_mem_estimate; + dedicated_coord_mem_estimate = exec_state.exec_request() + .query_exec_request.dedicated_coord_mem_estimate; + base->num_rows_fetched = exec_state.num_rows_fetched_counter(); + row_materialization_rate = exec_state.row_materialization_rate(); + row_materialization_time = exec_state.row_materialization_timer(); + + // Fields from the schedule. + if (exec_state.schedule() != nullptr) { + // Per-Host Metrics + for (int i =0; i < exec_state.schedule()->backend_exec_params_size(); i++) { + const BackendExecParamsPB& b = exec_state.schedule()->backend_exec_params(i); + TNetworkAddress host; + host.hostname = b.address().hostname(); + host.uds_address = b.address().uds_address(); + host.port = b.address().port(); + + PerHostState state; + state.fragment_instance_count = b.instance_params_size(); + per_host_state.emplace(move(host), move(state)); + } + } + + // Fields from the summary profile. + if (exec_state.summary_profile() != nullptr) { + const string* result_ptr = exec_state.summary_profile() + ->GetInfoString(AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT); + admission_result = result_ptr == nullptr ? "" : *result_ptr; + + const string* exec_group_ptr = exec_state.summary_profile() + ->GetInfoString(AdmissionController::PROFILE_INFO_KEY_EXECUTOR_GROUP); + executor_group = exec_group_ptr == nullptr ? "" : *exec_group_ptr; + + const string* exec_summary_ptr = exec_state.summary_profile()-> + GetInfoString("ExecSummary"); + if (exec_summary_ptr != nullptr && !exec_summary_ptr->empty()) { + exec_summary = *exec_summary_ptr; + boost::algorithm::trim_if(exec_summary, boost::algorithm::is_any_of("\n")); + } + } + + // Fields from the coordinator + Coordinator* coord = exec_state.GetCoordinator(); + if (coord != nullptr) { + // Query Profile is initialized when query execution starts. Thus, it will always be + // non-null at this point since this code runs after the query completes. + DCHECK(coord->query_profile() != nullptr); + map<string, int64_t> host_scratch_bytes; + map<string, int64_t> scanner_io_wait; + map<string, int64_t> bytes_read_cache; + vector<RuntimeProfileBase*> prof_stack; + + using boost::algorithm::iequals; + + // Lambda function to recursively walk through a profile. + std::function<void(RuntimeProfileBase*)> process_exec_profile = [ + &process_exec_profile, &host_scratch_bytes, &scanner_io_wait, &prof_stack, + &bytes_read_cache, this](RuntimeProfileBase* profile) { + prof_stack.push_back(profile); + if (const auto& cntr = profile->GetCounter("ScratchBytesWritten"); + cntr != nullptr) { + host_scratch_bytes.emplace(profile->name(), cntr->value()); + } + + // Metrics from HDFS_SCAN_NODE entries. + if (prof_stack.size() >= 3) { + if (iequals(prof_stack[1]->name().substr(0, 8), "instance") && + iequals(prof_stack[2]->name().substr(0, 14), "hdfs_scan_node")) { + if (const auto& cntr = profile->GetCounter("ScannerIoWaitTime"); + cntr != nullptr) { + scanner_io_wait.emplace(StrCat( + prof_stack[1]->name(), "::", prof_stack[2]->name()), cntr->value()); + } + + if (const auto& cntr = profile->GetCounter("DataCacheHitBytes"); + cntr != nullptr) { + bytes_read_cache.emplace(StrCat( + prof_stack[1]->name(), "::", prof_stack[2]->name()), cntr->value()); + } + } + } + + // Total Bytes Read + if (const auto& cntr = profile->GetCounter("TotalBytesRead"); cntr != nullptr) { + bytes_read_total = cntr->value(); + } + + // Recursively walk down through all child nodes. + vector<RuntimeProfileBase*> children; + profile->GetAllChildren(&children); + for (const auto& child : children) { + process_exec_profile(child); + } + prof_stack.pop_back(); + }; + + process_exec_profile(coord->query_profile()); + + // Compressed Bytes Spilled + for (const auto& hsb : host_scratch_bytes) { + compressed_bytes_spilled += hsb.second; + } + + // Read IO Wait Time Total and Average + if (scanner_io_wait.size() > 0) { + for (const auto& item : scanner_io_wait) { + read_io_wait_time_total += item.second; + } + + read_io_wait_time_mean = read_io_wait_time_total / scanner_io_wait.size(); + } + + // Bytes Read from Data Cache + for (const auto& b : bytes_read_cache) { + bytes_read_cache_total += b.second; + } + + // Per-Node Peak Memory Usage + for (const auto& be : coord->BackendResourceUtilization()) { + TNetworkAddress addr = FromNetworkAddressPB(be.first); + if(const auto& host = per_host_state.find(addr); + LIKELY(host != per_host_state.end())) { + host->second.peak_memory_usage = be.second.peak_per_host_mem_consumption; + } else{ + PerHostState state; + state.peak_memory_usage = be.second.peak_per_host_mem_consumption; + per_host_state.emplace(addr, state); + } + } + } // Fields from the coordinator + + // Executor Group + const RuntimeProfile* fe_profile = exec_state.frontend_profile(); + DCHECK(fe_profile != nullptr); // Frontend profile is initialized in the constructor. + std::vector<RuntimeProfileBase*> children; + stringstream exec_group_str; + + fe_profile->GetChildren(&children); + for (const auto& child : children) { + if (boost::algorithm::istarts_with(child->name(), "executor group ")) { + child->PrettyPrint(&exec_group_str); + } + } + executor_groups = exec_group_str.str(); + boost::algorithm::trim_if(executor_groups, boost::algorithm::is_any_of("\n")); +} // QueryStateExpanded constructor + +bool QueryStateExpanded::events_timeline_empty() const { + DCHECK(base_state->event_sequence.labels.size() == + base_state->event_sequence.timestamps.size()); + return base_state->event_sequence.labels.empty() || + base_state->event_sequence.timestamps.empty(); +} + +bool PerHostPeakMemoryComparator(const pair<TNetworkAddress, PerHostState>& a, + const pair<TNetworkAddress, PerHostState>& b) { + return a.second.peak_memory_usage < b.second.peak_memory_usage; +} + +/// Events Timeline Iterator +EventsTimelineIterator::EventsTimelineIterator(const std::vector<std::string>* labels, + const std::vector<std::int64_t>* timestamps) : + EventsTimelineIterator(labels, timestamps, 0) {} + +EventsTimelineIterator::EventsTimelineIterator(const std::vector<std::string>* labels, + const std::vector<std::int64_t>* timestamps, size_t cur) : labels_(labels), + timestamps_(timestamps), cur_(cur) { + DCHECK(labels != nullptr); + DCHECK(timestamps != nullptr); + DCHECK(labels->size() == timestamps->size()); +} + +EventsTimelineIterator QueryStateExpanded::EventsTimeline() const { + return EventsTimelineIterator(&base_state->event_sequence.labels, + &base_state->event_sequence.timestamps); +} + +EventsTimelineIterator::iter_t EventsTimelineIterator::operator*() const { + return make_pair(labels_->at(cur_), timestamps_->at(cur_)); +} + +EventsTimelineIterator& EventsTimelineIterator::operator++() { + ++cur_; + return *this; +} + +EventsTimelineIterator EventsTimelineIterator::operator++(int) { + ++cur_; + return *this; +} + +bool EventsTimelineIterator::operator==(const EventsTimelineIterator& other) const { + DCHECK(labels_ == other.labels_); + DCHECK(timestamps_ == other.timestamps_); + return cur_ == other.cur_; +} + +bool EventsTimelineIterator::operator!=(const EventsTimelineIterator& other) const { + return !(*this == other); +} + +EventsTimelineIterator EventsTimelineIterator::begin() { + return EventsTimelineIterator(labels_, timestamps_); +} + +EventsTimelineIterator EventsTimelineIterator::end() { + return EventsTimelineIterator(labels_, timestamps_, labels_->size()); +} + +} // namespace impala \ No newline at end of file diff --git a/be/src/service/query-state-record.h b/be/src/service/query-state-record.h new file mode 100644 index 000000000..957c7dff5 --- /dev/null +++ b/be/src/service/query-state-record.h @@ -0,0 +1,321 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <map> +#include <vector> + +#include "gen-cpp/ExecStats_types.h" +#include "gen-cpp/Types_types.h" +#include "util/network-util.h" + +namespace impala { + +class ClientRequestState; + +/// Snapshot of a query's state, archived in the query log. Not mutated after +/// construction. Please update EstimateSize() if field member changed. +struct QueryStateRecord { + /// Compressed representation of profile returned by RuntimeProfile::Compress(). + /// Must be initialised to a valid value if this is a completed query. + /// Empty if this was initialised from a running query. + const std::vector<uint8_t> compressed_profile; + + /// Query id + TUniqueId id; + + /// Queries are run and authorized on behalf of the effective_user. + /// If there is no delegated user, this will be the connected user. Otherwise, it + /// will be set to the delegated user. + std::string effective_user; + + /// If true, effective_user has access to the runtime profile and execution + /// summary. + bool user_has_profile_access; + + /// default db for this query + std::string default_db; + + /// SQL statement text + std::string stmt; + + /// Text representation of plan + std::string plan; + + /// DDL, DML etc. + TStmtType::type stmt_type; + + /// True if the query required a coordinator fragment + bool has_coord; + + /// The number of scan ranges that have completed. + int64_t num_completed_scan_ranges; + + /// The total number of scan ranges. + int64_t total_scan_ranges; + + /// The number of fragment instances that have completed. + int64_t num_completed_fragment_instances; + + /// The total number of fragment instances. + int64_t total_fragment_instances; + + /// The number of rows fetched by the client + int64_t num_rows_fetched; + + /// The state of the query as of this snapshot. The possible values for the + /// query_state = union(beeswax::QueryState, ClientRequestState::RetryState). This is + /// necessary so that the query_state can accurately reflect if a query has been + /// retried or not. This string is not displayed in the runtime profiles, it is only + /// displayed on the /queries endpoint of the Web UI when listing out the state of + /// each query. This is necessary so that users can clearly see if a query has been + /// retried or not. + std::string query_state; + + /// The beeswax::QueryState of the query as of this snapshot. + beeswax::QueryState::type beeswax_query_state; + + /// Start and end time of the query, in Unix microseconds. + /// A query whose end_time_us is 0 indicates that it is an in-flight query. + /// These two variables are initialized with the corresponding values from + /// ClientRequestState. + int64_t start_time_us, end_time_us; + + /// The request waited time in ms for queued. + int64_t wait_time_ms; + + /// Total peak memory usage by this query at all backends. + int64_t total_peak_mem_usage; + + /// The cluster wide estimated memory usage of this query. + int64_t cluster_mem_est; + + /// Total bytes read by this query at all backends. + int64_t bytes_read; + + /// The total number of bytes sent (across the network) by this query in exchange + /// nodes. Does not include remote reads, data written to disk, or data sent to the + /// client. + int64_t bytes_sent; + + // Query timeline from summary profile. + std::string timeline; + + /// Summary of execution for this query. + TExecSummary exec_summary; + + Status query_status; + + /// Timeline of important query events + TEventSequence event_sequence; + + /// Save the query plan fragments so that the plan tree can be rendered on the debug + /// webpages. + vector<TPlanFragment> fragments; + + // If true, this query has no more rows to return + bool all_rows_returned; + + // The most recent time this query was actively being processed, in Unix milliseconds. + int64_t last_active_time_ms; + + /// Resource pool to which the request was submitted for admission, or an empty + /// string if this request doesn't go through admission control. + std::string resource_pool; + + /// True if this query was retried, false otherwise. + bool was_retried = false; + + /// If this query was retried, the query id of the retried query. + std::unique_ptr<const TUniqueId> retried_query_id; + + /// Initialise from 'exec_state' of a completed query. 'compressed_profile' must be + /// a runtime profile decompressed with RuntimeProfile::Compress(). + QueryStateRecord( + const ClientRequestState& exec_state, std::vector<uint8_t>&& compressed_profile); + + /// Initialize from 'exec_state' of a running query + QueryStateRecord(const ClientRequestState& exec_state); + + /// Default constructor used only when participating in collections + QueryStateRecord() { } + + struct StartTimeComparator { + /// Comparator that sorts by start time. + bool operator() (const QueryStateRecord& lhs, const QueryStateRecord& rhs) const; + }; + + private: + // Common initialization for constructors. + void Init(const ClientRequestState& exec_state); +}; // struct QueryStateRecord + +/// Return the estimated size of given record in bytes. +/// It does not meant to return exact byte size of given QueryStateRecord in memory, +/// but should account for compressed_profile vector of record. +int64_t EstimateSize(const QueryStateRecord* record); + +/// Stores relevant information about each backend executor. Used by the +/// QueryStateExpanded struct. +struct PerHostState { + // Fragment Instances Count + int32_t fragment_instance_count = 0; + + // Peak Memory Usage + int64_t peak_memory_usage; +}; // struct PerHostState + +/// Comparator function that compares two PerHostState structs based on the +/// peak_memory_usage member of the struct. +bool PerHostPeakMemoryComparator(const std::pair<TNetworkAddress, PerHostState>& a, + const std::pair<TNetworkAddress, PerHostState>& b); + +/// The query events are stored in two separate vectors, one for the labels and the other +/// for the values. This iterator unifies the two into a single iterator that yields a +/// std::pair<string, int64_t> with the first being the event name and the second being +/// the event timestamp. This iterator supports one-time forward pass and range based +/// for loops. +class EventsTimelineIterator { +public: + using iter_t = const std::pair<const std::string, const std::int64_t>; + using iterator_category = std::input_iterator_tag; + using value_type = iter_t; + using difference_type = std::size_t; + using pointer = iter_t*; + using reference = iter_t&; + + /// Constructor that starts the iterator at index 0 of the vectors. + EventsTimelineIterator(const std::vector<std::string>* labels, + const std::vector<std::int64_t>* timestamps); + + /// Constructor that starts the iterator at the specified index of the vectors. + EventsTimelineIterator(const std::vector<std::string>* labels, + const std::vector<std::int64_t>* timestamps, size_t cur); + + /// Yields up the current position of the iterator. + iter_t operator*() const; + EventsTimelineIterator& operator++(); + + /// Moves to the iterator to next position. + EventsTimelineIterator operator++(int); + + /// Compare two iterators. + bool operator==(const EventsTimelineIterator& other) const; + bool operator!=(const EventsTimelineIterator& other) const; + + /// Functions to support range based for loops. + EventsTimelineIterator begin(); + EventsTimelineIterator end(); + +private: + const std::vector<std::string>* labels_; + const std::vector<int64_t>* timestamps_; + size_t cur_; +}; // class EventsTimelineIterator + +/// Expanded snapshot of the query including its state along with other fields relevant +/// to workload management. Not mutated after construction. +struct QueryStateExpanded { + + /// Base Query State + const std::shared_ptr<QueryStateRecord> base_state; + + /// User set query options. + TQueryOptions query_options; + + /// Impala assigned session id for the client session. + TUniqueId session_id; + + /// Type of the session the client opened. + TSessionType::type session_type; + + /// Version of the Hiveserver2 protocol used by the client (if connected using HS2). + /// The value of this field is undefined unless session_type is + /// TSessionType::HIVESERVER2. It is the responsibility of the consumer to first verify + /// the session_type before referencing this value stored in this struct member. + apache::hive::service::cli::thrift::TProtocolVersion::type hiveserver2_protocol_version; + + /// Name of the user that connected to Impala. + std::string db_user_connection; + + /// Impala Query End State + std::string impala_query_end_state; + + /// Address of the client that ran this query. + TNetworkAddress client_address; + + /// Per-Host Memory Estimate in Bytes + /// Calculated before considering the MAX_MEM_ESTIMATE_FOR_ADMISSION query option. + int64_t per_host_mem_estimate = 0; + + /// Dedicated Coordinator Memory Estimate in Bytes + int64_t dedicated_coord_mem_estimate = 0; + + /// Per-Host State + std::map<TNetworkAddress, PerHostState, TNetworkAddressComparator> per_host_state; + + /// Admission Result + std::string admission_result; + + /// Executor Group that Executed the Query + std::string executor_group; + + /// Redacted SQL + std::string redacted_sql; + + /// Exec Summary Pretty Printed + std::string exec_summary; + + /// Row Materialization Rate (bytes per second) + int64_t row_materialization_rate = 0; + + /// Row Materialization Time (microseconds) + int64_t row_materialization_time = 0; + + /// Compressed Bytes Spilled + int64_t compressed_bytes_spilled = 0; + + /// Read IO Wait Time Total (microseconds) + int64_t read_io_wait_time_total = 0; + + /// Read IO Wait Time Mean (microseconds) + int64_t read_io_wait_time_mean = 0; + + /// Total Bytes Read from the Data Cache + int64_t bytes_read_cache_total = 0; + + /// Total Bytes Read + int64_t bytes_read_total = 0; + + /// Executor Groups + std::string executor_groups; + + /// Events Timeline Empty + bool events_timeline_empty() const; + + /// Events Timeline Iterator + EventsTimelineIterator EventsTimeline() const; + + /// Required data will be copied from the provided ClientRequestState into members of + /// the struct. + QueryStateExpanded(const ClientRequestState& exec_state, + const std::shared_ptr<QueryStateRecord> base_state_src); +}; // struct QueryStateExpanded + +} // namespace impala diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc index b2ee1beb2..718286b03 100644 --- a/be/src/util/runtime-profile-test.cc +++ b/be/src/util/runtime-profile-test.cc @@ -92,7 +92,8 @@ TEST(CountersTest, Basic) { RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(&pool, thrift_profile); counter_merged = from_thrift->GetCounter("A"); EXPECT_EQ(counter_merged->value(), 1); - EXPECT_TRUE(from_thrift->GetCounter("Not there") == NULL); + EXPECT_TRUE(from_thrift->GetCounter("Not there") == NULL); + EXPECT_TRUE(from_thrift->GetCounter("Not there") == nullptr); TExecSummary exec_summary_result; from_thrift->GetExecSummary(&exec_summary_result); EXPECT_EQ(exec_summary_result.status, status); @@ -107,6 +108,7 @@ TEST(CountersTest, Basic) { RuntimeProfile::CreateFromThrift(&pool, deserialized_thrift_profile); counter_merged = deserialized_profile->GetCounter("A"); EXPECT_EQ(counter_merged->value(), 1); + EXPECT_TRUE(deserialized_profile->GetCounter("Not there") == NULL); EXPECT_TRUE(deserialized_profile->GetCounter("Not there") == nullptr); deserialized_profile->GetExecSummary(&exec_summary_result); EXPECT_EQ(exec_summary_result.status, status); @@ -119,6 +121,7 @@ TEST(CountersTest, Basic) { RuntimeProfile::DecompressToProfile(compressed, &pool, &deserialized_profile2)); counter_merged = deserialized_profile2->GetCounter("A"); EXPECT_EQ(counter_merged->value(), 1); + EXPECT_TRUE(deserialized_profile2->GetCounter("Not there") == NULL); EXPECT_TRUE(deserialized_profile2->GetCounter("Not there") == nullptr); deserialized_profile2->GetExecSummary(&exec_summary_result); EXPECT_EQ(exec_summary_result.status, status); diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc index 96f57fd65..9a531f01e 100644 --- a/be/src/util/runtime-profile.cc +++ b/be/src/util/runtime-profile.cc @@ -992,7 +992,7 @@ RuntimeProfile* RuntimeProfile::CreateChild( return child; } -void RuntimeProfileBase::GetChildren(vector<RuntimeProfileBase*>* children) { +void RuntimeProfileBase::GetChildren(vector<RuntimeProfileBase*>* children) const { children->clear(); lock_guard<SpinLock> l(children_lock_); for (const auto& entry : children_) children->push_back(entry.first); @@ -1216,10 +1216,10 @@ void RuntimeProfile::AddLocalTimeCounter(const SampleFunction& counter_fn) { counter_map_[LOCAL_TIME_COUNTER_NAME] = local_time_counter; } -RuntimeProfileBase::Counter* RuntimeProfileBase::GetCounter(const string& name) { +RuntimeProfileBase::Counter* RuntimeProfileBase::GetCounter(const string& name) const { lock_guard<SpinLock> l(counter_map_lock_); - if (counter_map_.find(name) != counter_map_.end()) { - return counter_map_[name]; + if (auto iter = counter_map_.find(name); iter != counter_map_.cend()) { + return iter->second; } return NULL; } diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h index 419b8d916..8e2c461b0 100644 --- a/be/src/util/runtime-profile.h +++ b/be/src/util/runtime-profile.h @@ -165,7 +165,7 @@ class RuntimeProfileBase { /// Gets the counter object with 'name'. Returns NULL if there is no counter with /// that name. - Counter* GetCounter(const std::string& name); + Counter* GetCounter(const std::string& name) const; /// Adds all counters with 'name' that are registered either in this or /// in any of the child profiles to 'counters'. @@ -183,7 +183,7 @@ class RuntimeProfileBase { void PrettyPrint( Verbosity verbosity, std::ostream* s, const std::string& prefix = "") const; - void GetChildren(std::vector<RuntimeProfileBase*>* children); + void GetChildren(std::vector<RuntimeProfileBase*>* children) const; /// Gets all profiles in tree, including this one. void GetAllChildren(std::vector<RuntimeProfileBase*>* children); diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 16d348e6e..ae1787fa4 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -989,5 +989,9 @@ struct TQueryExecRequest { // executor group to run the query. Non-positive value means no specific CPU core count // is required. 14: optional i32 cores_required; + + // Estimated per-host memory. The planner generates this value which may or may not be + // overridden to come up with a final per-host memory estimate. + 15: optional i64 planner_per_host_mem_estimate; } diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index a43389f88..209790868 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -605,6 +605,8 @@ public class Planner { maxPerHostPeakResources = MIN_PER_HOST_RESOURCES.max(maxPerHostPeakResources); request.setPer_host_mem_estimate(maxPerHostPeakResources.getMemEstimateBytes()); + request.setPlanner_per_host_mem_estimate( + maxPerHostPeakResources.getMemEstimateBytes()); request.setIs_trivial_query(trivial); request.setMax_per_host_min_mem_reservation( maxPerHostPeakResources.getMinMemReservationBytes());
