This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ed6c19cf0ccef0d55bd9397245f61abc3327e439
Author: Riza Suminto <[email protected]>
AuthorDate: Tue May 13 13:38:14 2025 -0700

    IMPALA-14071: Refactor helper methods around cardinality bounding
    
    There are multiple ways to do cardinality multipication that also avoid
    integer overflow. Some helper methods available are:
    - MathUtil.saturatingMultiplyCardinalities()
    - PlanNode.checkedMultiply()
    - LongMath.saturatedMultiply()
    
    This patch intent to simplify things by:
    - MathUtil.saturatingMultiplyCardinalities() with
      PlanNode.checkedMultiply() into MathUtil.multiplyCardinalities().
    - MathUtil.saturatingAddCardinalities() with
      PlanNode.checkedAdd() into MathUtil.addCardinalities().
    - Move PlanNode.smallestValidCardinality() to MathUtil.
    - Make MathUtil.saturatingMultiply() and MathUtil.saturatingAdd() simply
      a wrapper for LongMath.saturatedMultiply() and LongMath.saturatedAdd()
      accordingly.
    
    multiplyCardinalities(), addCardinalities(), and
    smallestValidCardinality() have cardinality Preconditions check.
    
    Harden cardinality calculation in several places by using
    multiplyCardinalities() and addCardinalities() accordingly. Added sanity
    check PlanNode.verifyCardinality() that is evaluated at the end of
    PlanNode.computeStats(). This ensure that cardinality_ and
    inputCardinality_ is always valid after PlanNode.computeStats().
    
    Also fixed bug in ExchangeNode.estimateTotalQueueByteSize() that prevent
    calculation against negative cardinality or non-positive num nodes.
    
    Testing:
    Pass Following FE and EE tests:
    CardinalityTest
    MathUtilTest
    PlannerTest#testSpillableBufferSizing+testResourceRequirements
    TpcdsCpuCostPlannerTest
    TpcdsPlannerTest
    metadata/test_explain.py
    
    Change-Id: I505ab11cfa1024feb4ceac4cffe9c3283be228ce
    Reviewed-on: http://gerrit.cloudera.org:8080/22897
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/analysis/SortInfo.java  |   6 +-
 .../org/apache/impala/planner/AggregationNode.java |  45 +++++----
 .../apache/impala/planner/DistributedPlanner.java  |   3 +-
 .../org/apache/impala/planner/ExchangeNode.java    |   6 +-
 .../org/apache/impala/planner/HBaseScanNode.java   |   8 +-
 .../org/apache/impala/planner/HashJoinNode.java    |   5 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  20 ++--
 .../org/apache/impala/planner/HdfsTableSink.java   |   5 +-
 .../apache/impala/planner/IcebergDeleteNode.java   |   5 +-
 .../org/apache/impala/planner/IcebergScanNode.java |  19 +++-
 .../java/org/apache/impala/planner/JoinNode.java   |  15 ++-
 .../apache/impala/planner/NestedLoopJoinNode.java  |  12 +--
 .../org/apache/impala/planner/PlanFragment.java    |   3 +-
 .../java/org/apache/impala/planner/PlanNode.java   |  54 ++--------
 .../java/org/apache/impala/planner/ScanNode.java   |   6 ++
 .../java/org/apache/impala/planner/SortNode.java   |  12 ++-
 .../org/apache/impala/planner/SubplanNode.java     |   5 +-
 .../java/org/apache/impala/planner/UnionNode.java  |  11 ++-
 .../main/java/org/apache/impala/util/MathUtil.java |  75 ++++++++++----
 .../org/apache/impala/planner/CardinalityTest.java |  36 +------
 .../java/org/apache/impala/util/MathUtilTest.java  | 109 +++++++++++++++++++--
 .../queries/PlannerTest/resource-requirements.test |  56 +++++------
 .../PlannerTest/spillable-buffer-sizing.test       |  40 ++++----
 .../tpcds_cpu_cost/tpcds-ddl-iceberg.test          |   6 +-
 .../tpcds_cpu_cost/tpcds-ddl-parquet.test          |   2 +-
 .../queries/QueryTest/explain-level2.test          |   2 +-
 26 files changed, 332 insertions(+), 234 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java 
b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index 2b7c6c00e..1b0479041 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -21,21 +21,19 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.MapType;
-import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.TreeNode;
-import org.apache.impala.planner.PlanNode;
 import org.apache.impala.planner.ProcessingCost;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.ExprUtil;
+import org.apache.impala.util.MathUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
@@ -290,7 +288,7 @@ public class SortInfo {
    * operator and 'offset' is the value in the 'OFFSET [x]' clause.
    */
   public long estimateTopNMaterializedSize(long cardinality, long offset) {
-    long totalRows = PlanNode.checkedAdd(cardinality, offset);
+    long totalRows = MathUtil.addCardinalities(cardinality, offset);
     return estimateMaterializedSize(totalRows);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index cea564663..ac5f8e5f4 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -345,12 +345,12 @@ public class AggregationNode extends PlanNode implements 
SpillableOperator {
                 groupingExprs.isEmpty(), canCompleteEarly, getLimit());
           }
         } else if (canCompleteEarly) {
-          aggOutputCard = smallestValidCardinality(aggOutputCard, getLimit());
+          aggOutputCard = MathUtil.smallestValidCardinality(aggOutputCard, 
getLimit());
         }
         aggClassOutputCardinality_.add(aggOutputCard);
         // IMPALA-2945: Behavior change if estimatePreaggDuplicate is true.
-        cardinality_ =
-            checkedAdd(cardinality_, estimatePreaggDuplicate ? aggOutputCard : 
numGroups);
+        cardinality_ = MathUtil.addCardinalities(
+            cardinality_, estimatePreaggDuplicate ? aggOutputCard : numGroups);
       }
       aggIdx++;
     }
@@ -446,25 +446,24 @@ public class AggregationNode extends PlanNode implements 
SpillableOperator {
 
     if (canCompleteEarly) {
       Preconditions.checkArgument(limit > -1, "limit must not be negative.");
-      long limitMultiple =
-          MathUtil.saturatingMultiplyCardinalities(limit, totalInstances);
-      long ndvMultiple =
-          MathUtil.saturatingMultiplyCardinalities(globalNdv, totalInstances);
-      return smallestValidCardinality(
-          inputCardinality, smallestValidCardinality(ndvMultiple, 
limitMultiple));
+      long limitMultiple = MathUtil.multiplyCardinalities(limit, 
totalInstances);
+      long ndvMultiple = MathUtil.multiplyCardinalities(globalNdv, 
totalInstances);
+      return MathUtil.smallestValidCardinality(inputCardinality,
+          MathUtil.smallestValidCardinality(ndvMultiple, limitMultiple));
     } else if (isNonGroupingAggregation) {
-      return smallestValidCardinality(inputCardinality,
-          MathUtil.saturatingMultiplyCardinalities(globalNdv, totalInstances));
+      return MathUtil.smallestValidCardinality(
+          inputCardinality, MathUtil.multiplyCardinalities(globalNdv, 
totalInstances));
     } else if (totalInstances > 1 && inputCardinality > globalNdv) {
       double perInstanceInputCard = Math.ceil((double) inputCardinality / 
totalInstances);
       double globalNdvInDouble = (double) globalNdv;
       double probValExist = 1.0
           - Math.pow((globalNdvInDouble - 1.0) / globalNdvInDouble, 
perInstanceInputCard);
       double perInstanceNdv = Math.ceil(probValExist * globalNdvInDouble);
-      long preaggOutputCard = MathUtil.saturatingMultiplyCardinalities(
-          Math.round(perInstanceNdv), totalInstances);
+      long preaggOutputCard =
+          MathUtil.multiplyCardinalities(Math.round(perInstanceNdv), 
totalInstances);
       // keep bounding at aggInputCardinality_ max.
-      preaggOutputCard = smallestValidCardinality(inputCardinality, 
preaggOutputCard);
+      preaggOutputCard =
+          MathUtil.smallestValidCardinality(inputCardinality, 
preaggOutputCard);
       LOG.trace("inputCardinality={} perInstanceInputCard={} globalNdv={} 
probValExist={}"
               + " perInstanceNdv={} preaggOutputCard={}",
           inputCardinality, perInstanceInputCard, globalNdv, probValExist, 
perInstanceNdv,
@@ -472,7 +471,7 @@ public class AggregationNode extends PlanNode implements 
SpillableOperator {
       return preaggOutputCard;
     }
     // Input is likely unique already.
-    return smallestValidCardinality(inputCardinality, globalNdv);
+    return MathUtil.smallestValidCardinality(inputCardinality, globalNdv);
   }
 
   /**
@@ -485,7 +484,7 @@ public class AggregationNode extends PlanNode implements 
SpillableOperator {
     if (groupingExprs.isEmpty()) {
       return NON_GROUPING_AGG_NUM_GROUPS;
     } else {
-      return smallestValidCardinality(preaggNumGroup, aggInputCardinality);
+      return MathUtil.smallestValidCardinality(preaggNumGroup, 
aggInputCardinality);
     }
   }
 
@@ -578,10 +577,10 @@ public class AggregationNode extends PlanNode implements 
SpillableOperator {
             Preconditions.checkState(
                 filteredNdv > 0, "filteredNdv must be greater than 0.");
             ndvBasedNumGroup =
-                MathUtil.saturatingMultiplyCardinalities(ndvBasedNumGroup, 
filteredNdv);
+                MathUtil.multiplyCardinalities(ndvBasedNumGroup, filteredNdv);
           }
           long numGroupFromCommonTuple =
-              smallestValidCardinality(producerCardinality, ndvBasedNumGroup);
+              MathUtil.smallestValidCardinality(producerCardinality, 
ndvBasedNumGroup);
 
           if (numGroupFromCommonTuple < 0) {
             // Can not reason about tuple cardinality.
@@ -600,9 +599,9 @@ public class AggregationNode extends PlanNode implements 
SpillableOperator {
     }
     if (numGroups < 0) return numGroups;
     for (Long entry : tupleBasedNumGroups) {
-      numGroups = MathUtil.saturatingMultiplyCardinalities(numGroups, entry);
+      numGroups = MathUtil.multiplyCardinalities(numGroups, entry);
     }
-    return smallestValidCardinality(numGroups, aggInputCardinality);
+    return MathUtil.smallestValidCardinality(numGroups, aggInputCardinality);
   }
 
   /**
@@ -620,7 +619,7 @@ public class AggregationNode extends PlanNode implements 
SpillableOperator {
     // Also, worst-case output cardinality is better than an unknown output 
cardinality.
     // Note that this will still be -1 (unknown) if both numGroups
     // and aggInputCardinality is unknown.
-    return smallestValidCardinality(numGroups, aggInputCardinality);
+    return MathUtil.smallestValidCardinality(numGroups, aggInputCardinality);
   }
 
   /**
@@ -936,8 +935,8 @@ public class AggregationNode extends PlanNode implements 
SpillableOperator {
         long aggClassOutputCardinality = estimatePreaggDuplicate ?
             prevAggNode.aggClassOutputCardinality_.get(aggIdx) :
             prevAggNode.aggClassNumGroups_.get(aggIdx);
-        inputCardinality =
-            smallestValidCardinality(inputCardinality, 
aggClassOutputCardinality);
+        inputCardinality = MathUtil.smallestValidCardinality(
+            inputCardinality, aggClassOutputCardinality);
       }
       resourceProfiles_.add(computeAggClassResourceProfile(
           queryOptions, aggInfo, inputCardinality, 
maxMemoryEstimatePerInstance));
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index a9824908e..eada3f73a 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -37,6 +37,7 @@ import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.thrift.TPartitionType;
 import org.apache.impala.thrift.TVirtualColumnType;
 import org.apache.impala.util.KuduUtil;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1347,7 +1348,7 @@ public class DistributedPlanner {
       Preconditions.checkState(node == childSortNode);
       if (hasLimit) {
         childSortNode.unsetLimit();
-        childSortNode.setLimit(PlanNode.checkedAdd(limit, offset));
+        childSortNode.setLimit(MathUtil.addCardinalities(limit, offset));
       }
       childSortNode.setOffset(0);
     }
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java 
b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 11971e151..cb0729f44 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -34,11 +34,11 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
-import org.apache.impala.util.ExprUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.math.LongMath;
 
 /**
  * Receiver side of a 1:n data stream. Logically, an ExchangeNode consumes the 
data
@@ -314,7 +314,7 @@ public class ExchangeNode extends PlanNode {
     long estimatedDeferredRPCQueueSize = 
estimateDeferredRPCQueueSize(queryOptions,
         numSenders);
     long estimatedMem = Math.max(
-        checkedAdd(estimatedTotalQueueByteSize, estimatedDeferredRPCQueueSize),
+        LongMath.saturatedAdd(estimatedTotalQueueByteSize, 
estimatedDeferredRPCQueueSize),
         MIN_ESTIMATE_BYTES);
     nodeResourceProfile_ = ResourceProfile.noReservation(estimatedMem);
   }
@@ -344,7 +344,7 @@ public class ExchangeNode extends PlanNode {
     // queries without stats.
     long estimatedTotalQueueByteSize = numQueues * maxQueueByteSize;
     // Set an upper limit based on estimated cardinality.
-    if (hasValidStats()) {
+    if (getCardinality() > -1 && (isBroadcastExchange() || getNumNodes() > 0)) 
{
       long totalBytesToReceive = (long) Math.ceil(getAvgRowSize() * 
getCardinality());
       // Assuming no skew in distribution during data shuffling.
       long bytesToReceivePerExchNode = isBroadcastExchange() ? 
totalBytesToReceive
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index f642905e2..87e28f776 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -347,14 +347,14 @@ public class HBaseScanNode extends ScanNode {
         }
       }
     }
+
+    // Safe guard for cardinality_ < -1, e.g. when hbase sampling fails and 
numRows
+    // in HMS is abnormally set to be < -1.
+    cardinality_ = Math.max(-1, cardinality_);
     inputCardinality_ = cardinality_;
 
     if (cardinality_ > 0) {
       cardinality_ = applyConjunctsSelectivity(cardinality_);
-    } else {
-      // Safe guard for cardinality_ < -1, e.g. when hbase sampling fails and 
numRows
-      // in HMS is abnormally set to be < -1.
-      cardinality_ = Math.max(-1, cardinality_);
     }
     cardinality_ = capCardinalityAtLimit(cardinality_);
     if (LOG.isTraceEnabled()) {
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 3b054b71e..9bf082308 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -37,6 +37,7 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.util.BitUtil;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -265,9 +266,7 @@ public class HashJoinNode extends JoinNode implements 
SpillableOperator {
       for (Expr eqJoinPredicate: eqJoinConjuncts_) {
         long rhsPdNdv = getNdv(eqJoinPredicate.getChild(1));
         rhsPdNdv = Math.min(rhsPdNdv, rhsCard);
-        if (rhsPdNdv != -1) {
-          rhsNdv = PlanNode.checkedMultiply(rhsNdv, rhsPdNdv);
-        }
+        if (rhsPdNdv != -1) rhsNdv = MathUtil.multiplyCardinalities(rhsNdv, 
rhsPdNdv);
       }
       // The memory of the data stored in hash table and
       // the memory of the hash tableā€˜s structure
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 80a59ec99..446add178 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1678,18 +1678,21 @@ public class HdfsScanNode extends ScanNode {
     }
 
     // Adjust cardinality for all collections referenced along the tuple's 
path.
-    if (cardinality_ != -1) {
+    if (cardinality_ > 0) {
       for (Type t: desc_.getPath().getMatchedTypes()) {
-        if (t.isCollectionType()) cardinality_ *= 
PlannerContext.AVG_COLLECTION_SIZE;
+        if (t.isCollectionType()) {
+          cardinality_ = MathUtil.multiplyCardinalities(
+              cardinality_, PlannerContext.AVG_COLLECTION_SIZE);
+        }
       }
     }
-    inputCardinality_ = cardinality_;
 
     // Sanity check scan node cardinality.
     if (cardinality_ < -1) {
       hasCorruptTableStats_ = true;
       cardinality_ = -1;
     }
+    inputCardinality_ = cardinality_;
 
     if (cardinality_ > 0) {
       if (LOG.isTraceEnabled()) {
@@ -1760,7 +1763,7 @@ public class HdfsScanNode extends ScanNode {
       } else if (partNumRows > -1) {
         // Consider partition with good stats.
         if (partitionNumRows_ == -1) partitionNumRows_ = 0;
-        partitionNumRows_ = checkedAdd(partitionNumRows_, partNumRows);
+        partitionNumRows_ = MathUtil.addCardinalities(partitionNumRows_, 
partNumRows);
         ++numPartitionsWithNumRows_;
       }
     }
@@ -2428,8 +2431,9 @@ public class HdfsScanNode extends ScanNode {
       Preconditions.checkState(maxScannerThreads == 1);
       perThreadIoBuffers = 2;
     }
-    long perInstanceMemEstimate = checkedMultiply(
-        checkedMultiply(maxScannerThreads, perThreadIoBuffers), 
maxIoBufferSize);
+    long perInstanceMemEstimate = MathUtil.multiplyCardinalities(
+        MathUtil.multiplyCardinalities(maxScannerThreads, perThreadIoBuffers),
+        maxIoBufferSize);
 
     // Sanity check: the tighter estimation should not exceed the per-host 
maximum.
     long perHostUpperBound = getPerHostMemUpperBound();
@@ -2811,8 +2815,8 @@ public class HdfsScanNode extends ScanNode {
           // This is a simple in-list predicate.
           Preconditions.checkState(statsConjunct.getChildCount() > 1,
               "InPredicate must have at least two child expressions");
-          ndvMult = MathUtil.saturatingMultiplyCardinalities(
-              ndvMult, statsConjunct.getChildCount() - 1);
+          ndvMult =
+              MathUtil.multiplyCardinalities(ndvMult, 
statsConjunct.getChildCount() - 1);
           it.remove();
         } else if (statsConjunct instanceof IsNullPredicate) {
           // This is an is-null predicate.
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index d739768b6..4bfcb656b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -39,6 +39,7 @@ import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
 import org.apache.impala.util.BitUtil;
+import org.apache.impala.util.MathUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -222,8 +223,8 @@ public class HdfsTableSink extends TableSink {
           Math.max(1L, inputNode.getCardinality() / numInstances);
       long perInstanceInputBytes =
           (long) Math.ceil(perInstanceInputCardinality * 
inputNode.getAvgRowSize());
-      long perInstanceMemReq =
-          PlanNode.checkedMultiply(numBufferedPartitionsPerInstance, 
perPartitionMemReq);
+      long perInstanceMemReq = MathUtil.multiplyCardinalities(
+          numBufferedPartitionsPerInstance, perPartitionMemReq);
       perInstanceMemEstimate = Math.min(perInstanceInputBytes, 
perInstanceMemReq);
     }
     resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate);
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
index 30877e5b0..ab38561b2 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
@@ -35,6 +35,7 @@ import org.apache.impala.thrift.TIcebergDeleteNode;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -169,9 +170,7 @@ public class IcebergDeleteNode extends JoinNode {
     for (Expr eqJoinPredicate : eqJoinConjuncts_) {
       long rhsPdNdv = getNdv(eqJoinPredicate.getChild(1));
       rhsPdNdv = Math.min(rhsPdNdv, rhsCard);
-      if (rhsPdNdv != -1) {
-        rhsNdv = PlanNode.checkedMultiply(rhsNdv, rhsPdNdv);
-      }
+      if (rhsPdNdv != -1) rhsNdv = MathUtil.multiplyCardinalities(rhsNdv, 
rhsPdNdv);
     }
 
     // The memory of the data stored in hash table is the file path of the 
data files
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index 91754ab4c..b59d51519 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -41,6 +41,7 @@ import org.apache.impala.common.ThriftSerializationCtx;
 import org.apache.impala.fb.FbIcebergDataFileFormat;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,19 +130,29 @@ public class IcebergScanNode extends HdfsScanNode {
         for (FileDescriptor fd : sampledFileDescs) {
           Preconditions.checkState(fd instanceof IcebergFileDescriptor);
           IcebergFileDescriptor iceFd = (IcebergFileDescriptor) fd;
-          cardinality_ += 
iceFd.getFbFileMetadata().icebergMetadata().recordCount();
+          cardinality_ = MathUtil.addCardinalities(
+              cardinality_, 
iceFd.getFbFileMetadata().icebergMetadata().recordCount());
         }
       }
     } else {
       for (IcebergFileDescriptor fd : fileDescs_) {
-        cardinality_ += fd.getFbFileMetadata().icebergMetadata().recordCount();
+        cardinality_ = MathUtil.addCardinalities(
+            cardinality_, 
fd.getFbFileMetadata().icebergMetadata().recordCount());
       }
     }
 
     // Adjust cardinality for all collections referenced along the tuple's 
path.
-    for (Type t: desc_.getPath().getMatchedTypes()) {
-      if (t.isCollectionType()) cardinality_ *= 
PlannerContext.AVG_COLLECTION_SIZE;
+    if (cardinality_ > 0) {
+      for (Type t : desc_.getPath().getMatchedTypes()) {
+        if (t.isCollectionType()) {
+          cardinality_ = MathUtil.multiplyCardinalities(
+              cardinality_, PlannerContext.AVG_COLLECTION_SIZE);
+        }
+      }
     }
+
+    // Sanity check scan node cardinality.
+    cardinality_ = Math.max(-1, cardinality_);
     inputCardinality_ = cardinality_;
 
     if (cardinality_ > 0) {
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 86597b345..4dba8c643 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -42,11 +42,11 @@ import org.apache.impala.common.ThriftSerializationCtx;
 import org.apache.impala.planner.TupleCacheInfo.IneligibilityReason;
 import org.apache.impala.thrift.TEqJoinCondition;
 import org.apache.impala.thrift.TExecNodePhase;
-import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TJoinDistributionMode;
 import org.apache.impala.thrift.TJoinNode;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -827,7 +827,7 @@ public abstract class JoinNode extends PlanNode {
       long leftCard = getChild(0).cardinality_;
       long rightCard = getChild(1).cardinality_;
       if (leftCard != -1 && rightCard != -1) {
-        cardinality_ = checkedMultiply(leftCard, rightCard);
+        cardinality_ = MathUtil.multiplyCardinalities(leftCard, rightCard);
       }
     }
 
@@ -835,6 +835,11 @@ public abstract class JoinNode extends PlanNode {
     long leftCard = getChild(0).cardinality_;
     long rightCard = getChild(1).cardinality_;
     switch (joinOp_) {
+      case INNER_JOIN: {
+        // Do nothing. It is already handled above.
+        // This is here just to complete the enum cases.
+        break;
+      }
       case LEFT_SEMI_JOIN: {
         if (leftCard != -1) {
           cardinality_ = Math.min(leftCard, cardinality_);
@@ -861,7 +866,7 @@ public abstract class JoinNode extends PlanNode {
       }
       case FULL_OUTER_JOIN: {
         if (leftCard != -1 && rightCard != -1) {
-          long cardinalitySum = checkedAdd(leftCard, rightCard);
+          long cardinalitySum = MathUtil.addCardinalities(leftCard, rightCard);
           cardinality_ = Math.max(cardinalitySum, cardinality_);
         }
         break;
@@ -884,8 +889,8 @@ public abstract class JoinNode extends PlanNode {
         if (getChild(0).cardinality_ == -1 || getChild(1).cardinality_ == -1) {
           cardinality_ = -1;
         } else {
-          cardinality_ = checkedMultiply(getChild(0).cardinality_,
-              getChild(1).cardinality_);
+          cardinality_ = MathUtil.multiplyCardinalities(
+              getChild(0).cardinality_, getChild(1).cardinality_);
         }
         break;
       }
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index c15ad2054..d1dbe6d25 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -32,7 +32,7 @@ import org.apache.impala.thrift.TNestedLoopJoinNode;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
-import org.apache.impala.util.ExprUtil;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,12 +91,12 @@ public class NestedLoopJoinNode extends JoinNode {
       TQueryOptions queryOptions) {
     // TODO: This seems a bug below that the total data is not divided by 
numInstances_.
     long perInstanceMemEstimate;
-    if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
-        || numNodes_ == 0) {
+    long rhsChildNode = getChild(1).getCardinality();
+    if (rhsChildNode == -1 || getChild(1).getAvgRowSize() == -1 || numNodes_ 
== 0) {
       perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
     } else {
-      perInstanceMemEstimate =
-          (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_);
+      double memMult = rhsChildNode * getChild(1).avgRowSize_;
+      perInstanceMemEstimate = (long) Math.ceil(memMult);
     }
     ResourceProfile buildProfile = 
ResourceProfile.noReservation(perInstanceMemEstimate);
     // Memory requirements for the probe side are minimal - batches are just 
streamed
@@ -113,7 +113,7 @@ public class NestedLoopJoinNode extends JoinNode {
     // We return the full cost in the first element of the Pair.
     long probeCardinality = getProbeCardinalityForCosting();
     long buildCardinality = Math.max(0, getChild(1).getCardinality());
-    long cardProduct = checkedMultiply(probeCardinality, buildCardinality);
+    long cardProduct = MathUtil.multiplyCardinalities(probeCardinality, 
buildCardinality);
     long perInstanceBuildCardinality =
         (long) Math.ceil(buildCardinality / 
fragment_.getNumInstancesForCosting());
     double totalCost = 0.0F;
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index d994edddb..80c9bd979 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -38,6 +38,7 @@ import org.apache.impala.thrift.TPartitionType;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TPlanFragmentTree;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -638,7 +639,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       if (dataPartition_.getPartitionExprs().contains(expr)) {
         partition = true;
       }
-      result = PlanNode.checkedMultiply(result, numDistinct);
+      result = MathUtil.multiplyCardinalities(result, numDistinct);
       maxNdv = Math.max(maxNdv, numDistinct);
     }
     if (partition) {
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 0da260c60..08918228b 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -59,6 +59,7 @@ import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TQueryOptionsHash;
 import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.ExprUtil;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +67,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.math.LongMath;
 
 /**
  * Each PlanNode represents a single relational operator
@@ -632,6 +632,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   public void init(Analyzer analyzer) throws ImpalaException {
     assignConjuncts(analyzer);
     computeStats(analyzer);
+    validateCardinality();
     createDefaultSmap(analyzer);
   }
 
@@ -748,8 +749,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     }
   }
 
+  protected void validateCardinality() { Preconditions.checkState(cardinality_ 
>= -1); }
+
   protected long capCardinalityAtLimit(long cardinality) {
-    return smallestValidCardinality(cardinality, limit_);
+    return MathUtil.smallestValidCardinality(cardinality, limit_);
   }
 
   // Default implementation of computing the total data processed in bytes.
@@ -949,6 +952,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
 
   /**
    * Returns true if stats-related variables are valid.
+   * Only use this for Preconditions.
    */
   public boolean hasValidStats() {
     return (numNodes_ == -1 || numNodes_ >= 0) &&
@@ -956,32 +960,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
            (cardinality_ == -1 || cardinality_ >= 0);
   }
 
-  /**
-   * Computes and returns the sum of two long values. If an overflow occurs,
-   * the maximum Long value is returned (Long.MAX_VALUE).
-   */
-  public static long checkedAdd(long a, long b) {
-    try {
-      return LongMath.checkedAdd(a, b);
-    } catch (ArithmeticException e) {
-      LOG.warn("overflow when adding longs: " + a + ", " + b);
-      return Long.MAX_VALUE;
-    }
-  }
-
-  /**
-   * Computes and returns the product of two cardinalities. If an overflow
-   * occurs, the maximum Long value is returned (Long.MAX_VALUE).
-   */
-  public static long checkedMultiply(long a, long b) {
-    try {
-      return LongMath.checkedMultiply(a, b);
-    } catch (ArithmeticException e) {
-      LOG.warn("overflow when multiplying longs: " + a + ", " + b);
-      return Long.MAX_VALUE;
-    }
-  }
-
   /**
    * Returns true if this plan node can output its first row only after 
consuming
    * all rows of all its children. This method is used to group plan nodes
@@ -1144,7 +1122,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> 
{
     for(PlanNode p : children_) {
       long tmp = p.getCardinality();
       if (tmp == -1) return -1;
-      sum = checkedAdd(sum, tmp);
+      sum = MathUtil.addCardinalities(sum, tmp);
     }
     return sum;
   }
@@ -1413,24 +1391,6 @@ abstract public class PlanNode extends 
TreeNode<PlanNode> {
    */
   public boolean omitTupleCache() { return false; }
 
-  /**
-   * Return the least between 'cardinality1' and 'cardinality2'
-   * that is not a negative number (unknown).
-   * Can return -1 if both number is less than 0.
-   * Both argument should not be < -1.
-   */
-  protected static long smallestValidCardinality(long cardinality1, long 
cardinality2) {
-    Preconditions.checkArgument(
-        cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1);
-    Preconditions.checkArgument(
-        cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2);
-    if (cardinality1 >= 0) {
-      if (cardinality2 >= 0) return Math.min(cardinality1, cardinality2);
-      return cardinality1;
-    }
-    return Math.max(-1, cardinality2);
-  }
-
   /**
    * Return True if Impala Coordinator node has scratch_dirs flag configured 
and
    * given 'queryOptions' allows taking spilling to disk into account [when 
calculating
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index efd6d149a..8089e1508 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -114,6 +114,12 @@ abstract public class ScanNode extends PlanNode {
     hasHardEstimates_ = !hasScanConjuncts() && !isAccessingCollectionType();
   }
 
+  @Override
+  protected void validateCardinality() {
+    Preconditions.checkState(cardinality_ >= -1);
+    Preconditions.checkState(inputCardinality_ >= -1);
+  }
+
   public TupleDescriptor getTupleDesc() { return desc_; }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java 
b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 8ad65ce3f..95f2e0d42 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -35,6 +35,7 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
 import org.apache.impala.thrift.TSortNode;
 import org.apache.impala.thrift.TSortType;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,7 +126,7 @@ public class SortNode extends PlanNode implements 
SpillableOperator {
       PlanNodeId id, PlanNode input, SortInfo info, long offset, long limit,
       boolean includeTies) {
     long topNBytesLimit = queryOptions.topn_bytes_limit;
-    long topNCardinality = smallestValidCardinality(input.cardinality_, limit);
+    long topNCardinality = 
MathUtil.smallestValidCardinality(input.cardinality_, limit);
     long estimatedTopNMaterializedSize =
         info.estimateTopNMaterializedSize(topNCardinality, offset);
 
@@ -221,7 +222,8 @@ public class SortNode extends PlanNode implements 
SpillableOperator {
     Preconditions.checkState(!hasLimit());
     Preconditions.checkState(!hasOffset());
     long topNBytesLimit = analyzer.getQueryOptions().topn_bytes_limit;
-    long topNCardinality = smallestValidCardinality(getChild(0).cardinality_, 
limit);
+    long topNCardinality =
+        MathUtil.smallestValidCardinality(getChild(0).cardinality_, limit);
     long estimatedTopNMaterializedSize =
         info_.estimateTopNMaterializedSize(topNCardinality, offset_);
 
@@ -302,7 +304,8 @@ public class SortNode extends PlanNode implements 
SpillableOperator {
   public void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
     if (isTypeTopN() && includeTies_) {
-      cardinality_ = smallestValidCardinality(getChild(0).cardinality_, 
limitWithTies_);
+      cardinality_ =
+          MathUtil.smallestValidCardinality(getChild(0).cardinality_, 
limitWithTies_);
     } else {
       cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_);
     }
@@ -312,7 +315,8 @@ public class SortNode extends PlanNode implements 
SpillableOperator {
       List<Expr> partExprs = info_.getSortExprs().subList(0, 
numPartitionExprs_);
       long partNdv = numPartitionExprs_ == 0 ? 1 : 
Expr.getNumDistinctValues(partExprs);
       if (partNdv >= 0) {
-        long maxRowsInHeaps = checkedMultiply(partNdv, getPerPartitionLimit());
+        long maxRowsInHeaps =
+            MathUtil.multiplyCardinalities(partNdv, getPerPartitionLimit());
         if (cardinality_ < 0 || cardinality_ > maxRowsInHeaps) {
           cardinality_ = maxRowsInHeaps;
         }
diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java 
b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
index 09a6ac7d8..9df5e19db 100644
--- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
@@ -27,6 +27,7 @@ import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.MathUtil;
 
 import com.google.common.base.Preconditions;
 
@@ -88,8 +89,8 @@ public class SubplanNode extends PlanNode {
   public void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
     if (getChild(0).cardinality_ != -1 && getChild(1).cardinality_ != -1) {
-      cardinality_ =
-          checkedMultiply(getChild(0).cardinality_, getChild(1).cardinality_);
+      cardinality_ = MathUtil.multiplyCardinalities(
+          getChild(0).cardinality_, getChild(1).cardinality_);
     } else {
       cardinality_ = -1;
     }
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java 
b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index 7183a6dcf..c25570dec 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -34,6 +34,7 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TUnionNode;
+import org.apache.impala.util.MathUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,7 +126,8 @@ public class UnionNode extends PlanNode {
       // ignore missing child cardinality info in the hope it won't matter 
enough
       // to change the planning outcome
       if (child.cardinality_ >= 0) {
-        totalChildCardinality = checkedAdd(totalChildCardinality, 
child.cardinality_);
+        totalChildCardinality =
+            MathUtil.addCardinalities(totalChildCardinality, 
child.cardinality_);
         haveChildWithCardinality = true;
       }
       // Union fragments are scheduled on the union of hosts of all scans in 
the fragment
@@ -139,7 +141,8 @@ public class UnionNode extends PlanNode {
     // Consider estimate valid if we have at least one child with known 
cardinality, or
     // only constant values.
     if (haveChildWithCardinality || children_.size() == 0) {
-      cardinality_ = checkedAdd(totalChildCardinality, constExprLists_.size());
+      cardinality_ =
+          MathUtil.addCardinalities(totalChildCardinality, 
constExprLists_.size());
     } else {
       cardinality_ = -1;
     }
@@ -163,8 +166,8 @@ public class UnionNode extends PlanNode {
     for (int i = firstMaterializedChildIdx_; i < resultExprLists_.size(); i++) 
{
       PlanNode child = children_.get(i);
       if (child.cardinality_ >= 0) {
-        totalMaterializedCardinality =
-            checkedAdd(totalMaterializedCardinality, Math.max(0, 
child.cardinality_));
+        totalMaterializedCardinality = MathUtil.addCardinalities(
+            totalMaterializedCardinality, Math.max(0, child.cardinality_));
       }
     }
     long estBytesMaterialized =
diff --git a/fe/src/main/java/org/apache/impala/util/MathUtil.java 
b/fe/src/main/java/org/apache/impala/util/MathUtil.java
index 0b2d01557..0091f071f 100644
--- a/fe/src/main/java/org/apache/impala/util/MathUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MathUtil.java
@@ -20,40 +20,79 @@ package org.apache.impala.util;
 import com.google.common.base.Preconditions;
 import com.google.common.math.LongMath;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class MathUtil {
+  private final static Logger LOG = LoggerFactory.getLogger(MathUtil.class);
 
   // Multiply two numbers. If the multiply would overflow, return either 
Long.MIN_VALUE
-  // (if a xor b is negative) or Long.MAX_VALUE otherwise. The overflow path 
is not
-  // optimised at all and may be somewhat slow.
+  // (if a xor b is negative) or Long.MAX_VALUE otherwise.
   public static long saturatingMultiply(long a, long b) {
+    return LongMath.saturatedMultiply(a, b);
+  }
+
+  // Add two numbers. If the add would overflow, return either Long.MAX_VALUE 
if both are
+  // positive or Long.MIN_VALUE if both are negative.
+  public static long saturatingAdd(long a, long b) {
+    return LongMath.saturatedAdd(a, b);
+  }
+
+  /**
+   * Computes and returns the sum of two cardinality numbers. If an overflow 
occurs,
+   * the maximum Long value is returned (Long.MAX_VALUE).
+   * Both number should be a valid cardinality number (>= -1).
+   * Return -1 if any argument is -1.
+   */
+  public static long addCardinalities(long cardinality1, long cardinality2) {
+    Preconditions.checkArgument(
+        cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1);
+    Preconditions.checkArgument(
+        cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2);
+    if (cardinality1 == -1 || cardinality2 == -1) return -1;
     try {
-      return LongMath.checkedMultiply(a, b);
+      return LongMath.checkedAdd(cardinality1, cardinality2);
     } catch (ArithmeticException e) {
-      return a < 0 != b < 0 ? Long.MIN_VALUE : Long.MAX_VALUE;
+      LOG.warn("overflow when adding longs: " + cardinality1 + ", " + 
cardinality2);
+      return Long.MAX_VALUE;
     }
   }
 
-  // Multiply two cardinality numbers like saturatingMultiply() but with 
additional
-  // precondition check that both must be a valid cardinality value.
-  // Return -1 if any argument is -1.
-  public static long saturatingMultiplyCardinalities(
-      long cardinality1, long cardinality2) {
+  /**
+   * Computes and returns the product of two cardinality numbers. If an 
overflow
+   * occurs, the maximum Long value is returned (Long.MAX_VALUE).
+   * Both number should be a valid cardinality number (>= -1).
+   * Return -1 if any argument is -1.
+   */
+  public static long multiplyCardinalities(long cardinality1, long 
cardinality2) {
     Preconditions.checkArgument(
         cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1);
     Preconditions.checkArgument(
         cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2);
     if (cardinality1 == -1 || cardinality2 == -1) return -1;
-    return saturatingMultiply(cardinality1, cardinality2);
-  }
-
-  // Add two numbers. If the add would overflow, return either Long.MAX_VALUE 
if both are
-  // positive or Long.MIN_VALUE if both are negative. The overflow path is not 
optimised
-  // at all and may be somewhat slow.
-  public static long saturatingAdd(long a, long b) {
     try {
-      return LongMath.checkedAdd(a, b);
+      return LongMath.checkedMultiply(cardinality1, cardinality2);
     } catch (ArithmeticException e) {
-      return a < 0 ? Long.MIN_VALUE : Long.MAX_VALUE;
+      LOG.warn("overflow when multiplying longs: " + cardinality1 + ", " + 
cardinality2);
+      return Long.MAX_VALUE;
+    }
+  }
+
+  /**
+   * Return the least between 'cardinality1' and 'cardinality2'
+   * that is not a negative number (unknown).
+   * Can return -1 if both number is less than 0.
+   * Both argument should not be < -1.
+   */
+  public static long smallestValidCardinality(long cardinality1, long 
cardinality2) {
+    Preconditions.checkArgument(
+        cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1);
+    Preconditions.checkArgument(
+        cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2);
+    if (cardinality1 >= 0) {
+      if (cardinality2 >= 0) return Math.min(cardinality1, cardinality2);
+      return cardinality1;
     }
+    return Math.max(-1, cardinality2);
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java 
b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
index 6ca710cd4..8b760b39c 100644
--- a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
@@ -1313,30 +1313,6 @@ public class CardinalityTest extends PlannerTestBase {
         expected * CARDINALITY_TOLERANCE);
   }
 
-  @Test
-  public void testSmallestValidCardinality() {
-    long[] validCard = {0, 1, Long.MAX_VALUE};
-    long unknown = -1;
-
-    // Case 1: both argument is valid.
-    for (long c1 : validCard) {
-      for (long c2 : validCard) {
-        assertEquals(c1 + " vs " + c2, Math.min(c1, c2),
-            PlanNode.smallestValidCardinality(c1, c2));
-      }
-    }
-    // Case 2: One argument is valid, the other is unknown.
-    for (long c : validCard) {
-      assertEquals(
-          c + " vs " + unknown, c, PlanNode.smallestValidCardinality(c, 
unknown));
-      assertEquals(
-          unknown + " vs " + c, c, PlanNode.smallestValidCardinality(unknown, 
c));
-    }
-    // Case 3: both argument is unknown.
-    assertEquals(unknown + " vs " + unknown, unknown,
-        PlanNode.smallestValidCardinality(unknown, unknown));
-  }
-
   @Test
   public void testEstimatePreaggCardinality() {
     List<Long> positiveLong = Arrays.asList(1L, 2L, 5L, 10L, 100L, 1000L, 
Long.MAX_VALUE);
@@ -1360,7 +1336,7 @@ public class CardinalityTest extends PlannerTestBase {
             assertTrue(message + ", expect=0", outputCard == 0);
           } else if (inputCard == -1 || totalInstances == 1) {
             long leastInputVsNdv =
-                PlanNode.smallestValidCardinality(inputCard, globalNdv);
+                MathUtil.smallestValidCardinality(inputCard, globalNdv);
             assertTrue(
                 message + ", expect=" + leastInputVsNdv, outputCard == 
leastInputVsNdv);
           } else {
@@ -1373,10 +1349,9 @@ public class CardinalityTest extends PlannerTestBase {
           message = String.format(
               pattern, totalInstances, globalNdv, inputCard, true, false, -1, 
outputCard);
           assertTrue(message + ", expect>=0", outputCard >= 0);
-          long allDuplicate =
-              MathUtil.saturatingMultiplyCardinalities(globalNdv, 
totalInstances);
+          long allDuplicate = MathUtil.multiplyCardinalities(globalNdv, 
totalInstances);
           long leastInputVsAllDuplicate =
-              PlanNode.smallestValidCardinality(inputCard, allDuplicate);
+              MathUtil.smallestValidCardinality(inputCard, allDuplicate);
           if (allDuplicate < inputCard) {
             assertTrue(message + ", expect=" + leastInputVsAllDuplicate,
                 outputCard == leastInputVsAllDuplicate);
@@ -1390,10 +1365,9 @@ public class CardinalityTest extends PlannerTestBase {
             message = String.format(pattern, totalInstances, globalNdv, 
inputCard, false,
                 true, limit, outputCard);
             assertTrue(message + ", expect>=0", outputCard >= 0);
-            long allAtLimit =
-                MathUtil.saturatingMultiplyCardinalities(totalInstances, 
limit);
+            long allAtLimit = MathUtil.multiplyCardinalities(totalInstances, 
limit);
             long leastOfAll =
-                PlanNode.smallestValidCardinality(allAtLimit, 
leastInputVsAllDuplicate);
+                MathUtil.smallestValidCardinality(allAtLimit, 
leastInputVsAllDuplicate);
             assertTrue(message + ", expect=" + leastOfAll, outputCard == 
leastOfAll);
           }
         }
diff --git a/fe/src/test/java/org/apache/impala/util/MathUtilTest.java 
b/fe/src/test/java/org/apache/impala/util/MathUtilTest.java
index 581ecf145..58d8de743 100644
--- a/fe/src/test/java/org/apache/impala/util/MathUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/util/MathUtilTest.java
@@ -19,6 +19,8 @@ package org.apache.impala.util;
 
 import static org.junit.Assert.*;
 
+import com.google.common.math.LongMath;
+
 import org.junit.Test;
 
 /**
@@ -45,27 +47,42 @@ public class MathUtilTest {
   }
 
   @Test
-  public void testSaturatingMultiplyCardinality() {
+  public void testPlanNodeMultiplyCardinalities() {
     long[] validCard = {0, 1, 2, Long.MAX_VALUE / 2, Long.MAX_VALUE};
     long unknown = -1;
+    long invalid = -2;
 
     // Case 1: both argument is valid.
     for (long c1 : validCard) {
       for (long c2 : validCard) {
-        assertEquals(c1 + " * " + c2, MathUtil.saturatingMultiply(c1, c2),
-            MathUtil.saturatingMultiplyCardinalities(c1, c2));
+        assertEquals(c1 + " * " + c2, LongMath.saturatedMultiply(c1, c2),
+            MathUtil.multiplyCardinalities(c1, c2));
       }
     }
     // Case 2: One argument is valid, the other is unknown.
     for (long c : validCard) {
-      assertEquals(c + " * " + unknown, unknown,
-          MathUtil.saturatingMultiplyCardinalities(c, unknown));
-      assertEquals(unknown + " * " + c, unknown,
-          MathUtil.saturatingMultiplyCardinalities(unknown, c));
+      assertEquals(
+          c + " * " + unknown, unknown, MathUtil.multiplyCardinalities(c, 
unknown));
+      assertEquals(
+          unknown + " * " + c, unknown, 
MathUtil.multiplyCardinalities(unknown, c));
     }
     // Case 3: both argument is unknown.
     assertEquals(unknown + " * " + unknown, unknown,
-        MathUtil.saturatingMultiplyCardinalities(unknown, unknown));
+        MathUtil.multiplyCardinalities(unknown, unknown));
+    // Case 4: one argument is invalid.
+    try {
+      MathUtil.multiplyCardinalities(invalid, 1);
+      fail("Expected IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("cardinality1 is invalid: -2"));
+    }
+    // Case 5: first argument is unknown and second argument is invalid.
+    try {
+      MathUtil.multiplyCardinalities(unknown, invalid);
+      fail("Expected IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("cardinality2 is invalid: -2"));
+    }
   }
 
   @Test
@@ -82,4 +99,80 @@ public class MathUtilTest {
     assertEquals(Long.MAX_VALUE, MathUtil.saturatingAdd(Long.MAX_VALUE - 10, 
11));
     assertEquals(Long.MAX_VALUE, MathUtil.saturatingAdd(Long.MAX_VALUE, 
Long.MAX_VALUE / 2));
   }
+
+  @Test
+  public void testPlanNodeAddCardinalities() {
+    long[] validCard = {0, 1, 2, Long.MAX_VALUE / 2, Long.MAX_VALUE};
+    long unknown = -1;
+    long invalid = -2;
+
+    // Case 1: both argument is valid.
+    for (long c1 : validCard) {
+      for (long c2 : validCard) {
+        assertEquals(c1 + " * " + c2, LongMath.saturatedAdd(c1, c2),
+            MathUtil.addCardinalities(c1, c2));
+      }
+    }
+    // Case 2: One argument is valid, the other is unknown.
+    for (long c : validCard) {
+      assertEquals(c + " * " + unknown, unknown, MathUtil.addCardinalities(c, 
unknown));
+      assertEquals(unknown + " * " + c, unknown, 
MathUtil.addCardinalities(unknown, c));
+    }
+    // Case 3: both argument is unknown.
+    assertEquals(
+        unknown + " * " + unknown, unknown, MathUtil.addCardinalities(unknown, 
unknown));
+    // Case 4: one argument is invalid.
+    try {
+      MathUtil.addCardinalities(invalid, 1);
+      fail("Expected IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("cardinality1 is invalid: -2"));
+    }
+    // Case 5: first argument is unknown and second argument is invalid.
+    try {
+      MathUtil.addCardinalities(unknown, invalid);
+      fail("Expected IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("cardinality2 is invalid: -2"));
+    }
+  }
+
+  @Test
+  public void testSmallestValidCardinality() {
+    long[] validCard = {0, 1, Long.MAX_VALUE};
+    long unknown = -1;
+    long invalid = -2;
+
+    // Case 1: both argument is valid.
+    for (long c1 : validCard) {
+      for (long c2 : validCard) {
+        assertEquals(c1 + " vs " + c2, Math.min(c1, c2),
+            MathUtil.smallestValidCardinality(c1, c2));
+      }
+    }
+    // Case 2: One argument is valid, the other is unknown.
+    for (long c : validCard) {
+      assertEquals(
+          c + " vs " + unknown, c, MathUtil.smallestValidCardinality(c, 
unknown));
+      assertEquals(
+          unknown + " vs " + c, c, MathUtil.smallestValidCardinality(unknown, 
c));
+    }
+    // Case 3: both argument is unknown.
+    assertEquals(unknown + " vs " + unknown, unknown,
+        MathUtil.smallestValidCardinality(unknown, unknown));
+    // Case 4: one argument is invalid.
+    try {
+      MathUtil.smallestValidCardinality(invalid, 1);
+      fail("Expected IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("cardinality1 is invalid: -2"));
+    }
+    // Case 5: first argument is unknown and second argument is invalid.
+    try {
+      MathUtil.smallestValidCardinality(unknown, invalid);
+      fail("Expected IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("cardinality2 is invalid: -2"));
+    }
+  }
 }
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 9731bf44a..2c0b7c156 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -975,19 +975,19 @@ PLAN-ROOT SINK
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=12.00MB Threads=3
-Per-Host Resource Estimates: Memory=75MB
+Per-Host Resource Estimates: Memory=85MB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 tpch_avro.orders
 Analyzed query: SELECT * FROM tpch_avro.orders
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=10.27MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Host Resources: mem-estimate=20.27MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: tpch_avro.orders.o_orderkey, tpch_avro.orders.o_custkey, 
tpch_avro.orders.o_orderstatus, tpch_avro.orders.o_totalprice, 
tpch_avro.orders.o_orderdate, tpch_avro.orders.o_orderpriority, 
tpch_avro.orders.o_clerk, tpch_avro.orders.o_shippriority, 
tpch_avro.orders.o_comment
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 01:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=275.97KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.27MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=88B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
@@ -1029,19 +1029,19 @@ PLAN-ROOT SINK
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=12.00MB Threads=3
-Per-Host Resource Estimates: Memory=42MB
+Per-Host Resource Estimates: Memory=52MB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 tpch_rc.customer
 Analyzed query: SELECT * FROM tpch_rc.customer
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=10.08MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Host Resources: mem-estimate=20.08MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: tpch_rc.customer.c_custkey, tpch_rc.customer.c_name, 
tpch_rc.customer.c_address, tpch_rc.customer.c_nationkey, 
tpch_rc.customer.c_phone, tpch_rc.customer.c_acctbal, 
tpch_rc.customer.c_mktsegment, tpch_rc.customer.c_comment
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 01:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=81.92KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.08MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=78B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
@@ -1058,19 +1058,19 @@ Per-Host Resources: mem-estimate=32.32MB 
mem-reservation=8.00MB thread-reservati
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=12.00MB Threads=2
-Per-Host Resource Estimates: Memory=42MB
+Per-Host Resource Estimates: Memory=52MB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 tpch_rc.customer
 Analyzed query: SELECT * FROM tpch_rc.customer
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=10.08MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Instance Resources: mem-estimate=20.08MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: tpch_rc.customer.c_custkey, tpch_rc.customer.c_name, 
tpch_rc.customer.c_address, tpch_rc.customer.c_nationkey, 
tpch_rc.customer.c_phone, tpch_rc.customer.c_acctbal, 
tpch_rc.customer.c_mktsegment, tpch_rc.customer.c_comment
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 01:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=81.92KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.08MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=78B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
@@ -1112,19 +1112,19 @@ PLAN-ROOT SINK
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=12.00MB Threads=3
-Per-Host Resource Estimates: Memory=27MB
+Per-Host Resource Estimates: Memory=37MB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 tpcds_seq_snap.web_returns
 Analyzed query: SELECT * FROM tpcds_seq_snap.web_returns
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=10.11MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Host Resources: mem-estimate=20.11MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: tpcds_seq_snap.web_returns.wr_returned_date_sk, 
tpcds_seq_snap.web_returns.wr_returned_time_sk, 
tpcds_seq_snap.web_returns.wr_item_sk, 
tpcds_seq_snap.web_returns.wr_refunded_customer_sk, 
tpcds_seq_snap.web_returns.wr_refunded_cdemo_sk, 
tpcds_seq_snap.web_returns.wr_refunded_hdemo_sk, 
tpcds_seq_snap.web_returns.wr_refunded_addr_sk, 
tpcds_seq_snap.web_returns.wr_returning_customer_sk, 
tpcds_seq_snap.web_returns.wr_returning_cdemo_sk, 
tpcds_seq_snap.web_returns.wr_returning [...]
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 01:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=107.90KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=104B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
@@ -1141,19 +1141,19 @@ Per-Host Resources: mem-estimate=16.42MB 
mem-reservation=8.00MB thread-reservati
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=12.00MB Threads=2
-Per-Host Resource Estimates: Memory=27MB
+Per-Host Resource Estimates: Memory=37MB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 tpcds_seq_snap.web_returns
 Analyzed query: SELECT * FROM tpcds_seq_snap.web_returns
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=10.11MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Instance Resources: mem-estimate=20.11MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: tpcds_seq_snap.web_returns.wr_returned_date_sk, 
tpcds_seq_snap.web_returns.wr_returned_time_sk, 
tpcds_seq_snap.web_returns.wr_item_sk, 
tpcds_seq_snap.web_returns.wr_refunded_customer_sk, 
tpcds_seq_snap.web_returns.wr_refunded_cdemo_sk, 
tpcds_seq_snap.web_returns.wr_refunded_hdemo_sk, 
tpcds_seq_snap.web_returns.wr_refunded_addr_sk, 
tpcds_seq_snap.web_returns.wr_returning_customer_sk, 
tpcds_seq_snap.web_returns.wr_returning_cdemo_sk, 
tpcds_seq_snap.web_returns.wr_returning [...]
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 01:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=107.90KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=104B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
@@ -1325,19 +1325,19 @@ PLAN-ROOT SINK
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=4.09MB Threads=3
-Per-Host Resource Estimates: Memory=27MB
+Per-Host Resource Estimates: Memory=37MB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 functional.alltypesmixedformat
 Analyzed query: SELECT * FROM functional.alltypesmixedformat
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=10.25MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Host Resources: mem-estimate=20.25MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: functional.alltypesmixedformat.id, 
functional.alltypesmixedformat.bool_col, 
functional.alltypesmixedformat.tinyint_col, 
functional.alltypesmixedformat.smallint_col, 
functional.alltypesmixedformat.int_col, 
functional.alltypesmixedformat.bigint_col, 
functional.alltypesmixedformat.float_col, 
functional.alltypesmixedformat.double_col, 
functional.alltypesmixedformat.date_string_col, 
functional.alltypesmixedformat.string_col, 
functional.alltypesmixedformat.timestamp_col, funct [...]
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 01:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=251.97KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.25MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=80B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
@@ -1355,19 +1355,19 @@ Per-Host Resources: mem-estimate=16.33MB 
mem-reservation=88.00KB thread-reservat
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=4.17MB Threads=3
-Per-Host Resource Estimates: Memory=43MB
+Per-Host Resource Estimates: Memory=53MB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 functional.alltypesmixedformat
 Analyzed query: SELECT * FROM functional.alltypesmixedformat
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=10.33MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Instance Resources: mem-estimate=20.33MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: functional.alltypesmixedformat.id, 
functional.alltypesmixedformat.bool_col, 
functional.alltypesmixedformat.tinyint_col, 
functional.alltypesmixedformat.smallint_col, 
functional.alltypesmixedformat.int_col, 
functional.alltypesmixedformat.bigint_col, 
functional.alltypesmixedformat.float_col, 
functional.alltypesmixedformat.double_col, 
functional.alltypesmixedformat.date_string_col, 
functional.alltypesmixedformat.string_col, 
functional.alltypesmixedformat.timestamp_col, funct [...]
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 01:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=335.97KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.33MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=80B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
@@ -1431,7 +1431,7 @@ Per-Host Resources: mem-estimate=592.00KB 
mem-reservation=0B thread-reservation=
      table: rows=unavailable
      columns: unavailable
    mem-estimate=256.00KB mem-reservation=0B thread-reservation=0
-   tuple-ids=0 row-size=80B cardinality=14.23K
+   tuple-ids=0 row-size=80B cardinality=14.30K
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
@@ -4396,7 +4396,7 @@ PLAN-ROOT SINK
    in pipelines: 02(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=253.75MB Threads=11
-Per-Host Resource Estimates: Memory=2.72GB
+Per-Host Resource Estimates: Memory=2.73GB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 tpch_avro.customer
 Analyzed query: SELECT c_name, c_custkey, o_orderkey, o_orderdate, 
o_totalprice,
@@ -4441,7 +4441,7 @@ Per-Host Resources: mem-estimate=138.23MB 
mem-reservation=34.00MB thread-reserva
 |  in pipelines: 02(GETNEXT)
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=2.23GB mem-reservation=144.75MB 
thread-reservation=1 runtime-filters-memory=4.00MB
+Per-Host Resources: mem-estimate=2.24GB mem-reservation=144.75MB 
thread-reservation=1 runtime-filters-memory=4.00MB
 08:AGGREGATE [STREAMING]
 |  output: sum(l_quantity)
 |  group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice
@@ -4497,7 +4497,7 @@ Per-Host Resources: mem-estimate=2.23GB 
mem-reservation=144.75MB thread-reservat
 |  in pipelines: 02(GETNEXT), 00(OPEN)
 |
 |--12:EXCHANGE [BROADCAST]
-|  |  mem-estimate=23.98KB mem-reservation=0B thread-reservation=0
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=0 row-size=20B cardinality=unavailable
 |  |  in pipelines: 00(GETNEXT)
 |  |
@@ -4558,7 +4558,7 @@ Per-Host Resources: mem-estimate=91.23MB 
mem-reservation=11.00MB thread-reservat
    in pipelines: 02(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=477.75MB Threads=16
-Per-Host Resource Estimates: Memory=3.26GB
+Per-Host Resource Estimates: Memory=3.27GB
 Analyzed query: SELECT c_name, c_custkey, o_orderkey, o_orderdate, 
o_totalprice,
 sum(l_quantity) FROM tpch_avro.customer, tpch.orders, tpch.lineitem LEFT SEMI
 JOIN (SELECT l_orderkey FROM tpch.lineitem GROUP BY l_orderkey HAVING
@@ -4665,7 +4665,7 @@ Per-Instance Resources: mem-estimate=139.95MB 
mem-reservation=34.00MB thread-res
 |  in pipelines: 02(GETNEXT), 00(OPEN)
 |
 |--F09:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-|  |  Per-Instance Resources: mem-estimate=2.00GB mem-reservation=69.00MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  Per-Instance Resources: mem-estimate=2.01GB mem-reservation=69.00MB 
thread-reservation=1 runtime-filters-memory=1.00MB
 |  JOIN BUILD
 |  |  join-table-id=01 plan-id=02 cohort-id=01
 |  |  build expressions: c_custkey
@@ -4673,7 +4673,7 @@ Per-Instance Resources: mem-estimate=139.95MB 
mem-reservation=34.00MB thread-res
 |  |  mem-estimate=2.00GB mem-reservation=68.00MB spill-buffer=2.00MB 
thread-reservation=0
 |  |
 |  12:EXCHANGE [BROADCAST]
-|  |  mem-estimate=23.98KB mem-reservation=0B thread-reservation=0
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=0 row-size=20B cardinality=unavailable
 |  |  in pipelines: 00(GETNEXT)
 |  |
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
index a6b05f870..c60bddd0e 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
@@ -501,25 +501,25 @@ from functional_parquet.alltypes
     left join functional_parquet.alltypestiny on alltypes.id = alltypestiny.id
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=38.17MB Threads=5
-Per-Host Resource Estimates: Memory=2.04GB
+Per-Host Resource Estimates: Memory=2.06GB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 functional_parquet.alltypes, functional_parquet.alltypestiny
 Analyzed query: SELECT /* +straight_join */ * FROM functional_parquet.alltypes
 LEFT OUTER JOIN functional_parquet.alltypestiny ON alltypes.id = 
alltypestiny.id
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=10.49MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Host Resources: mem-estimate=20.49MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: functional_parquet.alltypes.id, 
functional_parquet.alltypes.bool_col, functional_parquet.alltypes.tinyint_col, 
functional_parquet.alltypes.smallint_col, functional_parquet.alltypes.int_col, 
functional_parquet.alltypes.bigint_col, functional_parquet.alltypes.float_col, 
functional_parquet.alltypes.double_col, 
functional_parquet.alltypes.date_string_col, 
functional_parquet.alltypes.string_col, 
functional_parquet.alltypes.timestamp_col, functional_parquet.alltypes.year, 
func [...]
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=503.95KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.49MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1N row-size=160B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=2.02GB mem-reservation=34.09MB 
thread-reservation=2
+Per-Host Resources: mem-estimate=2.03GB mem-reservation=34.09MB 
thread-reservation=2
 02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
 |  hash predicates: alltypes.id = alltypestiny.id
 |  fk/pk conjuncts: assumed fk/pk
@@ -528,7 +528,7 @@ Per-Host Resources: mem-estimate=2.02GB 
mem-reservation=34.09MB thread-reservati
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--03:EXCHANGE [BROADCAST]
-|  |  mem-estimate=251.92KB mem-reservation=0B thread-reservation=0
+|  |  mem-estimate=10.25MB mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=80B cardinality=unavailable
 |  |  in pipelines: 01(GETNEXT)
 |  |
@@ -557,20 +557,20 @@ Per-Host Resources: mem-estimate=2.02GB 
mem-reservation=34.09MB thread-reservati
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=72.34MB Threads=6
-Per-Host Resource Estimates: Memory=2.08GB
+Per-Host Resource Estimates: Memory=2.10GB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 functional_parquet.alltypestiny
 Analyzed query: SELECT /* +straight_join */ * FROM functional_parquet.alltypes
 LEFT OUTER JOIN functional_parquet.alltypestiny ON alltypes.id = 
alltypestiny.id
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=10.98MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Instance Resources: mem-estimate=20.98MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: functional_parquet.alltypes.id, 
functional_parquet.alltypes.bool_col, functional_parquet.alltypes.tinyint_col, 
functional_parquet.alltypes.smallint_col, functional_parquet.alltypes.int_col, 
functional_parquet.alltypes.bigint_col, functional_parquet.alltypes.float_col, 
functional_parquet.alltypes.double_col, 
functional_parquet.alltypes.date_string_col, 
functional_parquet.alltypes.string_col, 
functional_parquet.alltypes.timestamp_col, functional_parquet.alltypes.year, 
func [...]
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=1007.95KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.98MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1N row-size=160B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
@@ -585,14 +585,14 @@ Per-Instance Resources: mem-estimate=16.66MB 
mem-reservation=88.00KB thread-rese
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--F03:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-|  |  Per-Instance Resources: mem-estimate=2.00GB mem-reservation=68.00MB 
thread-reservation=1
+|  |  Per-Instance Resources: mem-estimate=2.01GB mem-reservation=68.00MB 
thread-reservation=1
 |  JOIN BUILD
 |  |  join-table-id=00 plan-id=01 cohort-id=01
 |  |  build expressions: alltypestiny.id
 |  |  mem-estimate=2.00GB mem-reservation=68.00MB spill-buffer=2.00MB 
thread-reservation=0
 |  |
 |  03:EXCHANGE [BROADCAST]
-|  |  mem-estimate=335.92KB mem-reservation=0B thread-reservation=0
+|  |  mem-estimate=10.33MB mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=80B cardinality=unavailable
 |  |  in pipelines: 01(GETNEXT)
 |  |
@@ -1011,25 +1011,25 @@ from functional_parquet.alltypestiny
 group by string_col
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=72.01MB Threads=4
-Per-Host Resource Estimates: Memory=282MB
+Per-Host Resource Estimates: Memory=302MB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 functional_parquet.alltypestiny
 Analyzed query: SELECT string_col, count(*) FROM 
functional_parquet.alltypestiny
 GROUP BY string_col
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=10.07MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Host Resources: mem-estimate=20.07MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: string_col, count(*)
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=71.99KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.07MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=20B cardinality=unavailable
 |  in pipelines: 03(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(string_col)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=128.09MB mem-reservation=34.00MB 
thread-reservation=1
+Per-Host Resources: mem-estimate=138.07MB mem-reservation=34.00MB 
thread-reservation=1
 03:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
 |  group by: string_col
@@ -1038,7 +1038,7 @@ Per-Host Resources: mem-estimate=128.09MB 
mem-reservation=34.00MB thread-reserva
 |  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:EXCHANGE [HASH(string_col)]
-|  mem-estimate=71.99KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.07MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=20B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
@@ -1063,25 +1063,25 @@ Per-Host Resources: mem-estimate=144.28MB 
mem-reservation=34.01MB thread-reserva
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=140.02MB Threads=5
-Per-Host Resource Estimates: Memory=555MB
+Per-Host Resource Estimates: Memory=585MB
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 functional_parquet.alltypestiny
 Analyzed query: SELECT string_col, count(*) FROM 
functional_parquet.alltypestiny
 GROUP BY string_col
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=10.09MB mem-reservation=4.00MB 
thread-reservation=1
+|  Per-Instance Resources: mem-estimate=20.09MB mem-reservation=4.00MB 
thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: string_col, count(*)
 |  mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=95.99KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.09MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=20B cardinality=unavailable
 |  in pipelines: 03(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(string_col)] hosts=3 instances=4
-Per-Instance Resources: mem-estimate=128.09MB mem-reservation=34.00MB 
thread-reservation=1
+Per-Instance Resources: mem-estimate=138.09MB mem-reservation=34.00MB 
thread-reservation=1
 03:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
 |  group by: string_col
@@ -1090,7 +1090,7 @@ Per-Instance Resources: mem-estimate=128.09MB 
mem-reservation=34.00MB thread-res
 |  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:EXCHANGE [HASH(string_col)]
-|  mem-estimate=95.99KB mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.09MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=20B cardinality=unavailable
 |  in pipelines: 00(GETNEXT)
 |
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test
index d7224986b..e54e15005 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test
@@ -748,9 +748,9 @@ select *, ss_item_sk as part_col
 from tpcds_seq_snap.store_sales
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=102.00MB Threads=13
-Per-Host Resource Estimates: Memory=381MB
+Per-Host Resource Estimates: Memory=391MB
 F01:PLAN FRAGMENT [HASH(ss_item_sk)] hosts=10 instances=10
-|  Per-Instance Resources: mem-estimate=140.19MB mem-reservation=6.00MB 
thread-reservation=1
+|  Per-Instance Resources: mem-estimate=150.19MB mem-reservation=6.00MB 
thread-reservation=1
 |  max-parallelism=10 segment-costs=[0, 3954235]
 WRITE TO HDFS [tpcds_partitioned_parquet_snap.store_sales_without_stats, 
OVERWRITE=false, PARTITION-KEYS=(ss_item_sk)]
 |  output exprs: ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, 
ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, 
ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, 
ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, 
ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, 
ss_net_profit, ss_sold_date_sk, ss_item_sk
@@ -763,7 +763,7 @@ WRITE TO HDFS 
[tpcds_partitioned_parquet_snap.store_sales_without_stats, OVERWRI
 |  in pipelines: 02(GETNEXT), 00(OPEN)
 |
 01:EXCHANGE [HASH(ss_item_sk)]
-|  mem-estimate=12.19MB mem-reservation=0B thread-reservation=0
+|  mem-estimate=22.19MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=100B cardinality=unavailable cost=0
 |  in pipelines: 00(GETNEXT)
 |
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test
index 95b3ddb47..d573fbdfa 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test
@@ -780,7 +780,7 @@ WRITE TO HDFS 
[tpcds_partitioned_parquet_snap.store_sales_without_stats, OVERWRI
 |  in pipelines: 02(GETNEXT), 00(OPEN)
 |
 01:EXCHANGE [HASH(ss_item_sk)]
-|  mem-estimate=12.19MB mem-reservation=0B thread-reservation=0
+|  mem-estimate=22.19MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=100B cardinality=unavailable cost=0
 |  in pipelines: 00(GETNEXT)
 |
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test 
b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
index 2c62d965e..937939745 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
@@ -81,7 +81,7 @@ from functional_avro.alltypes t1
   left outer join functional_avro.alltypes t3 on (t2.id = t3.id)
 where t1.month = 1 and t2.year = 2009 and t3.bool_col = false
 ---- RESULTS: VERIFY_IS_SUBSET
-'Per-Host Resource Estimates: Memory=4.05GB'
+'Per-Host Resource Estimates: Memory=4.07GB'
 'WARNING: The following tables are missing relevant table and/or column 
statistics.'
 'functional_avro.alltypes, functional_parquet.alltypessmall'
 ====

Reply via email to