This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3181fe18006e392e0ce3f2f48fe285569ccfd148
Author: Joe McDonnell <[email protected]>
AuthorDate: Fri Mar 14 09:24:14 2025 -0700

    IMPALA-13437 (part 1): Compute processing cost before TupleCachePlanner
    
    This is a preparatory change for cost-based placement for
    TupleCacheNodes. It reorders planning so that the processing cost and
    filtered cardinality are calculated before running the TupleCachePlanner.
    This computes the processing cost when enable_tuple_cache=true.
    It also displays the cost information in the explain plan output
    when enable_tuple_cache=true. This does not impact the adjustment
    of fragment parallelism, which continues to be controlled by the
    compute_processing_cost option.
    
    This uses the processing cost to calculate a cumulative processing
    cost in the TupleCacheInfo. This is all of the processing cost below
    this point including other fragments. This is an indicator of how
    much processing a cache hit could avoid. This does not accumulate the
    cost when merging the TupleCacheInfo due to a runtime filter, as that
    cost is not actually being avoided. This also computes the estimated
    serialized size for the TupleCacheNode based on the filtered
    cardinality and the row size.
    
    Testing:
     - Ran a core job
    
    Change-Id: If78f5d002b0e079eef1eece612f0d4fefde545c7
    Reviewed-on: http://gerrit.cloudera.org:8080/23164
    Reviewed-by: Yida Wu <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Michael Smith <[email protected]>
---
 .../java/org/apache/impala/planner/DataSink.java   |  2 +-
 .../org/apache/impala/planner/PlanFragment.java    |  6 +-
 .../java/org/apache/impala/planner/PlanNode.java   |  5 +-
 .../java/org/apache/impala/planner/Planner.java    | 28 ++++++--
 .../org/apache/impala/planner/TupleCacheInfo.java  | 80 ++++++++++++++++++++++
 .../org/apache/impala/planner/TupleCacheNode.java  | 18 +++--
 .../java/org/apache/impala/service/Frontend.java   |  6 +-
 7 files changed, 122 insertions(+), 23 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/DataSink.java 
b/fe/src/main/java/org/apache/impala/planner/DataSink.java
index 97571207d..dfbf9f2d2 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSink.java
@@ -60,7 +60,7 @@ public abstract class DataSink {
     if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
       output.append(detailPrefix);
       output.append(resourceProfile_.getExplainString());
-      if (queryOptions.isCompute_processing_cost()) {
+      if (Planner.isProcessingCostAvailable(queryOptions)) {
         // Show processing cost total.
         output.append(" cost=");
         if (processingCost_.isValid()) {
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 80c9bd979..1589c6710 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -744,14 +744,14 @@ public class PlanFragment extends TreeNode<PlanFragment> {
    */
   public String getFragmentHeaderString(String firstLinePrefix, String 
detailPrefix,
       TQueryOptions queryOptions, TExplainLevel explainLevel) {
-    boolean isComputeCost = queryOptions.isCompute_processing_cost();
+    boolean adjustsInstanceCount = queryOptions.isCompute_processing_cost();
     boolean useMTFragment = Planner.useMTFragment(queryOptions);
     StringBuilder builder = new StringBuilder();
     builder.append(String.format("%s%s:PLAN FRAGMENT [%s]", firstLinePrefix,
         fragmentId_.toString(), dataPartition_.getExplainString()));
     builder.append(PrintUtils.printNumHosts(" ", getNumNodes()));
     builder.append(PrintUtils.printNumInstances(" ", getNumInstances()));
-    if (isComputeCost && originalInstanceCount_ != getNumInstances()) {
+    if (adjustsInstanceCount && originalInstanceCount_ != getNumInstances()) {
       builder.append(" (adjusted from " + originalInstanceCount_ + ")");
     }
     builder.append("\n");
@@ -807,7 +807,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       builder.append(perInstanceExplainString);
       builder.append("\n");
     }
-    if (isComputeCost && rootSegment_ != null
+    if (Planner.isProcessingCostAvailable(queryOptions) && rootSegment_ != null
         && explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
       // Print processing cost.
       builder.append(detailPrefix);
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 27116e7a6..acbbbff7f 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -388,7 +388,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
       expBuilder.append(nodeResourceProfile_.getExplainString());
       expBuilder.append("\n");
 
-      if (queryOptions.isCompute_processing_cost() && processingCost_.isValid()
+      if (Planner.isProcessingCostAvailable(queryOptions) && 
processingCost_.isValid()
           && detailLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) {
         // Print processing cost.
         expBuilder.append(processingCost_.getExplainString(detailPrefix, 
false));
@@ -421,7 +421,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
       } else {
         expBuilder.append(PrintUtils.printEstCardinality(cardinality_));
       }
-      if (queryOptions.isCompute_processing_cost()) {
+      if (Planner.isProcessingCostAvailable(queryOptions)) {
         // Show processing cost total.
         expBuilder.append(" cost=");
         if (processingCost_.isValid()) {
@@ -1412,6 +1412,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> 
{
       // Leaf node, add query options hash.
       tupleCacheInfo_.hashThrift("Query options hash", queryOptsHash);
     }
+    tupleCacheInfo_.calculateCostInformation(this);
     tupleCacheInfo_.finalizeHash();
     LOG.trace("Hash for {}:", this);
     for (HashTraceElement elem : tupleCacheInfo_.getHashTraces()) {
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 5c5db03c0..5da40d760 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -166,6 +166,7 @@ public class Planner {
       ctx_.getTimeline().markEvent("Runtime filters computed");
 
       checkAndOverrideMinmaxFilterThresholdAndLevel(ctx_.getQueryOptions());
+      reduceCardinalityByRuntimeFilter(rootFragment, ctx_);
     }
 
     rootFragment.verifyTree();
@@ -308,7 +309,8 @@ public class Planner {
    * return a single-node, distributed, or parallel plan depending on the 
query and
    * configuration.
    */
-  public List<PlanFragment> createPlans() throws ImpalaException {
+  public List<PlanFragment> createPlans(TQueryExecRequest result)
+      throws ImpalaException {
     List<PlanFragment> distrPlan = createPlanFragments();
     Preconditions.checkNotNull(distrPlan);
     if (useParallelPlan(ctx_)) {
@@ -318,11 +320,17 @@ public class Planner {
     } else {
       distrPlan = Collections.singletonList(distrPlan.get(0));
     }
+
+    // Compute the processing cost and adjust fragment parallelism
+    // The TupleCachePlanner uses the processing cost and fragment 
parallelism, so
+    // this needs to happen first.
+    computeProcessingCost(distrPlan, result, ctx_);
+
     // TupleCachePlanner comes last, because it needs to compute the 
eligibility of
     // various locations in the PlanNode tree. Runtime filters and other 
modifications
     // to the tree can change this, so this comes after all those 
modifications are
     // complete.
-    if (useTupleCache(ctx_)) {
+    if (useTupleCache(ctx_.getQueryOptions())) {
       TupleCachePlanner cachePlanner = new TupleCachePlanner(ctx_);
       distrPlan = cachePlanner.createPlans(distrPlan);
       ctx_.getTimeline().markEvent("Tuple caching plan created");
@@ -345,8 +353,15 @@ public class Planner {
   }
 
   // Return true if ENABLE_TUPLE_CACHE=true
-  public static boolean useTupleCache(PlannerContext planCtx) {
-    return planCtx.getQueryOptions().isEnable_tuple_cache();
+  public static boolean useTupleCache(TQueryOptions queryOptions) {
+    return queryOptions.isEnable_tuple_cache();
+  }
+
+  // Return true if the processing cost has been computed. This is true for
+  // COMPUTE_PROCESSING_COST=true and ENABLE_TUPLE_CACHE=true
+  public static boolean isProcessingCostAvailable(TQueryOptions queryOptions) {
+    return queryOptions.isCompute_processing_cost() ||
+        useTupleCache(queryOptions);
   }
 
   /**
@@ -548,12 +563,11 @@ public class Planner {
    * computation.
    */
   public static void reduceCardinalityByRuntimeFilter(
-      List<PlanFragment> planRoots, PlannerContext planCtx) {
+      PlanFragment rootFragment, PlannerContext planCtx) {
     double reductionScale = planCtx.getRootAnalyzer()
                                 .getQueryOptions()
                                 
.getRuntime_filter_cardinality_reduction_scale();
     if (reductionScale <= 0) return;
-    PlanFragment rootFragment = planRoots.get(0);
     Stack<PlanNode> nodeStack = new Stack<>();
     rootFragment.getPlanRoot().reduceCardinalityByRuntimeFilter(
         nodeStack, reductionScale);
@@ -572,7 +586,7 @@ public class Planner {
     List<PlanFragment> postOrderFragments = new ArrayList<>();
     boolean testCostCalculation = queryOptions.isEnable_replan()
         && (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan());
-    if (queryOptions.isCompute_processing_cost() || testCostCalculation) {
+    if (isProcessingCostAvailable(queryOptions) || testCostCalculation) {
       postOrderFragments = rootFragment.getNodesPostOrder();
       for (PlanFragment fragment : postOrderFragments) {
         fragment.computeCostingSegment(queryOptions);
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index f9a6bdbcc..b3873facc 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -34,7 +34,9 @@ import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.common.IdGenerator;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.ThriftSerializationCtx;
+import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TFileSplitGeneratorSpec;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocationList;
@@ -166,6 +168,14 @@ public class TupleCacheInfo {
   private boolean finalized_ = false;
   private String finalizedHashString_ = null;
 
+  // Cumulative processing cost from all nodes that feed into this node, 
including nodes
+  // from other fragments (e.g. the build side of a join).
+  private long cumulativeProcessingCost_ = 0;
+
+  // Estimated size of the result at this location. This is the row size 
multiplied by
+  // the filtered cardinality.
+  private long estimatedSerializedSize_ = -1;
+
   public TupleCacheInfo(DescriptorTable descTbl) {
     ineligibilityReasons_ = EnumSet.noneOf(IneligibilityReason.class);
     descriptorTable_ = descTbl;
@@ -221,6 +231,58 @@ public class TupleCacheInfo {
     finalized_ = true;
   }
 
+  public long getCumulativeProcessingCost() {
+    Preconditions.checkState(isEligible(),
+        "TupleCacheInfo only has cost information if it is cache eligible.");
+    Preconditions.checkState(finalized_, "TupleCacheInfo not finalized");
+    return cumulativeProcessingCost_;
+  }
+
+  public long getEstimatedSerializedSize() {
+    Preconditions.checkState(isEligible(),
+        "TupleCacheInfo only has cost information if it is cache eligible.");
+    Preconditions.checkState(finalized_, "TupleCacheInfo not finalized");
+    return estimatedSerializedSize_;
+  }
+
+  /**
+   * Calculate the tuple cache cost information for this plan node. This must 
be called
+   * with the matching PlanNode for this TupleCacheInfo. This pulls in any 
information
+   * from the PlanNode or from any children recursively. This cost information 
is used
+   * for planning decisions. It is also displayed in the explain plan output 
for
+   * debugging.
+   */
+  public void calculateCostInformation(PlanNode thisPlanNode) {
+    Preconditions.checkState(!finalized_,
+        "TupleCacheInfo is finalized and can't be modified");
+    Preconditions.checkState(isEligible(),
+        "TupleCacheInfo only calculates cost information if it is cache 
eligible.");
+    Preconditions.checkState(thisPlanNode.getTupleCacheInfo() == this,
+        "calculateCostInformation() must be called with its enclosing 
PlanNode");
+
+    // This was already called on our children, which are known to be eligible.
+    // Pull in the information from our children.
+    for (PlanNode child : thisPlanNode.getChildren()) {
+      cumulativeProcessingCost_ +=
+          child.getTupleCacheInfo().getCumulativeProcessingCost();
+      // If the child is from a different fragment (e.g. the build side of a 
hash join),
+      // incorporate the cost of the sink
+      if (child.getFragment() != thisPlanNode.getFragment()) {
+        cumulativeProcessingCost_ +=
+            child.getFragment().getSink().getProcessingCost().getTotalCost();
+      }
+    }
+    cumulativeProcessingCost_ += 
thisPlanNode.getProcessingCost().getTotalCost();
+
+    // If there are stats, compute the estimated serialized size. If there are 
no stats
+    // (i.e. cardinality == -1), then there is nothing to do.
+    if (thisPlanNode.getFilteredCardinality() > -1) {
+      long cardinality = thisPlanNode.getFilteredCardinality();
+      estimatedSerializedSize_ = (long) Math.round(
+          ExchangeNode.getAvgSerializedRowSize(thisPlanNode) * cardinality);
+    }
+  }
+
   /**
    * Pull in a child's TupleCacheInfo into this TupleCacheInfo. If the child is
    * ineligible, then this is marked ineligible and there is no need to 
calculate
@@ -499,6 +561,24 @@ public class TupleCacheInfo {
     return builder.toString();
   }
 
+  /**
+   * Produce explain output describing the cost information for this tuple 
cache location
+   */
+  public String getCostExplainString(String detailPrefix) {
+    StringBuilder output = new StringBuilder();
+    output.append(detailPrefix + "estimated serialized size: ");
+    if (estimatedSerializedSize_ > -1) {
+      output.append(PrintUtils.printBytes(estimatedSerializedSize_));
+    } else {
+      output.append("unavailable");
+    }
+    output.append("\n");
+    output.append(detailPrefix + "cumulative processing cost: ");
+    output.append(getCumulativeProcessingCost());
+    output.append("\n");
+    return output.toString();
+  }
+
   /**
    * Construct a comma separated list of the ineligibility reasons.
    */
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
index 98818648e..71a807ae8 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
@@ -45,32 +45,37 @@ public class TupleCacheNode extends PlanNode {
   protected boolean displayCorrectnessCheckingInfo_;
   protected boolean skipCorrectnessVerification_;
   protected final List<Integer> inputScanNodeIds_ = new ArrayList<Integer>();
+  protected final TupleCacheInfo childTupleCacheInfo_;
 
   public TupleCacheNode(PlanNodeId id, PlanNode child,
       boolean displayCorrectnessCheckingInfo) {
     super(id, "TUPLE CACHE");
     addChild(child);
     cardinality_ = child.getCardinality();
+    if (child.getFilteredCardinality() != cardinality_) {
+      setFilteredCardinality(child.getFilteredCardinality());
+    }
     limit_ = child.limit_;
 
-    TupleCacheInfo childCacheInfo = child.getTupleCacheInfo();
-    Preconditions.checkState(childCacheInfo.isEligible());
-    compileTimeKey_ = childCacheInfo.getHashString();
+    childTupleCacheInfo_ = child.getTupleCacheInfo();
+    Preconditions.checkState(childTupleCacheInfo_.isEligible());
+    compileTimeKey_ = childTupleCacheInfo_.getHashString();
     // If there is variability due to a streaming agg, skip the correctness 
verification
     // for this location.
-    skipCorrectnessVerification_ = childCacheInfo.getStreamingAggVariability();
+    skipCorrectnessVerification_ = 
childTupleCacheInfo_.getStreamingAggVariability();
     displayCorrectnessCheckingInfo_ = displayCorrectnessCheckingInfo;
-    for (HdfsScanNode scanNode : childCacheInfo.getInputScanNodes()) {
+    for (HdfsScanNode scanNode : childTupleCacheInfo_.getInputScanNodes()) {
       // Inputs into the tuple cache need to use deterministic scan range 
assignment
       scanNode.setDeterministicScanRangeAssignment(true);
       inputScanNodeIds_.add(scanNode.getId().asInt());
     }
+    computeTupleIds();
   }
 
   @Override
   public void init(Analyzer analyzer) throws ImpalaException {
     super.init(analyzer);
-    computeTupleIds();
+    Preconditions.checkState(conjuncts_.isEmpty());
   }
 
   @Override
@@ -128,6 +133,7 @@ public class TupleCacheNode extends PlanNode {
         
inputScanNodeIds_.stream().map(Object::toString).collect(Collectors.toList());
     output.append(detailPrefix + "input scan node ids: " +
         String.join(",", input_scan_node_ids_strs) + "\n");
+    output.append(childTupleCacheInfo_.getCostExplainString(detailPrefix));
     return output.toString();
   }
 
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index c7f5e198f..d37e357d7 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2031,16 +2031,14 @@ public class Frontend {
    */
   private TQueryExecRequest createExecRequest(
       Planner planner, PlanCtx planCtx) throws ImpalaException {
+    TQueryExecRequest result = new TQueryExecRequest();
     TQueryCtx queryCtx = planner.getQueryCtx();
-    List<PlanFragment> planRoots = planner.createPlans();
+    List<PlanFragment> planRoots = planner.createPlans(result);
     if (planCtx.planCaptureRequested()) {
       planCtx.plan_ = planRoots;
     }
 
     // Compute resource requirements of the final plans.
-    TQueryExecRequest result = new TQueryExecRequest();
-    Planner.reduceCardinalityByRuntimeFilter(planRoots, 
planner.getPlannerCtx());
-    Planner.computeProcessingCost(planRoots, result, planner.getPlannerCtx());
     Planner.computeResourceReqs(planRoots, queryCtx, result,
         planner.getPlannerCtx(), planner.getAnalysisResult().isQueryStmt());
 

Reply via email to