This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 29ad046d05869bed7489bc487636e0f64b3328aa Author: Qifan Chen <[email protected]> AuthorDate: Thu Sep 22 16:36:34 2022 -0400 IMPALA-11604 (part 1): Model ProcessingCost for PlanNodes & DataSink This patch augments IMPALA-10992 by establishing a model to allow the weighted total amount of data to process to be used as a new factor in the definition and selection of an executor group. We call this model ProcessingCost. ProcessingCost of a PlanNode/DataSink is a weighted amount of data processed by that node/sink. The basic ProcessingCost is computed with a general formula as follows. ProcessingCost is a pair: PC(D, N), where D = I * (C + M) where D is the weighted amount of data processed I is the input cardinality C is the expression evaluation cost per row. Set to total weight of expression evaluation in node/sink. M is a materialization cost per row. Only used by scan and exchange node. Otherwise, 0. N is the number of instances. Default to D / 10000000. In this patch, the weight of each expression evaluation is set to a constant of 1. A description of the computation for each kind of PlanNode/DataSink is given below. 01. AggregationNode: Each AggregateInfo has its C as a sum of grouping expression and aggregate expression and then assigned a single ProcessingCost individually. These ProcessingCosts then summed to be the Aggregation node's ProcessingCost; 02. AnalyticEvalNode: C is the sum of the evaluation costs for analytic functions; 03. CardinalityCheckNode: Use the general formula, I = 1; 04. DataSourceScanNode: Follow the formula from the superclass ScanNode; 05. EmptySetNode: I = 0; 06. ExchangeNode: M = (average serialized row size) / 1024 A modification of the general formula when in broadcast mode: D = D * number of receivers; 07. HashJoinNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equi-join predicate), N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 08. HbaseScanNode, HdfsScanNode, and KuduScanNode: Follow the formula from the superclass ScanNode; 09. Nested loop join node: When the right child is not a SingularRowSrcNode: probe cost = PC(I0 * C(equiJoin predicate), N) + PC(output cardinality * C(otherJoin predicate), N) build cost = PC(I1 * C(equiJoin predicate), N) When the right child is a SingularRowSrcNode: probe cost = PC(I0, N) build cost = PC(I0 * I1, N) With I0 and I1 as input cardinality of the probe and build side accordingly. If the plan node does not have a separate build, ProcessingCost is the sum of probe cost and build cost. Otherwise, ProcessingCost is equal to probeCost. 10. ScanNode: M = (average row size) / 1024; 11. SelectNode: Use the general formula; 12. SingularRowSrcNode: Since the node is involved once per input in nested loop join, the contribution of this node is computed in nested loop join; 13. SortNode: C is the evaluation cost for the sort expression; 14. SubplanNode: C is 1. I is the multiplication of the cardinality of the left and the right child; 15. Union node: C is the cost of result expression evaluation from all non-pass-through children; 16. Unnest node: I is the cardinality of the containing SubplanNode and C is 1. 17. DataStreamSink: M = 1 / num rows per batch. 18. JoinBuildSink: ProcessingCost is the build cost of its associated JoinNode. 19. PlanRootSink: If result spooling is enabled, C is the cost of output expression evaluation. Otherwise. ProcessingCost is zero. 20. TableSink: C is the cost of output expression evaluation. TableSink subclasses (including HBaseTableSink, HdfsTableSink, and KuduTableSink) follows the same formula; Part 2 of IMPALA-11604 will implement an algorithm that tries to adjust the number of instances for each fragment by considering their production-consumption ratio, and then finally returns a number representing an ideal CPU core count required for a query to run efficiently. Testing: - Pass FE tests. Co-authored-by: Riza Suminto <[email protected]> Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a Reviewed-on: http://gerrit.cloudera.org:8080/19033 Reviewed-by: Wenzhe Zhou <[email protected]> Reviewed-by: Kurt Deschler <[email protected]> Reviewed-by: Riza Suminto <[email protected]> Tested-by: Riza Suminto <[email protected]> --- .../org/apache/impala/analysis/AggregateInfo.java | 9 + .../java/org/apache/impala/analysis/SortInfo.java | 8 + .../org/apache/impala/planner/AggregationNode.java | 11 + .../apache/impala/planner/AnalyticEvalNode.java | 12 + .../apache/impala/planner/BaseProcessingCost.java | 69 +++++ .../impala/planner/BroadcastProcessingCost.java | 74 +++++ .../impala/planner/CardinalityCheckNode.java | 5 + .../java/org/apache/impala/planner/DataSink.java | 33 ++- .../apache/impala/planner/DataSourceScanNode.java | 5 + .../org/apache/impala/planner/DataStreamSink.java | 12 +- .../org/apache/impala/planner/EmptySetNode.java | 10 + .../org/apache/impala/planner/ExchangeNode.java | 54 +++- .../org/apache/impala/planner/HBaseScanNode.java | 6 +- .../org/apache/impala/planner/HBaseTableSink.java | 6 + .../org/apache/impala/planner/HashJoinNode.java | 27 ++ .../org/apache/impala/planner/HdfsScanNode.java | 5 + .../org/apache/impala/planner/HdfsTableSink.java | 8 +- .../org/apache/impala/planner/JoinBuildSink.java | 8 + .../java/org/apache/impala/planner/JoinNode.java | 21 ++ .../org/apache/impala/planner/KuduScanNode.java | 5 + .../org/apache/impala/planner/KuduTableSink.java | 7 +- .../apache/impala/planner/NestedLoopJoinNode.java | 47 ++++ .../java/org/apache/impala/planner/PlanNode.java | 71 ++++- .../org/apache/impala/planner/PlanRootSink.java | 14 + .../org/apache/impala/planner/ProcessingCost.java | 306 +++++++++++++++++++++ .../impala/planner/ScaledProcessingCost.java | 65 +++++ .../java/org/apache/impala/planner/ScanNode.java | 19 ++ .../java/org/apache/impala/planner/SelectNode.java | 5 + .../apache/impala/planner/SingularRowSrcNode.java | 8 + .../java/org/apache/impala/planner/SortNode.java | 6 + .../org/apache/impala/planner/SubplanNode.java | 5 + .../apache/impala/planner/SumProcessingCost.java | 61 ++++ .../java/org/apache/impala/planner/TableSink.java | 8 +- .../java/org/apache/impala/planner/UnionNode.java | 21 ++ .../java/org/apache/impala/planner/UnnestNode.java | 7 +- .../main/java/org/apache/impala/util/ExprUtil.java | 19 ++ 36 files changed, 1033 insertions(+), 24 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java index 865d1d15b..324d64829 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java +++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java @@ -25,6 +25,8 @@ import org.apache.impala.catalog.AggregateFunction; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.InternalException; +import org.apache.impala.planner.ProcessingCost; +import org.apache.impala.util.ExprUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -717,4 +719,11 @@ public class AggregateInfo extends AggregateInfoBase { @Override public AggregateInfo clone() { return new AggregateInfo(this); } + + public ProcessingCost computeProcessingCost(String label, long inputCardinality) { + float weight = ExprUtil.computeExprsTotalCost(getGroupingExprs()) + + ExprUtil.computeExprsTotalCost(getAggregateExprs()); + + return ProcessingCost.basicCost(label, inputCardinality, weight); + } } diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java index 995c686eb..e6d802ede 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java +++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java @@ -26,7 +26,9 @@ import java.util.Set; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.TreeNode; import org.apache.impala.planner.PlanNode; +import org.apache.impala.planner.ProcessingCost; import org.apache.impala.thrift.TSortingOrder; +import org.apache.impala.util.ExprUtil; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; @@ -313,4 +315,10 @@ public class SortInfo { } return result; } + + public ProcessingCost computeProcessingCost(String label, long inputCardinality) { + float weight = ExprUtil.computeExprsTotalCost(getSortExprs()); + + return ProcessingCost.basicCost(label, inputCardinality, weight); + } } diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java index b9741ff7b..e4eea634c 100644 --- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java @@ -77,6 +77,7 @@ public class AggregationNode extends PlanNode { private boolean useStreamingPreagg_ = false; // Resource profiles for each aggregation class. + // Set in computeNodeResourceProfile(). private List<ResourceProfile> resourceProfiles_; // Conservative minimum size of hash table for low-cardinality aggregations. @@ -505,6 +506,16 @@ public class AggregationNode extends PlanNode { return output; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = ProcessingCost.zero(); + for (AggregateInfo aggInfo : aggInfos_) { + ProcessingCost aggCost = + aggInfo.computeProcessingCost(getDisplayLabel(), getChild(0).getCardinality()); + processingCost_ = ProcessingCost.sumCost(processingCost_, aggCost); + } + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { resourceProfiles_ = Lists.newArrayListWithCapacity(aggInfos_.size()); diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java index d4fb6abf4..2ab23a643 100644 --- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java @@ -45,6 +45,7 @@ import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.util.ExprUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -357,6 +358,17 @@ public class AnalyticEvalNode extends PlanNode { return output.toString(); } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // The total cost per row is the sum of the evaluation costs for analytic functions, + // partition by equal and order by equal predicate. 'partitionByEq_' and 'orderByEq_' + // are excluded since the input data stream is already partitioned and sorted within + // each partition (see notes on class AnalyticEvalNode in analytic-eval-node.h). + float totalCostToEvalOneRow = ExprUtil.computeExprsTotalCost(analyticFnCalls_); + processingCost_ = ProcessingCost.basicCost( + getDisplayLabel(), getCardinality(), totalCostToEvalOneRow); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { Preconditions.checkNotNull( diff --git a/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java new file mode 100644 index 000000000..3f6de5ffd --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +/** + * A basic implementation of {@link ProcessingCost} that takes account expression cost + * and average row size as per-row costing weight. + */ +public class BaseProcessingCost extends ProcessingCost { + private final long cardinality_; + private final float exprsCost_; + private final float materializationCost_; + + public BaseProcessingCost( + long cardinality, float exprsCost, float materializationCost) { + // TODO: materializationCost accommodate ProcessingCost where row width should be + // factor in. Currently, ProcessingCost of ScanNode, ExchangeNode, and DataStreamSink + // has row width factored in through materialization parameter here. Investigate if + // other operator need to have its row width factored in as well and whether we should + // have specific 'rowWidth' parameter here. + cardinality_ = cardinality; + exprsCost_ = exprsCost; + materializationCost_ = materializationCost; + } + + private float costFactor() { return exprsCost_ + materializationCost_; } + + @Override + public long getTotalCost() { + // Total cost must be non-negative. + return (long) Math.ceil(Math.max(cardinality_, 0) * costFactor()); + } + + @Override + public boolean isValid() { + return cardinality_ >= 0; + } + + @Override + public ProcessingCost clone() { + return new BaseProcessingCost(cardinality_, exprsCost_, materializationCost_); + } + + @Override + public String getDetails() { + StringBuilder output = new StringBuilder(); + output.append(super.getDetails()); + output.append(" cardinality=") + .append(cardinality_) + .append(" cost-factor=") + .append(costFactor()); + return output.toString(); + } +} diff --git a/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java new file mode 100644 index 000000000..d031ecea6 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import com.google.common.base.Preconditions; + +import org.apache.impala.util.MathUtil; + +import java.util.function.Supplier; + +/** + * Similar as {@link ScaledProcessingCost}, but the multiple (countSupplier) represent + * fragment instance count associated with this ProcessingCost and may change after the + * object construction. + * <p> + * countSupplier must always return positive value. + */ +public class BroadcastProcessingCost extends ProcessingCost { + private final ProcessingCost childProcessingCost_; + + protected BroadcastProcessingCost( + ProcessingCost cost, Supplier<Integer> countSupplier) { + Preconditions.checkArgument( + cost.isValid(), "BroadcastProcessingCost: cost is invalid!"); + childProcessingCost_ = cost; + setNumInstanceExpected(countSupplier); + } + + @Override + public long getTotalCost() { + return MathUtil.saturatingMultiply( + childProcessingCost_.getTotalCost(), getNumInstancesExpected()); + } + + @Override + public boolean isValid() { + return getNumInstancesExpected() > 0; + } + + @Override + public ProcessingCost clone() { + return new BroadcastProcessingCost(childProcessingCost_, numInstanceSupplier_); + } + + @Override + public String getExplainString(String detailPrefix, boolean fullExplain) { + StringBuilder sb = new StringBuilder(); + sb.append(detailPrefix); + sb.append("BroadcastCost("); + sb.append(getNumInstancesExpected()); + sb.append("): "); + sb.append(getDetails()); + if (fullExplain) { + sb.append("\n"); + sb.append(childProcessingCost_.getExplainString(detailPrefix + " ", true)); + } + return sb.toString(); + } +} diff --git a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java index 00cd67fdd..78d32d8ac 100644 --- a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java +++ b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java @@ -78,6 +78,11 @@ public class CardinalityCheckNode extends PlanNode { msg.setCardinality_check_node(cardinalityCheckNode); } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = computeDefaultProcessingCost(); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { nodeResourceProfile_ = ResourceProfile.noReservation(0); diff --git a/fe/src/main/java/org/apache/impala/planner/DataSink.java b/fe/src/main/java/org/apache/impala/planner/DataSink.java index 2cacb4d3d..4b665b4c4 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataSink.java +++ b/fe/src/main/java/org/apache/impala/planner/DataSink.java @@ -45,6 +45,10 @@ public abstract class DataSink { // set in computeResourceProfile() protected ResourceProfile resourceProfile_ = ResourceProfile.invalid(); + // A total processing cost across all instances of this plan node. + // Set in computeProcessingCost() for a meaningful value. + protected ProcessingCost processingCost_ = ProcessingCost.invalid(); + /** * Return an explain string for the DataSink. Each line of the explain will be prefixed * by "prefix". @@ -56,6 +60,19 @@ public abstract class DataSink { if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { output.append(detailPrefix); output.append(resourceProfile_.getExplainString()); + if (ProcessingCost.isComputeCost(queryOptions)) { + // Show processing cost total. + output.append(" cost="); + if (processingCost_.isValid()) { + output.append(processingCost_.getTotalCost()); + if (explainLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) { + output.append("\n"); + output.append(processingCost_.getExplainString(detailPrefix, false)); + } + } else { + output.append("<invalid>"); + } + } output.append("\n"); } return output.toString(); @@ -107,15 +124,29 @@ public abstract class DataSink { public void setFragment(PlanFragment fragment) { fragment_ = fragment; } public PlanFragment getFragment() { return fragment_; } public ResourceProfile getResourceProfile() { return resourceProfile_; } + public ProcessingCost getProcessingCost() { return processingCost_; } + + public abstract void computeProcessingCost(TQueryOptions queryOptions); /** * Compute the resource profile for an instance of this DataSink. */ public abstract void computeResourceProfile(TQueryOptions queryOptions); + /** + * Set number of rows consumed and produced data fields in processing cost. + */ + public void computeRowConsumptionAndProductionToCost() { + Preconditions.checkState(processingCost_.isValid(), + "Processing cost of DataSink " + fragment_.getId() + ":" + getLabel() + + " is invalid!"); + long inputOutputCardinality = fragment_.getPlanRoot().getCardinality(); + processingCost_.setNumRowToConsume(inputOutputCardinality); + processingCost_.setNumRowToProduce(inputOutputCardinality); + } + /** * Collect all expressions evaluated by this data sink. */ public abstract void collectExprs(List<Expr> exprs); - } diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java index c89b2760f..7cafa1840 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java @@ -332,6 +332,11 @@ public class DataSourceScanNode extends ScanNode { new TScanRange(), Lists.newArrayList(new TScanRangeLocation(hostIndex)))); } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = computeDefaultProcessingCost(); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // This node fetches a thrift representation of the rows from the data diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java index 06b8f31b0..fcf1f266e 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java +++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java @@ -71,9 +71,7 @@ public class DataStreamSink extends DataSink { private long estimateOutboundRowBatchBuffers(TQueryOptions queryOptions) { int numChannels = outputPartition_.isPartitioned() ? exchNode_.getFragment().getNumInstances() : 1; - long rowBatchSize = queryOptions.isSetBatch_size() && queryOptions.batch_size > 0 ? - queryOptions.batch_size : - PlanNode.DEFAULT_ROWBATCH_SIZE; + long rowBatchSize = PlanNode.getRowBatchSize(queryOptions); long avgOutboundRowBatchSize = Math.min( (long) Math.ceil(rowBatchSize * exchNode_.getAvgSerializedRowSize(exchNode_)), PlanNode.ROWBATCH_MAX_MEM_USAGE); @@ -86,6 +84,14 @@ public class DataStreamSink extends DataSink { return bufferSize; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // The sending part of the processing cost for the exchange node. + processingCost_ = + ProcessingCost.basicCost(getLabel() + "(" + exchNode_.getDisplayLabel() + ")", + exchNode_.getCardinality(), 0, exchNode_.estimateSerializationCostPerRow()); + } + @Override public void computeResourceProfile(TQueryOptions queryOptions) { long estimatedMem = estimateOutboundRowBatchBuffers(queryOptions); diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java index b39a2ae15..02bdcff1f 100644 --- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java +++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java @@ -59,6 +59,11 @@ public class EmptySetNode extends PlanNode { computeStats(analyzer); } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = ProcessingCost.zero(); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate @@ -78,4 +83,9 @@ public class EmptySetNode extends PlanNode { @Override protected boolean displayCardinality(TExplainLevel detailLevel) { return false; } + + @Override + protected boolean isLeafNode() { + return true; + } } diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java index c3856956f..44232b53d 100644 --- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java @@ -29,6 +29,7 @@ import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TSortInfo; +import org.apache.impala.util.ExprUtil; import com.google.common.base.Preconditions; @@ -66,7 +67,7 @@ public class ExchangeNode extends PlanNode { return mergeInfo_ != null; } - private boolean isBroadcastExchange() { + protected boolean isBroadcastExchange() { // If the output of the sink is not partitioned but the target fragment is // partitioned, then the data exchange is broadcast. Preconditions.checkState(!children_.isEmpty()); @@ -178,6 +179,39 @@ public class ExchangeNode extends PlanNode { (exchInput.getTupleIds().size() * PER_TUPLE_SERIALIZATION_OVERHEAD); } + // Return the number of sending instances of this exchange. + public int getNumSenders() { + Preconditions.checkState(!children_.isEmpty()); + Preconditions.checkNotNull(children_.get(0).getFragment()); + return children_.get(0).getFragment().getNumInstances(); + } + + // Return the number of receiving instances of this exchange. + public int getNumReceivers() { + DataSink sink = fragment_.getSink(); + if (sink == null) return 1; + return sink.getFragment().getNumInstances(); + } + + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // The computation for the processing cost for exchange splits into two parts: + // 1. The sending processing cost which is computed in the DataStreamSink of the + // bottom sending fragment; + // 2. The receiving processing cost in the top receiving fragment which is computed + // here. + // Assume serialization and deserialization costs per row are equal. + float conjunctsCost = ExprUtil.computeExprsTotalCost(conjuncts_); + float materializationCost = estimateSerializationCostPerRow(); + processingCost_ = ProcessingCost.basicCost(getDisplayLabel() + "(receiving)", + getChild(0).getCardinality(), conjunctsCost, materializationCost); + + if (isBroadcastExchange()) { + processingCost_ = ProcessingCost.broadcastCost(processingCost_, + () -> getNumReceivers()); + } + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // For non-merging exchanges, one row batch queue is maintained for row @@ -193,9 +227,7 @@ public class ExchangeNode extends PlanNode { // the system load. This makes it difficult to accurately estimate the // memory usage at runtime. The following estimates assume that memory usage will // lean towards the soft limits. - Preconditions.checkState(!children_.isEmpty()); - Preconditions.checkNotNull(children_.get(0).getFragment()); - int numSenders = children_.get(0).getFragment().getNumInstances(); + int numSenders = getNumSenders(); long estimatedTotalQueueByteSize = estimateTotalQueueByteSize(numSenders); long estimatedDeferredRPCQueueSize = estimateDeferredRPCQueueSize(queryOptions, numSenders); @@ -205,12 +237,18 @@ public class ExchangeNode extends PlanNode { nodeResourceProfile_ = ResourceProfile.noReservation(estimatedMem); } + /** + * Estimate per-row serialization/deserialization cost as 1 per 1KB. + */ + protected float estimateSerializationCostPerRow() { + return (float) getAvgSerializedRowSize(this) / 1024; + } + // Returns the estimated size of the deferred batch queue (in bytes) by // assuming that at least one row batch rpc payload per sender is queued. private long estimateDeferredRPCQueueSize(TQueryOptions queryOptions, int numSenders) { - long rowBatchSize = queryOptions.isSetBatch_size() && queryOptions.batch_size > 0 - ? queryOptions.batch_size : DEFAULT_ROWBATCH_SIZE; + long rowBatchSize = getRowBatchSize(queryOptions); // Set an upper limit based on estimated cardinality. if (getCardinality() > 0) rowBatchSize = Math.min(rowBatchSize, getCardinality()); long avgRowBatchByteSize = Math.min( @@ -251,6 +289,10 @@ public class ExchangeNode extends PlanNode { @Override protected void toThrift(TPlanNode msg) { + if (processingCost_.isValid() && processingCost_ instanceof BroadcastProcessingCost) { + Preconditions.checkState( + getNumReceivers() == processingCost_.getNumInstancesExpected()); + } msg.node_type = TPlanNodeType.EXCHANGE_NODE; msg.exchange_node = new TExchangeNode(); for (TupleId tid: tupleIds_) { diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java index 2b764e602..96460b14b 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java @@ -59,7 +59,6 @@ import org.apache.impala.thrift.TScanRangeLocation; import org.apache.impala.thrift.TScanRangeLocationList; import org.apache.impala.thrift.TScanRangeSpec; import org.apache.impala.util.BitUtil; -import org.apache.impala.util.ExecutorMembershipSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -655,6 +654,11 @@ public class HBaseScanNode extends ScanNode { } } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = computeScanProcessingCost(queryOptions); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { FeHBaseTable tbl = (FeHBaseTable) desc_.getTable(); diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java index cfbb335e7..abeb27f09 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java @@ -54,6 +54,12 @@ public class HBaseTableSink extends TableSink { return "HBASE WRITER"; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // The processing cost to export rows. + processingCost_ = computeDefaultProcessingCost(); + } + @Override public void computeResourceProfile(TQueryOptions queryOptions) { resourceProfile_ = ResourceProfile.noReservation(0); diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java index 0fe91eed3..7aeee7433 100644 --- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java @@ -37,6 +37,7 @@ import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.util.BitUtil; +import org.apache.impala.util.ExprUtil; import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; @@ -314,4 +315,30 @@ public class HashJoinNode extends JoinNode { .setMaxRowBufferBytes(maxRowBufferSize).build(); return Pair.create(probeProfile, buildProfile); } + + @Override + public Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost() { + // TODO: The cost should consider conjuncts_ as well. + // Assume 'eqJoinConjuncts_' will be applied to all rows from lhs and rhs side, + // and 'otherJoinConjuncts_' to the resultant rows. + float eqJoinPredicateEvalCost = ExprUtil.computeExprsTotalCost(eqJoinConjuncts_); + float otherJoinPredicateEvalCost = + ExprUtil.computeExprsTotalCost(otherJoinConjuncts_); + + // Compute the processing cost for lhs. + ProcessingCost probeProcessingCost = + ProcessingCost.basicCost(getDisplayLabel() + " Probe side (eqJoinConjuncts_)", + getChild(0).getCardinality(), eqJoinPredicateEvalCost); + if (otherJoinPredicateEvalCost > 0) { + probeProcessingCost = ProcessingCost.sumCost(probeProcessingCost, + ProcessingCost.basicCost(getDisplayLabel() + " Probe side(otherJoinConjuncts_)", + getCardinality(), otherJoinPredicateEvalCost)); + } + + // Compute the processing cost for rhs. + ProcessingCost buildProcessingCost = + ProcessingCost.basicCost(getDisplayLabel() + " Build side", + getChild(1).getCardinality(), eqJoinPredicateEvalCost); + return Pair.create(probeProcessingCost, buildProcessingCost); + } } diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 9931d5849..6ba0b3c31 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -2108,6 +2108,11 @@ public class HdfsScanNode extends ScanNode { return output.toString(); } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = computeScanProcessingCost(queryOptions); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // Update 'useMtScanNode_' before any return cases. It's used in BE. diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java index 76b68a1ff..399e45829 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java @@ -41,10 +41,8 @@ import org.apache.impala.thrift.TTableSinkType; import org.apache.impala.util.BitUtil; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,6 +140,12 @@ public class HdfsTableSink extends TableSink { externalOutputPartitionDepth_ = partitionDepth; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // The processing cost to export rows. + processingCost_ = computeDefaultProcessingCost(); + } + @Override public void computeResourceProfile(TQueryOptions queryOptions) { PlanNode inputNode = fragment_.getPlanRoot(); diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java index c8cc056d0..feed83381 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java @@ -141,11 +141,19 @@ public class JoinBuildSink extends DataSink { joinNode_.getFragment().getNumInstances(); } + public boolean isShared() { return joinNode_.canShareBuild(); } + @Override protected String getLabel() { return "JOIN BUILD"; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // The processing cost to export rows. + processingCost_ = joinNode_.computeJoinProcessingCost().second; + } + @Override public void computeResourceProfile(TQueryOptions queryOptions) { resourceProfile_ = joinNode_.computeJoinResourceProfile(queryOptions).second; diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java index f31045235..02d39e991 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java @@ -909,6 +909,19 @@ public abstract class JoinNode extends PlanNode { return result; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + Pair<ProcessingCost, ProcessingCost> probeBuildCost = computeJoinProcessingCost(); + if (hasSeparateBuild()) { + // All build resource consumption is accounted for in the separate builder. + processingCost_ = probeBuildCost.first; + } else { + // Both build and profile resources are accounted for in the node. + processingCost_ = + ProcessingCost.sumCost(probeBuildCost.first, probeBuildCost.second); + } + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { Pair<ResourceProfile, ResourceProfile> profiles = @@ -931,6 +944,14 @@ public abstract class JoinNode extends PlanNode { public abstract Pair<ResourceProfile, ResourceProfile> computeJoinResourceProfile( TQueryOptions queryOptions); + /** + * Helper method to compute the processing cost for the join that can be + * called from the builder or the join node. Returns a pair of the probe + * processing cost and the build processing cost. + * Does not modify the state of this node. + */ + public abstract Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost(); + /* Helper to return all predicates as a string. */ public String getAllPredicatesAsString(TExplainLevel level) { return "Conjuncts=" + Expr.getExplainString(getConjuncts(), level) diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index b65388ac6..6ec5255cb 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -395,6 +395,11 @@ public class KuduScanNode extends ScanNode { } } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = computeScanProcessingCost(queryOptions); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // The bulk of memory used by Kudu scan node is generally utilized by the diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java index eba08b5ce..52fa82d65 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java @@ -18,7 +18,6 @@ package org.apache.impala.planner; -import java.nio.ByteBuffer; import java.util.List; import org.apache.impala.analysis.DescriptorTable; @@ -94,6 +93,12 @@ public class KuduTableSink extends TableSink { return "KUDU WRITER"; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // The processing cost to export rows. + processingCost_ = computeDefaultProcessingCost(); + } + @Override public void computeResourceProfile(TQueryOptions queryOptions) { // The major chunk of memory used by this node is untracked. Part of which diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java index b36cb9fc5..ecd5c03ae 100644 --- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java @@ -31,6 +31,7 @@ import org.apache.impala.thrift.TNestedLoopJoinNode; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.util.ExprUtil; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -78,6 +79,7 @@ public class NestedLoopJoinNode extends JoinNode { @Override public Pair<ResourceProfile, ResourceProfile> computeJoinResourceProfile( TQueryOptions queryOptions) { + // TODO: This seems a bug below that the total data is not divided by numInstances_. long perInstanceMemEstimate; if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1 || numNodes_ == 0) { @@ -92,6 +94,51 @@ public class NestedLoopJoinNode extends JoinNode { return Pair.create(ResourceProfile.noReservation(0), buildProfile); } + @Override + public Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost() { + // TODO: Make this general regardless of SingularRowSrcNode exist or not. + // TODO: The cost should consider conjuncts_ as well. + ProcessingCost probeProcessingCost = ProcessingCost.zero(); + ProcessingCost buildProcessingCost = ProcessingCost.zero(); + if (getChild(1) instanceof SingularRowSrcNode) { + // Compute the processing cost for lhs. + probeProcessingCost = + ProcessingCost.basicCost(getDisplayLabel() + "(c0, singularRowSrc) Probe side", + getChild(0).getCardinality(), 0); + + // Compute the processing cost for rhs. + buildProcessingCost = ProcessingCost.basicCost( + getDisplayLabel() + "(c0, singularRowSrc) Build side per probe", + getChild(1).getCardinality(), 0); + // Multiply by the number of probes + buildProcessingCost = ProcessingCost.scaleCost( + buildProcessingCost, Math.max(0, getChild(0).getCardinality())); + } else { + // Assume 'eqJoinConjuncts_' will be applied to all rows from lhs side, + // and 'otherJoinConjuncts_' to the resultant rows. + float eqJoinPredicateEvalCost = ExprUtil.computeExprsTotalCost(eqJoinConjuncts_); + float otherJoinPredicateEvalCost = + ExprUtil.computeExprsTotalCost(otherJoinConjuncts_); + + // Compute the processing cost for lhs. + probeProcessingCost = ProcessingCost.basicCost( + getDisplayLabel() + "(c0, non-singularRowSrc, eqJoinConjuncts_) Probe side", + getChild(0).getCardinality(), eqJoinPredicateEvalCost); + + probeProcessingCost = ProcessingCost.sumCost(probeProcessingCost, + ProcessingCost.basicCost(getDisplayLabel() + + "(c0, non-singularRowSrc, otherJoinConjuncts_) Probe side", + getCardinality(), otherJoinPredicateEvalCost)); + + // Compute the processing cost for rhs, assuming 'eqJoinConjuncts_' will be applied + // to all rows from rhs side. + buildProcessingCost = ProcessingCost.basicCost( + getDisplayLabel() + "(c0, non-singularRowSrc) Build side", + getChild(1).getCardinality(), eqJoinPredicateEvalCost); + } + return Pair.create(probeProcessingCost, buildProcessingCost); + } + @Override protected String getNodeExplainString(String prefix, String detailPrefix, TExplainLevel detailLevel) { diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index ae19ce278..6dc9709c5 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -28,7 +28,6 @@ import java.util.Set; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BinaryPredicate; -import org.apache.impala.analysis.CollectionTableRef; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.ExprId; import org.apache.impala.analysis.ExprSubstitutionMap; @@ -49,6 +48,7 @@ import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TSortingOrder; import org.apache.impala.util.BitUtil; +import org.apache.impala.util.ExprUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,6 +154,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> { // Runtime filters assigned to this node. protected List<RuntimeFilter> runtimeFilters_ = new ArrayList<>(); + // A total processing cost across all instances of this plan node. + // Gets set correctly in computeProcessingCost(). + protected ProcessingCost processingCost_ = ProcessingCost.invalid(); + protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String displayName) { this(id, displayName); tupleIds_.addAll(tupleIds); @@ -235,6 +239,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { public void setAssignedConjuncts(Set<ExprId> conjuncts) { assignedConjuncts_ = conjuncts; } + public ProcessingCost getProcessingCost() { return processingCost_; } /** * Set the limit_ to the given limit_ only if the limit_ hasn't been set, or the new limit_ @@ -342,6 +347,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> { expBuilder.append(nodeResourceProfile_.getExplainString()); expBuilder.append("\n"); + if (ProcessingCost.isComputeCost(queryOptions) && processingCost_.isValid() + && detailLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) { + // Print processing cost. + expBuilder.append(processingCost_.getExplainString(detailPrefix, false)); + expBuilder.append("\n"); + } + // Print tuple ids, row size and cardinality. expBuilder.append(detailPrefix + "tuple-ids="); for (int i = 0; i < tupleIds_.size(); ++i) { @@ -358,10 +370,19 @@ abstract public class PlanNode extends TreeNode<PlanNode> { if (displayCardinality) { if (detailLevel == TExplainLevel.STANDARD) expBuilder.append(detailPrefix); expBuilder.append("row-size=") - .append(PrintUtils.printBytes(Math.round(avgRowSize_))) - .append(" cardinality=") - .append(PrintUtils.printEstCardinality(cardinality_)) - .append("\n"); + .append(PrintUtils.printBytes(Math.round(avgRowSize_))) + .append(" cardinality=") + .append(PrintUtils.printEstCardinality(cardinality_)); + if (ProcessingCost.isComputeCost(queryOptions)) { + // Show processing cost total. + expBuilder.append(" cost="); + if (processingCost_.isValid()) { + expBuilder.append(processingCost_.getTotalCost()); + } else { + expBuilder.append("<invalid>"); + } + } + expBuilder.append("\n"); } if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { @@ -653,6 +674,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> { return cardinality; } + // Default implementation of computing the total data processed in bytes. + protected ProcessingCost computeDefaultProcessingCost() { + Preconditions.checkState(hasValidStats()); + return ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(), + ExprUtil.computeExprsTotalCost(getConjuncts())); + } + public static long capCardinalityAtLimit(long cardinality, long limit) { return cardinality == -1 ? limit : Math.min(cardinality, limit); } @@ -882,14 +910,41 @@ abstract public class PlanNode extends TreeNode<PlanNode> { } } + public abstract void computeProcessingCost(TQueryOptions queryOptions); + /** * Compute peak resources consumed when executing this PlanNode, initializing - * 'nodeResourceProfile_'. May only be called after this PlanNode has been placed in - * a PlanFragment because the cost computation is dependent on the enclosing fragment's - * data partition. + * 'nodeResourceProfile_' and 'processingCost_'. May only be called after this PlanNode + * has been placed in a PlanFragment because the cost computation is dependent on the + * enclosing fragment's data partition. */ public abstract void computeNodeResourceProfile(TQueryOptions queryOptions); + /** + * Determine whether a PlanNode is a leaf node within the plan tree. + * @return true if a PlanNode is a leaf node within the plan tree. + */ + protected boolean isLeafNode() { return false; } + + /** + * Set number of rows consumed and produced data fields in processing cost. + */ + public void computeRowConsumptionAndProductionToCost() { + Preconditions.checkState(processingCost_.isValid(), + "Processing cost of PlanNode " + getDisplayLabel() + " is invalid!"); + processingCost_.setNumRowToConsume(getInputCardinality()); + processingCost_.setNumRowToProduce(getCardinality()); + } + + /** + * Get row batch size after considering 'batch_size' query option. + */ + protected static long getRowBatchSize(TQueryOptions queryOptions) { + return (queryOptions.isSetBatch_size() && queryOptions.batch_size > 0) ? + queryOptions.batch_size : + PlanNode.DEFAULT_ROWBATCH_SIZE; + } + /** * Wrapper class to represent resource profiles during different phases of execution. */ diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java index 53887b738..133aec186 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java @@ -26,6 +26,7 @@ import org.apache.impala.thrift.TDataSinkType; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanRootSink; import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.util.ExprUtil; import org.apache.log4j.Logger; import com.google.common.base.Preconditions; @@ -67,6 +68,19 @@ public class PlanRootSink extends DataSink { return "ROOT"; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + if (queryOptions.isSpool_query_results() && queryOptions.getScratch_limit() != 0 + && !BackendConfig.INSTANCE.getScratchDirs().isEmpty()) { + // The processing cost to buffer these many rows in root. + processingCost_ = + ProcessingCost.basicCost(getLabel(), fragment_.getPlanRoot().getCardinality(), + ExprUtil.computeExprsTotalCost(outputExprs_)); + } else { + processingCost_ = ProcessingCost.zero(); + } + } + /** * Computes and sets the {@link ResourceProfile} for this PlanRootSink. If result * spooling is disabled, a ResourceProfile is returned with no reservation or buffer diff --git a/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java new file mode 100644 index 000000000..df695bddc --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java @@ -0,0 +1,306 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import com.google.common.base.Preconditions; +import com.google.common.math.LongMath; + +import org.apache.impala.thrift.TQueryOptions; + +import java.math.RoundingMode; +import java.util.List; +import java.util.function.Supplier; + +/** + * A base class that encapsulate processing cost which models a total cost or amount + * of work shared across all instances of specific {@link PlanNode}, {@link DataSink}, or + * {@link PlanFragment}. + */ +public abstract class ProcessingCost implements Cloneable { + public static ProcessingCost invalid() { return new BaseProcessingCost(-1, 1, 0); } + public static ProcessingCost zero() { return new BaseProcessingCost(0, 1, 0); } + + public static ProcessingCost maxCost(ProcessingCost a, ProcessingCost b) { + return (a.getTotalCost() >= b.getTotalCost()) ? a : b; + } + + public static ProcessingCost sumCost(ProcessingCost a, ProcessingCost b) { + return new SumProcessingCost(a, b); + } + + public static ProcessingCost scaleCost(ProcessingCost cost, long factor) { + return new ScaledProcessingCost(cost, factor); + } + + public static ProcessingCost broadcastCost( + ProcessingCost cost, Supplier<Integer> numInstanceSupplier) { + return new BroadcastProcessingCost(cost, numInstanceSupplier); + } + + protected static void tryAdjustConsumerParallelism(int nodeStepCount, + int minParallelism, int maxParallelism, ProcessingCost producer, + ProcessingCost consumer) { + Preconditions.checkState(consumer.getNumInstancesExpected() > 0); + Preconditions.checkState(producer.getNumInstancesExpected() > 0); + if (producer.getCostPerRowProduced() > 0 + && (consumer.canReducedBy(nodeStepCount, minParallelism, producer) + || (consumer.canIncreaseBy(nodeStepCount, maxParallelism, producer)))) { + // Adjust consumer's concurrency following producer's parallelism and their + // produce-consume rate ratio. + float consProdRatio = consumer.consumerProducerRatio(producer); + int adjustedCount = (int) Math.ceil(consProdRatio + * producer.getNumInstancesExpected() / nodeStepCount) + * nodeStepCount; + final int finalCount = + Math.max(minParallelism, Math.min(maxParallelism, adjustedCount)); + consumer.setNumInstanceExpected(() -> finalCount); + } else if (maxParallelism < consumer.getNumInstancesExpected()) { + consumer.setNumInstanceExpected(() -> maxParallelism); + } + } + + private static ProcessingCost computeValidBaseCost( + long cardinality, float exprsCost, float materializationCost) { + return new BaseProcessingCost( + Math.max(0, cardinality), exprsCost, materializationCost); + } + + public static ProcessingCost basicCost( + String label, long cardinality, float exprsCost, float materializationCost) { + ProcessingCost processingCost = + computeValidBaseCost(cardinality, exprsCost, materializationCost); + processingCost.setLabel(label); + return processingCost; + } + + public static ProcessingCost basicCost( + String label, long cardinality, float exprsCost) { + ProcessingCost processingCost = computeValidBaseCost(cardinality, exprsCost, 0); + processingCost.setLabel(label); + return processingCost; + } + + public static boolean isComputeCost(TQueryOptions queryOptions) { + // TODO: Replace with proper check in IMPALA-11604 part 2. + return false; + } + + /** + * Merge multiple ProcessingCost into a single new ProcessingCost. + * <p> + * The resulting ProcessingCost will have the total cost, number of rows produced, + * and number of rows consumed as a sum of respective properties of all ProcessingCost + * in the given list. Meanwhile, the number of instances expected is the maximum among + * all ProcessingCost is the list. + * + * @param costs list of all ProcessingCost to merge. + * @return A new combined ProcessingCost. + */ + protected static ProcessingCost fullMergeCosts(List<ProcessingCost> costs) { + Preconditions.checkNotNull(costs); + Preconditions.checkArgument(!costs.isEmpty()); + + ProcessingCost resultingCost = ProcessingCost.zero(); + long inputCardinality = 0; + long outputCardinality = 0; + int maxProducerParallelism = 1; + for (ProcessingCost cost : costs) { + resultingCost = ProcessingCost.sumCost(resultingCost, cost); + inputCardinality += cost.getNumRowToConsume(); + outputCardinality += cost.getNumRowToProduce(); + maxProducerParallelism = + Math.max(maxProducerParallelism, cost.getNumInstancesExpected()); + } + resultingCost.setNumRowToConsume(inputCardinality); + resultingCost.setNumRowToProduce(outputCardinality); + final int finalProducerParallelism = maxProducerParallelism; + resultingCost.setNumInstanceExpected(() -> finalProducerParallelism); + return resultingCost; + } + + protected Supplier<Integer> numInstanceSupplier_ = null; + private long numRowToProduce_ = 0; + private long numRowToConsume_ = 0; + private String label_ = null; + private boolean isSetNumRowToProduce_ = false; + private boolean isSetNumRowToConsume_ = false; + + public abstract long getTotalCost(); + + public abstract boolean isValid(); + + public abstract ProcessingCost clone(); + + public String getDetails() { + StringBuilder output = new StringBuilder(); + output.append("cost-total=") + .append(getTotalCost()) + .append(" max-instances=") + .append(getNumInstanceMax()); + if (hasAdjustedInstanceCount()) { + output.append(" adj-instances=").append(getNumInstancesExpected()); + } + output.append(" cost/inst=") + .append(getPerInstanceCost()) + .append(" #cons:#prod=") + .append(numRowToConsume_) + .append(":") + .append(numRowToProduce_); + if (isSetNumRowToConsume_ && isSetNumRowToProduce_) { + output.append(" reduction=").append(getReduction()); + } + if (isSetNumRowToConsume_) { + output.append(" cost/cons=").append(getCostPerRowConsumed()); + } + if (isSetNumRowToProduce_) { + output.append(" cost/prod=").append(getCostPerRowProduced()); + } + return output.toString(); + } + + public String debugString() { + StringBuilder output = new StringBuilder(); + if (label_ != null) { + output.append(label_); + output.append("="); + } + output.append(this); + return output.toString(); + } + + @Override + public String toString() { + return "{" + getDetails() + "}"; + } + + public String getExplainString(String detailPrefix, boolean fullExplain) { + return detailPrefix + getDetails(); + } + + public void setNumInstanceExpected(Supplier<Integer> countSupplier) { + Preconditions.checkArgument( + countSupplier.get() > 0, "Number of instance must be greater than 0!"); + numInstanceSupplier_ = countSupplier; + } + + public int getNumInstancesExpected() { + return hasAdjustedInstanceCount() ? numInstanceSupplier_.get() : getNumInstanceMax(); + } + + private boolean hasAdjustedInstanceCount() { + return numInstanceSupplier_ != null && numInstanceSupplier_.get() > 0; + } + + private int getNumInstanceMax() { + // TODO: replace minProcessingCostPerThread with backend flag. + long minProcessingCostPerThread = 10000000L; + long maxInstance = LongMath.divide(getTotalCost(), + minProcessingCostPerThread, RoundingMode.CEILING); + if (maxInstance > 0) { + return maxInstance < Integer.MAX_VALUE ? (int) maxInstance : Integer.MAX_VALUE; + } else { + return 1; + } + } + + /** + * Set num rows to produce. + * + * @param numRowToProduce Number of rows to produce by plan node or data sink associated + * with this cost. Assume 0 rows if negative value is given. + */ + public void setNumRowToProduce(long numRowToProduce) { + numRowToProduce_ = Math.max(0, numRowToProduce); + isSetNumRowToProduce_ = true; + } + + /** + * Set num rows to consume. + * + * @param numRowToConsume Number of rows to consume by plan node or data sink associated + * with this cost. Assume 0 rows if negative value is given. + */ + protected void setNumRowToConsume(long numRowToConsume) { + numRowToConsume_ = Math.max(0, numRowToConsume); + isSetNumRowToConsume_ = true; + } + + public void setLabel(String label) { label_ = label; } + public long getNumRowToConsume() { return numRowToConsume_; } + public long getNumRowToProduce() { return numRowToProduce_; } + + private int getPerInstanceCost() { + Preconditions.checkState(getNumInstancesExpected() > 0); + return (int) Math.ceil((float) getTotalCost() / getNumInstancesExpected()); + } + + private float getReduction() { + return (float) numRowToConsume_ / Math.max(1, numRowToProduce_); + } + + private float getCostPerRowProduced() { + return (float) getTotalCost() / Math.max(1, numRowToProduce_); + } + + private float getCostPerRowConsumed() { + return (float) getTotalCost() / Math.max(1, numRowToConsume_); + } + + private float instanceRatio(ProcessingCost other) { + Preconditions.checkState(getNumInstancesExpected() > 0); + return (float) getNumInstancesExpected() / other.getNumInstancesExpected(); + } + + private float consumerProducerRatio(ProcessingCost other) { + return getCostPerRowConsumed() / Math.max(1, other.getCostPerRowProduced()); + } + + private boolean isAtLowestInstanceRatio( + int nodeStepCount, int minParallelism, ProcessingCost other) { + if (getNumInstancesExpected() - nodeStepCount < minParallelism) { + return true; + } else { + float lowerRatio = (float) (getNumInstancesExpected() - nodeStepCount) + / other.getNumInstancesExpected(); + return lowerRatio < consumerProducerRatio(other); + } + } + + private boolean isAtHighestInstanceRatio( + int nodeStepCount, int maxInstance, ProcessingCost other) { + if (getNumInstancesExpected() + nodeStepCount > maxInstance) { + return true; + } else { + float higherRatio = (float) (getNumInstancesExpected() + nodeStepCount) + / other.getNumInstancesExpected(); + return higherRatio > consumerProducerRatio(other); + } + } + + private boolean canReducedBy( + int nodeStepCount, int minParallelism, ProcessingCost other) { + return !isAtLowestInstanceRatio(nodeStepCount, minParallelism, other) + && consumerProducerRatio(other) < instanceRatio(other); + } + + private boolean canIncreaseBy( + int nodeStepCount, int maxInstance, ProcessingCost other) { + return !isAtHighestInstanceRatio(nodeStepCount, maxInstance, other) + && consumerProducerRatio(other) > instanceRatio(other); + } +} diff --git a/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java new file mode 100644 index 000000000..421ee9e64 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import com.google.common.base.Preconditions; + +import org.apache.impala.util.MathUtil; + +public class ScaledProcessingCost extends ProcessingCost { + private final ProcessingCost cost_; + private final long multiplier_; + + protected ScaledProcessingCost(ProcessingCost cost, long multiplier) { + Preconditions.checkArgument(cost.isValid(), "ScaledProcessingCost: cost is invalid!"); + Preconditions.checkArgument( + multiplier >= 0, "ScaledProcessingCost: multiplier must be non-negative!"); + cost_ = cost; + multiplier_ = multiplier; + } + + @Override + public long getTotalCost() { + return MathUtil.saturatingMultiply(cost_.getTotalCost(), multiplier_); + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public ProcessingCost clone() { + return new ScaledProcessingCost(cost_, multiplier_); + } + + @Override + public String getExplainString(String detailPrefix, boolean fullExplain) { + StringBuilder sb = new StringBuilder(); + sb.append(detailPrefix); + sb.append("ScaledCost("); + sb.append(multiplier_); + sb.append("): "); + sb.append(getDetails()); + if (fullExplain) { + sb.append("\n"); + sb.append(cost_.getExplainString(detailPrefix + " ", true)); + } + return sb.toString(); + } +} diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index d250eb8db..be12b293c 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -40,6 +40,7 @@ import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TScanRangeSpec; import org.apache.impala.thrift.TTableStats; +import org.apache.impala.util.ExprUtil; import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; @@ -343,6 +344,24 @@ abstract public class ScanNode extends PlanNode { } return maxScannerThreads; } + + protected ProcessingCost computeScanProcessingCost(TQueryOptions queryOptions) { + return ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(), + ExprUtil.computeExprsTotalCost(conjuncts_), rowMaterializationCost()); + } + + /** + * Estimate per-row cost as 1 per 1KB row size. + * <p> + * This reflect deserialization/copy cost per row. + */ + private float rowMaterializationCost() { return getAvgRowSize() / 1024; } + + @Override + protected boolean isLeafNode() { + return true; + } + /** * Returns true if this node has conjuncts to be evaluated by Impala against the scan * tuple. diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java index fa888f47f..1bde0d41a 100644 --- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java @@ -117,6 +117,11 @@ public class SelectNode extends PlanNode { public void setSelectivity(double value) { selectivity_ = value; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = computeDefaultProcessingCost(); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // The select node initializes a single row-batch which it recycles on every diff --git a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java index e9c498417..f37b67c11 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java @@ -23,6 +23,7 @@ import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.util.ExprUtil; import com.google.common.base.Preconditions; @@ -66,6 +67,13 @@ public class SingularRowSrcNode extends PlanNode { numInstances_ = containingSubplanNode_.getChild(0).getNumInstances(); } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = ProcessingCost.basicCost(getDisplayLabel(), + containingSubplanNode_.getChild(0).getCardinality(), + ExprUtil.computeExprsTotalCost(getConjuncts())); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java index 5f12dee45..6d8ad8b03 100644 --- a/fe/src/main/java/org/apache/impala/planner/SortNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java @@ -453,6 +453,12 @@ public class SortNode extends PlanNode { return offset_ != 0 ? prefix + "offset: " + Long.toString(offset_) + "\n" : ""; } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = + getSortInfo().computeProcessingCost(getDisplayLabel(), getCardinality()); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { Preconditions.checkState(hasValidStats()); diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java index eb57df628..09a6ac7d8 100644 --- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java @@ -96,6 +96,11 @@ public class SubplanNode extends PlanNode { cardinality_ = capCardinalityAtLimit(cardinality_); } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = ProcessingCost.basicCost(getDisplayLabel(), getCardinality(), 0); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate diff --git a/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java new file mode 100644 index 000000000..fcbd6dfab --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import com.google.common.base.Preconditions; + +import org.apache.impala.util.MathUtil; + +public class SumProcessingCost extends ProcessingCost { + private final ProcessingCost cost1_; + private final ProcessingCost cost2_; + + protected SumProcessingCost(ProcessingCost cost1, ProcessingCost cost2) { + Preconditions.checkArgument(cost1.isValid(), "SumProcessingCost: cost1 is invalid!"); + Preconditions.checkArgument(cost2.isValid(), "SumProcessingCost: cost2 is invalid!"); + cost1_ = cost1; + cost2_ = cost2; + } + + @Override + public long getTotalCost() { + return MathUtil.saturatingAdd(cost1_.getTotalCost(), cost2_.getTotalCost()); + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public ProcessingCost clone() { + return new SumProcessingCost(cost1_, cost2_); + } + + @Override + public String getExplainString(String detailPrefix, boolean fullExplain) { + StringBuilder output = new StringBuilder(); + output.append(detailPrefix).append("SumCost: ").append(getDetails()); + if (fullExplain) { + String nextPrefix = detailPrefix + " "; + output.append("\n").append(cost1_.getExplainString(nextPrefix, true)); + output.append("\n").append(cost2_.getExplainString(nextPrefix, true)); + } + return output.toString(); + } +} diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java index cc12bd995..0f34ddb20 100644 --- a/fe/src/main/java/org/apache/impala/planner/TableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java @@ -17,7 +17,6 @@ package org.apache.impala.planner; -import java.nio.ByteBuffer; import java.util.List; import org.apache.impala.analysis.Expr; @@ -28,6 +27,7 @@ import org.apache.impala.catalog.FeTable; import org.apache.impala.common.Pair; import org.apache.impala.thrift.TSinkAction; import org.apache.impala.thrift.TSortingOrder; +import org.apache.impala.util.ExprUtil; import com.google.common.base.Preconditions; @@ -160,4 +160,10 @@ public abstract class TableSink extends DataSink { "Cannot create data sink into table of type: " + table.getClass().getName()); } } + + protected ProcessingCost computeDefaultProcessingCost() { + // TODO: consider including materialization cost into the returned cost. + return ProcessingCost.basicCost(getLabel(), fragment_.getPlanRoot().getCardinality(), + ExprUtil.computeExprsTotalCost(outputExprs_)); + } } diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java index b1dbe7212..c8f45dd7b 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java @@ -33,6 +33,7 @@ import org.apache.impala.thrift.TPlanNode; import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TUnionNode; +import org.apache.impala.util.ExprUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,6 +149,19 @@ public class UnionNode extends PlanNode { } } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // Compute the cost for materializing child rows and use that to figure out + // the total data processed. Assume the costs of processing pass-through rows are 0. + float totalMaterializeCost = 0; + for (int i = firstMaterializedChildIdx_; i < resultExprLists_.size(); i++) { + totalMaterializeCost += ExprUtil.computeExprsTotalCost(resultExprLists_.get(i)); + } + + processingCost_ = + ProcessingCost.basicCost(getDisplayLabel(), cardinality_, totalMaterializeCost); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // The union node directly returns the rows for children marked as pass @@ -375,4 +389,11 @@ public class UnionNode extends PlanNode { } } } + + @Override + protected boolean isLeafNode() { + // Union node is being scheduled the same as scan node. + // See Scheduler::CreateCollocatedAndScanInstances() in scheduler.cc. + return true; + } } diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java index dd6eb33f5..919d849e3 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java @@ -17,7 +17,6 @@ package org.apache.impala.planner; -import java.util.Comparator; import java.util.List; import org.apache.impala.analysis.Analyzer; @@ -96,6 +95,12 @@ public class UnnestNode extends PlanNode { cardinality_ = capCardinalityAtLimit(cardinality_); } + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + processingCost_ = ProcessingCost.basicCost( + getDisplayLabel(), containingSubplanNode_.getChild(0).getCardinality(), 0); + } + @Override public void computeNodeResourceProfile(TQueryOptions queryOptions) { // TODO: add an estimate diff --git a/fe/src/main/java/org/apache/impala/util/ExprUtil.java b/fe/src/main/java/org/apache/impala/util/ExprUtil.java index 934fabd0b..2b34835da 100644 --- a/fe/src/main/java/org/apache/impala/util/ExprUtil.java +++ b/fe/src/main/java/org/apache/impala/util/ExprUtil.java @@ -27,9 +27,12 @@ import org.apache.impala.analysis.StringLiteral; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.InternalException; +import org.apache.impala.service.BackendConfig; import org.apache.impala.service.FeSupport; import org.apache.impala.thrift.TColumnValue; +import java.util.List; + public class ExprUtil { /** * Converts a UTC timestamp to UNIX microseconds. @@ -102,4 +105,20 @@ public class ExprUtil { toUtcTimestamp.analyze(analyzer); return toUtcTimestamp; } + + // Compute total cost for a list of expressions. Return 0 for a null list. + public static float computeExprsTotalCost(List<? extends Expr> exprs) { + // TODO: Implement the cost for conjunts once the implemetation for + // 'Expr' is in place. + if (exprs == null) return 0; + return exprs.size(); + } + + public static float computeExprCost(Expr e) { + if (e == null) return 0; + return 1; + // TODO Implement a function that can take into consideration of data types, + // expressions and potentially LLVM translation in BE. The function must also + // run fast. + } }
