This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 89a48b80a2689115d6bd90bf4312a452ae11477a Author: Riza Suminto <[email protected]> AuthorDate: Thu Aug 24 11:30:37 2023 -0700 IMPALA-12444: Fix minimum parallelism bug in scan fragment Scan fragment did not follow PROCESSING_COST_MIN_THREADS set by user even if total scan ranges allow to do so. This patch fix the issue by exposing ScanNode.maxScannerThreads_ to PlanFragment.adjustToMaxParallelism(). By using ScanNode.maxScannerThreads_ as an upper bound, ScanNode does not need to artificially lower ProcessingCost if maxScannerThreads_ is lower than minimum parallelism dictated by the original ProcessingCost. Thus, the synthetic ProcessingCost logic in ScanNode class is revised to only apply if input cardinality is unknown (-1). This patch also does the following adjustments: - Remove some dead codes in Frontend.java and PlanFragment.java. - Add sanity check such that PROCESSING_COST_MIN_THREADS <= MAX_FRAGMENT_INSTANCES_PER_NODE. - Tidy up test_query_cpu_count_divisor_default to reduce number of SET query. Testing: - Update test_query_cpu_count_divisor_default to ensure that PROCESSING_COST_MIN_THREADS is respected by scan fragment and error is returned if PROCESSING_COST_MIN_THREADS is greater than MAX_FRAGMENT_INSTANCES_PER_NODE. - Pass test_executor_groups.py. Change-Id: I69e5a80146d4ac41de5ef406fc2bdceffe3ec394 Reviewed-on: http://gerrit.cloudera.org:8080/20475 Reviewed-by: Kurt Deschler <[email protected]> Reviewed-by: Wenzhe Zhou <[email protected]> Tested-by: Riza Suminto <[email protected]> --- .../java/org/apache/impala/analysis/Analyzer.java | 2 + .../org/apache/impala/planner/CostingSegment.java | 2 +- .../org/apache/impala/planner/PlanFragment.java | 24 ++++----- .../java/org/apache/impala/planner/ScanNode.java | 37 +++++++------- .../java/org/apache/impala/service/Frontend.java | 28 +++++------ tests/custom_cluster/test_executor_groups.py | 57 ++++++++-------------- 6 files changed, 63 insertions(+), 87 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 1e024d525..224be7859 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -580,6 +580,8 @@ public class Analyzer { // singleton. private int numExecutorsForPlanning_ = -1; + // Number of available cores per executor node. + // Set by Frontend.java. private int availableCoresPerNode_ = -1; // Cache of KuduTables opened for this query. (map from table name to kudu table) diff --git a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java index 27ea37300..fed37bf29 100644 --- a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java +++ b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java @@ -192,7 +192,7 @@ public class CostingSegment extends TreeNode<CostingSegment> { nodeStepCount, minParallelism, maxParallelism, producerCost, cost_); newParallelism = Math.max(newParallelism, cost_.getNumInstancesExpected()); Preconditions.checkState(newParallelism <= maxParallelism, - "originalParallelism=" + originalParallelism + ". newParallelism=" + getRootId() + " originalParallelism=" + originalParallelism + ". newParallelism=" + newParallelism + " > maxParallelism=" + maxParallelism); if (LOG.isTraceEnabled()) { diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java index 44b52f152..de708b0a8 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java @@ -1062,6 +1062,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { int parentParallelism, int nodeStepCount) { int maxThreadAllowed = IntMath.saturatedMultiply(maxThreadPerNode, getNumNodes()); boolean canTryLower = true; + int maxScannerThreads = Integer.MAX_VALUE; // Compute selectedParallelism as the maximum allowed parallelism. int selectedParallelism = getNumInstances(); @@ -1099,13 +1100,12 @@ public class PlanFragment extends TreeNode<PlanFragment> { if (!scanNodes.isEmpty()) { // The existence of scan node may justify an increase of parallelism for this // union fargment, but it should be capped at costBasedMaxParallelism. - long maxRangesPerScanNode = 1; + maxScannerThreads = 1; for (ScanNode scanNode : scanNodes) { - maxRangesPerScanNode = - Math.max(maxRangesPerScanNode, scanNode.getEffectiveNumScanRanges()); + maxScannerThreads = Math.max(maxScannerThreads, scanNode.maxScannerThreads_); } - maxParallelism_ = Math.max(maxParallelism_, - (int) Math.min(costBasedMaxParallelism, maxRangesPerScanNode)); + maxParallelism_ = Math.max( + maxParallelism_, Math.min(costBasedMaxParallelism, maxScannerThreads)); } if (maxParallelism_ > maxThreadAllowed) { @@ -1135,8 +1135,8 @@ public class PlanFragment extends TreeNode<PlanFragment> { if (!scanNodes.isEmpty()) { Preconditions.checkState(scanNodes.size() == 1); ScanNode scanNode = scanNodes.get(0); - maxParallelism_ = - (int) Math.min(maxParallelism_, scanNode.getEffectiveNumScanRanges()); + maxScannerThreads = scanNode.maxScannerThreads_; + maxParallelism_ = Math.min(maxParallelism_, maxScannerThreads); // Prevent caller from lowering parallelism if fragment has ScanNode // because there is no child fragment to compare with. @@ -1153,7 +1153,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { getNumInstances(), selectedParallelism, "Follow maxThreadPerNode."); } } else { - if (maxParallelism_ < minParallelism && scanNodes.isEmpty()) { + if (maxParallelism_ < minParallelism && minParallelism < maxScannerThreads) { maxParallelism_ = minParallelism; canTryLower = false; if (LOG.isTraceEnabled()) { @@ -1177,7 +1177,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { // Initialize this fragment's parallelism to the selectedParallelism. setAdjustedInstanceCount(selectedParallelism); - return canTryLower; + return canTryLower && selectedParallelism > 1; } private boolean hasUnionNode() { @@ -1186,12 +1186,6 @@ public class PlanFragment extends TreeNode<PlanFragment> { return !nodes.isEmpty(); } - private boolean hasScanNode() { - List<ScanNode> nodes = Lists.newArrayList(); - collectPlanNodes(Predicates.instanceOf(ScanNode.class), nodes); - return !nodes.isEmpty(); - } - /** * Compute {@link CoreCount} of this fragment and populate it into 'fragmentCoreState'. * @param fragmentCoreState A map holding per-fragment core state. diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index a2bcda486..66ef705ce 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -84,6 +84,10 @@ abstract public class ScanNode extends PlanNode { // Refer to the comment of 'TableRef.tableNumRowsHint_' protected long tableNumRowsHint_ = -1; + // Maximum number of scanner threads after considering number of scan ranges + // and related query options.Calculated at computeScanProcessingCost. + protected int maxScannerThreads_ = -1; + public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) { super(id, desc.getId().asList(), displayName); desc_ = desc; @@ -358,39 +362,38 @@ abstract public class ScanNode extends PlanNode { */ protected ProcessingCost computeScanProcessingCost(TQueryOptions queryOptions) { Preconditions.checkArgument(queryOptions.isCompute_processing_cost()); - ProcessingCost cardinalityBasedCost = - ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(), - ExprUtil.computeExprsTotalCost(conjuncts_), rowMaterializationCost()); // maxThread calculation below intentionally does not include core count from // executor group config. This is to allow scan fragment parallelism to scale // regardless of the core count limit. int maxThreadsPerNode = Math.max(queryOptions.getProcessing_cost_min_threads(), queryOptions.getMax_fragment_instances_per_node()); - int maxScannerThreads = (int) Math.min(getEffectiveNumScanRanges(), + maxScannerThreads_ = (int) Math.min(getEffectiveNumScanRanges(), IntMath.saturatedMultiply(getNumNodes(), maxThreadsPerNode)); - - if (getInputCardinality() == 0) { - Preconditions.checkState(cardinalityBasedCost.getTotalCost() == 0, - "Scan is empty but cost is non-zero."); + long inputCardinality = getInputCardinality(); + + if (inputCardinality >= 0) { + ProcessingCost cardinalityBasedCost = + ProcessingCost.basicCost(getDisplayLabel(), inputCardinality, + ExprUtil.computeExprsTotalCost(conjuncts_), rowMaterializationCost()); + if (inputCardinality == 0) { + Preconditions.checkState(cardinalityBasedCost.getTotalCost() == 0, + "Scan is empty but cost is non-zero."); + } return cardinalityBasedCost; - } else if (maxScannerThreads < cardinalityBasedCost.getTotalCost() - / BackendConfig.INSTANCE.getMinProcessingPerThread()) { - // Input cardinality is unknown or cost is too high compared to maxScanThreads - // Return synthetic ProcessingCost based on maxScanThreads. + } else { + // Input cardinality is unknown. Return synthetic ProcessingCost based on + // maxScannerThreads_. long syntheticCardinality = - Math.max(1, Math.min(getInputCardinality(), maxScannerThreads)); + Math.max(1, Math.min(inputCardinality, maxScannerThreads_)); long syntheticPerRowCost = LongMath.saturatedMultiply( Math.max(1, BackendConfig.INSTANCE.getMinProcessingPerThread() / syntheticCardinality), - maxScannerThreads); + maxScannerThreads_); return ProcessingCost.basicCost( getDisplayLabel(), syntheticCardinality, 0, syntheticPerRowCost); } - - // None of the conditions above apply. - return cardinalityBasedCost; } /** diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 2821dfc5e..0d2153656 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -180,6 +180,7 @@ import org.apache.impala.thrift.TGetTableHistoryResult; import org.apache.impala.thrift.TGetTableHistoryResultItem; import org.apache.impala.thrift.TGrantRevokePrivParams; import org.apache.impala.thrift.TGrantRevokeRoleParams; +import org.apache.impala.thrift.TImpalaQueryOptions; import org.apache.impala.thrift.TLineageGraph; import org.apache.impala.thrift.TLoadDataReq; import org.apache.impala.thrift.TLoadDataResp; @@ -257,10 +258,8 @@ public class Frontend { private static final String VERDICT = "Verdict"; private static final String MEMORY_MAX = "MemoryMax"; private static final String MEMORY_ASK = "MemoryAsk"; - private static final String MEMORY_ASK_UNBOUNDED = "MemoryAskUnbounded"; private static final String CPU_MAX = "CpuMax"; private static final String CPU_ASK = "CpuAsk"; - private static final String CPU_ASK_UNBOUNDED = "CpuAskUnbounded"; /** * Plan-time context that allows capturing various artifacts created @@ -2074,8 +2073,6 @@ public class Frontend { int attempt = 0; int lastExecutorGroupTotalCores = expectedTotalCores(executorGroupSetsToUse.get(num_executor_group_sets - 1)); - long memoryAskUnbounded = -1; - int cpuAskUnbounded = -1; int i = 0; while (i < num_executor_group_sets) { group_set = executorGroupSetsToUse.get(i); @@ -2175,6 +2172,16 @@ public class Frontend { int scaled_cores_requirement = -1; if (isComputeCost) { Preconditions.checkState(cores_requirement > 0); + if (queryOptions.getProcessing_cost_min_threads() + > queryOptions.getMax_fragment_instances_per_node()) { + throw new AnalysisException( + TImpalaQueryOptions.PROCESSING_COST_MIN_THREADS.name() + " (" + + queryOptions.getProcessing_cost_min_threads() + + ") can not be larger than " + + TImpalaQueryOptions.MAX_FRAGMENT_INSTANCES_PER_NODE.name() + " (" + + queryOptions.getMax_fragment_instances_per_node() + ")."); + } + scaled_cores_requirement = (int) Math.min(Integer.MAX_VALUE, Math.ceil( cores_requirement / BackendConfig.INSTANCE.getQueryCpuCountDivisor())); @@ -2184,19 +2191,6 @@ public class Frontend { groupSetProfile, new TCounter(CPU_ASK, TUnit.UNIT, scaled_cores_requirement)); addCounter(groupSetProfile, new TCounter(EFFECTIVE_PARALLELISM, TUnit.UNIT, cores_requirement)); - - if (memoryAskUnbounded > 0) { - addCounter(groupSetProfile, - new TCounter(MEMORY_ASK_UNBOUNDED, TUnit.BYTES, - LongMath.saturatedMultiply( - expectedNumExecutor(group_set), memoryAskUnbounded))); - memoryAskUnbounded = -1; - } - if (cpuAskUnbounded > 0) { - addCounter(groupSetProfile, - new TCounter(CPU_ASK_UNBOUNDED, TUnit.UNIT, cpuAskUnbounded)); - cpuAskUnbounded = -1; - } } boolean matchFound = false; diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index af94e68f0..0ba856597 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -994,10 +994,8 @@ class TestExecutorGroups(CustomClusterTestSuite): "Memory and cpu limit checking is skipped."), "EffectiveParallelism: 13", "ExecutorGroupsConsidered: 1"]) - # Test setting REQUEST_POOL and disabling COMPUTE_PROCESSING_COST - self._set_query_options({ - 'COMPUTE_PROCESSING_COST': 'false', - 'REQUEST_POOL': 'root.large'}) + # Test setting REQUEST_POOL=root.large and disabling COMPUTE_PROCESSING_COST + self._set_query_options({'COMPUTE_PROCESSING_COST': 'false'}) self._run_query_and_verify_profile(CPU_TEST_QUERY, ["Query Options (set by configuration): REQUEST_POOL=root.large", "Executor Group: root.large-group", @@ -1024,26 +1022,13 @@ class TestExecutorGroups(CustomClusterTestSuite): ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", "Verdict: Match", "CpuAsk: 12"]) - # ENABLE_REPLAN=false should force query to run in tiny group, but high scan - # parallelism will cause it to exceed the admission control slots. + # ENABLE_REPLAN=false should force query to run in first group (tiny). self._set_query_options({'ENABLE_REPLAN': 'false'}) - result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY) - status = ("Rejected query from pool root.tiny: number of admission control slots " - r"needed \(10\) on backend '.*' is greater than total slots available 8. " - "Reduce mt_dop to less than 8 to ensure that the query can execute.") - assert re.search(status, str(result)) - - # ENABLE_REPLAN=false and MAX_FRAGMENT_INSTANCES_PER_NODE=4 should allow query to run - # in tiny group. - self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'}) - self._run_query_and_verify_profile(CPU_TEST_QUERY, + self._run_query_and_verify_profile(TEST_QUERY, ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", "Verdict: Assign to first group because query option ENABLE_REPLAN=false"]) - - # Unset both ENABLE_REPLAN and MAX_FRAGMENT_INSTANCES_PER_NODE - self._set_query_options({ - 'ENABLE_REPLAN': '', - 'MAX_FRAGMENT_INSTANCES_PER_NODE': ''}) + # Unset ENABLE_REPLAN. + self._set_query_options({'ENABLE_REPLAN': ''}) # Trivial query should be assigned to tiny group by Frontend. # Backend may decide to run it in coordinator only. @@ -1065,33 +1050,31 @@ class TestExecutorGroups(CustomClusterTestSuite): ["Executor Group:"]) # Test combination of PROCESSING_COST_MIN_THREADS and MAX_FRAGMENT_INSTANCES_PER_NODE. - self._set_query_options({ - 'PROCESSING_COST_MIN_THREADS': '1', - 'MAX_FRAGMENT_INSTANCES_PER_NODE': '3'}) + self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '3'}) self._run_query_and_verify_profile(GROUPING_TEST_QUERY, ["Executor Group: root.large-group", "EffectiveParallelism: 9", "ExecutorGroupsConsidered: 3"]) - self._set_query_options({ - 'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'}) + self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'}) self._run_query_and_verify_profile(GROUPING_TEST_QUERY, ["Executor Group: root.large-group", "EffectiveParallelism: 12", "ExecutorGroupsConsidered: 3"]) - self._set_query_options({ - 'PROCESSING_COST_MIN_THREADS': '3', - 'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'}) + self._set_query_options({'PROCESSING_COST_MIN_THREADS': '2'}) self._run_query_and_verify_profile(GROUPING_TEST_QUERY, - ["Executor Group: root.large-group", "EffectiveParallelism: 9", + ["Executor Group: root.large-group", "EffectiveParallelism: 12", "ExecutorGroupsConsidered: 3"]) - self._set_query_options({ - 'PROCESSING_COST_MIN_THREADS': '2', - 'MAX_FRAGMENT_INSTANCES_PER_NODE': '2'}) + self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '2'}) self._run_query_and_verify_profile(GROUPING_TEST_QUERY, - ["Executor Group: root.small-group", "EffectiveParallelism: 2", + ["Executor Group: root.small-group", "EffectiveParallelism: 4", "ExecutorGroupsConsidered: 2"]) + self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'}) + result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY) + status = (r"PROCESSING_COST_MIN_THREADS \(2\) can not be larger than " + r"MAX_FRAGMENT_INSTANCES_PER_NODE \(1\).") + assert re.search(status, str(result)) # Unset PROCESSING_COST_MIN_THREADS and MAX_FRAGMENT_INSTANCES_PER_NODE. self._set_query_options({ - 'PROCESSING_COST_MIN_THREADS': '', - 'MAX_FRAGMENT_INSTANCES_PER_NODE': ''}) + 'MAX_FRAGMENT_INSTANCES_PER_NODE': '', + 'PROCESSING_COST_MIN_THREADS': ''}) # BEGIN testing count queries # Test optimized count star query with 1824 scan ranges assign to small group. @@ -1222,7 +1205,7 @@ class TestExecutorGroups(CustomClusterTestSuite): # Check resource pools on the Web queries site and admission site self._verify_query_num_for_resource_pool("root.small", 10) - self._verify_query_num_for_resource_pool("root.tiny", 6) + self._verify_query_num_for_resource_pool("root.tiny", 5) self._verify_query_num_for_resource_pool("root.large", 12) self._verify_total_admitted_queries("root.small", 11) self._verify_total_admitted_queries("root.tiny", 8)
