This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c9aed342f54494d2f0b5e5aada472d6af6697adc Author: Riza Suminto <[email protected]> AuthorDate: Wed Jul 12 15:20:56 2023 -0700 IMPALA-12281: Disallow unsetting REQUEST_POOL if it is set by client IMPALA-12056 enable child query to unset REQUEST_POOL if it is set by Frontend.java as part of executor group selection. However, the implementation miss to setRequest_pool_set_by_frontend(false) if REQUEST_POOL is explicitly set by client request through impala-shell configuration. This cause child query to always unset REQUEST_POOL if parent query was executed via impala-shell. This patch fix the issue by checking query options that comes from client. This patch also tidy up null and empty REQUEST_POOL checking by using StringUtils.isNotEmpty(). Testing: - Add testcase in test_query_cpu_count_divisor_default Change-Id: Ib5036859d51bc64f568da405f730c8f3ffebb742 Reviewed-on: http://gerrit.cloudera.org:8080/20189 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Riza Suminto <[email protected]> --- .../java/org/apache/impala/service/Frontend.java | 22 +++++++---- tests/custom_cluster/test_executor_groups.py | 45 +++++++++++++++++----- 2 files changed, 50 insertions(+), 17 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 18365487d..b2124a1a4 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -1943,7 +1944,8 @@ public class Frontend { // If defined, request_pool can be a suffix of the group name prefix. For example // group_set_prefix = root.queue1 // request_pool = queue1 - if (request_pool != null && !e.getExec_group_name_prefix().endsWith(request_pool)) { + if (StringUtils.isNotEmpty(request_pool) + && !e.getExec_group_name_prefix().endsWith(request_pool)) { continue; } TExecutorGroupSet new_entry = new TExecutorGroupSet(e); @@ -1965,7 +1967,8 @@ public class Frontend { } result.add(new_entry); } - if (executorGroupSets.size() > 0 && result.size() == 0 && request_pool != null) { + if (executorGroupSets.size() > 0 && result.size() == 0 + && StringUtils.isNotEmpty(request_pool)) { throw new AnalysisException("Request pool: " + request_pool + " does not map to any known executor group set."); } @@ -2018,6 +2021,9 @@ public class Frontend { TQueryOptions queryOptions = queryCtx.client_request.getQuery_options(); boolean enable_replan = queryOptions.isEnable_replan(); + final boolean clientSetRequestPool = queryOptions.isSetRequest_pool(); + Preconditions.checkState( + !clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty()); List<TExecutorGroupSet> originalExecutorGroupSets = ExecutorMembershipSnapshot.getAllExecutorGroupSets(); @@ -2131,7 +2137,7 @@ public class Frontend { } if (notScalable) { - setGroupNamePrefix(default_executor_group, req, group_set); + setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set); addInfoString( groupSetProfile, VERDICT, "Assign to first group because " + reason); FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); @@ -2187,7 +2193,7 @@ public class Frontend { } boolean matchFound = false; - if (queryOptions.isSetRequest_pool()) { + if (clientSetRequestPool) { if (!default_executor_group) { Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith( queryOptions.getRequest_pool())); @@ -2216,7 +2222,7 @@ public class Frontend { FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); if (matchFound) { - setGroupNamePrefix(default_executor_group, req, group_set); + setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set); break; } @@ -2267,14 +2273,14 @@ public class Frontend { return req; } - private static void setGroupNamePrefix( - boolean default_executor_group, TExecRequest req, TExecutorGroupSet group_set) { + private static void setGroupNamePrefix(boolean default_executor_group, + boolean clientSetRequestPool, TExecRequest req, TExecutorGroupSet group_set) { // Set the group name prefix in both the returned query options and // the query context for non default group setup. if (!default_executor_group) { String namePrefix = group_set.getExec_group_name_prefix(); req.query_options.setRequest_pool(namePrefix); - req.setRequest_pool_set_by_frontend(true); + req.setRequest_pool_set_by_frontend(!clientSetRequestPool); if (req.query_exec_request != null) { req.query_exec_request.query_ctx.setRequest_pool(namePrefix); } diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index c42c06c15..dd8693d47 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -870,7 +870,9 @@ class TestExecutorGroups(CustomClusterTestSuite): exec_group_set_prefix="root.large") == 1 def _set_query_options(self, query_options): - """Set query options""" + """Set query options by running it as an SQL statement. + To mimic impala-shell behavior, use self.client.set_configuration() instead. + """ for k, v in query_options.items(): self.execute_query_expect_success(self.client, "SET {}='{}'".format(k, v)) @@ -904,9 +906,11 @@ class TestExecutorGroups(CustomClusterTestSuite): @UniqueDatabase.parametrize(sync_ddl=True) @pytest.mark.execute_serially def test_query_cpu_count_divisor_default(self, unique_database): - # Expect to run the query on the small group by default. coordinator_test_args = "" self._setup_three_exec_group_cluster(coordinator_test_args) + self.client.clear_configuration() + + # Expect to run the query on the small group by default. self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'}) self._run_query_and_verify_profile(CPU_TEST_QUERY, ["Executor Group: root.small-group", "EffectiveParallelism: 11", @@ -957,22 +961,36 @@ class TestExecutorGroups(CustomClusterTestSuite): self._run_query_and_verify_profile(compute_stats_query, ["ExecutorGroupsConsidered: 1", "Verdict: Assign to first group because query is not auto-scalable"], - ["Executor Group:"]) + ["Query Options (set by configuration): REQUEST_POOL=", + "Executor Group:"]) self._verify_total_admitted_queries("root.small", 4) self._verify_total_admitted_queries("root.large", 2) - # Test that child queries follow REQUEST_POOL that was set by client. + # Test that child queries follow REQUEST_POOL that is set through client + # configuration. Two child queries should all run in root.small. + self.client.set_configuration({'REQUEST_POOL': 'root.small'}) + self._run_query_and_verify_profile(compute_stats_query, + ["Query Options (set by configuration): REQUEST_POOL=root.small", + "ExecutorGroupsConsidered: 1", + "Verdict: Assign to first group because query is not auto-scalable"], + ["Executor Group:"]) + self._verify_total_admitted_queries("root.small", 6) + self.client.clear_configuration() + + # Test that child queries follow REQUEST_POOL that is set through SQL statement. # Two child queries should all run in root.large. self._set_query_options({'REQUEST_POOL': 'root.large'}) self._run_query_and_verify_profile(compute_stats_query, - ["ExecutorGroupsConsidered: 1", + ["Query Options (set by configuration): REQUEST_POOL=root.large", + "ExecutorGroupsConsidered: 1", "Verdict: Assign to first group because query is not auto-scalable"], ["Executor Group:"]) self._verify_total_admitted_queries("root.large", 4) # Test that REQUEST_POOL will override executor group selection self._run_query_and_verify_profile(CPU_TEST_QUERY, - ["Executor Group: root.large-group", + ["Query Options (set by configuration): REQUEST_POOL=root.large", + "Executor Group: root.large-group", ("Verdict: query option REQUEST_POOL=root.large is set. " "Memory and cpu limit checking is skipped."), "EffectiveParallelism: 13", "ExecutorGroupsConsidered: 1"]) @@ -982,7 +1000,8 @@ class TestExecutorGroups(CustomClusterTestSuite): 'COMPUTE_PROCESSING_COST': 'false', 'REQUEST_POOL': 'root.large'}) self._run_query_and_verify_profile(CPU_TEST_QUERY, - ["Executor Group: root.large-group", + ["Query Options (set by configuration): REQUEST_POOL=root.large", + "Executor Group: root.large-group", ("Verdict: query option REQUEST_POOL=root.large is set. " "Memory and cpu limit checking is skipped."), "ExecutorGroupsConsidered: 1"], @@ -993,6 +1012,14 @@ class TestExecutorGroups(CustomClusterTestSuite): 'REQUEST_POOL': '', 'COMPUTE_PROCESSING_COST': 'true'}) + # Test that empty REQUEST_POOL should have no impact. + self.client.set_configuration({'REQUEST_POOL': ''}) + self._run_query_and_verify_profile(CPU_TEST_QUERY, + ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2", + "Verdict: Match"], + ["Query Options (set by configuration): REQUEST_POOL="]) + self.client.clear_configuration() + # Test that GROUPING_TEST_QUERY will get assigned to the large group. self._run_query_and_verify_profile(GROUPING_TEST_QUERY, ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", @@ -1160,10 +1187,10 @@ class TestExecutorGroups(CustomClusterTestSuite): # END testing insert + MAX_FS_WRITER # Check resource pools on the Web queries site and admission site - self._verify_query_num_for_resource_pool("root.small", 4) + self._verify_query_num_for_resource_pool("root.small", 7) self._verify_query_num_for_resource_pool("root.tiny", 4) self._verify_query_num_for_resource_pool("root.large", 12) - self._verify_total_admitted_queries("root.small", 5) + self._verify_total_admitted_queries("root.small", 8) self._verify_total_admitted_queries("root.tiny", 6) self._verify_total_admitted_queries("root.large", 16)
