This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 818057b8758f8b3d3b669f88698033c6f5b79f27 Author: Riza Suminto <[email protected]> AuthorDate: Wed Nov 6 20:09:28 2024 -0800 IMPALA-13526: Fix Agg node creation order in DistributedPlanner Within DistributedPlanner.java, there are several places where Planner need to insert extra merge aggregation node. It requires transferring HAVING conjuncts from preaggregation node to merge aggregation, unsetting limit, and recompute stats of preaggregation node. However, the stats recompute is not consistently done, and there might be an inefficient recompute happening. This patch fixes the order of AggregationNode creation order in DistributedPlanner.java so that stats recomputation is done consistently and efficiently. Testing: - Pass core tests. Change-Id: Ica8227fdc46a1ef59bef5ae5424ba3907827411d Reviewed-on: http://gerrit.cloudera.org:8080/22046 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Riza Suminto <[email protected]> --- .../org/apache/impala/planner/AggregationNode.java | 10 ++++-- .../apache/impala/planner/DistributedPlanner.java | 37 +++++++++++++++------- .../java/org/apache/impala/planner/PlanNode.java | 4 +++ .../queries/PlannerTest/aggregation.test | 10 +++--- .../multiple-distinct-materialization.test | 6 ++-- 5 files changed, 45 insertions(+), 22 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java index eda9d1a8f..9c0b7a84a 100644 --- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java @@ -213,17 +213,23 @@ public class AggregationNode extends PlanNode { @Override public void init(Analyzer analyzer) throws InternalException { + init(analyzer, null); + } + + public void init(Analyzer analyzer, List<Expr> transferredConjuncts) + throws InternalException { Preconditions.checkState(tupleIds_.size() == aggInfos_.size()); // Assign conjuncts to the top-most agg in the single-node plan. They are transferred - // to the proper place in the distributed plan via transferConjuncts(). + // to the proper place in the distributed plan via transferConjuncts argument. if (aggPhase_ == multiAggInfo_.getConjunctAssignmentPhase()) { conjuncts_.clear(); // TODO: If this is the transposition phase, then we can push conjuncts that // reference a single aggregation class down into the aggregators of the // previous phase. conjuncts_.addAll(multiAggInfo_.collectConjuncts(analyzer, true)); - conjuncts_ = orderConjunctsByCost(conjuncts_); } + if (transferredConjuncts != null) conjuncts_.addAll(transferredConjuncts); + conjuncts_ = orderConjunctsByCost(conjuncts_); // Compute the mem layout for both tuples here for simplicity. for (AggregateInfo aggInfo : aggInfos_) { diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 0dd8ecb2b..eb17be65e 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -1051,13 +1051,21 @@ public class DistributedPlanner { node.unsetLimit(); } node.unsetNeedsFinalize(); + // Move all HAVING predicates to be transferred into mergeAggNode. + List<Expr> havingPredicates = node.conjuncts_; + node.conjuncts_ = new ArrayList<>(); + // Recompute stats after setIsPreagg and predicates removal. + // Must do this before creating mergeFragment. + node.computeStats(ctx_.getRootAnalyzer()); - // place a merge aggregation step in a new fragment + // Place the merge aggregation step in a new fragment. PlanFragment mergeFragment = createParentFragment(childFragment, parentPartition); AggregationNode mergeAggNode = new AggregationNode(ctx_.getNextNodeId(), mergeFragment.getPlanRoot(), node.getMultiAggInfo(), AggPhase.FIRST_MERGE); - mergeAggNode.init(ctx_.getRootAnalyzer()); mergeAggNode.setLimit(limit); + // Transfer all HAVING predicates into mergeAggNode and init(). + // init() is called after setLimit() to avoid calling computeStats() again. + mergeAggNode.init(ctx_.getRootAnalyzer(), havingPredicates); // Carry the IsNonCorrelatedSclarSubquery_ flag to the merge node. This flag is // applicable regardless of the partition scheme for the children since it is a // logical property. @@ -1069,12 +1077,6 @@ public class DistributedPlanner { mergeAggNode.setDisableCodegen(true); } - // HAVING predicates can only be evaluated after the merge agg step - node.transferConjuncts(mergeAggNode); - // Recompute stats after transferring the conjuncts_ (order is important). - node.computeStats(ctx_.getRootAnalyzer()); - mergeFragment.getPlanRoot().computeStats(ctx_.getRootAnalyzer()); - mergeAggNode.computeStats(ctx_.getRootAnalyzer()); // Set new plan root after updating stats. mergeFragment.addPlanRoot(mergeAggNode); @@ -1119,6 +1121,9 @@ public class DistributedPlanner { } else { phase1AggNode.setIntermediateTuple(); phase1AggNode.setIsPreagg(ctx_); + // Recompute stats after setIsPreagg. + // Must do this before creating firstMergeFragment. + phase1AggNode.computeStats(ctx_.getRootAnalyzer()); DataPartition phase1MergePartition = DataPartition.hashPartitioned(phase1PartitionExprs); @@ -1155,15 +1160,23 @@ public class DistributedPlanner { phase2AggNode.setIsPreagg(ctx_); phase2MergePartition = DataPartition.hashPartitioned(phase2PartitionExprs); } + + // Move all HAVING predicates to be transferred into phase2MergeAggNode. + List<Expr> havingPredicates = phase2AggNode.conjuncts_; + phase2AggNode.conjuncts_ = new ArrayList<>(); + // Recompute stats after setIsPreagg and predicates removal. + // Must do this before creating phase2MergeAggNode. + phase2AggNode.computeStats(ctx_.getRootAnalyzer()); + + // Place the merge aggregation step in a new fragment. PlanFragment secondMergeFragment = createParentFragment(firstMergeFragment, phase2MergePartition); - AggregationNode phase2MergeAggNode = new AggregationNode(ctx_.getNextNodeId(), phase2AggNode, phase2AggNode.getMultiAggInfo(), AggPhase.SECOND_MERGE); - phase2MergeAggNode.init(ctx_.getRootAnalyzer()); phase2MergeAggNode.setLimit(limit); - // Transfer having predicates to final merge agg node - phase2AggNode.transferConjuncts(phase2MergeAggNode); + // Transfer all HAVING predicates into phase2MergeAggNode and init(). + // init() is called after setLimit() to avoid calling computeStats() again. + phase2MergeAggNode.init(ctx_.getRootAnalyzer(), havingPredicates); secondMergeFragment.addPlanRoot(phase2MergeAggNode); return secondMergeFragment; } diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 11ed81064..4c75d896b 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -304,6 +304,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> { conjuncts_.addAll(conjuncts); } + /** + * Deprecated because it may hide the need to recompute stats after conjunct transfer. + */ + @Deprecated public void transferConjuncts(PlanNode recipient) { recipient.conjuncts_.addAll(conjuncts_); conjuncts_.clear(); diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test index a602b161f..5f6a859da 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test @@ -422,7 +422,7 @@ PLAN-ROOT SINK 04:AGGREGATE [STREAMING] | output: count(int_col) | group by: t.bigint_col -| row-size=16B cardinality=10 +| row-size=16B cardinality=20 | 06:AGGREGATE | group by: t.bigint_col, int_col @@ -565,7 +565,7 @@ PLAN-ROOT SINK 04:AGGREGATE [STREAMING] | output: count(int_col), count:merge(smallint_col) | group by: t.bigint_col -| row-size=24B cardinality=10 +| row-size=24B cardinality=20 | 06:AGGREGATE | output: count:merge(smallint_col) @@ -645,7 +645,7 @@ PLAN-ROOT SINK 05:AGGREGATE [STREAMING] | output: count(int_col), count:merge(smallint_col) | group by: t.bigint_col -| row-size=24B cardinality=10 +| row-size=24B cardinality=20 | 09:AGGREGATE | output: count:merge(smallint_col) @@ -1978,7 +1978,7 @@ PLAN-ROOT SINK 02:AGGREGATE [STREAMING] | output: count(int_col), count:merge(smallint_col) | group by: bigint_col -| row-size=24B cardinality=1 +| row-size=24B cardinality=10 | 05:AGGREGATE | output: count:merge(smallint_col) @@ -2047,7 +2047,7 @@ PLAN-ROOT SINK 02:AGGREGATE [STREAMING] | output: count(int_col), count:merge(smallint_col) | group by: bigint_col -| row-size=24B cardinality=3 +| row-size=24B cardinality=10 | 05:AGGREGATE | output: count:merge(smallint_col) diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-materialization.test b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-materialization.test index d17267e2b..d8dd6c857 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-materialization.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-materialization.test @@ -969,7 +969,7 @@ PLAN-ROOT SINK 02:AGGREGATE [STREAMING] | output: count(tinyint_col) | group by: date_string_col, timestamp_col -| row-size=44B cardinality=1 +| row-size=44B cardinality=10 | 05:AGGREGATE | group by: date_string_col, timestamp_col, tinyint_col @@ -1030,7 +1030,7 @@ PLAN-ROOT SINK 02:AGGREGATE [STREAMING] | output: count(tinyint_col), min:merge(string_col), max:merge(string_col) | group by: date_string_col, timestamp_col -| row-size=68B cardinality=1 +| row-size=68B cardinality=3 | 05:AGGREGATE | output: min:merge(string_col), max:merge(string_col) @@ -1093,7 +1093,7 @@ PLAN-ROOT SINK 02:AGGREGATE [STREAMING] | output: count(smallint_col), max:merge(string_col) | group by: date_string_col, timestamp_col -| row-size=56B cardinality=1 +| row-size=56B cardinality=3 | 05:AGGREGATE | output: max:merge(string_col)
