This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ac8ffa9125fc3be6f4ceb958ea4930b7834f9292 Author: Riza Suminto <[email protected]> AuthorDate: Mon Dec 18 15:47:27 2023 -0800 IMPALA-12654: Add query option QUERY_CPU_COUNT_DIVISOR IMPALA-11604 adds a hidden backend flag named query_cpu_count_divisor to allow oversubscribing CPU cores more than what is available in the executor group set. This patch adds a query option with the same name and function so that CPU core matching can be tuned for individual queries. The query option takes precedence over the flag. Testing: - Add test case in test_executor_groups.py and query-options-test.cc Change-Id: I34ab47bd67509a02790c3caedb3fde4d1b6eaa78 Reviewed-on: http://gerrit.cloudera.org:8080/20819 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/query-option-parser.h | 18 +++++++++++++++ be/src/service/query-options-test.cc | 17 ++++++++++++++ be/src/service/query-options.cc | 7 ++++++ be/src/service/query-options.h | 4 +++- be/src/util/backend-gflag-util.cc | 1 + common/thrift/ImpalaService.thrift | 9 ++++++++ common/thrift/Query.thrift | 3 +++ .../java/org/apache/impala/service/Frontend.java | 11 +++++---- tests/custom_cluster/test_executor_groups.py | 26 +++++++++++++++++++--- 9 files changed, 88 insertions(+), 8 deletions(-) diff --git a/be/src/service/query-option-parser.h b/be/src/service/query-option-parser.h index b87246668..4d31ae416 100644 --- a/be/src/service/query-option-parser.h +++ b/be/src/service/query-option-parser.h @@ -69,6 +69,16 @@ class QueryOptionValidator { return Status::OK(); } + static inline Status ExclusiveLowerBound( + TImpalaQueryOptions::type option, const T value, const T lower) { + if (value <= lower) { + std::stringstream ss; + ss << "Value must be greater than " << lower << ", actual value: " << value; + return CreateValidationErrorStatus(option, ss.str()); + } + return Status::OK(); + } + static inline Status NotEquals( TImpalaQueryOptions::type option, const T value, const T other) { if (value == other) { @@ -156,6 +166,14 @@ class QueryOptionParser { return QueryOptionValidator<T>::InclusiveLowerBound(option, *result, lower); } + template <typename T> + static Status ParseAndCheckExclusiveLowerBound(TImpalaQueryOptions::type option, + const std::string& value, const T lower, T* result) { + Status status = Parse(option, value, result); + RETURN_IF_ERROR(status); + return QueryOptionValidator<T>::ExclusiveLowerBound(option, *result, lower); + } + template <typename T> static Status ParseAndCheckNonNegative( TImpalaQueryOptions::type option, const std::string& value, T* result) { diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index 66cea4991..75bd1867a 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -428,6 +428,23 @@ TEST(QueryOptions, SetSpecialOptions) { TestError("8191"); // default value of FLAGS_min_buffer_size is 8KB TestOk("64KB", 64 * 1024); } + // QUERY_CPU_COUNT_DIVISOR should be greater than 0.0. + { + OptionDef<double> key_def = MAKE_OPTIONDEF(query_cpu_count_divisor); + auto TestOk = MakeTestOkFn(options, key_def); + auto TestError = MakeTestErrFn(options, key_def); + TestOk("0.5", 0.5); + TestOk("0.0000000001", 0.0000000001); + TestOk("0.999999999", 0.999999999); + TestOk(" 0.9", 0.9); + TestOk("1", 1.0); + TestOk("1.1", 1.1); + TestOk("1000.00", 1000.0); + TestError("0"); + TestError("-1"); + TestError("-0.1"); + TestError("Not a number!"); + } } TEST(QueryOptions, ParseQueryOptions) { diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index ec6dfce5e..0cf88b904 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -1195,6 +1195,13 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_max_num_filters_aggregated_per_host(int32_t_val); break; } + case TImpalaQueryOptions::QUERY_CPU_COUNT_DIVISOR: { + double double_val = 0.0f; + RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckExclusiveLowerBound<double>( + option, value, 0.0, &double_val)); + query_options->__set_query_cpu_count_divisor(double_val); + break; + } default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index d31116d3f..defa0dd7f 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // time we add or remove a query option to/from the enum TImpalaQueryOptions. #define QUERY_OPTS_TABLE \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \ - TImpalaQueryOptions::MAX_NUM_FILTERS_AGGREGATED_PER_HOST + 1); \ + TImpalaQueryOptions::QUERY_CPU_COUNT_DIVISOR + 1); \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \ @@ -321,6 +321,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE, TQueryOptionLevel::DEVELOPMENT) \ QUERY_OPT_FN(max_num_filters_aggregated_per_host, MAX_NUM_FILTERS_AGGREGATED_PER_HOST, \ TQueryOptionLevel::DEVELOPMENT) \ + QUERY_OPT_FN(query_cpu_count_divisor, \ + QUERY_CPU_COUNT_DIVISOR, TQueryOptionLevel::ADVANCED) \ ; /// Enforce practical limits on some query options to avoid undesired query state. diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 8c349aac3..64f2fe0be 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -210,6 +210,7 @@ DEFINE_string(ignored_dir_prefix_list, ".,_tmp.,_spark_metadata", " skip in loading file metadata."); DEFINE_double_hidden(query_cpu_count_divisor, 1.0, + "(Deprecated) this is now deprecated in favor of query option with the same name. " "(Advance) Divide the CPU requirement of a query to fit the total available CPU in " "the executor group. For example, setting value 2 will fit the query with CPU " "requirement 2X to an executor group with total available CPU X. Note that setting " diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 3276c2785..d308463dc 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -907,6 +907,15 @@ enum TImpalaQueryOptions { // ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting 1, 0, or negative value // will disable the intermediate aggregator feature. Default to -1 (disabled). MAX_NUM_FILTERS_AGGREGATED_PER_HOST = 172 + + // Divide the CPU requirement of a query to fit the total available CPU in + // the executor group. For example, setting value 2 will fit the query with CPU + // requirement 2X to an executor group with total available CPU X. Note that setting + // with a fractional value less than 1 effectively multiplies the query CPU + // requirement. A valid value is > 0.0. + // If this query option is not set, value of backend flag --query_cpu_count_divisor + // (default to 1.0) will be picked up instead. + QUERY_CPU_COUNT_DIVISOR = 173 } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 794301257..16d348e6e 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -693,6 +693,9 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 173: optional i32 max_num_filters_aggregated_per_host = -1 + + // See comment in ImpalaService.thrift + 174: optional double query_cpu_count_divisor } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 7be0861a1..31f9a570d 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -2080,9 +2080,13 @@ public class Frontend { planCtx.compilationState_.captureState(); boolean isComputeCost = queryOptions.isCompute_processing_cost(); + double cpuCountDivisor = BackendConfig.INSTANCE.getQueryCpuCountDivisor(); if (isComputeCost) { + if (queryOptions.isSetQuery_cpu_count_divisor()) { + cpuCountDivisor = queryOptions.getQuery_cpu_count_divisor(); + } FrontendProfile.getCurrent().setToCounter(CPU_COUNT_DIVISOR, TUnit.DOUBLE_VALUE, - Double.doubleToLongBits(BackendConfig.INSTANCE.getQueryCpuCountDivisor())); + Double.doubleToLongBits(cpuCountDivisor)); } TExecutorGroupSet group_set = null; @@ -2199,9 +2203,8 @@ public class Frontend { + queryOptions.getMax_fragment_instances_per_node() + ")."); } - scaled_cores_requirement = (int) Math.min(Integer.MAX_VALUE, - Math.ceil( - cores_requirement / BackendConfig.INSTANCE.getQueryCpuCountDivisor())); + scaled_cores_requirement = (int) Math.min( + Integer.MAX_VALUE, Math.ceil(cores_requirement / cpuCountDivisor)); cpuReqSatisfied = scaled_cores_requirement <= available_cores; addCounter( diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index 6718e4572..e7a9e382c 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -1221,11 +1221,31 @@ class TestExecutorGroups(CustomClusterTestSuite): self._run_query_and_verify_profile(CPU_TEST_QUERY, ["Executor Group: root.small-group", "CpuAsk: 6", "EffectiveParallelism: 11", - "ExecutorGroupsConsidered: 2"]) + "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"]) + + # Test that QUERY_CPU_COUNT_DIVISOR option can override + # query_cpu_count_divisor flag. + self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '1.0'}) + self._run_query_and_verify_profile(CPU_TEST_QUERY, + ["Executor Group: root.small-group", + "CpuAsk: 11", "EffectiveParallelism: 11", + "CpuCountDivisor: 1", "ExecutorGroupsConsidered: 2"]) + self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '0.5'}) + self._run_query_and_verify_profile(CPU_TEST_QUERY, + ["Executor Group: root.large-group", + "CpuAsk: 22", "EffectiveParallelism: 11", + "CpuCountDivisor: 0.5", "ExecutorGroupsConsidered: 3"]) + self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '2.0'}) + self._run_query_and_verify_profile(CPU_TEST_QUERY, + ["Executor Group: root.small-group", + "CpuAsk: 6", "EffectiveParallelism: 11", + "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"]) # Check resource pools on the Web queries site and admission site - self._verify_query_num_for_resource_pool("root.small", 1) - self._verify_total_admitted_queries("root.small", 1) + self._verify_query_num_for_resource_pool("root.small", 3) + self._verify_query_num_for_resource_pool("root.large", 1) + self._verify_total_admitted_queries("root.small", 3) + self._verify_total_admitted_queries("root.large", 1) @pytest.mark.execute_serially def test_query_cpu_count_divisor_fraction(self):
