This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ade98362c8efd07c4e526b33dec4f6809ea92fb9 Author: zhangyifan27 <[email protected]> AuthorDate: Thu Feb 22 16:34:24 2024 +0800 IMPALA-12834: Add number of concurrent queries to profile This patch adds profile info string for the number of current running queries of the executor group on which the query is scheduled, to diagnose potential performance issues due to resource limit. Testing: - Add an e2e test to verify the information appears in profile Change-Id: I8389215b60022b39e7d171d6fc2418acca7c0658 Reviewed-on: http://gerrit.cloudera.org:8080/21063 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/scheduling/admission-controller.cc | 12 +++++++++++ be/src/scheduling/admission-controller.h | 5 +++++ tests/custom_cluster/test_admission_controller.py | 26 +++++++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 6d53d1f03..7f448e115 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -185,6 +185,8 @@ const string AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON = const string AdmissionController::PROFILE_INFO_KEY_ADMITTED_MEM = "Cluster Memory Admitted"; const string AdmissionController::PROFILE_INFO_KEY_EXECUTOR_GROUP = "Executor Group"; +const string AdmissionController::PROFILE_INFO_KEY_EXECUTOR_GROUP_QUERY_LOAD = + "Number of running queries in designated executor group when admitted"; const string AdmissionController::PROFILE_INFO_KEY_STALENESS_WARNING = "Admission control state staleness"; const string AdmissionController::PROFILE_TIME_SINCE_LAST_UPDATE_COUNTER_NAME = @@ -2268,6 +2270,8 @@ void AdmissionController::AdmitQuery(QueueNode* node, bool was_queued, bool is_t PROFILE_INFO_KEY_ADMITTED_MEM, PrintBytes(state->GetClusterMemoryToAdmit())); state->summary_profile()->AddInfoString( PROFILE_INFO_KEY_EXECUTOR_GROUP, state->executor_group()); + state->summary_profile()->AddInfoString(PROFILE_INFO_KEY_EXECUTOR_GROUP_QUERY_LOAD, + std::to_string(GetExecGroupQueryLoad(state->executor_group()))); // We may have admitted based on stale information. Include a warning in the profile // if this this may be the case. int64_t time_since_update_ms; @@ -2721,4 +2725,12 @@ void AdmissionController::UpdateExecGroupMetric( } } +int64_t AdmissionController::GetExecGroupQueryLoad(const string& grp_name) { + auto entry = exec_group_query_load_map_.find(grp_name); + if (entry != exec_group_query_load_map_.end()) { + return entry->second->GetValue(); + } + return 0; +} + } // namespace impala diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index 64f0e139a..6c6841107 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -339,6 +339,7 @@ class AdmissionController { static const std::string PROFILE_INFO_KEY_LAST_QUEUED_REASON; static const std::string PROFILE_INFO_KEY_ADMITTED_MEM; static const std::string PROFILE_INFO_KEY_EXECUTOR_GROUP; + static const std::string PROFILE_INFO_KEY_EXECUTOR_GROUP_QUERY_LOAD; static const std::string PROFILE_INFO_KEY_STALENESS_WARNING; static const std::string PROFILE_TIME_SINCE_LAST_UPDATE_COUNTER_NAME; @@ -1167,6 +1168,10 @@ class AdmissionController { /// admitted or released. void UpdateExecGroupMetric(const string& grp_name, int64_t delta); + /// Returns the query load for the given executor group. + /// If no query load information is available, returns 0. + int64_t GetExecGroupQueryLoad(const string& grp_name); + /// A helper type to glue information together to compute the topN queries out of <n> /// topM queries through a priority queue. Each object of the type represents a query. /// diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 174f263f3..4743bad65 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -905,6 +905,32 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): finally: client.close() + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1, + pool_max_mem=1024 * 1024 * 1024), statestored_args=_STATESTORED_ARGS) + def test_concurrent_queries(self): + """Test that the number of running queries appears in the profile when the query is + successfully admitted.""" + # A trivial coordinator only query is scheduled on the empty group which does not + # exist in the cluster. + result = self.execute_query_expect_success(self.client, "select 1") + assert "Executor Group: empty group (using coordinator only)" \ + in result.runtime_profile + assert "Number of running queries in designated executor group when admitted: 0" \ + in result.runtime_profile + # Two queries run concurrently in the default pool. + sleep_query = "select * from functional.alltypesagg where id < sleep(1000)" + query = "select * from functional.alltypesagg" + sleep_query_handle = self.client.execute_async(sleep_query) + self.client.wait_for_admission_control(sleep_query_handle) + self._wait_for_change_to_profile(sleep_query_handle, + "Admission result: Admitted immediately") + result = self.execute_query_expect_success(self.client, query) + assert "Executor Group: default" in result.runtime_profile + assert "Number of running queries in designated executor group when admitted: 2" \ + in result.runtime_profile + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
