This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 3181fe18006e392e0ce3f2f48fe285569ccfd148 Author: Joe McDonnell <[email protected]> AuthorDate: Fri Mar 14 09:24:14 2025 -0700 IMPALA-13437 (part 1): Compute processing cost before TupleCachePlanner This is a preparatory change for cost-based placement for TupleCacheNodes. It reorders planning so that the processing cost and filtered cardinality are calculated before running the TupleCachePlanner. This computes the processing cost when enable_tuple_cache=true. It also displays the cost information in the explain plan output when enable_tuple_cache=true. This does not impact the adjustment of fragment parallelism, which continues to be controlled by the compute_processing_cost option. This uses the processing cost to calculate a cumulative processing cost in the TupleCacheInfo. This is all of the processing cost below this point including other fragments. This is an indicator of how much processing a cache hit could avoid. This does not accumulate the cost when merging the TupleCacheInfo due to a runtime filter, as that cost is not actually being avoided. This also computes the estimated serialized size for the TupleCacheNode based on the filtered cardinality and the row size. Testing: - Ran a core job Change-Id: If78f5d002b0e079eef1eece612f0d4fefde545c7 Reviewed-on: http://gerrit.cloudera.org:8080/23164 Reviewed-by: Yida Wu <[email protected]> Reviewed-by: Michael Smith <[email protected]> Tested-by: Michael Smith <[email protected]> --- .../java/org/apache/impala/planner/DataSink.java | 2 +- .../org/apache/impala/planner/PlanFragment.java | 6 +- .../java/org/apache/impala/planner/PlanNode.java | 5 +- .../java/org/apache/impala/planner/Planner.java | 28 ++++++-- .../org/apache/impala/planner/TupleCacheInfo.java | 80 ++++++++++++++++++++++ .../org/apache/impala/planner/TupleCacheNode.java | 18 +++-- .../java/org/apache/impala/service/Frontend.java | 6 +- 7 files changed, 122 insertions(+), 23 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/DataSink.java b/fe/src/main/java/org/apache/impala/planner/DataSink.java index 97571207d..dfbf9f2d2 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataSink.java +++ b/fe/src/main/java/org/apache/impala/planner/DataSink.java @@ -60,7 +60,7 @@ public abstract class DataSink { if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { output.append(detailPrefix); output.append(resourceProfile_.getExplainString()); - if (queryOptions.isCompute_processing_cost()) { + if (Planner.isProcessingCostAvailable(queryOptions)) { // Show processing cost total. output.append(" cost="); if (processingCost_.isValid()) { diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java index 80c9bd979..1589c6710 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java @@ -744,14 +744,14 @@ public class PlanFragment extends TreeNode<PlanFragment> { */ public String getFragmentHeaderString(String firstLinePrefix, String detailPrefix, TQueryOptions queryOptions, TExplainLevel explainLevel) { - boolean isComputeCost = queryOptions.isCompute_processing_cost(); + boolean adjustsInstanceCount = queryOptions.isCompute_processing_cost(); boolean useMTFragment = Planner.useMTFragment(queryOptions); StringBuilder builder = new StringBuilder(); builder.append(String.format("%s%s:PLAN FRAGMENT [%s]", firstLinePrefix, fragmentId_.toString(), dataPartition_.getExplainString())); builder.append(PrintUtils.printNumHosts(" ", getNumNodes())); builder.append(PrintUtils.printNumInstances(" ", getNumInstances())); - if (isComputeCost && originalInstanceCount_ != getNumInstances()) { + if (adjustsInstanceCount && originalInstanceCount_ != getNumInstances()) { builder.append(" (adjusted from " + originalInstanceCount_ + ")"); } builder.append("\n"); @@ -807,7 +807,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { builder.append(perInstanceExplainString); builder.append("\n"); } - if (isComputeCost && rootSegment_ != null + if (Planner.isProcessingCostAvailable(queryOptions) && rootSegment_ != null && explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { // Print processing cost. builder.append(detailPrefix); diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 27116e7a6..acbbbff7f 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -388,7 +388,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { expBuilder.append(nodeResourceProfile_.getExplainString()); expBuilder.append("\n"); - if (queryOptions.isCompute_processing_cost() && processingCost_.isValid() + if (Planner.isProcessingCostAvailable(queryOptions) && processingCost_.isValid() && detailLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) { // Print processing cost. expBuilder.append(processingCost_.getExplainString(detailPrefix, false)); @@ -421,7 +421,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { } else { expBuilder.append(PrintUtils.printEstCardinality(cardinality_)); } - if (queryOptions.isCompute_processing_cost()) { + if (Planner.isProcessingCostAvailable(queryOptions)) { // Show processing cost total. expBuilder.append(" cost="); if (processingCost_.isValid()) { @@ -1412,6 +1412,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { // Leaf node, add query options hash. tupleCacheInfo_.hashThrift("Query options hash", queryOptsHash); } + tupleCacheInfo_.calculateCostInformation(this); tupleCacheInfo_.finalizeHash(); LOG.trace("Hash for {}:", this); for (HashTraceElement elem : tupleCacheInfo_.getHashTraces()) { diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 5c5db03c0..5da40d760 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -166,6 +166,7 @@ public class Planner { ctx_.getTimeline().markEvent("Runtime filters computed"); checkAndOverrideMinmaxFilterThresholdAndLevel(ctx_.getQueryOptions()); + reduceCardinalityByRuntimeFilter(rootFragment, ctx_); } rootFragment.verifyTree(); @@ -308,7 +309,8 @@ public class Planner { * return a single-node, distributed, or parallel plan depending on the query and * configuration. */ - public List<PlanFragment> createPlans() throws ImpalaException { + public List<PlanFragment> createPlans(TQueryExecRequest result) + throws ImpalaException { List<PlanFragment> distrPlan = createPlanFragments(); Preconditions.checkNotNull(distrPlan); if (useParallelPlan(ctx_)) { @@ -318,11 +320,17 @@ public class Planner { } else { distrPlan = Collections.singletonList(distrPlan.get(0)); } + + // Compute the processing cost and adjust fragment parallelism + // The TupleCachePlanner uses the processing cost and fragment parallelism, so + // this needs to happen first. + computeProcessingCost(distrPlan, result, ctx_); + // TupleCachePlanner comes last, because it needs to compute the eligibility of // various locations in the PlanNode tree. Runtime filters and other modifications // to the tree can change this, so this comes after all those modifications are // complete. - if (useTupleCache(ctx_)) { + if (useTupleCache(ctx_.getQueryOptions())) { TupleCachePlanner cachePlanner = new TupleCachePlanner(ctx_); distrPlan = cachePlanner.createPlans(distrPlan); ctx_.getTimeline().markEvent("Tuple caching plan created"); @@ -345,8 +353,15 @@ public class Planner { } // Return true if ENABLE_TUPLE_CACHE=true - public static boolean useTupleCache(PlannerContext planCtx) { - return planCtx.getQueryOptions().isEnable_tuple_cache(); + public static boolean useTupleCache(TQueryOptions queryOptions) { + return queryOptions.isEnable_tuple_cache(); + } + + // Return true if the processing cost has been computed. This is true for + // COMPUTE_PROCESSING_COST=true and ENABLE_TUPLE_CACHE=true + public static boolean isProcessingCostAvailable(TQueryOptions queryOptions) { + return queryOptions.isCompute_processing_cost() || + useTupleCache(queryOptions); } /** @@ -548,12 +563,11 @@ public class Planner { * computation. */ public static void reduceCardinalityByRuntimeFilter( - List<PlanFragment> planRoots, PlannerContext planCtx) { + PlanFragment rootFragment, PlannerContext planCtx) { double reductionScale = planCtx.getRootAnalyzer() .getQueryOptions() .getRuntime_filter_cardinality_reduction_scale(); if (reductionScale <= 0) return; - PlanFragment rootFragment = planRoots.get(0); Stack<PlanNode> nodeStack = new Stack<>(); rootFragment.getPlanRoot().reduceCardinalityByRuntimeFilter( nodeStack, reductionScale); @@ -572,7 +586,7 @@ public class Planner { List<PlanFragment> postOrderFragments = new ArrayList<>(); boolean testCostCalculation = queryOptions.isEnable_replan() && (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan()); - if (queryOptions.isCompute_processing_cost() || testCostCalculation) { + if (isProcessingCostAvailable(queryOptions) || testCostCalculation) { postOrderFragments = rootFragment.getNodesPostOrder(); for (PlanFragment fragment : postOrderFragments) { fragment.computeCostingSegment(queryOptions); diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java index f9a6bdbcc..b3873facc 100644 --- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java +++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java @@ -34,7 +34,9 @@ import org.apache.impala.analysis.TupleId; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeView; import org.apache.impala.common.IdGenerator; +import org.apache.impala.common.PrintUtils; import org.apache.impala.common.ThriftSerializationCtx; +import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TFileSplitGeneratorSpec; import org.apache.impala.thrift.TScanRange; import org.apache.impala.thrift.TScanRangeLocationList; @@ -166,6 +168,14 @@ public class TupleCacheInfo { private boolean finalized_ = false; private String finalizedHashString_ = null; + // Cumulative processing cost from all nodes that feed into this node, including nodes + // from other fragments (e.g. the build side of a join). + private long cumulativeProcessingCost_ = 0; + + // Estimated size of the result at this location. This is the row size multiplied by + // the filtered cardinality. + private long estimatedSerializedSize_ = -1; + public TupleCacheInfo(DescriptorTable descTbl) { ineligibilityReasons_ = EnumSet.noneOf(IneligibilityReason.class); descriptorTable_ = descTbl; @@ -221,6 +231,58 @@ public class TupleCacheInfo { finalized_ = true; } + public long getCumulativeProcessingCost() { + Preconditions.checkState(isEligible(), + "TupleCacheInfo only has cost information if it is cache eligible."); + Preconditions.checkState(finalized_, "TupleCacheInfo not finalized"); + return cumulativeProcessingCost_; + } + + public long getEstimatedSerializedSize() { + Preconditions.checkState(isEligible(), + "TupleCacheInfo only has cost information if it is cache eligible."); + Preconditions.checkState(finalized_, "TupleCacheInfo not finalized"); + return estimatedSerializedSize_; + } + + /** + * Calculate the tuple cache cost information for this plan node. This must be called + * with the matching PlanNode for this TupleCacheInfo. This pulls in any information + * from the PlanNode or from any children recursively. This cost information is used + * for planning decisions. It is also displayed in the explain plan output for + * debugging. + */ + public void calculateCostInformation(PlanNode thisPlanNode) { + Preconditions.checkState(!finalized_, + "TupleCacheInfo is finalized and can't be modified"); + Preconditions.checkState(isEligible(), + "TupleCacheInfo only calculates cost information if it is cache eligible."); + Preconditions.checkState(thisPlanNode.getTupleCacheInfo() == this, + "calculateCostInformation() must be called with its enclosing PlanNode"); + + // This was already called on our children, which are known to be eligible. + // Pull in the information from our children. + for (PlanNode child : thisPlanNode.getChildren()) { + cumulativeProcessingCost_ += + child.getTupleCacheInfo().getCumulativeProcessingCost(); + // If the child is from a different fragment (e.g. the build side of a hash join), + // incorporate the cost of the sink + if (child.getFragment() != thisPlanNode.getFragment()) { + cumulativeProcessingCost_ += + child.getFragment().getSink().getProcessingCost().getTotalCost(); + } + } + cumulativeProcessingCost_ += thisPlanNode.getProcessingCost().getTotalCost(); + + // If there are stats, compute the estimated serialized size. If there are no stats + // (i.e. cardinality == -1), then there is nothing to do. + if (thisPlanNode.getFilteredCardinality() > -1) { + long cardinality = thisPlanNode.getFilteredCardinality(); + estimatedSerializedSize_ = (long) Math.round( + ExchangeNode.getAvgSerializedRowSize(thisPlanNode) * cardinality); + } + } + /** * Pull in a child's TupleCacheInfo into this TupleCacheInfo. If the child is * ineligible, then this is marked ineligible and there is no need to calculate @@ -499,6 +561,24 @@ public class TupleCacheInfo { return builder.toString(); } + /** + * Produce explain output describing the cost information for this tuple cache location + */ + public String getCostExplainString(String detailPrefix) { + StringBuilder output = new StringBuilder(); + output.append(detailPrefix + "estimated serialized size: "); + if (estimatedSerializedSize_ > -1) { + output.append(PrintUtils.printBytes(estimatedSerializedSize_)); + } else { + output.append("unavailable"); + } + output.append("\n"); + output.append(detailPrefix + "cumulative processing cost: "); + output.append(getCumulativeProcessingCost()); + output.append("\n"); + return output.toString(); + } + /** * Construct a comma separated list of the ineligibility reasons. */ diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java index 98818648e..71a807ae8 100644 --- a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java +++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java @@ -45,32 +45,37 @@ public class TupleCacheNode extends PlanNode { protected boolean displayCorrectnessCheckingInfo_; protected boolean skipCorrectnessVerification_; protected final List<Integer> inputScanNodeIds_ = new ArrayList<Integer>(); + protected final TupleCacheInfo childTupleCacheInfo_; public TupleCacheNode(PlanNodeId id, PlanNode child, boolean displayCorrectnessCheckingInfo) { super(id, "TUPLE CACHE"); addChild(child); cardinality_ = child.getCardinality(); + if (child.getFilteredCardinality() != cardinality_) { + setFilteredCardinality(child.getFilteredCardinality()); + } limit_ = child.limit_; - TupleCacheInfo childCacheInfo = child.getTupleCacheInfo(); - Preconditions.checkState(childCacheInfo.isEligible()); - compileTimeKey_ = childCacheInfo.getHashString(); + childTupleCacheInfo_ = child.getTupleCacheInfo(); + Preconditions.checkState(childTupleCacheInfo_.isEligible()); + compileTimeKey_ = childTupleCacheInfo_.getHashString(); // If there is variability due to a streaming agg, skip the correctness verification // for this location. - skipCorrectnessVerification_ = childCacheInfo.getStreamingAggVariability(); + skipCorrectnessVerification_ = childTupleCacheInfo_.getStreamingAggVariability(); displayCorrectnessCheckingInfo_ = displayCorrectnessCheckingInfo; - for (HdfsScanNode scanNode : childCacheInfo.getInputScanNodes()) { + for (HdfsScanNode scanNode : childTupleCacheInfo_.getInputScanNodes()) { // Inputs into the tuple cache need to use deterministic scan range assignment scanNode.setDeterministicScanRangeAssignment(true); inputScanNodeIds_.add(scanNode.getId().asInt()); } + computeTupleIds(); } @Override public void init(Analyzer analyzer) throws ImpalaException { super.init(analyzer); - computeTupleIds(); + Preconditions.checkState(conjuncts_.isEmpty()); } @Override @@ -128,6 +133,7 @@ public class TupleCacheNode extends PlanNode { inputScanNodeIds_.stream().map(Object::toString).collect(Collectors.toList()); output.append(detailPrefix + "input scan node ids: " + String.join(",", input_scan_node_ids_strs) + "\n"); + output.append(childTupleCacheInfo_.getCostExplainString(detailPrefix)); return output.toString(); } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index c7f5e198f..d37e357d7 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -2031,16 +2031,14 @@ public class Frontend { */ private TQueryExecRequest createExecRequest( Planner planner, PlanCtx planCtx) throws ImpalaException { + TQueryExecRequest result = new TQueryExecRequest(); TQueryCtx queryCtx = planner.getQueryCtx(); - List<PlanFragment> planRoots = planner.createPlans(); + List<PlanFragment> planRoots = planner.createPlans(result); if (planCtx.planCaptureRequested()) { planCtx.plan_ = planRoots; } // Compute resource requirements of the final plans. - TQueryExecRequest result = new TQueryExecRequest(); - Planner.reduceCardinalityByRuntimeFilter(planRoots, planner.getPlannerCtx()); - Planner.computeProcessingCost(planRoots, result, planner.getPlannerCtx()); Planner.computeResourceReqs(planRoots, queryCtx, result, planner.getPlannerCtx(), planner.getAnalysisResult().isQueryStmt());
