This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b1628d864 IMPALA-13622: Fix negative cardinality bug in 
AggregationNode.java
b1628d864 is described below

commit b1628d8644d21c3b45acd8b2715a284cf5e8379b
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Dec 17 21:59:10 2024 -0800

    IMPALA-13622: Fix negative cardinality bug in AggregationNode.java
    
    An incomplete COMPUTE STATS during data loading reveal a bug in
    AggregationNode.java where estimateNumGroups() can return value less
    than -1.
    
    This patch fix the bug by implementing
    PlanNode.smallestValidCardinality() and
    MathUtil.saturatingMultiplyCardinalities(). Both function validates that
    the function arguments are valid cardinality number.
    smallestValidCardinality() correctly compares two cardinality numbers
    and return the smallest and valid one. It generalizes and replaces
    static function PlanNode.capCardinalityAtLimit().
    saturatingMultiplyCardinalities() adds validation and normalization over
    MathUtil.saturatingMultiply().
    
    Reorder logic of tuple-based estimation from IMPALA-13405 such that
    negative estimate is handled properly.
    
    Testing:
    - Added more preconditions in AgggregationNode.java.
    - Added CardinalityTest.testSmallestValidCardinality and
      MathUtilTest.testSaturatingMultiplyCardinality.
    - Added test in resource-requirements.test that will consistently fail
      without this fix.
    - Pass testResourceRequirement.
    
    Change-Id: Ib862a010b2094daa2cbdd5d555e46443009672ad
    Reviewed-on: http://gerrit.cloudera.org:8080/22235
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Jason Fehr <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/planner/AggregationNode.java |  73 +--
 .../java/org/apache/impala/planner/PlanNode.java   |  27 +-
 .../java/org/apache/impala/planner/SortNode.java   |   6 +-
 .../main/java/org/apache/impala/util/MathUtil.java |  14 +
 .../org/apache/impala/planner/CardinalityTest.java |  23 +
 .../java/org/apache/impala/util/MathUtilTest.java  |  24 +
 .../queries/PlannerTest/resource-requirements.test | 491 +++++++++++++++++++++
 7 files changed, 614 insertions(+), 44 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index 9c0b7a84a..0535f60da 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -297,7 +297,7 @@ public class AggregationNode extends PlanNode {
         numGroups =
             estimateNumGroups(aggInfo.getGroupingExprs(), 
aggInputCardinality_, this);
       }
-      Preconditions.checkState(numGroups >= -1, numGroups);
+      Preconditions.checkState(numGroups >= -1, "numGroups is invalid: %s", 
numGroups);
 
       if (LOG.isTraceEnabled()) {
         LOG.trace("{} aggPhase={} aggInputCardinality={} aggIdx={} 
numGroups={} "
@@ -340,7 +340,7 @@ public class AggregationNode extends PlanNode {
     if (groupingExprs.isEmpty()) {
       return NON_GROUPING_AGG_NUM_GROUPS;
     } else {
-      return lowerNumGroupsByInputCardinality(preaggNumGroup, 
aggInputCardinality);
+      return smallestValidCardinality(preaggNumGroup, aggInputCardinality);
     }
   }
 
@@ -400,7 +400,10 @@ public class AggregationNode extends PlanNode {
       }
     }
 
-    List<Pair<Long, List<Expr>>> knownCardinalities = new ArrayList<>();
+    // List of <outputCardinality, groupingExpressions> pairs.
+    // Note that outputCardinality can be -1.
+    List<Pair<Long, List<Expr>>> knownCardinalitySources = new ArrayList<>();
+
     if (tupleIdToExprs.isEmpty()) {
       Preconditions.checkState(exprsWithUniqueTupleId.size() == 
groupingExprs.size(),
           "Missing expression after TupleId analysis! Expect %s but found %s",
@@ -416,7 +419,7 @@ public class AggregationNode extends PlanNode {
         for (TupleId id : childNode.getTupleIds()) {
           List<Expr> exprs = tupleIdToExprs.get(id);
           if (exprs != null) {
-            knownCardinalities.add(Pair.create(childNode.getCardinality(), 
exprs));
+            
knownCardinalitySources.add(Pair.create(childNode.getCardinality(), exprs));
             tupleIdToExprs.remove(id);
           }
         }
@@ -433,18 +436,36 @@ public class AggregationNode extends PlanNode {
     }
 
     long numGroups = 1;
-    if (!exprsWithUniqueTupleId.isEmpty()) {
-      numGroups = estimateNumGroups(exprsWithUniqueTupleId, 
aggInputCardinality);
-    }
-    for (Pair<Long, List<Expr>> entry : knownCardinalities) {
+    for (Pair<Long, List<Expr>> entry : knownCardinalitySources) {
       // Pick the minimum between NDV multiple of the expressions vs 
cardinality
       // of tuple.
-      long numGroupFromCommonTuple = Math.min(
-          entry.getFirst(), estimateNumGroups(entry.getSecond(), 
aggInputCardinality));
-      numGroups = Math.min(aggInputCardinality,
-          MathUtil.saturatingMultiply(numGroups, numGroupFromCommonTuple));
+      long tupleCard = entry.getFirst();
+      long tupleNdv = estimateNumGroups(entry.getSecond(), 
aggInputCardinality);
+      long numGroupFromCommonTuple = smallestValidCardinality(tupleCard, 
tupleNdv);
+      if (numGroupFromCommonTuple < 0) {
+        // Can not reason about tuple cardinality.
+        // Fallback to the original estimation logic by moving all exprs to
+        // exprsWithUniqueTupleId.
+        exprsWithUniqueTupleId.addAll(entry.getSecond());
+      } else {
+        numGroups =
+            MathUtil.saturatingMultiplyCardinalites(numGroups, 
numGroupFromCommonTuple);
+      }
+    }
+    // numGroups should be non-negative at this point.
+    Preconditions.checkState(numGroups > -1, numGroups);
+
+    if (!exprsWithUniqueTupleId.isEmpty()) {
+      long numGroupsForUniqueTuples =
+          estimateNumGroups(exprsWithUniqueTupleId, aggInputCardinality);
+      if (numGroupsForUniqueTuples < 0) {
+        numGroups = -1;
+      } else {
+        numGroups =
+            MathUtil.saturatingMultiplyCardinalites(numGroups, 
numGroupsForUniqueTuples);
+      }
     }
-    return numGroups;
+    return smallestValidCardinality(numGroups, aggInputCardinality);
   }
 
   /**
@@ -457,26 +478,12 @@ public class AggregationNode extends PlanNode {
     Preconditions.checkArgument(
         !groupingExprs.isEmpty(), "groupingExprs must not be empty");
     long numGroups = Expr.getNumDistinctValues(groupingExprs);
-    if (numGroups == -1) {
-      // A worst-case cardinality_ is better than an unknown cardinality_.
-      // Note that this will still be -1 if the child's cardinality is unknown.
-      return aggInputCardinality;
-    }
-    // We have a valid estimate of the number of groups. Cap it at number of 
input
-    // rows because an aggregation cannot increase the cardinality_.
-    if (aggInputCardinality >= 0) {
-      numGroups = Math.min(aggInputCardinality, numGroups);
-    }
-    return lowerNumGroupsByInputCardinality(numGroups, aggInputCardinality);
-  }
-
-  private static long lowerNumGroupsByInputCardinality(
-      long numGroups, long aggInputCardinality) {
-    if (aggInputCardinality >= 0) {
-      return Math.min(numGroups, aggInputCardinality);
-    } else {
-      return numGroups;
-    }
+    // Return the least & valid cardinality between numGroups vs 
aggInputCardinality.
+    // Grouping aggregation cannot increase output cardinality.
+    // Also, worst-case output cardinality is better than an unknown output 
cardinality.
+    // Note that this will still be -1 (unknown) if both numGroups
+    // and aggInputCardinality is unknown.
+    return smallestValidCardinality(numGroups, aggInputCardinality);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 4c75d896b..3a8d4815b 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -748,10 +748,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   }
 
   protected long capCardinalityAtLimit(long cardinality) {
-    if (hasLimit()) {
-      return capCardinalityAtLimit(cardinality, limit_);
-    }
-    return cardinality;
+    return smallestValidCardinality(cardinality, limit_);
   }
 
   // Default implementation of computing the total data processed in bytes.
@@ -761,10 +758,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         ExprUtil.computeExprsTotalCost(getConjuncts()));
   }
 
-  public static long capCardinalityAtLimit(long cardinality, long limit) {
-    return cardinality == -1 ? limit : Math.min(cardinality, limit);
-  }
-
   /**
    * Call computeMemLayout() for all materialized tuples.
    */
@@ -1410,4 +1403,22 @@ abstract public class PlanNode extends 
TreeNode<PlanNode> {
    * need to explicitly enable it.
    */
   public boolean isTupleCachingImplemented() { return false; }
+
+  /**
+   * Return the least between 'cardinality1' and 'cardinality2'
+   * that is not a negative number (unknown).
+   * Can return -1 if both number is less than 0.
+   * Both argument should not be < -1.
+   */
+  protected static long smallestValidCardinality(long cardinality1, long 
cardinality2) {
+    Preconditions.checkArgument(
+        cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1);
+    Preconditions.checkArgument(
+        cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2);
+    if (cardinality1 >= 0) {
+      if (cardinality2 >= 0) return Math.min(cardinality1, cardinality2);
+      return cardinality1;
+    }
+    return Math.max(-1, cardinality2);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java 
b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 4d120818e..417128359 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -125,7 +125,7 @@ public class SortNode extends PlanNode {
       PlanNodeId id, PlanNode input, SortInfo info, long offset, long limit,
       boolean includeTies) {
     long topNBytesLimit = queryOptions.topn_bytes_limit;
-    long topNCardinality = capCardinalityAtLimit(input.cardinality_, limit);
+    long topNCardinality = smallestValidCardinality(input.cardinality_, limit);
     long estimatedTopNMaterializedSize =
         info.estimateTopNMaterializedSize(topNCardinality, offset);
 
@@ -221,7 +221,7 @@ public class SortNode extends PlanNode {
     Preconditions.checkState(!hasLimit());
     Preconditions.checkState(!hasOffset());
     long topNBytesLimit = analyzer.getQueryOptions().topn_bytes_limit;
-    long topNCardinality = capCardinalityAtLimit(getChild(0).cardinality_, 
limit);
+    long topNCardinality = smallestValidCardinality(getChild(0).cardinality_, 
limit);
     long estimatedTopNMaterializedSize =
         info_.estimateTopNMaterializedSize(topNCardinality, offset_);
 
@@ -302,7 +302,7 @@ public class SortNode extends PlanNode {
   public void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
     if (isTypeTopN() && includeTies_) {
-      cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_, 
limitWithTies_);
+      cardinality_ = smallestValidCardinality(getChild(0).cardinality_, 
limitWithTies_);
     } else {
       cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_);
     }
diff --git a/fe/src/main/java/org/apache/impala/util/MathUtil.java 
b/fe/src/main/java/org/apache/impala/util/MathUtil.java
index c4029fa59..5af1e023e 100644
--- a/fe/src/main/java/org/apache/impala/util/MathUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MathUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.util;
 
+import com.google.common.base.Preconditions;
 import com.google.common.math.LongMath;
 
 public class MathUtil {
@@ -32,6 +33,19 @@ public class MathUtil {
     }
   }
 
+  // Multiply two cardinality numbers like saturatingMultiply() but with 
additional
+  // precondition check that both must be a valid cardinality value.
+  // Return -1 if any argument is -1.
+  public static long saturatingMultiplyCardinalites(
+      long cardinality1, long cardinality2) {
+    Preconditions.checkArgument(
+        cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1);
+    Preconditions.checkArgument(
+        cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2);
+    if (cardinality1 == -1 || cardinality2 == -1) return -1;
+    return saturatingMultiply(cardinality1, cardinality2);
+  }
+
   // Add two numbers. If the add would overflow, return either Long.MAX_VALUE 
if both are
   // positive or Long.MIN_VALUE if both are negative. The overflow path is not 
optimised
   // at all and may be somewhat slow.
diff --git a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java 
b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
index 04fb65cd3..feca60835 100644
--- a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
@@ -1311,4 +1311,27 @@ public class CardinalityTest extends PlannerTestBase {
         expected * CARDINALITY_TOLERANCE);
   }
 
+  @Test
+  public void testSmallestValidCardinality() {
+    long[] validCard = {0, 1, Long.MAX_VALUE};
+    long unknown = -1;
+
+    // Case 1: both argument is valid.
+    for (long c1 : validCard) {
+      for (long c2 : validCard) {
+        assertEquals(c1 + " vs " + c2, Math.min(c1, c2),
+            PlanNode.smallestValidCardinality(c1, c2));
+      }
+    }
+    // Case 2: One argument is valid, the other is unknown.
+    for (long c : validCard) {
+      assertEquals(
+          c + " vs " + unknown, c, PlanNode.smallestValidCardinality(c, 
unknown));
+      assertEquals(
+          unknown + " vs " + c, c, PlanNode.smallestValidCardinality(unknown, 
c));
+    }
+    // Case 3: both argument is unknown.
+    assertEquals(unknown + " vs " + unknown, unknown,
+        PlanNode.smallestValidCardinality(unknown, unknown));
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/util/MathUtilTest.java 
b/fe/src/test/java/org/apache/impala/util/MathUtilTest.java
index 612006793..352eb5898 100644
--- a/fe/src/test/java/org/apache/impala/util/MathUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/util/MathUtilTest.java
@@ -44,6 +44,30 @@ public class MathUtilTest {
     assertEquals(Long.MAX_VALUE, MathUtil.saturatingMultiply(Long.MIN_VALUE / 
10, -100));
   }
 
+  @Test
+  public void testSaturatingMultiplyCardinality() {
+    long[] validCard = {0, 1, 2, Long.MAX_VALUE / 2, Long.MAX_VALUE};
+    long unknown = -1;
+
+    // Case 1: both argument is valid.
+    for (long c1 : validCard) {
+      for (long c2 : validCard) {
+        assertEquals(c1 + " * " + c2, MathUtil.saturatingMultiply(c1, c2),
+            MathUtil.saturatingMultiplyCardinalites(c1, c2));
+      }
+    }
+    // Case 2: One argument is valid, the other is unknown.
+    for (long c : validCard) {
+      assertEquals(c + " * " + unknown, unknown,
+          MathUtil.saturatingMultiplyCardinalites(c, unknown));
+      assertEquals(unknown + " * " + c, unknown,
+          MathUtil.saturatingMultiplyCardinalites(unknown, c));
+    }
+    // Case 3: both argument is unknown.
+    assertEquals(unknown + " * " + unknown, unknown,
+        MathUtil.saturatingMultiplyCardinalites(unknown, unknown));
+  }
+
   @Test
   public void testSaturatingAdd() {
     // No overflow
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 8deadd324..e29d53900 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -4252,6 +4252,497 @@ Per-Instance Resources: mem-estimate=88.47MB 
mem-reservation=8.00MB thread-reser
    tuple-ids=2 row-size=16B cardinality=6.00M
    in pipelines: 02(GETNEXT)
 ====
+# IMPALA-13622: TPC-H Q18 with tpch.customer replaced with tpch_avro.customer
+# that does not have stats. This meant to test that none of cardinality 
estimation code
+# for AggregationNode return cardinality or numGroups less than -1.
+# All preconditions in AggregationNode.java should pass.
+ select
+  c_name,
+  c_custkey,
+  o_orderkey,
+  o_orderdate,
+  o_totalprice,
+  sum(l_quantity)
+from
+  tpch_avro.customer,
+  tpch.orders,
+  tpch.lineitem
+where
+  o_orderkey in (
+    select
+      l_orderkey
+    from
+      tpch.lineitem
+    group by
+      l_orderkey
+    having
+      sum(l_quantity) > 300
+    )
+  and c_custkey = o_custkey
+  and o_orderkey = l_orderkey
+group by
+  c_name,
+  c_custkey,
+  o_orderkey,
+  o_orderdate,
+  o_totalprice
+order by
+  o_totalprice desc,
+  o_orderdate
+limit 100
+---- PLAN
+Max Per-Host Resource Reservation: Memory=138.50MB Threads=5
+Per-Host Resource Estimates: Memory=2.46GB
+WARNING: The following tables are missing relevant table and/or column 
statistics.
+tpch_avro.customer
+Analyzed query: SELECT c_name, c_custkey, o_orderkey, o_orderdate, 
o_totalprice,
+sum(l_quantity) FROM tpch_avro.customer, tpch.orders, tpch.lineitem LEFT SEMI
+JOIN (SELECT l_orderkey FROM tpch.lineitem GROUP BY l_orderkey HAVING
+sum(l_quantity) > CAST(300 AS DECIMAL(5,0))) `$a$1` (`$c$1`) ON o_orderkey =
+`$a$1`.`$c$1` WHERE TRUE AND c_custkey = o_custkey AND o_orderkey = l_orderkey
+GROUP BY c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice ORDER BY
+o_totalprice DESC, o_orderdate ASC LIMIT CAST(100 AS TINYINT)
+
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=2.46GB mem-reservation=138.50MB 
thread-reservation=5 runtime-filters-memory=4.00MB
+PLAN-ROOT SINK
+|  output exprs: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, 
sum(l_quantity)
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
+|
+09:TOP-N [LIMIT=100]
+|  order by: o_totalprice DESC, o_orderdate ASC
+|  mem-estimate=7.23KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=7 row-size=74B cardinality=100
+|  in pipelines: 09(GETNEXT), 08(OPEN)
+|
+08:AGGREGATE [FINALIZE]
+|  output: sum(l_quantity)
+|  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=6 row-size=74B cardinality=600.12K
+|  in pipelines: 08(GETNEXT), 02(OPEN)
+|
+07:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: o_orderkey = l_orderkey
+|  runtime filters: RF000[bloom] <- l_orderkey
+|  mem-estimate=8.50MB mem-reservation=8.50MB spill-buffer=512.00KB 
thread-reservation=0
+|  tuple-ids=2,1,0 row-size=82B cardinality=600.12K
+|  in pipelines: 02(GETNEXT), 04(OPEN)
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: sum(l_quantity)
+|  |  group by: l_orderkey
+|  |  having: sum(l_quantity) > CAST(300 AS DECIMAL(5,0))
+|  |  mem-estimate=53.68MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  |  tuple-ids=4 row-size=24B cardinality=156.34K
+|  |  in pipelines: 04(GETNEXT), 03(OPEN)
+|  |
+|  03:SCAN HDFS [tpch.lineitem]
+|     HDFS partitions=1/1 files=1 size=718.94MB
+|     stored statistics:
+|       table: rows=6.00M size=718.94MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.07M
+|     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=3 row-size=16B cardinality=6.00M
+|     in pipelines: 03(GETNEXT)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: o_custkey = c_custkey
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF002[bloom] <- c_custkey
+|  mem-estimate=2.00GB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=2,1,0 row-size=82B cardinality=5.76M
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+|--00:SCAN HDFS [tpch_avro.customer]
+|     HDFS partitions=1/1 files=1 size=23.05MB
+|     stored statistics:
+|       table: rows=unavailable size=23.05MB
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=32.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=0 row-size=20B cardinality=unavailable
+|     in pipelines: 00(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  runtime filters: RF004[bloom] <- o_orderkey
+|  mem-estimate=113.80MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=2,1 row-size=62B cardinality=5.76M
+|  in pipelines: 02(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [tpch.orders]
+|     HDFS partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF000[bloom] -> o_orderkey, RF002[bloom] -> o_custkey
+|     stored statistics:
+|       table: rows=1.50M size=162.56MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=1 row-size=46B cardinality=1.50M
+|     in pipelines: 01(GETNEXT)
+|
+02:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000[bloom] -> tpch.lineitem.l_orderkey, RF004[bloom] -> 
l_orderkey
+   stored statistics:
+     table: rows=6.00M size=718.94MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=1.07M
+   mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+   tuple-ids=2 row-size=16B cardinality=6.00M
+   in pipelines: 02(GETNEXT)
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=253.75MB Threads=11
+Per-Host Resource Estimates: Memory=2.71GB
+WARNING: The following tables are missing relevant table and/or column 
statistics.
+tpch_avro.customer
+Analyzed query: SELECT c_name, c_custkey, o_orderkey, o_orderdate, 
o_totalprice,
+sum(l_quantity) FROM tpch_avro.customer, tpch.orders, tpch.lineitem LEFT SEMI
+JOIN (SELECT l_orderkey FROM tpch.lineitem GROUP BY l_orderkey HAVING
+sum(l_quantity) > CAST(300 AS DECIMAL(5,0))) `$a$1` (`$c$1`) ON o_orderkey =
+`$a$1`.`$c$1` WHERE TRUE AND c_custkey = o_custkey AND o_orderkey = l_orderkey
+GROUP BY c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice ORDER BY
+o_totalprice DESC, o_orderdate ASC LIMIT CAST(100 AS TINYINT)
+
+F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, 
sum(l_quantity)
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
+|
+17:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: o_totalprice DESC, o_orderdate ASC
+|  limit: 100
+|  mem-estimate=25.26KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=7 row-size=74B cardinality=100
+|  in pipelines: 09(GETNEXT)
+|
+F06:PLAN FRAGMENT [HASH(c_name,c_custkey,o_orderkey,o_orderdate,o_totalprice)] 
hosts=3 instances=3
+Per-Host Resources: mem-estimate=138.23MB mem-reservation=34.00MB 
thread-reservation=1
+09:TOP-N [LIMIT=100]
+|  order by: o_totalprice DESC, o_orderdate ASC
+|  mem-estimate=7.23KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=7 row-size=74B cardinality=100
+|  in pipelines: 09(GETNEXT), 16(OPEN)
+|
+16:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_quantity)
+|  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=6 row-size=74B cardinality=600.12K
+|  in pipelines: 16(GETNEXT), 02(OPEN)
+|
+15:EXCHANGE [HASH(c_name,c_custkey,o_orderkey,o_orderdate,o_totalprice)]
+|  mem-estimate=10.23MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=6 row-size=74B cardinality=600.12K
+|  in pipelines: 02(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=2.23GB mem-reservation=144.75MB 
thread-reservation=1 runtime-filters-memory=4.00MB
+08:AGGREGATE [STREAMING]
+|  output: sum(l_quantity)
+|  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=6 row-size=74B cardinality=600.12K
+|  in pipelines: 02(GETNEXT)
+|
+07:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]
+|  hash predicates: o_orderkey = l_orderkey
+|  runtime filters: RF000[bloom] <- l_orderkey
+|  mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB 
thread-reservation=0
+|  tuple-ids=2,1,0 row-size=82B cardinality=600.12K
+|  in pipelines: 02(GETNEXT), 14(OPEN)
+|
+|--14:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(l_quantity)
+|  |  group by: l_orderkey
+|  |  having: sum(l_quantity) > CAST(300 AS DECIMAL(5,0))
+|  |  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  |  tuple-ids=4 row-size=24B cardinality=156.34K
+|  |  in pipelines: 14(GETNEXT), 03(OPEN)
+|  |
+|  13:EXCHANGE [HASH(l_orderkey)]
+|  |  mem-estimate=10.08MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=24B cardinality=1.56M
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F04:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=139.84MB mem-reservation=42.00MB 
thread-reservation=2
+|  04:AGGREGATE [STREAMING]
+|  |  output: sum(l_quantity)
+|  |  group by: l_orderkey
+|  |  mem-estimate=51.51MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  |  tuple-ids=4 row-size=24B cardinality=1.56M
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  03:SCAN HDFS [tpch.lineitem, RANDOM]
+|     HDFS partitions=1/1 files=1 size=718.94MB
+|     stored statistics:
+|       table: rows=6.00M size=718.94MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.07M
+|     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=3 row-size=16B cardinality=6.00M
+|     in pipelines: 03(GETNEXT)
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_custkey = c_custkey
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF002[bloom] <- c_custkey
+|  mem-estimate=2.00GB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=2,1,0 row-size=82B cardinality=5.76M
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+|--12:EXCHANGE [BROADCAST]
+|  |  mem-estimate=23.98KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=0 row-size=20B cardinality=unavailable
+|  |  in pipelines: 00(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.09MB mem-reservation=8.00MB 
thread-reservation=2
+|  00:SCAN HDFS [tpch_avro.customer, RANDOM]
+|     HDFS partitions=1/1 files=1 size=23.05MB
+|     stored statistics:
+|       table: rows=unavailable size=23.05MB
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=32.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=0 row-size=20B cardinality=unavailable
+|     in pipelines: 00(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  runtime filters: RF004[bloom] <- o_orderkey
+|  mem-estimate=37.93MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=2,1 row-size=62B cardinality=5.76M
+|  in pipelines: 02(GETNEXT), 01(OPEN)
+|
+|--11:EXCHANGE [HASH(o_orderkey)]
+|  |  mem-estimate=10.10MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=1 row-size=46B cardinality=1.50M
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=90.59MB mem-reservation=10.00MB 
thread-reservation=2 runtime-filters-memory=2.00MB
+|  01:SCAN HDFS [tpch.orders, RANDOM]
+|     HDFS partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF000[bloom] -> o_orderkey, RF002[bloom] -> o_custkey
+|     stored statistics:
+|       table: rows=1.50M size=162.56MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=1 row-size=46B cardinality=1.50M
+|     in pipelines: 01(GETNEXT)
+|
+10:EXCHANGE [HASH(l_orderkey)]
+|  mem-estimate=10.06MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=16B cardinality=6.00M
+|  in pipelines: 02(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=91.23MB mem-reservation=11.00MB 
thread-reservation=2 runtime-filters-memory=3.00MB
+02:SCAN HDFS [tpch.lineitem, RANDOM]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000[bloom] -> tpch.lineitem.l_orderkey, RF004[bloom] -> 
l_orderkey
+   stored statistics:
+     table: rows=6.00M size=718.94MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=1.07M
+   mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+   tuple-ids=2 row-size=16B cardinality=6.00M
+   in pipelines: 02(GETNEXT)
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=443.75MB Threads=16
+Per-Host Resource Estimates: Memory=3.23GB
+Analyzed query: SELECT c_name, c_custkey, o_orderkey, o_orderdate, 
o_totalprice,
+sum(l_quantity) FROM tpch_avro.customer, tpch.orders, tpch.lineitem LEFT SEMI
+JOIN (SELECT l_orderkey FROM tpch.lineitem GROUP BY l_orderkey HAVING
+sum(l_quantity) > CAST(300 AS DECIMAL(5,0))) `$a$1` (`$c$1`) ON o_orderkey =
+`$a$1`.`$c$1` WHERE TRUE AND c_custkey = o_custkey AND o_orderkey = l_orderkey
+GROUP BY c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice ORDER BY
+o_totalprice DESC, o_orderdate ASC LIMIT CAST(100 AS TINYINT)
+
+F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=4.05MB mem-reservation=4.00MB 
thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, 
sum(l_quantity)
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
+|
+17:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: o_totalprice DESC, o_orderdate ASC
+|  limit: 100
+|  mem-estimate=48.11KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=7 row-size=74B cardinality=100
+|  in pipelines: 09(GETNEXT)
+|
+F06:PLAN FRAGMENT [HASH(c_name,c_custkey,o_orderkey,o_orderdate,o_totalprice)] 
hosts=3 instances=6
+Per-Instance Resources: mem-estimate=138.46MB mem-reservation=34.00MB 
thread-reservation=1
+09:TOP-N [LIMIT=100]
+|  order by: o_totalprice DESC, o_orderdate ASC
+|  mem-estimate=7.23KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=7 row-size=74B cardinality=100
+|  in pipelines: 09(GETNEXT), 16(OPEN)
+|
+16:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_quantity)
+|  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=6 row-size=74B cardinality=600.12K
+|  in pipelines: 16(GETNEXT), 02(OPEN)
+|
+15:EXCHANGE [HASH(c_name,c_custkey,o_orderkey,o_orderdate,o_totalprice)]
+|  mem-estimate=10.46MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=6 row-size=74B cardinality=600.12K
+|  in pipelines: 02(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
+Per-Instance Resources: mem-estimate=139.95MB mem-reservation=34.00MB 
thread-reservation=1
+08:AGGREGATE [STREAMING]
+|  output: sum(l_quantity)
+|  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=6 row-size=74B cardinality=600.12K
+|  in pipelines: 02(GETNEXT)
+|
+07:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]
+|  hash-table-id=00
+|  hash predicates: o_orderkey = l_orderkey
+|  mem-estimate=0B mem-reservation=0B spill-buffer=128.00KB 
thread-reservation=0
+|  tuple-ids=2,1,0 row-size=82B cardinality=600.12K
+|  in pipelines: 02(GETNEXT), 14(OPEN)
+|
+|--F08:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
+|  |  Per-Instance Resources: mem-estimate=28.16MB mem-reservation=20.88MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: l_orderkey
+|  |  runtime filters: RF000[bloom] <- l_orderkey
+|  |  mem-estimate=2.88MB mem-reservation=2.88MB spill-buffer=128.00KB 
thread-reservation=0
+|  |
+|  14:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(l_quantity)
+|  |  group by: l_orderkey
+|  |  having: sum(l_quantity) > CAST(300 AS DECIMAL(5,0))
+|  |  mem-estimate=17.00MB mem-reservation=17.00MB spill-buffer=1.00MB 
thread-reservation=0
+|  |  tuple-ids=4 row-size=24B cardinality=156.34K
+|  |  in pipelines: 14(GETNEXT), 03(OPEN)
+|  |
+|  13:EXCHANGE [HASH(l_orderkey)]
+|  |  mem-estimate=10.16MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=24B cardinality=1.56M
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F04:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+|  Per-Instance Resources: mem-estimate=122.66MB mem-reservation=42.00MB 
thread-reservation=1
+|  04:AGGREGATE [STREAMING]
+|  |  output: sum(l_quantity)
+|  |  group by: l_orderkey
+|  |  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  |  tuple-ids=4 row-size=24B cardinality=1.56M
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  03:SCAN HDFS [tpch.lineitem, RANDOM]
+|     HDFS partitions=1/1 files=1 size=718.94MB
+|     stored statistics:
+|       table: rows=6.00M size=718.94MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.07M
+|     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
+|     tuple-ids=3 row-size=16B cardinality=6.00M
+|     in pipelines: 03(GETNEXT)
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=01
+|  hash predicates: o_custkey = c_custkey
+|  fk/pk conjuncts: assumed fk/pk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2,1,0 row-size=82B cardinality=5.76M
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+|--F09:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.00GB mem-reservation=69.00MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: c_custkey
+|  |  runtime filters: RF002[bloom] <- c_custkey
+|  |  mem-estimate=2.00GB mem-reservation=68.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  |
+|  12:EXCHANGE [BROADCAST]
+|  |  mem-estimate=23.98KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=0 row-size=20B cardinality=unavailable
+|  |  in pipelines: 00(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=32.09MB mem-reservation=8.00MB 
thread-reservation=1
+|  00:SCAN HDFS [tpch_avro.customer, RANDOM]
+|     HDFS partitions=1/1 files=1 size=23.05MB
+|     stored statistics:
+|       table: rows=unavailable size=23.05MB
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=32.00MB mem-reservation=8.00MB thread-reservation=0
+|     tuple-ids=0 row-size=20B cardinality=unavailable
+|     in pipelines: 00(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash-table-id=02
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  mem-estimate=0B mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2,1 row-size=62B cardinality=5.76M
+|  in pipelines: 02(GETNEXT), 01(OPEN)
+|
+|--F10:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
+|  |  Per-Instance Resources: mem-estimate=46.10MB mem-reservation=36.00MB 
thread-reservation=1 runtime-filters-memory=2.00MB
+|  JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: o_orderkey
+|  |  runtime filters: RF004[bloom] <- o_orderkey
+|  |  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  |
+|  11:EXCHANGE [HASH(o_orderkey)]
+|  |  mem-estimate=10.10MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=1 row-size=46B cardinality=1.50M
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Shared Resources: mem-estimate=2.00MB mem-reservation=2.00MB 
thread-reservation=0 runtime-filters-memory=2.00MB
+|  Per-Instance Resources: mem-estimate=89.17MB mem-reservation=8.00MB 
thread-reservation=1
+|  01:SCAN HDFS [tpch.orders, RANDOM]
+|     HDFS partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF000[bloom] -> o_orderkey, RF002[bloom] -> o_custkey
+|     stored statistics:
+|       table: rows=1.50M size=162.56MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
+|     tuple-ids=1 row-size=46B cardinality=1.50M
+|     in pipelines: 01(GETNEXT)
+|
+10:EXCHANGE [HASH(l_orderkey)]
+|  mem-estimate=10.12MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=16B cardinality=6.00M
+|  in pipelines: 02(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+Per-Host Shared Resources: mem-estimate=3.00MB mem-reservation=3.00MB 
thread-reservation=0 runtime-filters-memory=3.00MB
+Per-Instance Resources: mem-estimate=88.47MB mem-reservation=8.00MB 
thread-reservation=1
+02:SCAN HDFS [tpch.lineitem, RANDOM]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   runtime filters: RF000[bloom] -> tpch.lineitem.l_orderkey, RF004[bloom] -> 
l_orderkey
+   stored statistics:
+     table: rows=6.00M size=718.94MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=1.07M
+   mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
+   tuple-ids=2 row-size=16B cardinality=6.00M
+   in pipelines: 02(GETNEXT)
+====
 # Unpartitioned HDFS insert
 insert into table functional.alltypesnopart
 select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,


Reply via email to