This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 63f7c0949bcf791569adcb44265a64eb520b17a1 Author: zhangyifan27 <[email protected]> AuthorDate: Thu May 25 14:52:18 2023 +0800 IMPALA-12048: Add query progress to queries webpage This patch adds the query progress of queries to Impala /queries webpage. The query progress shows the completion progress of a query's fragment instances. It helps users track the completion of computation-intensive queries. Testing: - Added test cases to test_observability.py - Added a new functional test to test_web_pages.py Change-Id: Ic0e8695a8a8395c1364b4b249f83c4345d2cc53e Reviewed-on: http://gerrit.cloudera.org:8080/19706 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Quanlong Huang <[email protected]> --- be/src/runtime/coordinator-backend-state.cc | 5 +++-- be/src/runtime/coordinator-backend-state.h | 9 ++++---- be/src/runtime/coordinator.cc | 13 ++++++++++-- be/src/runtime/coordinator.h | 10 +++++++-- be/src/service/impala-hs2-server.cc | 2 +- be/src/service/impala-http-handler.cc | 33 ++++++++++++++++++++--------- be/src/service/impala-http-handler.h | 3 +++ be/src/service/impala-server.cc | 20 +++++++++++------ be/src/service/impala-server.h | 14 ++++++++---- common/thrift/ExecStats.thrift | 2 ++ tests/query_test/test_observability.py | 5 +++++ tests/webserver/test_web_pages.py | 9 ++++++++ www/queries.tmpl | 24 +++++++++++++++------ www/query_detail_tabs.tmpl | 20 ++++++++++------- 14 files changed, 124 insertions(+), 45 deletions(-) diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index 9ee2d6e24..ece2899e9 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -407,8 +407,8 @@ inline bool Coordinator::BackendState::IsDoneLocked( bool Coordinator::BackendState::ApplyExecStatusReport( const ReportExecStatusRequestPB& backend_exec_status, const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary, - ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state, - vector<AuxErrorInfoPB>* aux_error_info, + ProgressUpdater* scan_range_progress, ProgressUpdater* query_progress, + DmlExecState* dml_exec_state, vector<AuxErrorInfoPB>* aux_error_info, const vector<FragmentStats*>& fragment_stats) { DCHECK(!IsEmptyBackend()); CHECK(FLAGS_gen_experimental_profile || @@ -501,6 +501,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport( DCHECK(!instance_stats->done_); instance_stats->done_ = true; --num_remaining_instances_; + query_progress->Update(1); } if (!FLAGS_gen_experimental_profile) ++profile_iter; } diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h index 2d4053c87..80ef0e571 100644 --- a/be/src/runtime/coordinator-backend-state.h +++ b/be/src/runtime/coordinator-backend-state.h @@ -135,8 +135,9 @@ class Coordinator::BackendState { /// Update overall execution status, including the instances' exec status/profiles /// and the error log, if this backend is not already done. Updates the fragment - /// instances' TExecStats in exec_summary (exec_summary->nodes.exec_stats) and updates - /// scan_range_progress with any newly-completed scan ranges. + /// instances' TExecStats in exec_summary (exec_summary->nodes.exec_stats), updates + /// scan_range_progress with any newly-completed scan ranges and updates query_progress + /// with any newly-completed fragment instances. /// /// If any instance reports an error, the overall execution status becomes the first /// reported error status. Returns true iff this update changed IsDone() from false @@ -145,8 +146,8 @@ class Coordinator::BackendState { /// FragmentInstanceExecStatusPB in backend_exec_status to the vector aux_error_info. bool ApplyExecStatusReport(const ReportExecStatusRequestPB& backend_exec_status, const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary, - ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state, - std::vector<AuxErrorInfoPB>* aux_error_info, + ProgressUpdater* scan_range_progress, ProgressUpdater* query_progress, + DmlExecState* dml_exec_state, std::vector<AuxErrorInfoPB>* aux_error_info, const std::vector<FragmentStats*>& fragment_stats); /// Merges the incoming 'thrift_profile' into this backend state's host profile. diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 8bfdf0c3a..e5f82edfe 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -164,7 +164,7 @@ Status Coordinator::Exec() { // initialize progress updater const string& str = Substitute("Query $0", PrintId(query_id())); - progress_.Init(str, exec_params_.query_schedule().num_scan_ranges()); + scan_progress_.Init(str, exec_params_.query_schedule().num_scan_ranges()); query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState( query_ctx(), exec_params_.query_schedule().coord_backend_mem_limit()); @@ -177,6 +177,14 @@ Status Coordinator::Exec() { InitBackendStates(); exec_summary_.Init(exec_params_); + int64_t total_finstances = 0; + for (BackendState* backend_state : backend_states_) { + total_finstances += backend_state->exec_params().instance_params().size(); + } + const string& query_progress_str = + Substitute("Query $0 progress", PrintId(query_id())); + query_progress_.Init(query_progress_str, total_finstances); + if (filter_mode_ != TRuntimeFilterMode::OFF) { // Populate the runtime filter routing table. This should happen before starting the // fragment instances. This code anticipates the indices of the instance states @@ -1050,7 +1058,8 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req vector<AuxErrorInfoPB> aux_error_info; if (backend_state->ApplyExecStatusReport(request, thrift_profiles, &exec_summary_, - &progress_, &dml_exec_state_, &aux_error_info, fragment_stats_)) { + &scan_progress_, &query_progress_, &dml_exec_state_, &aux_error_info, + fragment_stats_)) { // This backend execution has completed. if (VLOG_QUERY_IS_ON) { // Don't log backend completion if the query has already been cancelled. diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index aaeaa89fb..411113dd8 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -184,7 +184,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// individual fragment instances are merged into a single output to retain readability. std::string GetErrorLog(); - const ProgressUpdater& progress() const { return progress_; } + const ProgressUpdater& scan_progress() const { return scan_progress_; } + + const ProgressUpdater& query_progress() const { return query_progress_; } /// Get a copy of the current exec summary. Thread-safe. void GetTExecSummary(TExecSummary* exec_summary); @@ -335,7 +337,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Keeps track of number of completed ranges and total scan ranges. Initialized by /// Exec(). - ProgressUpdater progress_; + ProgressUpdater scan_progress_; + + /// Keeps track of number of completed fragment instances and total fragment instances. + /// Initialized by Exec(). + ProgressUpdater query_progress_; /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in Exec(). RuntimeProfile* query_profile_ = nullptr; diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 2eea07e1d..3a3a36805 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -1004,7 +1004,7 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) { Coordinator* coord = query_handle->GetCoordinator(); if (coord != nullptr) { // Report progress - ss << coord->progress().ToString() << "\n"; + ss << coord->scan_progress().ToString() << "\n"; } // Report the query status, if the query failed or has been retried. if (query_handle->IsRetriedQuery()) { diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index d18087728..cda45b343 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -466,6 +466,10 @@ void ImpalaHttpHandler::AddQueryRecordTips(Document* document) { "completed number of scan ranges / the total number of scan ranges.", document->GetAllocator()); + document->AddMember("tips_query_progress", "The progress of the query, i.e. the number " + "of completed fragment instances / the total number of fragment instances.", + document->GetAllocator()); + document->AddMember("tips_bytes_read", "The total number of bytes read from the data " "source during the query execution.", document->GetAllocator()); @@ -489,6 +493,17 @@ void ImpalaHttpHandler::AddQueryRecordTips(Document* document) { document->GetAllocator()); } +std::string ImpalaHttpHandler::ProgressToString(int64_t num_completed, int64_t total) { + stringstream ss; + ss << num_completed << " / " << total << " (" << setw(4); + if (num_completed == 0 || total == 0) { + ss << "0%)"; + } else { + ss << (100.0 * num_completed / (1.f * total)) << "%)"; + } + return ss.str(); +} + void ImpalaHttpHandler::QueryStateToJson(const ImpalaServer::QueryStateRecord& record, Value* value, Document* document, bool inflight) { Value user(record.effective_user.c_str(), document->GetAllocator()); @@ -543,21 +558,19 @@ void ImpalaHttpHandler::QueryStateToJson(const ImpalaServer::QueryStateRecord& r value->AddMember("mem_est", mem_est, document->GetAllocator()); string progress = "N/A"; + string query_progress = "N/A"; if (record.has_coord) { - stringstream ss; - ss << record.num_complete_fragments << " / " << record.total_fragments - << " (" << setw(4); - if (record.total_fragments == 0) { - ss << "0%)"; - } else { - ss << (100.0 * record.num_complete_fragments / (1.f * record.total_fragments)) - << "%)"; - } - progress = ss.str(); + progress = + ProgressToString(record.num_completed_scan_ranges, record.total_scan_ranges); + query_progress = ProgressToString(record.num_completed_fragment_instances, + record.total_fragment_instances); } Value progress_json(progress.c_str(), document->GetAllocator()); value->AddMember("progress", progress_json, document->GetAllocator()); + Value query_progress_json(query_progress.c_str(), document->GetAllocator()); + value->AddMember("query_progress", query_progress_json, document->GetAllocator()); + const string& printed_bytes_read = PrettyPrinter::Print(record.bytes_read, TUnit::BYTES); Value bytes_read(printed_bytes_read.c_str(), document->GetAllocator()); diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h index 118102fa7..3e4a3f03b 100644 --- a/be/src/service/impala-http-handler.h +++ b/be/src/service/impala-http-handler.h @@ -202,6 +202,9 @@ class ImpalaHttpHandler { void QueryMemoryHandler(const Webserver::WebRequest& req, rapidjson::Document* output); + /// Utility method to print progress something as n/m(xx%). + std::string ProgressToString(int64_t num_completed, int64_t total); + /// Helper method to render a single QueryStateRecord as a Json object Used by /// QueryStateHandler(). void QueryStateToJson(const ImpalaServer::QueryStateRecord& record, diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index e6105a80c..c1ab28531 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -917,9 +917,13 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use query_handle->GetCoordinator()->GetTExecSummary(result); TExecProgress progress; progress.__set_num_completed_scan_ranges( - query_handle->GetCoordinator()->progress().num_complete()); + query_handle->GetCoordinator()->scan_progress().num_complete()); progress.__set_total_scan_ranges( - query_handle->GetCoordinator()->progress().total()); + query_handle->GetCoordinator()->scan_progress().total()); + progress.__set_num_completed_fragment_instances( + query_handle->GetCoordinator()->query_progress().num_complete()); + progress.__set_total_fragment_instances( + query_handle->GetCoordinator()->query_progress().total()); // TODO: does this not need to be synchronized? result->__set_progress(progress); } else { @@ -2397,8 +2401,10 @@ void ImpalaServer::QueryStateRecord::Init(const ClientRequestState& query_handle Coordinator* coord = query_handle.GetCoordinator(); if (coord != nullptr) { - num_complete_fragments = coord->progress().num_complete(); - total_fragments = coord->progress().total(); + num_completed_scan_ranges = coord->scan_progress().num_complete(); + total_scan_ranges = coord->scan_progress().total(); + num_completed_fragment_instances = coord->query_progress().num_complete(); + total_fragment_instances = coord->query_progress().total(); auto utilization = coord->ComputeQueryResourceUtilization(); total_peak_mem_usage = utilization.total_peak_mem_usage; cluster_mem_est = query_handle.schedule()->cluster_mem_est(); @@ -2406,8 +2412,10 @@ void ImpalaServer::QueryStateRecord::Init(const ClientRequestState& query_handle bytes_sent = utilization.exchange_bytes_sent + utilization.scan_bytes_sent; has_coord = true; } else { - num_complete_fragments = 0; - total_fragments = 0; + num_completed_scan_ranges = 0; + total_scan_ranges = 0; + num_completed_fragment_instances = 0; + total_fragment_instances = 0; total_peak_mem_usage = 0; cluster_mem_est = 0; bytes_read = 0; diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 7afbc430f..6b40630d4 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -929,11 +929,17 @@ class ImpalaServer : public ImpalaServiceIf, /// True if the query required a coordinator fragment bool has_coord; - /// The number of fragments that have completed - int64_t num_complete_fragments; + /// The number of scan ranges that have completed. + int64_t num_completed_scan_ranges; - /// The total number of fragments - int64_t total_fragments; + /// The total number of scan ranges. + int64_t total_scan_ranges; + + /// The number of fragment instances that have completed. + int64_t num_completed_fragment_instances; + + /// The total number of fragment instances. + int64_t total_fragment_instances; /// The number of rows fetched by the client int64_t num_rows_fetched; diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift index 303d28cfe..f629eaf0a 100644 --- a/common/thrift/ExecStats.thrift +++ b/common/thrift/ExecStats.thrift @@ -84,6 +84,8 @@ struct TPlanNodeExecSummary { struct TExecProgress { 1: optional i64 total_scan_ranges 2: optional i64 num_completed_scan_ranges + 3: optional i64 total_fragment_instances; + 4: optional i64 num_completed_fragment_instances; } // Execution summary of an entire query. diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py index 7d29cd428..175b56568 100644 --- a/tests/query_test/test_observability.py +++ b/tests/query_test/test_observability.py @@ -208,6 +208,11 @@ class TestObservability(ImpalaTestSuite): # After fetching the results and reaching finished state, we should still be able to # fetch an exec_summary. assert exec_summary is not None and exec_summary.nodes is not None + # Verify the query is complete. + assert exec_summary.progress.num_completed_scan_ranges == \ + exec_summary.progress.total_scan_ranges + assert exec_summary.progress.num_completed_fragment_instances == \ + exec_summary.progress.total_fragment_instances def test_exec_summary_in_runtime_profile(self): """Test that the exec summary is populated in runtime profile correctly in every diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py index 22d3e2ba3..7cfeeab98 100644 --- a/tests/webserver/test_web_pages.py +++ b/tests/webserver/test_web_pages.py @@ -915,3 +915,12 @@ class TestWebPage(ImpalaTestSuite): assert page.status_code == requests.codes.ok page = requests.head("http://localhost:25020/operations") assert page.status_code == requests.codes.ok + + def test_query_progress(self): + """Tests that /queries page shows query progress.""" + query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)" + response_json = self.__run_query_and_get_debug_page( + query, self.QUERIES_URL, expected_state=self.client.QUERY_STATES["RUNNING"]) + for json_part in response_json['in_flight_queries']: + if query in json_part['stmt']: + assert json_part["query_progress"] == "0 / 4 ( 0%)" diff --git a/www/queries.tmpl b/www/queries.tmpl index 59513c4c2..0eba10ebb 100644 --- a/www/queries.tmpl +++ b/www/queries.tmpl @@ -44,7 +44,11 @@ command line parameter.</p> <hr style="margin-top:0px;margin-bottom:0px;"/> <span title="{{tips_mem_estimate}}">Mem Estimate</span> </th> - <th title="{{tips_scan_progress}}">Scan Progress</th> + <th> + <span title="{{tips_scan_progress}}">Scan Progress</span> + <hr style="margin-top:0px;margin-bottom:0px;"/> + <span title="{{tips_query_progress}}">Query Progress</span> + </th> <th> <span title="{{tips_bytes_read}}">Bytes Read</span> <hr style="margin-top:0px;margin-bottom:0px;"/> @@ -68,7 +72,7 @@ command line parameter.</p> <td>{{start_time}}</td> <td>{{duration}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{queued_duration}}</td> <td>{{mem_usage}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{mem_est}}</td> - <td>{{progress}}</td> + <td>{{progress}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{query_progress}}</td> <td>{{bytes_read}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{bytes_sent}}</td> <td><samp>{{state}}</samp></td> <td><samp>{{last_event}}</samp></td> @@ -104,7 +108,11 @@ command line parameter.</p> <hr style="margin-top:0px;margin-bottom:0px;"/> <span title="{{tips_mem_estimate}}">Mem Estimate</span> </th> - <th title="{{tips_scan_progress}}">Scan Progress</th> + <th> + <span title="{{tips_scan_progress}}">Scan Progress</span> + <hr style="margin-top:0px;margin-bottom:0px;"/> + <span title="{{tips_query_progress}}">Query Progress</span> + </th> <th> <span title="{{tips_bytes_read}}">Bytes Read</span> <hr style="margin-top:0px;margin-bottom:0px;"/> @@ -129,7 +137,7 @@ command line parameter.</p> <td>{{waiting_time}}</td> <td>{{duration}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{queued_duration}}</td> <td>{{mem_usage}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{mem_est}}</td> - <td>{{progress}}</td> + <td>{{progress}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{query_progress}}</td> <td>{{bytes_read}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{bytes_sent}}</td> <td><samp>{{state}}</samp></td> <td><samp>{{last_event}}</samp></td> @@ -161,7 +169,11 @@ command line parameter.</p> <hr style="margin-top:0px;margin-bottom:0px;"/> <span title="{{tips_mem_estimate}}">Mem Estimate</span> </th> - <th title="{{tips_scan_progress}}">Scan Progress</th> + <th> + <span title="{{tips_scan_progress}}">Scan Progress</span> + <hr style="margin-top:0px;margin-bottom:0px;"/> + <span title="{{tips_query_progress}}">Query Progress</span> + </th> <th> <span title="{{tips_bytes_read}}">Bytes Read</span> <hr style="margin-top:0px;margin-bottom:0px;"/> @@ -182,7 +194,7 @@ command line parameter.</p> <td>{{end_time}}</td> <td>{{duration}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{queued_duration}}</td> <td>{{mem_usage}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{mem_est}}</td> - <td>{{progress}}</td> + <td>{{progress}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{query_progress}}</td> <td>{{bytes_read}}<hr style="margin-top:0px;margin-bottom:0px;"/>{{bytes_sent}}</td> <td><samp>{{state}}</samp></td> <td>{{rows_fetched}}</td> diff --git a/www/query_detail_tabs.tmpl b/www/query_detail_tabs.tmpl index 61d9926d9..70e8d1cbe 100644 --- a/www/query_detail_tabs.tmpl +++ b/www/query_detail_tabs.tmpl @@ -47,6 +47,7 @@ under the License. <th title="{{tips_mem_usage}}">Mem Usage</th> <th title="{{tips_mem_estimate}}">Mem Estimate</th> <th title="{{tips_scan_progress}}">Scan Progress</th> + <th title="{{tips_query_progress}}">Query Progress</th> <th title="{{tips_bytes_read}}">Bytes Read</th> <th title="{{tips_bytes_sent}}">Bytes Sent</th> {{#inflight}} @@ -77,6 +78,7 @@ under the License. <td>{{mem_usage}}</td> <td>{{mem_est}}</td> <td>{{progress}}</td> + <td>{{query_progress}}</td> <td>{{bytes_read}}</td> <td>{{bytes_sent}}</td> {{#inflight}} @@ -120,10 +122,11 @@ if (typeof index == "undefined") { mem_usage : 6, mem_est : 7, progress : 8, - bytes_read : 9, - bytes_sent : 10, - last_event : 11, - rows_fetched : 12 + query_progress : 9, + bytes_read : 10, + bytes_sent : 11, + last_event : 12, + rows_fetched : 13 }; // For waiting query @@ -134,10 +137,11 @@ if (typeof index == "undefined") { mem_usage : 7, mem_est : 8, progress : 9, - bytes_read : 10, - bytes_sent : 11, - last_event : 12, - rows_fetched : 13 + query_progress : 10, + bytes_read : 11, + bytes_sent : 12, + last_event : 13, + rows_fetched : 14 }; }
