This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 818057b8758f8b3d3b669f88698033c6f5b79f27
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Nov 6 20:09:28 2024 -0800

    IMPALA-13526: Fix Agg node creation order in DistributedPlanner
    
    Within DistributedPlanner.java, there are several places where Planner
    need to insert extra merge aggregation node. It requires transferring
    HAVING conjuncts from preaggregation node to merge aggregation,
    unsetting limit, and recompute stats of preaggregation node. However,
    the stats recompute is not consistently done, and there might be an
    inefficient recompute happening.
    
    This patch fixes the order of AggregationNode creation order in
    DistributedPlanner.java so that stats recomputation is done consistently
    and efficiently.
    
    Testing:
    - Pass core tests.
    
    Change-Id: Ica8227fdc46a1ef59bef5ae5424ba3907827411d
    Reviewed-on: http://gerrit.cloudera.org:8080/22046
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Riza Suminto <[email protected]>
---
 .../org/apache/impala/planner/AggregationNode.java | 10 ++++--
 .../apache/impala/planner/DistributedPlanner.java  | 37 +++++++++++++++-------
 .../java/org/apache/impala/planner/PlanNode.java   |  4 +++
 .../queries/PlannerTest/aggregation.test           | 10 +++---
 .../multiple-distinct-materialization.test         |  6 ++--
 5 files changed, 45 insertions(+), 22 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index eda9d1a8f..9c0b7a84a 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -213,17 +213,23 @@ public class AggregationNode extends PlanNode {
 
   @Override
   public void init(Analyzer analyzer) throws InternalException {
+    init(analyzer, null);
+  }
+
+  public void init(Analyzer analyzer, List<Expr> transferredConjuncts)
+      throws InternalException {
     Preconditions.checkState(tupleIds_.size() == aggInfos_.size());
     // Assign conjuncts to the top-most agg in the single-node plan. They are 
transferred
-    // to the proper place in the distributed plan via transferConjuncts().
+    // to the proper place in the distributed plan via transferConjuncts 
argument.
     if (aggPhase_ == multiAggInfo_.getConjunctAssignmentPhase()) {
       conjuncts_.clear();
       // TODO: If this is the transposition phase, then we can push conjuncts 
that
       // reference a single aggregation class down into the aggregators of the
       // previous phase.
       conjuncts_.addAll(multiAggInfo_.collectConjuncts(analyzer, true));
-      conjuncts_ = orderConjunctsByCost(conjuncts_);
     }
+    if (transferredConjuncts != null) conjuncts_.addAll(transferredConjuncts);
+    conjuncts_ = orderConjunctsByCost(conjuncts_);
 
     // Compute the mem layout for both tuples here for simplicity.
     for (AggregateInfo aggInfo : aggInfos_) {
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 0dd8ecb2b..eb17be65e 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -1051,13 +1051,21 @@ public class DistributedPlanner {
       node.unsetLimit();
     }
     node.unsetNeedsFinalize();
+    // Move all HAVING predicates to be transferred into mergeAggNode.
+    List<Expr> havingPredicates = node.conjuncts_;
+    node.conjuncts_ = new ArrayList<>();
+    // Recompute stats after setIsPreagg and predicates removal.
+    // Must do this before creating mergeFragment.
+    node.computeStats(ctx_.getRootAnalyzer());
 
-    // place a merge aggregation step in a new fragment
+    // Place the merge aggregation step in a new fragment.
     PlanFragment mergeFragment = createParentFragment(childFragment, 
parentPartition);
     AggregationNode mergeAggNode = new AggregationNode(ctx_.getNextNodeId(),
         mergeFragment.getPlanRoot(), node.getMultiAggInfo(), 
AggPhase.FIRST_MERGE);
-    mergeAggNode.init(ctx_.getRootAnalyzer());
     mergeAggNode.setLimit(limit);
+    // Transfer all HAVING predicates into mergeAggNode and init().
+    // init() is called after setLimit() to avoid calling computeStats() again.
+    mergeAggNode.init(ctx_.getRootAnalyzer(), havingPredicates);
     // Carry the IsNonCorrelatedSclarSubquery_ flag to the merge node. This 
flag is
     // applicable regardless of the partition scheme for the children since it 
is a
     // logical property.
@@ -1069,12 +1077,6 @@ public class DistributedPlanner {
       mergeAggNode.setDisableCodegen(true);
     }
 
-    // HAVING predicates can only be evaluated after the merge agg step
-    node.transferConjuncts(mergeAggNode);
-    // Recompute stats after transferring the conjuncts_ (order is important).
-    node.computeStats(ctx_.getRootAnalyzer());
-    mergeFragment.getPlanRoot().computeStats(ctx_.getRootAnalyzer());
-    mergeAggNode.computeStats(ctx_.getRootAnalyzer());
     // Set new plan root after updating stats.
     mergeFragment.addPlanRoot(mergeAggNode);
 
@@ -1119,6 +1121,9 @@ public class DistributedPlanner {
     } else {
       phase1AggNode.setIntermediateTuple();
       phase1AggNode.setIsPreagg(ctx_);
+      // Recompute stats after setIsPreagg.
+      // Must do this before creating firstMergeFragment.
+      phase1AggNode.computeStats(ctx_.getRootAnalyzer());
 
       DataPartition phase1MergePartition =
           DataPartition.hashPartitioned(phase1PartitionExprs);
@@ -1155,15 +1160,23 @@ public class DistributedPlanner {
       phase2AggNode.setIsPreagg(ctx_);
       phase2MergePartition = 
DataPartition.hashPartitioned(phase2PartitionExprs);
     }
+
+    // Move all HAVING predicates to be transferred into phase2MergeAggNode.
+    List<Expr> havingPredicates = phase2AggNode.conjuncts_;
+    phase2AggNode.conjuncts_ = new ArrayList<>();
+    // Recompute stats after setIsPreagg and predicates removal.
+    // Must do this before creating phase2MergeAggNode.
+    phase2AggNode.computeStats(ctx_.getRootAnalyzer());
+
+    // Place the merge aggregation step in a new fragment.
     PlanFragment secondMergeFragment =
         createParentFragment(firstMergeFragment, phase2MergePartition);
-
     AggregationNode phase2MergeAggNode = new 
AggregationNode(ctx_.getNextNodeId(),
         phase2AggNode, phase2AggNode.getMultiAggInfo(), AggPhase.SECOND_MERGE);
-    phase2MergeAggNode.init(ctx_.getRootAnalyzer());
     phase2MergeAggNode.setLimit(limit);
-    // Transfer having predicates to final merge agg node
-    phase2AggNode.transferConjuncts(phase2MergeAggNode);
+    // Transfer all HAVING predicates into phase2MergeAggNode and init().
+    // init() is called after setLimit() to avoid calling computeStats() again.
+    phase2MergeAggNode.init(ctx_.getRootAnalyzer(), havingPredicates);
     secondMergeFragment.addPlanRoot(phase2MergeAggNode);
     return secondMergeFragment;
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 11ed81064..4c75d896b 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -304,6 +304,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     conjuncts_.addAll(conjuncts);
   }
 
+  /**
+   * Deprecated because it may hide the need to recompute stats after conjunct 
transfer.
+   */
+  @Deprecated
   public void transferConjuncts(PlanNode recipient) {
     recipient.conjuncts_.addAll(conjuncts_);
     conjuncts_.clear();
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index a602b161f..5f6a859da 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -422,7 +422,7 @@ PLAN-ROOT SINK
 04:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: t.bigint_col
-|  row-size=16B cardinality=10
+|  row-size=16B cardinality=20
 |
 06:AGGREGATE
 |  group by: t.bigint_col, int_col
@@ -565,7 +565,7 @@ PLAN-ROOT SINK
 04:AGGREGATE [STREAMING]
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
-|  row-size=24B cardinality=10
+|  row-size=24B cardinality=20
 |
 06:AGGREGATE
 |  output: count:merge(smallint_col)
@@ -645,7 +645,7 @@ PLAN-ROOT SINK
 05:AGGREGATE [STREAMING]
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
-|  row-size=24B cardinality=10
+|  row-size=24B cardinality=20
 |
 09:AGGREGATE
 |  output: count:merge(smallint_col)
@@ -1978,7 +1978,7 @@ PLAN-ROOT SINK
 02:AGGREGATE [STREAMING]
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: bigint_col
-|  row-size=24B cardinality=1
+|  row-size=24B cardinality=10
 |
 05:AGGREGATE
 |  output: count:merge(smallint_col)
@@ -2047,7 +2047,7 @@ PLAN-ROOT SINK
 02:AGGREGATE [STREAMING]
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: bigint_col
-|  row-size=24B cardinality=3
+|  row-size=24B cardinality=10
 |
 05:AGGREGATE
 |  output: count:merge(smallint_col)
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-materialization.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-materialization.test
index d17267e2b..d8dd6c857 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-materialization.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/multiple-distinct-materialization.test
@@ -969,7 +969,7 @@ PLAN-ROOT SINK
 02:AGGREGATE [STREAMING]
 |  output: count(tinyint_col)
 |  group by: date_string_col, timestamp_col
-|  row-size=44B cardinality=1
+|  row-size=44B cardinality=10
 |
 05:AGGREGATE
 |  group by: date_string_col, timestamp_col, tinyint_col
@@ -1030,7 +1030,7 @@ PLAN-ROOT SINK
 02:AGGREGATE [STREAMING]
 |  output: count(tinyint_col), min:merge(string_col), max:merge(string_col)
 |  group by: date_string_col, timestamp_col
-|  row-size=68B cardinality=1
+|  row-size=68B cardinality=3
 |
 05:AGGREGATE
 |  output: min:merge(string_col), max:merge(string_col)
@@ -1093,7 +1093,7 @@ PLAN-ROOT SINK
 02:AGGREGATE [STREAMING]
 |  output: count(smallint_col), max:merge(string_col)
 |  group by: date_string_col, timestamp_col
-|  row-size=56B cardinality=1
+|  row-size=56B cardinality=3
 |
 05:AGGREGATE
 |  output: max:merge(string_col)

Reply via email to