This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f2b01c1ddb7d5f002d07fdda12afe9300ac316e8 Author: Riza Suminto <[email protected]> AuthorDate: Wed Mar 22 11:50:35 2023 -0700 IMPALA-12017: Skip memory and cpu limit check if REQUEST_POOL is set Memory and cpu limit checking in executor group selection (Frontend.java) should be skipped if REQUEST_POOL query option is set. Setting REQUEST_POOL means user is specifying pool to run the query regardless of memory and cpu limit. Testing: - Add test cases in test_query_cpu_count_divisor_default Change-Id: I14bf7fe71e2dda1099651b3edf62480e1fdbf845 Reviewed-on: http://gerrit.cloudera.org:8080/19645 Reviewed-by: Wenzhe Zhou <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Riza Suminto <[email protected]> --- .../java/org/apache/impala/service/Frontend.java | 21 ++++-- tests/custom_cluster/test_executor_groups.py | 74 +++++++++++++++++----- 2 files changed, 75 insertions(+), 20 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 28f097f3a..e59bf90bc 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -2059,6 +2059,7 @@ public class Frontend { } // Counters about this group set. + int available_cores = expectedTotalCores(group_set); String profileName = "Executor group " + (i + 1); if (group_set.isSetExec_group_name_prefix() && !group_set.getExec_group_name_prefix().isEmpty()) { @@ -2067,6 +2068,7 @@ public class Frontend { TRuntimeProfileNode groupSetProfile = createTRuntimeProfileNode(profileName); addCounter(groupSetProfile, new TCounter(MEMORY_MAX, TUnit.BYTES, group_set.getMax_mem_limit())); + addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, available_cores)); FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); // Find out the per host memory estimated from two possible sources. @@ -2089,28 +2091,38 @@ public class Frontend { boolean cpuReqSatisfied = true; int scaled_cores_requirement = -1; - int available_cores = -1; if (ProcessingCost.isComputeCost(queryOptions)) { Preconditions.checkState(cores_requirement > 0); scaled_cores_requirement = (int) Math.min(Integer.MAX_VALUE, Math.ceil( cores_requirement / BackendConfig.INSTANCE.getQueryCpuCountDivisor())); - available_cores = expectedTotalCores(group_set); cpuReqSatisfied = scaled_cores_requirement <= available_cores; - addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, available_cores)); addCounter( groupSetProfile, new TCounter(CPU_ASK, TUnit.UNIT, scaled_cores_requirement)); addCounter(groupSetProfile, new TCounter(EFFECTIVE_PARALLELISM, TUnit.UNIT, cores_requirement)); } - if (memReqSatisfied && cpuReqSatisfied) { + boolean matchFound = false; + if (queryOptions.isSetRequest_pool()) { + if (!default_executor_group) { + Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith( + queryOptions.getRequest_pool())); + } + reason = "query option REQUEST_POOL=" + queryOptions.getRequest_pool() + + " is set. Memory and cpu limit checking is skipped."; + addInfoString(groupSetProfile, VERDICT, reason); + matchFound = true; + } else if (memReqSatisfied && cpuReqSatisfied) { reason = "suitable group found (estimated per-host memory=" + PrintUtils.printBytes(per_host_mem_estimate) + ", estimated cpu cores required=" + cores_requirement + ", scaled cpu cores=" + scaled_cores_requirement + ")"; addInfoString(groupSetProfile, VERDICT, "Match"); + matchFound = true; + } + if (matchFound) { // Set the group name prefix in both the returned query options and // the query context for non default group setup. if (!default_executor_group) { @@ -2120,7 +2132,6 @@ public class Frontend { req.query_exec_request.query_ctx.setRequest_pool(namePrefix); } } - break; } diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index 7df6c27bb..4ae476e81 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -22,6 +22,7 @@ from builtins import range from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.util.concurrent_workload import ConcurrentWorkload +import copy import json import logging import os @@ -33,9 +34,12 @@ LOG = logging.getLogger("test_auto_scaling") # Non-trivial query that gets scheduled on all executors within a group. TEST_QUERY = "select count(*) from functional.alltypes where month + random() < 3" -# A query to test Cpu requirement. Estimated memory per host is 37MB. +# A query to test CPU requirement. Estimated memory per host is 37MB. CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;" +# Default query option to use for testing CPU requirement. +CPU_DOP_OPTIONS = {'MT_DOP': '2', 'COMPUTE_PROCESSING_COST': 'true'} + DEFAULT_RESOURCE_POOL = "default-pool" @@ -785,16 +789,16 @@ class TestExecutorGroups(CustomClusterTestSuite): result = self.execute_query_expect_success(self.client, LARGE_QUERY) assert "Executor Group: root.large-group" in str(result.runtime_profile) - # Force to run the large query on the small group should fail + # Force to run the large query on the small group. + # Query should run successfully since exec group memory limit is ignored. self.client.set_configuration({'request_pool': 'small'}) - result = self.execute_query_expect_failure(self.client, LARGE_QUERY) - assert ("The query does not fit largest executor group sets. " - "Reason: not enough per-host memory") in str(result) + result = self.execute_query_expect_success(self.client, LARGE_QUERY) + assert ("Verdict: query option REQUEST_POOL=small is set. " + "Memory and cpu limit checking is skipped.") in str(result.runtime_profile) self.client.close() - def _run_with_compute_processing_cost(self, coordinator_test_args, TEST_QUERY, - expected_strings_in_profile): + def _setup_three_exec_group_cluster(self, coordinator_test_args): # The path to resources directory which contains the admission control config files. RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources") @@ -840,37 +844,77 @@ class TestExecutorGroups(CustomClusterTestSuite): assert self._get_num_executor_groups(only_healthy=True, exec_group_set_prefix="root.large") == 1 - # assert that 'expected_profile' exist in query profile - self.execute_query_expect_success(self.client, 'SET MT_DOP=2;') - self.execute_query_expect_success(self.client, 'SET COMPUTE_PROCESSING_COST=1;') - result = self.execute_query_expect_success(self.client, TEST_QUERY) + def _run_query_and_verify_profile(self, query, query_options, + expected_strings_in_profile, not_expected_in_profile=[]): + """Run 'query' with given 'query_options'. Assert existence of + 'expected_strings_in_profile' and nonexistence of 'not_expected_in_profile' + in query profile. + Caller is reponsible to close self.client at the end of test.""" + for k, v in query_options.items(): + self.execute_query_expect_success(self.client, "SET {}='{}';".format(k, v)) + result = self.execute_query_expect_success(self.client, query) for expected_profile in expected_strings_in_profile: assert expected_profile in str(result.runtime_profile) - self.client.close() + for not_expected in not_expected_in_profile: + assert not_expected not in str(result.runtime_profile) @pytest.mark.execute_serially def test_query_cpu_count_divisor_default(self): # Expect to run the query on the small group by default. coordinator_test_args = "" - self._run_with_compute_processing_cost(coordinator_test_args, CPU_TEST_QUERY, + self._setup_three_exec_group_cluster(coordinator_test_args) + self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS, ["Executor Group: root.small-group", "EffectiveParallelism: 5", "ExecutorGroupsConsidered: 2"]) + # Test disabling COMPUTE_PROCESING_COST and not setting REQUEST_POOL + options = copy.deepcopy(CPU_DOP_OPTIONS) + options['COMPUTE_PROCESSING_COST'] = 'false' + self._run_query_and_verify_profile(CPU_TEST_QUERY, options, + ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", + "Verdict: Match"], + ["EffectiveParallelism:", "CpuAsk:"]) + + # Test that REQUEST_POOL will override executor group selection + options['COMPUTE_PROCESSING_COST'] = 'true' + options['REQUEST_POOL'] = 'root.large' + self._run_query_and_verify_profile(CPU_TEST_QUERY, options, + ["Executor Group: root.large-group", + ("Verdict: query option REQUEST_POOL=root.large is set. " + "Memory and cpu limit checking is skipped."), + "EffectiveParallelism: 7", "ExecutorGroupsConsidered: 1"]) + + # Test setting REQUEST_POOL and disabling COMPUTE_PROCESSING_COST + options['COMPUTE_PROCESSING_COST'] = 'false' + options['REQUEST_POOL'] = 'root.large' + self._run_query_and_verify_profile(CPU_TEST_QUERY, options, + ["Executor Group: root.large-group", + ("Verdict: query option REQUEST_POOL=root.large is set. " + "Memory and cpu limit checking is skipped."), + "ExecutorGroupsConsidered: 1"], + ["EffectiveParallelism:", "CpuAsk:"]) + + self.client.close() + @pytest.mark.execute_serially def test_query_cpu_count_divisor_two(self): # Expect to run the query on the tiny group coordinator_test_args = "-query_cpu_count_divisor=2 " - self._run_with_compute_processing_cost(coordinator_test_args, CPU_TEST_QUERY, + self._setup_three_exec_group_cluster(coordinator_test_args) + self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS, ["Executor Group: root.tiny-group", "EffectiveParallelism: 3", "ExecutorGroupsConsidered: 1"]) + self.client.close() @pytest.mark.execute_serially def test_query_cpu_count_divisor_fraction(self): # Expect to run the query on the large group coordinator_test_args = "-query_cpu_count_divisor=0.2 " - self._run_with_compute_processing_cost(coordinator_test_args, CPU_TEST_QUERY, + self._setup_three_exec_group_cluster(coordinator_test_args) + self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS, ["Executor Group: root.large-group", "EffectiveParallelism: 7", "ExecutorGroupsConsidered: 3"]) + self.client.close() @pytest.mark.execute_serially def test_per_exec_group_set_metrics(self):
