This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ed6c19cf0ccef0d55bd9397245f61abc3327e439 Author: Riza Suminto <[email protected]> AuthorDate: Tue May 13 13:38:14 2025 -0700 IMPALA-14071: Refactor helper methods around cardinality bounding There are multiple ways to do cardinality multipication that also avoid integer overflow. Some helper methods available are: - MathUtil.saturatingMultiplyCardinalities() - PlanNode.checkedMultiply() - LongMath.saturatedMultiply() This patch intent to simplify things by: - MathUtil.saturatingMultiplyCardinalities() with PlanNode.checkedMultiply() into MathUtil.multiplyCardinalities(). - MathUtil.saturatingAddCardinalities() with PlanNode.checkedAdd() into MathUtil.addCardinalities(). - Move PlanNode.smallestValidCardinality() to MathUtil. - Make MathUtil.saturatingMultiply() and MathUtil.saturatingAdd() simply a wrapper for LongMath.saturatedMultiply() and LongMath.saturatedAdd() accordingly. multiplyCardinalities(), addCardinalities(), and smallestValidCardinality() have cardinality Preconditions check. Harden cardinality calculation in several places by using multiplyCardinalities() and addCardinalities() accordingly. Added sanity check PlanNode.verifyCardinality() that is evaluated at the end of PlanNode.computeStats(). This ensure that cardinality_ and inputCardinality_ is always valid after PlanNode.computeStats(). Also fixed bug in ExchangeNode.estimateTotalQueueByteSize() that prevent calculation against negative cardinality or non-positive num nodes. Testing: Pass Following FE and EE tests: CardinalityTest MathUtilTest PlannerTest#testSpillableBufferSizing+testResourceRequirements TpcdsCpuCostPlannerTest TpcdsPlannerTest metadata/test_explain.py Change-Id: I505ab11cfa1024feb4ceac4cffe9c3283be228ce Reviewed-on: http://gerrit.cloudera.org:8080/22897 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../java/org/apache/impala/analysis/SortInfo.java | 6 +- .../org/apache/impala/planner/AggregationNode.java | 45 +++++---- .../apache/impala/planner/DistributedPlanner.java | 3 +- .../org/apache/impala/planner/ExchangeNode.java | 6 +- .../org/apache/impala/planner/HBaseScanNode.java | 8 +- .../org/apache/impala/planner/HashJoinNode.java | 5 +- .../org/apache/impala/planner/HdfsScanNode.java | 20 ++-- .../org/apache/impala/planner/HdfsTableSink.java | 5 +- .../apache/impala/planner/IcebergDeleteNode.java | 5 +- .../org/apache/impala/planner/IcebergScanNode.java | 19 +++- .../java/org/apache/impala/planner/JoinNode.java | 15 ++- .../apache/impala/planner/NestedLoopJoinNode.java | 12 +-- .../org/apache/impala/planner/PlanFragment.java | 3 +- .../java/org/apache/impala/planner/PlanNode.java | 54 ++-------- .../java/org/apache/impala/planner/ScanNode.java | 6 ++ .../java/org/apache/impala/planner/SortNode.java | 12 ++- .../org/apache/impala/planner/SubplanNode.java | 5 +- .../java/org/apache/impala/planner/UnionNode.java | 11 ++- .../main/java/org/apache/impala/util/MathUtil.java | 75 ++++++++++---- .../org/apache/impala/planner/CardinalityTest.java | 36 +------ .../java/org/apache/impala/util/MathUtilTest.java | 109 +++++++++++++++++++-- .../queries/PlannerTest/resource-requirements.test | 56 +++++------ .../PlannerTest/spillable-buffer-sizing.test | 40 ++++---- .../tpcds_cpu_cost/tpcds-ddl-iceberg.test | 6 +- .../tpcds_cpu_cost/tpcds-ddl-parquet.test | 2 +- .../queries/QueryTest/explain-level2.test | 2 +- 26 files changed, 332 insertions(+), 234 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java index 2b7c6c00e..1b0479041 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java +++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java @@ -21,21 +21,19 @@ import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import org.apache.impala.catalog.ArrayType; import org.apache.impala.catalog.MapType; -import org.apache.impala.catalog.PrimitiveType; import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.TreeNode; -import org.apache.impala.planner.PlanNode; import org.apache.impala.planner.ProcessingCost; import org.apache.impala.thrift.TSortingOrder; import org.apache.impala.util.ExprUtil; +import org.apache.impala.util.MathUtil; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; @@ -290,7 +288,7 @@ public class SortInfo { * operator and 'offset' is the value in the 'OFFSET [x]' clause. */ public long estimateTopNMaterializedSize(long cardinality, long offset) { - long totalRows = PlanNode.checkedAdd(cardinality, offset); + long totalRows = MathUtil.addCardinalities(cardinality, offset); return estimateMaterializedSize(totalRows); } diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java index cea564663..ac5f8e5f4 100644 --- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java @@ -345,12 +345,12 @@ public class AggregationNode extends PlanNode implements SpillableOperator { groupingExprs.isEmpty(), canCompleteEarly, getLimit()); } } else if (canCompleteEarly) { - aggOutputCard = smallestValidCardinality(aggOutputCard, getLimit()); + aggOutputCard = MathUtil.smallestValidCardinality(aggOutputCard, getLimit()); } aggClassOutputCardinality_.add(aggOutputCard); // IMPALA-2945: Behavior change if estimatePreaggDuplicate is true. - cardinality_ = - checkedAdd(cardinality_, estimatePreaggDuplicate ? aggOutputCard : numGroups); + cardinality_ = MathUtil.addCardinalities( + cardinality_, estimatePreaggDuplicate ? aggOutputCard : numGroups); } aggIdx++; } @@ -446,25 +446,24 @@ public class AggregationNode extends PlanNode implements SpillableOperator { if (canCompleteEarly) { Preconditions.checkArgument(limit > -1, "limit must not be negative."); - long limitMultiple = - MathUtil.saturatingMultiplyCardinalities(limit, totalInstances); - long ndvMultiple = - MathUtil.saturatingMultiplyCardinalities(globalNdv, totalInstances); - return smallestValidCardinality( - inputCardinality, smallestValidCardinality(ndvMultiple, limitMultiple)); + long limitMultiple = MathUtil.multiplyCardinalities(limit, totalInstances); + long ndvMultiple = MathUtil.multiplyCardinalities(globalNdv, totalInstances); + return MathUtil.smallestValidCardinality(inputCardinality, + MathUtil.smallestValidCardinality(ndvMultiple, limitMultiple)); } else if (isNonGroupingAggregation) { - return smallestValidCardinality(inputCardinality, - MathUtil.saturatingMultiplyCardinalities(globalNdv, totalInstances)); + return MathUtil.smallestValidCardinality( + inputCardinality, MathUtil.multiplyCardinalities(globalNdv, totalInstances)); } else if (totalInstances > 1 && inputCardinality > globalNdv) { double perInstanceInputCard = Math.ceil((double) inputCardinality / totalInstances); double globalNdvInDouble = (double) globalNdv; double probValExist = 1.0 - Math.pow((globalNdvInDouble - 1.0) / globalNdvInDouble, perInstanceInputCard); double perInstanceNdv = Math.ceil(probValExist * globalNdvInDouble); - long preaggOutputCard = MathUtil.saturatingMultiplyCardinalities( - Math.round(perInstanceNdv), totalInstances); + long preaggOutputCard = + MathUtil.multiplyCardinalities(Math.round(perInstanceNdv), totalInstances); // keep bounding at aggInputCardinality_ max. - preaggOutputCard = smallestValidCardinality(inputCardinality, preaggOutputCard); + preaggOutputCard = + MathUtil.smallestValidCardinality(inputCardinality, preaggOutputCard); LOG.trace("inputCardinality={} perInstanceInputCard={} globalNdv={} probValExist={}" + " perInstanceNdv={} preaggOutputCard={}", inputCardinality, perInstanceInputCard, globalNdv, probValExist, perInstanceNdv, @@ -472,7 +471,7 @@ public class AggregationNode extends PlanNode implements SpillableOperator { return preaggOutputCard; } // Input is likely unique already. - return smallestValidCardinality(inputCardinality, globalNdv); + return MathUtil.smallestValidCardinality(inputCardinality, globalNdv); } /** @@ -485,7 +484,7 @@ public class AggregationNode extends PlanNode implements SpillableOperator { if (groupingExprs.isEmpty()) { return NON_GROUPING_AGG_NUM_GROUPS; } else { - return smallestValidCardinality(preaggNumGroup, aggInputCardinality); + return MathUtil.smallestValidCardinality(preaggNumGroup, aggInputCardinality); } } @@ -578,10 +577,10 @@ public class AggregationNode extends PlanNode implements SpillableOperator { Preconditions.checkState( filteredNdv > 0, "filteredNdv must be greater than 0."); ndvBasedNumGroup = - MathUtil.saturatingMultiplyCardinalities(ndvBasedNumGroup, filteredNdv); + MathUtil.multiplyCardinalities(ndvBasedNumGroup, filteredNdv); } long numGroupFromCommonTuple = - smallestValidCardinality(producerCardinality, ndvBasedNumGroup); + MathUtil.smallestValidCardinality(producerCardinality, ndvBasedNumGroup); if (numGroupFromCommonTuple < 0) { // Can not reason about tuple cardinality. @@ -600,9 +599,9 @@ public class AggregationNode extends PlanNode implements SpillableOperator { } if (numGroups < 0) return numGroups; for (Long entry : tupleBasedNumGroups) { - numGroups = MathUtil.saturatingMultiplyCardinalities(numGroups, entry); + numGroups = MathUtil.multiplyCardinalities(numGroups, entry); } - return smallestValidCardinality(numGroups, aggInputCardinality); + return MathUtil.smallestValidCardinality(numGroups, aggInputCardinality); } /** @@ -620,7 +619,7 @@ public class AggregationNode extends PlanNode implements SpillableOperator { // Also, worst-case output cardinality is better than an unknown output cardinality. // Note that this will still be -1 (unknown) if both numGroups // and aggInputCardinality is unknown. - return smallestValidCardinality(numGroups, aggInputCardinality); + return MathUtil.smallestValidCardinality(numGroups, aggInputCardinality); } /** @@ -936,8 +935,8 @@ public class AggregationNode extends PlanNode implements SpillableOperator { long aggClassOutputCardinality = estimatePreaggDuplicate ? prevAggNode.aggClassOutputCardinality_.get(aggIdx) : prevAggNode.aggClassNumGroups_.get(aggIdx); - inputCardinality = - smallestValidCardinality(inputCardinality, aggClassOutputCardinality); + inputCardinality = MathUtil.smallestValidCardinality( + inputCardinality, aggClassOutputCardinality); } resourceProfiles_.add(computeAggClassResourceProfile( queryOptions, aggInfo, inputCardinality, maxMemoryEstimatePerInstance)); diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index a9824908e..eada3f73a 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -37,6 +37,7 @@ import org.apache.impala.planner.JoinNode.DistributionMode; import org.apache.impala.thrift.TPartitionType; import org.apache.impala.thrift.TVirtualColumnType; import org.apache.impala.util.KuduUtil; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1347,7 +1348,7 @@ public class DistributedPlanner { Preconditions.checkState(node == childSortNode); if (hasLimit) { childSortNode.unsetLimit(); - childSortNode.setLimit(PlanNode.checkedAdd(limit, offset)); + childSortNode.setLimit(MathUtil.addCardinalities(limit, offset)); } childSortNode.setOffset(0); } diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java index 11971e151..cb0729f44 100644 --- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java @@ -34,11 +34,11 @@ import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TSortInfo; -import org.apache.impala.util.ExprUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.math.LongMath; /** * Receiver side of a 1:n data stream. Logically, an ExchangeNode consumes the data @@ -314,7 +314,7 @@ public class ExchangeNode extends PlanNode { long estimatedDeferredRPCQueueSize = estimateDeferredRPCQueueSize(queryOptions, numSenders); long estimatedMem = Math.max( - checkedAdd(estimatedTotalQueueByteSize, estimatedDeferredRPCQueueSize), + LongMath.saturatedAdd(estimatedTotalQueueByteSize, estimatedDeferredRPCQueueSize), MIN_ESTIMATE_BYTES); nodeResourceProfile_ = ResourceProfile.noReservation(estimatedMem); } @@ -344,7 +344,7 @@ public class ExchangeNode extends PlanNode { // queries without stats. long estimatedTotalQueueByteSize = numQueues * maxQueueByteSize; // Set an upper limit based on estimated cardinality. - if (hasValidStats()) { + if (getCardinality() > -1 && (isBroadcastExchange() || getNumNodes() > 0)) { long totalBytesToReceive = (long) Math.ceil(getAvgRowSize() * getCardinality()); // Assuming no skew in distribution during data shuffling. long bytesToReceivePerExchNode = isBroadcastExchange() ? totalBytesToReceive diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java index f642905e2..87e28f776 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java @@ -347,14 +347,14 @@ public class HBaseScanNode extends ScanNode { } } } + + // Safe guard for cardinality_ < -1, e.g. when hbase sampling fails and numRows + // in HMS is abnormally set to be < -1. + cardinality_ = Math.max(-1, cardinality_); inputCardinality_ = cardinality_; if (cardinality_ > 0) { cardinality_ = applyConjunctsSelectivity(cardinality_); - } else { - // Safe guard for cardinality_ < -1, e.g. when hbase sampling fails and numRows - // in HMS is abnormally set to be < -1. - cardinality_ = Math.max(-1, cardinality_); } cardinality_ = capCardinalityAtLimit(cardinality_); if (LOG.isTraceEnabled()) { diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java index 3b054b71e..9bf082308 100644 --- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java @@ -37,6 +37,7 @@ import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.util.BitUtil; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -265,9 +266,7 @@ public class HashJoinNode extends JoinNode implements SpillableOperator { for (Expr eqJoinPredicate: eqJoinConjuncts_) { long rhsPdNdv = getNdv(eqJoinPredicate.getChild(1)); rhsPdNdv = Math.min(rhsPdNdv, rhsCard); - if (rhsPdNdv != -1) { - rhsNdv = PlanNode.checkedMultiply(rhsNdv, rhsPdNdv); - } + if (rhsPdNdv != -1) rhsNdv = MathUtil.multiplyCardinalities(rhsNdv, rhsPdNdv); } // The memory of the data stored in hash table and // the memory of the hash tableās structure diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 80a59ec99..446add178 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1678,18 +1678,21 @@ public class HdfsScanNode extends ScanNode { } // Adjust cardinality for all collections referenced along the tuple's path. - if (cardinality_ != -1) { + if (cardinality_ > 0) { for (Type t: desc_.getPath().getMatchedTypes()) { - if (t.isCollectionType()) cardinality_ *= PlannerContext.AVG_COLLECTION_SIZE; + if (t.isCollectionType()) { + cardinality_ = MathUtil.multiplyCardinalities( + cardinality_, PlannerContext.AVG_COLLECTION_SIZE); + } } } - inputCardinality_ = cardinality_; // Sanity check scan node cardinality. if (cardinality_ < -1) { hasCorruptTableStats_ = true; cardinality_ = -1; } + inputCardinality_ = cardinality_; if (cardinality_ > 0) { if (LOG.isTraceEnabled()) { @@ -1760,7 +1763,7 @@ public class HdfsScanNode extends ScanNode { } else if (partNumRows > -1) { // Consider partition with good stats. if (partitionNumRows_ == -1) partitionNumRows_ = 0; - partitionNumRows_ = checkedAdd(partitionNumRows_, partNumRows); + partitionNumRows_ = MathUtil.addCardinalities(partitionNumRows_, partNumRows); ++numPartitionsWithNumRows_; } } @@ -2428,8 +2431,9 @@ public class HdfsScanNode extends ScanNode { Preconditions.checkState(maxScannerThreads == 1); perThreadIoBuffers = 2; } - long perInstanceMemEstimate = checkedMultiply( - checkedMultiply(maxScannerThreads, perThreadIoBuffers), maxIoBufferSize); + long perInstanceMemEstimate = MathUtil.multiplyCardinalities( + MathUtil.multiplyCardinalities(maxScannerThreads, perThreadIoBuffers), + maxIoBufferSize); // Sanity check: the tighter estimation should not exceed the per-host maximum. long perHostUpperBound = getPerHostMemUpperBound(); @@ -2811,8 +2815,8 @@ public class HdfsScanNode extends ScanNode { // This is a simple in-list predicate. Preconditions.checkState(statsConjunct.getChildCount() > 1, "InPredicate must have at least two child expressions"); - ndvMult = MathUtil.saturatingMultiplyCardinalities( - ndvMult, statsConjunct.getChildCount() - 1); + ndvMult = + MathUtil.multiplyCardinalities(ndvMult, statsConjunct.getChildCount() - 1); it.remove(); } else if (statsConjunct instanceof IsNullPredicate) { // This is an is-null predicate. diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java index d739768b6..4bfcb656b 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java @@ -39,6 +39,7 @@ import org.apache.impala.thrift.TSortingOrder; import org.apache.impala.thrift.TTableSink; import org.apache.impala.thrift.TTableSinkType; import org.apache.impala.util.BitUtil; +import org.apache.impala.util.MathUtil; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -222,8 +223,8 @@ public class HdfsTableSink extends TableSink { Math.max(1L, inputNode.getCardinality() / numInstances); long perInstanceInputBytes = (long) Math.ceil(perInstanceInputCardinality * inputNode.getAvgRowSize()); - long perInstanceMemReq = - PlanNode.checkedMultiply(numBufferedPartitionsPerInstance, perPartitionMemReq); + long perInstanceMemReq = MathUtil.multiplyCardinalities( + numBufferedPartitionsPerInstance, perPartitionMemReq); perInstanceMemEstimate = Math.min(perInstanceInputBytes, perInstanceMemReq); } resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate); diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java index 30877e5b0..ab38561b2 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java @@ -35,6 +35,7 @@ import org.apache.impala.thrift.TIcebergDeleteNode; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,9 +170,7 @@ public class IcebergDeleteNode extends JoinNode { for (Expr eqJoinPredicate : eqJoinConjuncts_) { long rhsPdNdv = getNdv(eqJoinPredicate.getChild(1)); rhsPdNdv = Math.min(rhsPdNdv, rhsCard); - if (rhsPdNdv != -1) { - rhsNdv = PlanNode.checkedMultiply(rhsNdv, rhsPdNdv); - } + if (rhsPdNdv != -1) rhsNdv = MathUtil.multiplyCardinalities(rhsNdv, rhsPdNdv); } // The memory of the data stored in hash table is the file path of the data files diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java index 91754ab4c..b59d51519 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java @@ -41,6 +41,7 @@ import org.apache.impala.common.ThriftSerializationCtx; import org.apache.impala.fb.FbIcebergDataFileFormat; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,19 +130,29 @@ public class IcebergScanNode extends HdfsScanNode { for (FileDescriptor fd : sampledFileDescs) { Preconditions.checkState(fd instanceof IcebergFileDescriptor); IcebergFileDescriptor iceFd = (IcebergFileDescriptor) fd; - cardinality_ += iceFd.getFbFileMetadata().icebergMetadata().recordCount(); + cardinality_ = MathUtil.addCardinalities( + cardinality_, iceFd.getFbFileMetadata().icebergMetadata().recordCount()); } } } else { for (IcebergFileDescriptor fd : fileDescs_) { - cardinality_ += fd.getFbFileMetadata().icebergMetadata().recordCount(); + cardinality_ = MathUtil.addCardinalities( + cardinality_, fd.getFbFileMetadata().icebergMetadata().recordCount()); } } // Adjust cardinality for all collections referenced along the tuple's path. - for (Type t: desc_.getPath().getMatchedTypes()) { - if (t.isCollectionType()) cardinality_ *= PlannerContext.AVG_COLLECTION_SIZE; + if (cardinality_ > 0) { + for (Type t : desc_.getPath().getMatchedTypes()) { + if (t.isCollectionType()) { + cardinality_ = MathUtil.multiplyCardinalities( + cardinality_, PlannerContext.AVG_COLLECTION_SIZE); + } + } } + + // Sanity check scan node cardinality. + cardinality_ = Math.max(-1, cardinality_); inputCardinality_ = cardinality_; if (cardinality_ > 0) { diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java index 86597b345..4dba8c643 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java @@ -42,11 +42,11 @@ import org.apache.impala.common.ThriftSerializationCtx; import org.apache.impala.planner.TupleCacheInfo.IneligibilityReason; import org.apache.impala.thrift.TEqJoinCondition; import org.apache.impala.thrift.TExecNodePhase; -import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TJoinDistributionMode; import org.apache.impala.thrift.TJoinNode; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -827,7 +827,7 @@ public abstract class JoinNode extends PlanNode { long leftCard = getChild(0).cardinality_; long rightCard = getChild(1).cardinality_; if (leftCard != -1 && rightCard != -1) { - cardinality_ = checkedMultiply(leftCard, rightCard); + cardinality_ = MathUtil.multiplyCardinalities(leftCard, rightCard); } } @@ -835,6 +835,11 @@ public abstract class JoinNode extends PlanNode { long leftCard = getChild(0).cardinality_; long rightCard = getChild(1).cardinality_; switch (joinOp_) { + case INNER_JOIN: { + // Do nothing. It is already handled above. + // This is here just to complete the enum cases. + break; + } case LEFT_SEMI_JOIN: { if (leftCard != -1) { cardinality_ = Math.min(leftCard, cardinality_); @@ -861,7 +866,7 @@ public abstract class JoinNode extends PlanNode { } case FULL_OUTER_JOIN: { if (leftCard != -1 && rightCard != -1) { - long cardinalitySum = checkedAdd(leftCard, rightCard); + long cardinalitySum = MathUtil.addCardinalities(leftCard, rightCard); cardinality_ = Math.max(cardinalitySum, cardinality_); } break; @@ -884,8 +889,8 @@ public abstract class JoinNode extends PlanNode { if (getChild(0).cardinality_ == -1 || getChild(1).cardinality_ == -1) { cardinality_ = -1; } else { - cardinality_ = checkedMultiply(getChild(0).cardinality_, - getChild(1).cardinality_); + cardinality_ = MathUtil.multiplyCardinalities( + getChild(0).cardinality_, getChild(1).cardinality_); } break; } diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java index c15ad2054..d1dbe6d25 100644 --- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java @@ -32,7 +32,7 @@ import org.apache.impala.thrift.TNestedLoopJoinNode; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; -import org.apache.impala.util.ExprUtil; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,12 +91,12 @@ public class NestedLoopJoinNode extends JoinNode { TQueryOptions queryOptions) { // TODO: This seems a bug below that the total data is not divided by numInstances_. long perInstanceMemEstimate; - if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1 - || numNodes_ == 0) { + long rhsChildNode = getChild(1).getCardinality(); + if (rhsChildNode == -1 || getChild(1).getAvgRowSize() == -1 || numNodes_ == 0) { perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM; } else { - perInstanceMemEstimate = - (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_); + double memMult = rhsChildNode * getChild(1).avgRowSize_; + perInstanceMemEstimate = (long) Math.ceil(memMult); } ResourceProfile buildProfile = ResourceProfile.noReservation(perInstanceMemEstimate); // Memory requirements for the probe side are minimal - batches are just streamed @@ -113,7 +113,7 @@ public class NestedLoopJoinNode extends JoinNode { // We return the full cost in the first element of the Pair. long probeCardinality = getProbeCardinalityForCosting(); long buildCardinality = Math.max(0, getChild(1).getCardinality()); - long cardProduct = checkedMultiply(probeCardinality, buildCardinality); + long cardProduct = MathUtil.multiplyCardinalities(probeCardinality, buildCardinality); long perInstanceBuildCardinality = (long) Math.ceil(buildCardinality / fragment_.getNumInstancesForCosting()); double totalCost = 0.0F; diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java index d994edddb..80c9bd979 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java @@ -38,6 +38,7 @@ import org.apache.impala.thrift.TPartitionType; import org.apache.impala.thrift.TPlanFragment; import org.apache.impala.thrift.TPlanFragmentTree; import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -638,7 +639,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { if (dataPartition_.getPartitionExprs().contains(expr)) { partition = true; } - result = PlanNode.checkedMultiply(result, numDistinct); + result = MathUtil.multiplyCardinalities(result, numDistinct); maxNdv = Math.max(maxNdv, numDistinct); } if (partition) { diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 0da260c60..08918228b 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -59,6 +59,7 @@ import org.apache.impala.thrift.TSortingOrder; import org.apache.impala.thrift.TQueryOptionsHash; import org.apache.impala.util.BitUtil; import org.apache.impala.util.ExprUtil; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.math.LongMath; /** * Each PlanNode represents a single relational operator @@ -632,6 +632,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { public void init(Analyzer analyzer) throws ImpalaException { assignConjuncts(analyzer); computeStats(analyzer); + validateCardinality(); createDefaultSmap(analyzer); } @@ -748,8 +749,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> { } } + protected void validateCardinality() { Preconditions.checkState(cardinality_ >= -1); } + protected long capCardinalityAtLimit(long cardinality) { - return smallestValidCardinality(cardinality, limit_); + return MathUtil.smallestValidCardinality(cardinality, limit_); } // Default implementation of computing the total data processed in bytes. @@ -949,6 +952,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { /** * Returns true if stats-related variables are valid. + * Only use this for Preconditions. */ public boolean hasValidStats() { return (numNodes_ == -1 || numNodes_ >= 0) && @@ -956,32 +960,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> { (cardinality_ == -1 || cardinality_ >= 0); } - /** - * Computes and returns the sum of two long values. If an overflow occurs, - * the maximum Long value is returned (Long.MAX_VALUE). - */ - public static long checkedAdd(long a, long b) { - try { - return LongMath.checkedAdd(a, b); - } catch (ArithmeticException e) { - LOG.warn("overflow when adding longs: " + a + ", " + b); - return Long.MAX_VALUE; - } - } - - /** - * Computes and returns the product of two cardinalities. If an overflow - * occurs, the maximum Long value is returned (Long.MAX_VALUE). - */ - public static long checkedMultiply(long a, long b) { - try { - return LongMath.checkedMultiply(a, b); - } catch (ArithmeticException e) { - LOG.warn("overflow when multiplying longs: " + a + ", " + b); - return Long.MAX_VALUE; - } - } - /** * Returns true if this plan node can output its first row only after consuming * all rows of all its children. This method is used to group plan nodes @@ -1144,7 +1122,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { for(PlanNode p : children_) { long tmp = p.getCardinality(); if (tmp == -1) return -1; - sum = checkedAdd(sum, tmp); + sum = MathUtil.addCardinalities(sum, tmp); } return sum; } @@ -1413,24 +1391,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> { */ public boolean omitTupleCache() { return false; } - /** - * Return the least between 'cardinality1' and 'cardinality2' - * that is not a negative number (unknown). - * Can return -1 if both number is less than 0. - * Both argument should not be < -1. - */ - protected static long smallestValidCardinality(long cardinality1, long cardinality2) { - Preconditions.checkArgument( - cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1); - Preconditions.checkArgument( - cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2); - if (cardinality1 >= 0) { - if (cardinality2 >= 0) return Math.min(cardinality1, cardinality2); - return cardinality1; - } - return Math.max(-1, cardinality2); - } - /** * Return True if Impala Coordinator node has scratch_dirs flag configured and * given 'queryOptions' allows taking spilling to disk into account [when calculating diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index efd6d149a..8089e1508 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -114,6 +114,12 @@ abstract public class ScanNode extends PlanNode { hasHardEstimates_ = !hasScanConjuncts() && !isAccessingCollectionType(); } + @Override + protected void validateCardinality() { + Preconditions.checkState(cardinality_ >= -1); + Preconditions.checkState(inputCardinality_ >= -1); + } + public TupleDescriptor getTupleDesc() { return desc_; } @Override diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java index 8ad65ce3f..95f2e0d42 100644 --- a/fe/src/main/java/org/apache/impala/planner/SortNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java @@ -35,6 +35,7 @@ import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TSortInfo; import org.apache.impala.thrift.TSortNode; import org.apache.impala.thrift.TSortType; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,7 +126,7 @@ public class SortNode extends PlanNode implements SpillableOperator { PlanNodeId id, PlanNode input, SortInfo info, long offset, long limit, boolean includeTies) { long topNBytesLimit = queryOptions.topn_bytes_limit; - long topNCardinality = smallestValidCardinality(input.cardinality_, limit); + long topNCardinality = MathUtil.smallestValidCardinality(input.cardinality_, limit); long estimatedTopNMaterializedSize = info.estimateTopNMaterializedSize(topNCardinality, offset); @@ -221,7 +222,8 @@ public class SortNode extends PlanNode implements SpillableOperator { Preconditions.checkState(!hasLimit()); Preconditions.checkState(!hasOffset()); long topNBytesLimit = analyzer.getQueryOptions().topn_bytes_limit; - long topNCardinality = smallestValidCardinality(getChild(0).cardinality_, limit); + long topNCardinality = + MathUtil.smallestValidCardinality(getChild(0).cardinality_, limit); long estimatedTopNMaterializedSize = info_.estimateTopNMaterializedSize(topNCardinality, offset_); @@ -302,7 +304,8 @@ public class SortNode extends PlanNode implements SpillableOperator { public void computeStats(Analyzer analyzer) { super.computeStats(analyzer); if (isTypeTopN() && includeTies_) { - cardinality_ = smallestValidCardinality(getChild(0).cardinality_, limitWithTies_); + cardinality_ = + MathUtil.smallestValidCardinality(getChild(0).cardinality_, limitWithTies_); } else { cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_); } @@ -312,7 +315,8 @@ public class SortNode extends PlanNode implements SpillableOperator { List<Expr> partExprs = info_.getSortExprs().subList(0, numPartitionExprs_); long partNdv = numPartitionExprs_ == 0 ? 1 : Expr.getNumDistinctValues(partExprs); if (partNdv >= 0) { - long maxRowsInHeaps = checkedMultiply(partNdv, getPerPartitionLimit()); + long maxRowsInHeaps = + MathUtil.multiplyCardinalities(partNdv, getPerPartitionLimit()); if (cardinality_ < 0 || cardinality_ > maxRowsInHeaps) { cardinality_ = maxRowsInHeaps; } diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java index 09a6ac7d8..9df5e19db 100644 --- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java @@ -27,6 +27,7 @@ import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.util.MathUtil; import com.google.common.base.Preconditions; @@ -88,8 +89,8 @@ public class SubplanNode extends PlanNode { public void computeStats(Analyzer analyzer) { super.computeStats(analyzer); if (getChild(0).cardinality_ != -1 && getChild(1).cardinality_ != -1) { - cardinality_ = - checkedMultiply(getChild(0).cardinality_, getChild(1).cardinality_); + cardinality_ = MathUtil.multiplyCardinalities( + getChild(0).cardinality_, getChild(1).cardinality_); } else { cardinality_ = -1; } diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java index 7183a6dcf..c25570dec 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java @@ -34,6 +34,7 @@ import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TUnionNode; +import org.apache.impala.util.MathUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,7 +126,8 @@ public class UnionNode extends PlanNode { // ignore missing child cardinality info in the hope it won't matter enough // to change the planning outcome if (child.cardinality_ >= 0) { - totalChildCardinality = checkedAdd(totalChildCardinality, child.cardinality_); + totalChildCardinality = + MathUtil.addCardinalities(totalChildCardinality, child.cardinality_); haveChildWithCardinality = true; } // Union fragments are scheduled on the union of hosts of all scans in the fragment @@ -139,7 +141,8 @@ public class UnionNode extends PlanNode { // Consider estimate valid if we have at least one child with known cardinality, or // only constant values. if (haveChildWithCardinality || children_.size() == 0) { - cardinality_ = checkedAdd(totalChildCardinality, constExprLists_.size()); + cardinality_ = + MathUtil.addCardinalities(totalChildCardinality, constExprLists_.size()); } else { cardinality_ = -1; } @@ -163,8 +166,8 @@ public class UnionNode extends PlanNode { for (int i = firstMaterializedChildIdx_; i < resultExprLists_.size(); i++) { PlanNode child = children_.get(i); if (child.cardinality_ >= 0) { - totalMaterializedCardinality = - checkedAdd(totalMaterializedCardinality, Math.max(0, child.cardinality_)); + totalMaterializedCardinality = MathUtil.addCardinalities( + totalMaterializedCardinality, Math.max(0, child.cardinality_)); } } long estBytesMaterialized = diff --git a/fe/src/main/java/org/apache/impala/util/MathUtil.java b/fe/src/main/java/org/apache/impala/util/MathUtil.java index 0b2d01557..0091f071f 100644 --- a/fe/src/main/java/org/apache/impala/util/MathUtil.java +++ b/fe/src/main/java/org/apache/impala/util/MathUtil.java @@ -20,40 +20,79 @@ package org.apache.impala.util; import com.google.common.base.Preconditions; import com.google.common.math.LongMath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class MathUtil { + private final static Logger LOG = LoggerFactory.getLogger(MathUtil.class); // Multiply two numbers. If the multiply would overflow, return either Long.MIN_VALUE - // (if a xor b is negative) or Long.MAX_VALUE otherwise. The overflow path is not - // optimised at all and may be somewhat slow. + // (if a xor b is negative) or Long.MAX_VALUE otherwise. public static long saturatingMultiply(long a, long b) { + return LongMath.saturatedMultiply(a, b); + } + + // Add two numbers. If the add would overflow, return either Long.MAX_VALUE if both are + // positive or Long.MIN_VALUE if both are negative. + public static long saturatingAdd(long a, long b) { + return LongMath.saturatedAdd(a, b); + } + + /** + * Computes and returns the sum of two cardinality numbers. If an overflow occurs, + * the maximum Long value is returned (Long.MAX_VALUE). + * Both number should be a valid cardinality number (>= -1). + * Return -1 if any argument is -1. + */ + public static long addCardinalities(long cardinality1, long cardinality2) { + Preconditions.checkArgument( + cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1); + Preconditions.checkArgument( + cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2); + if (cardinality1 == -1 || cardinality2 == -1) return -1; try { - return LongMath.checkedMultiply(a, b); + return LongMath.checkedAdd(cardinality1, cardinality2); } catch (ArithmeticException e) { - return a < 0 != b < 0 ? Long.MIN_VALUE : Long.MAX_VALUE; + LOG.warn("overflow when adding longs: " + cardinality1 + ", " + cardinality2); + return Long.MAX_VALUE; } } - // Multiply two cardinality numbers like saturatingMultiply() but with additional - // precondition check that both must be a valid cardinality value. - // Return -1 if any argument is -1. - public static long saturatingMultiplyCardinalities( - long cardinality1, long cardinality2) { + /** + * Computes and returns the product of two cardinality numbers. If an overflow + * occurs, the maximum Long value is returned (Long.MAX_VALUE). + * Both number should be a valid cardinality number (>= -1). + * Return -1 if any argument is -1. + */ + public static long multiplyCardinalities(long cardinality1, long cardinality2) { Preconditions.checkArgument( cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1); Preconditions.checkArgument( cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2); if (cardinality1 == -1 || cardinality2 == -1) return -1; - return saturatingMultiply(cardinality1, cardinality2); - } - - // Add two numbers. If the add would overflow, return either Long.MAX_VALUE if both are - // positive or Long.MIN_VALUE if both are negative. The overflow path is not optimised - // at all and may be somewhat slow. - public static long saturatingAdd(long a, long b) { try { - return LongMath.checkedAdd(a, b); + return LongMath.checkedMultiply(cardinality1, cardinality2); } catch (ArithmeticException e) { - return a < 0 ? Long.MIN_VALUE : Long.MAX_VALUE; + LOG.warn("overflow when multiplying longs: " + cardinality1 + ", " + cardinality2); + return Long.MAX_VALUE; + } + } + + /** + * Return the least between 'cardinality1' and 'cardinality2' + * that is not a negative number (unknown). + * Can return -1 if both number is less than 0. + * Both argument should not be < -1. + */ + public static long smallestValidCardinality(long cardinality1, long cardinality2) { + Preconditions.checkArgument( + cardinality1 >= -1, "cardinality1 is invalid: %s", cardinality1); + Preconditions.checkArgument( + cardinality2 >= -1, "cardinality2 is invalid: %s", cardinality2); + if (cardinality1 >= 0) { + if (cardinality2 >= 0) return Math.min(cardinality1, cardinality2); + return cardinality1; } + return Math.max(-1, cardinality2); } } diff --git a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java index 6ca710cd4..8b760b39c 100644 --- a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java +++ b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java @@ -1313,30 +1313,6 @@ public class CardinalityTest extends PlannerTestBase { expected * CARDINALITY_TOLERANCE); } - @Test - public void testSmallestValidCardinality() { - long[] validCard = {0, 1, Long.MAX_VALUE}; - long unknown = -1; - - // Case 1: both argument is valid. - for (long c1 : validCard) { - for (long c2 : validCard) { - assertEquals(c1 + " vs " + c2, Math.min(c1, c2), - PlanNode.smallestValidCardinality(c1, c2)); - } - } - // Case 2: One argument is valid, the other is unknown. - for (long c : validCard) { - assertEquals( - c + " vs " + unknown, c, PlanNode.smallestValidCardinality(c, unknown)); - assertEquals( - unknown + " vs " + c, c, PlanNode.smallestValidCardinality(unknown, c)); - } - // Case 3: both argument is unknown. - assertEquals(unknown + " vs " + unknown, unknown, - PlanNode.smallestValidCardinality(unknown, unknown)); - } - @Test public void testEstimatePreaggCardinality() { List<Long> positiveLong = Arrays.asList(1L, 2L, 5L, 10L, 100L, 1000L, Long.MAX_VALUE); @@ -1360,7 +1336,7 @@ public class CardinalityTest extends PlannerTestBase { assertTrue(message + ", expect=0", outputCard == 0); } else if (inputCard == -1 || totalInstances == 1) { long leastInputVsNdv = - PlanNode.smallestValidCardinality(inputCard, globalNdv); + MathUtil.smallestValidCardinality(inputCard, globalNdv); assertTrue( message + ", expect=" + leastInputVsNdv, outputCard == leastInputVsNdv); } else { @@ -1373,10 +1349,9 @@ public class CardinalityTest extends PlannerTestBase { message = String.format( pattern, totalInstances, globalNdv, inputCard, true, false, -1, outputCard); assertTrue(message + ", expect>=0", outputCard >= 0); - long allDuplicate = - MathUtil.saturatingMultiplyCardinalities(globalNdv, totalInstances); + long allDuplicate = MathUtil.multiplyCardinalities(globalNdv, totalInstances); long leastInputVsAllDuplicate = - PlanNode.smallestValidCardinality(inputCard, allDuplicate); + MathUtil.smallestValidCardinality(inputCard, allDuplicate); if (allDuplicate < inputCard) { assertTrue(message + ", expect=" + leastInputVsAllDuplicate, outputCard == leastInputVsAllDuplicate); @@ -1390,10 +1365,9 @@ public class CardinalityTest extends PlannerTestBase { message = String.format(pattern, totalInstances, globalNdv, inputCard, false, true, limit, outputCard); assertTrue(message + ", expect>=0", outputCard >= 0); - long allAtLimit = - MathUtil.saturatingMultiplyCardinalities(totalInstances, limit); + long allAtLimit = MathUtil.multiplyCardinalities(totalInstances, limit); long leastOfAll = - PlanNode.smallestValidCardinality(allAtLimit, leastInputVsAllDuplicate); + MathUtil.smallestValidCardinality(allAtLimit, leastInputVsAllDuplicate); assertTrue(message + ", expect=" + leastOfAll, outputCard == leastOfAll); } } diff --git a/fe/src/test/java/org/apache/impala/util/MathUtilTest.java b/fe/src/test/java/org/apache/impala/util/MathUtilTest.java index 581ecf145..58d8de743 100644 --- a/fe/src/test/java/org/apache/impala/util/MathUtilTest.java +++ b/fe/src/test/java/org/apache/impala/util/MathUtilTest.java @@ -19,6 +19,8 @@ package org.apache.impala.util; import static org.junit.Assert.*; +import com.google.common.math.LongMath; + import org.junit.Test; /** @@ -45,27 +47,42 @@ public class MathUtilTest { } @Test - public void testSaturatingMultiplyCardinality() { + public void testPlanNodeMultiplyCardinalities() { long[] validCard = {0, 1, 2, Long.MAX_VALUE / 2, Long.MAX_VALUE}; long unknown = -1; + long invalid = -2; // Case 1: both argument is valid. for (long c1 : validCard) { for (long c2 : validCard) { - assertEquals(c1 + " * " + c2, MathUtil.saturatingMultiply(c1, c2), - MathUtil.saturatingMultiplyCardinalities(c1, c2)); + assertEquals(c1 + " * " + c2, LongMath.saturatedMultiply(c1, c2), + MathUtil.multiplyCardinalities(c1, c2)); } } // Case 2: One argument is valid, the other is unknown. for (long c : validCard) { - assertEquals(c + " * " + unknown, unknown, - MathUtil.saturatingMultiplyCardinalities(c, unknown)); - assertEquals(unknown + " * " + c, unknown, - MathUtil.saturatingMultiplyCardinalities(unknown, c)); + assertEquals( + c + " * " + unknown, unknown, MathUtil.multiplyCardinalities(c, unknown)); + assertEquals( + unknown + " * " + c, unknown, MathUtil.multiplyCardinalities(unknown, c)); } // Case 3: both argument is unknown. assertEquals(unknown + " * " + unknown, unknown, - MathUtil.saturatingMultiplyCardinalities(unknown, unknown)); + MathUtil.multiplyCardinalities(unknown, unknown)); + // Case 4: one argument is invalid. + try { + MathUtil.multiplyCardinalities(invalid, 1); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("cardinality1 is invalid: -2")); + } + // Case 5: first argument is unknown and second argument is invalid. + try { + MathUtil.multiplyCardinalities(unknown, invalid); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("cardinality2 is invalid: -2")); + } } @Test @@ -82,4 +99,80 @@ public class MathUtilTest { assertEquals(Long.MAX_VALUE, MathUtil.saturatingAdd(Long.MAX_VALUE - 10, 11)); assertEquals(Long.MAX_VALUE, MathUtil.saturatingAdd(Long.MAX_VALUE, Long.MAX_VALUE / 2)); } + + @Test + public void testPlanNodeAddCardinalities() { + long[] validCard = {0, 1, 2, Long.MAX_VALUE / 2, Long.MAX_VALUE}; + long unknown = -1; + long invalid = -2; + + // Case 1: both argument is valid. + for (long c1 : validCard) { + for (long c2 : validCard) { + assertEquals(c1 + " * " + c2, LongMath.saturatedAdd(c1, c2), + MathUtil.addCardinalities(c1, c2)); + } + } + // Case 2: One argument is valid, the other is unknown. + for (long c : validCard) { + assertEquals(c + " * " + unknown, unknown, MathUtil.addCardinalities(c, unknown)); + assertEquals(unknown + " * " + c, unknown, MathUtil.addCardinalities(unknown, c)); + } + // Case 3: both argument is unknown. + assertEquals( + unknown + " * " + unknown, unknown, MathUtil.addCardinalities(unknown, unknown)); + // Case 4: one argument is invalid. + try { + MathUtil.addCardinalities(invalid, 1); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("cardinality1 is invalid: -2")); + } + // Case 5: first argument is unknown and second argument is invalid. + try { + MathUtil.addCardinalities(unknown, invalid); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("cardinality2 is invalid: -2")); + } + } + + @Test + public void testSmallestValidCardinality() { + long[] validCard = {0, 1, Long.MAX_VALUE}; + long unknown = -1; + long invalid = -2; + + // Case 1: both argument is valid. + for (long c1 : validCard) { + for (long c2 : validCard) { + assertEquals(c1 + " vs " + c2, Math.min(c1, c2), + MathUtil.smallestValidCardinality(c1, c2)); + } + } + // Case 2: One argument is valid, the other is unknown. + for (long c : validCard) { + assertEquals( + c + " vs " + unknown, c, MathUtil.smallestValidCardinality(c, unknown)); + assertEquals( + unknown + " vs " + c, c, MathUtil.smallestValidCardinality(unknown, c)); + } + // Case 3: both argument is unknown. + assertEquals(unknown + " vs " + unknown, unknown, + MathUtil.smallestValidCardinality(unknown, unknown)); + // Case 4: one argument is invalid. + try { + MathUtil.smallestValidCardinality(invalid, 1); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("cardinality1 is invalid: -2")); + } + // Case 5: first argument is unknown and second argument is invalid. + try { + MathUtil.smallestValidCardinality(unknown, invalid); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("cardinality2 is invalid: -2")); + } + } } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index 9731bf44a..2c0b7c156 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -975,19 +975,19 @@ PLAN-ROOT SINK in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=12.00MB Threads=3 -Per-Host Resource Estimates: Memory=75MB +Per-Host Resource Estimates: Memory=85MB WARNING: The following tables are missing relevant table and/or column statistics. tpch_avro.orders Analyzed query: SELECT * FROM tpch_avro.orders F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=10.27MB mem-reservation=4.00MB thread-reservation=1 +| Per-Host Resources: mem-estimate=20.27MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: tpch_avro.orders.o_orderkey, tpch_avro.orders.o_custkey, tpch_avro.orders.o_orderstatus, tpch_avro.orders.o_totalprice, tpch_avro.orders.o_orderdate, tpch_avro.orders.o_orderpriority, tpch_avro.orders.o_clerk, tpch_avro.orders.o_shippriority, tpch_avro.orders.o_comment | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] -| mem-estimate=275.97KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.27MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=88B cardinality=unavailable | in pipelines: 00(GETNEXT) | @@ -1029,19 +1029,19 @@ PLAN-ROOT SINK in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=12.00MB Threads=3 -Per-Host Resource Estimates: Memory=42MB +Per-Host Resource Estimates: Memory=52MB WARNING: The following tables are missing relevant table and/or column statistics. tpch_rc.customer Analyzed query: SELECT * FROM tpch_rc.customer F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=10.08MB mem-reservation=4.00MB thread-reservation=1 +| Per-Host Resources: mem-estimate=20.08MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: tpch_rc.customer.c_custkey, tpch_rc.customer.c_name, tpch_rc.customer.c_address, tpch_rc.customer.c_nationkey, tpch_rc.customer.c_phone, tpch_rc.customer.c_acctbal, tpch_rc.customer.c_mktsegment, tpch_rc.customer.c_comment | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] -| mem-estimate=81.92KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.08MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=78B cardinality=unavailable | in pipelines: 00(GETNEXT) | @@ -1058,19 +1058,19 @@ Per-Host Resources: mem-estimate=32.32MB mem-reservation=8.00MB thread-reservati in pipelines: 00(GETNEXT) ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=12.00MB Threads=2 -Per-Host Resource Estimates: Memory=42MB +Per-Host Resource Estimates: Memory=52MB WARNING: The following tables are missing relevant table and/or column statistics. tpch_rc.customer Analyzed query: SELECT * FROM tpch_rc.customer F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=10.08MB mem-reservation=4.00MB thread-reservation=1 +| Per-Instance Resources: mem-estimate=20.08MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: tpch_rc.customer.c_custkey, tpch_rc.customer.c_name, tpch_rc.customer.c_address, tpch_rc.customer.c_nationkey, tpch_rc.customer.c_phone, tpch_rc.customer.c_acctbal, tpch_rc.customer.c_mktsegment, tpch_rc.customer.c_comment | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] -| mem-estimate=81.92KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.08MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=78B cardinality=unavailable | in pipelines: 00(GETNEXT) | @@ -1112,19 +1112,19 @@ PLAN-ROOT SINK in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=12.00MB Threads=3 -Per-Host Resource Estimates: Memory=27MB +Per-Host Resource Estimates: Memory=37MB WARNING: The following tables are missing relevant table and/or column statistics. tpcds_seq_snap.web_returns Analyzed query: SELECT * FROM tpcds_seq_snap.web_returns F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=10.11MB mem-reservation=4.00MB thread-reservation=1 +| Per-Host Resources: mem-estimate=20.11MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: tpcds_seq_snap.web_returns.wr_returned_date_sk, tpcds_seq_snap.web_returns.wr_returned_time_sk, tpcds_seq_snap.web_returns.wr_item_sk, tpcds_seq_snap.web_returns.wr_refunded_customer_sk, tpcds_seq_snap.web_returns.wr_refunded_cdemo_sk, tpcds_seq_snap.web_returns.wr_refunded_hdemo_sk, tpcds_seq_snap.web_returns.wr_refunded_addr_sk, tpcds_seq_snap.web_returns.wr_returning_customer_sk, tpcds_seq_snap.web_returns.wr_returning_cdemo_sk, tpcds_seq_snap.web_returns.wr_returning [...] | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] -| mem-estimate=107.90KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=104B cardinality=unavailable | in pipelines: 00(GETNEXT) | @@ -1141,19 +1141,19 @@ Per-Host Resources: mem-estimate=16.42MB mem-reservation=8.00MB thread-reservati in pipelines: 00(GETNEXT) ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=12.00MB Threads=2 -Per-Host Resource Estimates: Memory=27MB +Per-Host Resource Estimates: Memory=37MB WARNING: The following tables are missing relevant table and/or column statistics. tpcds_seq_snap.web_returns Analyzed query: SELECT * FROM tpcds_seq_snap.web_returns F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=10.11MB mem-reservation=4.00MB thread-reservation=1 +| Per-Instance Resources: mem-estimate=20.11MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: tpcds_seq_snap.web_returns.wr_returned_date_sk, tpcds_seq_snap.web_returns.wr_returned_time_sk, tpcds_seq_snap.web_returns.wr_item_sk, tpcds_seq_snap.web_returns.wr_refunded_customer_sk, tpcds_seq_snap.web_returns.wr_refunded_cdemo_sk, tpcds_seq_snap.web_returns.wr_refunded_hdemo_sk, tpcds_seq_snap.web_returns.wr_refunded_addr_sk, tpcds_seq_snap.web_returns.wr_returning_customer_sk, tpcds_seq_snap.web_returns.wr_returning_cdemo_sk, tpcds_seq_snap.web_returns.wr_returning [...] | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] -| mem-estimate=107.90KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=104B cardinality=unavailable | in pipelines: 00(GETNEXT) | @@ -1325,19 +1325,19 @@ PLAN-ROOT SINK in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=4.09MB Threads=3 -Per-Host Resource Estimates: Memory=27MB +Per-Host Resource Estimates: Memory=37MB WARNING: The following tables are missing relevant table and/or column statistics. functional.alltypesmixedformat Analyzed query: SELECT * FROM functional.alltypesmixedformat F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=10.25MB mem-reservation=4.00MB thread-reservation=1 +| Per-Host Resources: mem-estimate=20.25MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: functional.alltypesmixedformat.id, functional.alltypesmixedformat.bool_col, functional.alltypesmixedformat.tinyint_col, functional.alltypesmixedformat.smallint_col, functional.alltypesmixedformat.int_col, functional.alltypesmixedformat.bigint_col, functional.alltypesmixedformat.float_col, functional.alltypesmixedformat.double_col, functional.alltypesmixedformat.date_string_col, functional.alltypesmixedformat.string_col, functional.alltypesmixedformat.timestamp_col, funct [...] | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] -| mem-estimate=251.97KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.25MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=80B cardinality=unavailable | in pipelines: 00(GETNEXT) | @@ -1355,19 +1355,19 @@ Per-Host Resources: mem-estimate=16.33MB mem-reservation=88.00KB thread-reservat in pipelines: 00(GETNEXT) ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=4.17MB Threads=3 -Per-Host Resource Estimates: Memory=43MB +Per-Host Resource Estimates: Memory=53MB WARNING: The following tables are missing relevant table and/or column statistics. functional.alltypesmixedformat Analyzed query: SELECT * FROM functional.alltypesmixedformat F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=10.33MB mem-reservation=4.00MB thread-reservation=1 +| Per-Instance Resources: mem-estimate=20.33MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: functional.alltypesmixedformat.id, functional.alltypesmixedformat.bool_col, functional.alltypesmixedformat.tinyint_col, functional.alltypesmixedformat.smallint_col, functional.alltypesmixedformat.int_col, functional.alltypesmixedformat.bigint_col, functional.alltypesmixedformat.float_col, functional.alltypesmixedformat.double_col, functional.alltypesmixedformat.date_string_col, functional.alltypesmixedformat.string_col, functional.alltypesmixedformat.timestamp_col, funct [...] | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 01:EXCHANGE [UNPARTITIONED] -| mem-estimate=335.97KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.33MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=80B cardinality=unavailable | in pipelines: 00(GETNEXT) | @@ -1431,7 +1431,7 @@ Per-Host Resources: mem-estimate=592.00KB mem-reservation=0B thread-reservation= table: rows=unavailable columns: unavailable mem-estimate=256.00KB mem-reservation=0B thread-reservation=0 - tuple-ids=0 row-size=80B cardinality=14.23K + tuple-ids=0 row-size=80B cardinality=14.30K in pipelines: 00(GETNEXT) ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=4.00MB Threads=2 @@ -4396,7 +4396,7 @@ PLAN-ROOT SINK in pipelines: 02(GETNEXT) ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=253.75MB Threads=11 -Per-Host Resource Estimates: Memory=2.72GB +Per-Host Resource Estimates: Memory=2.73GB WARNING: The following tables are missing relevant table and/or column statistics. tpch_avro.customer Analyzed query: SELECT c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, @@ -4441,7 +4441,7 @@ Per-Host Resources: mem-estimate=138.23MB mem-reservation=34.00MB thread-reserva | in pipelines: 02(GETNEXT) | F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3 -Per-Host Resources: mem-estimate=2.23GB mem-reservation=144.75MB thread-reservation=1 runtime-filters-memory=4.00MB +Per-Host Resources: mem-estimate=2.24GB mem-reservation=144.75MB thread-reservation=1 runtime-filters-memory=4.00MB 08:AGGREGATE [STREAMING] | output: sum(l_quantity) | group by: c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice @@ -4497,7 +4497,7 @@ Per-Host Resources: mem-estimate=2.23GB mem-reservation=144.75MB thread-reservat | in pipelines: 02(GETNEXT), 00(OPEN) | |--12:EXCHANGE [BROADCAST] -| | mem-estimate=23.98KB mem-reservation=0B thread-reservation=0 +| | mem-estimate=10.02MB mem-reservation=0B thread-reservation=0 | | tuple-ids=0 row-size=20B cardinality=unavailable | | in pipelines: 00(GETNEXT) | | @@ -4558,7 +4558,7 @@ Per-Host Resources: mem-estimate=91.23MB mem-reservation=11.00MB thread-reservat in pipelines: 02(GETNEXT) ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=477.75MB Threads=16 -Per-Host Resource Estimates: Memory=3.26GB +Per-Host Resource Estimates: Memory=3.27GB Analyzed query: SELECT c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, sum(l_quantity) FROM tpch_avro.customer, tpch.orders, tpch.lineitem LEFT SEMI JOIN (SELECT l_orderkey FROM tpch.lineitem GROUP BY l_orderkey HAVING @@ -4665,7 +4665,7 @@ Per-Instance Resources: mem-estimate=139.95MB mem-reservation=34.00MB thread-res | in pipelines: 02(GETNEXT), 00(OPEN) | |--F09:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3 -| | Per-Instance Resources: mem-estimate=2.00GB mem-reservation=69.00MB thread-reservation=1 runtime-filters-memory=1.00MB +| | Per-Instance Resources: mem-estimate=2.01GB mem-reservation=69.00MB thread-reservation=1 runtime-filters-memory=1.00MB | JOIN BUILD | | join-table-id=01 plan-id=02 cohort-id=01 | | build expressions: c_custkey @@ -4673,7 +4673,7 @@ Per-Instance Resources: mem-estimate=139.95MB mem-reservation=34.00MB thread-res | | mem-estimate=2.00GB mem-reservation=68.00MB spill-buffer=2.00MB thread-reservation=0 | | | 12:EXCHANGE [BROADCAST] -| | mem-estimate=23.98KB mem-reservation=0B thread-reservation=0 +| | mem-estimate=10.02MB mem-reservation=0B thread-reservation=0 | | tuple-ids=0 row-size=20B cardinality=unavailable | | in pipelines: 00(GETNEXT) | | diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test index a6b05f870..c60bddd0e 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test @@ -501,25 +501,25 @@ from functional_parquet.alltypes left join functional_parquet.alltypestiny on alltypes.id = alltypestiny.id ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=38.17MB Threads=5 -Per-Host Resource Estimates: Memory=2.04GB +Per-Host Resource Estimates: Memory=2.06GB WARNING: The following tables are missing relevant table and/or column statistics. functional_parquet.alltypes, functional_parquet.alltypestiny Analyzed query: SELECT /* +straight_join */ * FROM functional_parquet.alltypes LEFT OUTER JOIN functional_parquet.alltypestiny ON alltypes.id = alltypestiny.id F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=10.49MB mem-reservation=4.00MB thread-reservation=1 +| Per-Host Resources: mem-estimate=20.49MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: functional_parquet.alltypes.id, functional_parquet.alltypes.bool_col, functional_parquet.alltypes.tinyint_col, functional_parquet.alltypes.smallint_col, functional_parquet.alltypes.int_col, functional_parquet.alltypes.bigint_col, functional_parquet.alltypes.float_col, functional_parquet.alltypes.double_col, functional_parquet.alltypes.date_string_col, functional_parquet.alltypes.string_col, functional_parquet.alltypes.timestamp_col, functional_parquet.alltypes.year, func [...] | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=503.95KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.49MB mem-reservation=0B thread-reservation=0 | tuple-ids=0,1N row-size=160B cardinality=unavailable | in pipelines: 00(GETNEXT) | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 -Per-Host Resources: mem-estimate=2.02GB mem-reservation=34.09MB thread-reservation=2 +Per-Host Resources: mem-estimate=2.03GB mem-reservation=34.09MB thread-reservation=2 02:HASH JOIN [LEFT OUTER JOIN, BROADCAST] | hash predicates: alltypes.id = alltypestiny.id | fk/pk conjuncts: assumed fk/pk @@ -528,7 +528,7 @@ Per-Host Resources: mem-estimate=2.02GB mem-reservation=34.09MB thread-reservati | in pipelines: 00(GETNEXT), 01(OPEN) | |--03:EXCHANGE [BROADCAST] -| | mem-estimate=251.92KB mem-reservation=0B thread-reservation=0 +| | mem-estimate=10.25MB mem-reservation=0B thread-reservation=0 | | tuple-ids=1 row-size=80B cardinality=unavailable | | in pipelines: 01(GETNEXT) | | @@ -557,20 +557,20 @@ Per-Host Resources: mem-estimate=2.02GB mem-reservation=34.09MB thread-reservati in pipelines: 00(GETNEXT) ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=72.34MB Threads=6 -Per-Host Resource Estimates: Memory=2.08GB +Per-Host Resource Estimates: Memory=2.10GB WARNING: The following tables are missing relevant table and/or column statistics. functional_parquet.alltypestiny Analyzed query: SELECT /* +straight_join */ * FROM functional_parquet.alltypes LEFT OUTER JOIN functional_parquet.alltypestiny ON alltypes.id = alltypestiny.id F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=10.98MB mem-reservation=4.00MB thread-reservation=1 +| Per-Instance Resources: mem-estimate=20.98MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: functional_parquet.alltypes.id, functional_parquet.alltypes.bool_col, functional_parquet.alltypes.tinyint_col, functional_parquet.alltypes.smallint_col, functional_parquet.alltypes.int_col, functional_parquet.alltypes.bigint_col, functional_parquet.alltypes.float_col, functional_parquet.alltypes.double_col, functional_parquet.alltypes.date_string_col, functional_parquet.alltypes.string_col, functional_parquet.alltypes.timestamp_col, functional_parquet.alltypes.year, func [...] | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=1007.95KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.98MB mem-reservation=0B thread-reservation=0 | tuple-ids=0,1N row-size=160B cardinality=unavailable | in pipelines: 00(GETNEXT) | @@ -585,14 +585,14 @@ Per-Instance Resources: mem-estimate=16.66MB mem-reservation=88.00KB thread-rese | in pipelines: 00(GETNEXT), 01(OPEN) | |--F03:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 -| | Per-Instance Resources: mem-estimate=2.00GB mem-reservation=68.00MB thread-reservation=1 +| | Per-Instance Resources: mem-estimate=2.01GB mem-reservation=68.00MB thread-reservation=1 | JOIN BUILD | | join-table-id=00 plan-id=01 cohort-id=01 | | build expressions: alltypestiny.id | | mem-estimate=2.00GB mem-reservation=68.00MB spill-buffer=2.00MB thread-reservation=0 | | | 03:EXCHANGE [BROADCAST] -| | mem-estimate=335.92KB mem-reservation=0B thread-reservation=0 +| | mem-estimate=10.33MB mem-reservation=0B thread-reservation=0 | | tuple-ids=1 row-size=80B cardinality=unavailable | | in pipelines: 01(GETNEXT) | | @@ -1011,25 +1011,25 @@ from functional_parquet.alltypestiny group by string_col ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=72.01MB Threads=4 -Per-Host Resource Estimates: Memory=282MB +Per-Host Resource Estimates: Memory=302MB WARNING: The following tables are missing relevant table and/or column statistics. functional_parquet.alltypestiny Analyzed query: SELECT string_col, count(*) FROM functional_parquet.alltypestiny GROUP BY string_col F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Host Resources: mem-estimate=10.07MB mem-reservation=4.00MB thread-reservation=1 +| Per-Host Resources: mem-estimate=20.07MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: string_col, count(*) | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=71.99KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.07MB mem-reservation=0B thread-reservation=0 | tuple-ids=1 row-size=20B cardinality=unavailable | in pipelines: 03(GETNEXT) | F01:PLAN FRAGMENT [HASH(string_col)] hosts=3 instances=3 -Per-Host Resources: mem-estimate=128.09MB mem-reservation=34.00MB thread-reservation=1 +Per-Host Resources: mem-estimate=138.07MB mem-reservation=34.00MB thread-reservation=1 03:AGGREGATE [FINALIZE] | output: count:merge(*) | group by: string_col @@ -1038,7 +1038,7 @@ Per-Host Resources: mem-estimate=128.09MB mem-reservation=34.00MB thread-reserva | in pipelines: 03(GETNEXT), 00(OPEN) | 02:EXCHANGE [HASH(string_col)] -| mem-estimate=71.99KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.07MB mem-reservation=0B thread-reservation=0 | tuple-ids=1 row-size=20B cardinality=unavailable | in pipelines: 00(GETNEXT) | @@ -1063,25 +1063,25 @@ Per-Host Resources: mem-estimate=144.28MB mem-reservation=34.01MB thread-reserva in pipelines: 00(GETNEXT) ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=140.02MB Threads=5 -Per-Host Resource Estimates: Memory=555MB +Per-Host Resource Estimates: Memory=585MB WARNING: The following tables are missing relevant table and/or column statistics. functional_parquet.alltypestiny Analyzed query: SELECT string_col, count(*) FROM functional_parquet.alltypestiny GROUP BY string_col F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=10.09MB mem-reservation=4.00MB thread-reservation=1 +| Per-Instance Resources: mem-estimate=20.09MB mem-reservation=4.00MB thread-reservation=1 PLAN-ROOT SINK | output exprs: string_col, count(*) | mem-estimate=10.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 | 04:EXCHANGE [UNPARTITIONED] -| mem-estimate=95.99KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.09MB mem-reservation=0B thread-reservation=0 | tuple-ids=1 row-size=20B cardinality=unavailable | in pipelines: 03(GETNEXT) | F01:PLAN FRAGMENT [HASH(string_col)] hosts=3 instances=4 -Per-Instance Resources: mem-estimate=128.09MB mem-reservation=34.00MB thread-reservation=1 +Per-Instance Resources: mem-estimate=138.09MB mem-reservation=34.00MB thread-reservation=1 03:AGGREGATE [FINALIZE] | output: count:merge(*) | group by: string_col @@ -1090,7 +1090,7 @@ Per-Instance Resources: mem-estimate=128.09MB mem-reservation=34.00MB thread-res | in pipelines: 03(GETNEXT), 00(OPEN) | 02:EXCHANGE [HASH(string_col)] -| mem-estimate=95.99KB mem-reservation=0B thread-reservation=0 +| mem-estimate=10.09MB mem-reservation=0B thread-reservation=0 | tuple-ids=1 row-size=20B cardinality=unavailable | in pipelines: 00(GETNEXT) | diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test index d7224986b..e54e15005 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test @@ -748,9 +748,9 @@ select *, ss_item_sk as part_col from tpcds_seq_snap.store_sales ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=102.00MB Threads=13 -Per-Host Resource Estimates: Memory=381MB +Per-Host Resource Estimates: Memory=391MB F01:PLAN FRAGMENT [HASH(ss_item_sk)] hosts=10 instances=10 -| Per-Instance Resources: mem-estimate=140.19MB mem-reservation=6.00MB thread-reservation=1 +| Per-Instance Resources: mem-estimate=150.19MB mem-reservation=6.00MB thread-reservation=1 | max-parallelism=10 segment-costs=[0, 3954235] WRITE TO HDFS [tpcds_partitioned_parquet_snap.store_sales_without_stats, OVERWRITE=false, PARTITION-KEYS=(ss_item_sk)] | output exprs: ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, ss_net_profit, ss_sold_date_sk, ss_item_sk @@ -763,7 +763,7 @@ WRITE TO HDFS [tpcds_partitioned_parquet_snap.store_sales_without_stats, OVERWRI | in pipelines: 02(GETNEXT), 00(OPEN) | 01:EXCHANGE [HASH(ss_item_sk)] -| mem-estimate=12.19MB mem-reservation=0B thread-reservation=0 +| mem-estimate=22.19MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=100B cardinality=unavailable cost=0 | in pipelines: 00(GETNEXT) | diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test index 95b3ddb47..d573fbdfa 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test @@ -780,7 +780,7 @@ WRITE TO HDFS [tpcds_partitioned_parquet_snap.store_sales_without_stats, OVERWRI | in pipelines: 02(GETNEXT), 00(OPEN) | 01:EXCHANGE [HASH(ss_item_sk)] -| mem-estimate=12.19MB mem-reservation=0B thread-reservation=0 +| mem-estimate=22.19MB mem-reservation=0B thread-reservation=0 | tuple-ids=0 row-size=100B cardinality=unavailable cost=0 | in pipelines: 00(GETNEXT) | diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test index 2c62d965e..937939745 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test +++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test @@ -81,7 +81,7 @@ from functional_avro.alltypes t1 left outer join functional_avro.alltypes t3 on (t2.id = t3.id) where t1.month = 1 and t2.year = 2009 and t3.bool_col = false ---- RESULTS: VERIFY_IS_SUBSET -'Per-Host Resource Estimates: Memory=4.05GB' +'Per-Host Resource Estimates: Memory=4.07GB' 'WARNING: The following tables are missing relevant table and/or column statistics.' 'functional_avro.alltypes, functional_parquet.alltypessmall' ====
