This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 89a48b80a2689115d6bd90bf4312a452ae11477a
Author: Riza Suminto <[email protected]>
AuthorDate: Thu Aug 24 11:30:37 2023 -0700

    IMPALA-12444: Fix minimum parallelism bug in scan fragment
    
    Scan fragment did not follow PROCESSING_COST_MIN_THREADS set by user
    even if total scan ranges allow to do so. This patch fix the issue by
    exposing ScanNode.maxScannerThreads_ to
    PlanFragment.adjustToMaxParallelism(). By using
    ScanNode.maxScannerThreads_ as an upper bound, ScanNode does not need to
    artificially lower ProcessingCost if maxScannerThreads_ is lower than
    minimum parallelism dictated by the original ProcessingCost. Thus, the
    synthetic ProcessingCost logic in ScanNode class is revised to only
    apply if input cardinality is unknown (-1).
    
    This patch also does the following adjustments:
    - Remove some dead codes in Frontend.java and PlanFragment.java.
    - Add sanity check such that PROCESSING_COST_MIN_THREADS <=
      MAX_FRAGMENT_INSTANCES_PER_NODE.
    - Tidy up test_query_cpu_count_divisor_default to reduce number of
      SET query.
    
    Testing:
    - Update test_query_cpu_count_divisor_default to ensure that
      PROCESSING_COST_MIN_THREADS is respected by scan fragment and error
      is returned if PROCESSING_COST_MIN_THREADS is greater than
      MAX_FRAGMENT_INSTANCES_PER_NODE.
    - Pass test_executor_groups.py.
    
    Change-Id: I69e5a80146d4ac41de5ef406fc2bdceffe3ec394
    Reviewed-on: http://gerrit.cloudera.org:8080/20475
    Reviewed-by: Kurt Deschler <[email protected]>
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Tested-by: Riza Suminto <[email protected]>
---
 .../java/org/apache/impala/analysis/Analyzer.java  |  2 +
 .../org/apache/impala/planner/CostingSegment.java  |  2 +-
 .../org/apache/impala/planner/PlanFragment.java    | 24 ++++-----
 .../java/org/apache/impala/planner/ScanNode.java   | 37 +++++++-------
 .../java/org/apache/impala/service/Frontend.java   | 28 +++++------
 tests/custom_cluster/test_executor_groups.py       | 57 ++++++++--------------
 6 files changed, 63 insertions(+), 87 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java 
b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 1e024d525..224be7859 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -580,6 +580,8 @@ public class Analyzer {
     // singleton.
     private int numExecutorsForPlanning_ = -1;
 
+    // Number of available cores per executor node.
+    // Set by Frontend.java.
     private int availableCoresPerNode_ = -1;
 
     // Cache of KuduTables opened for this query. (map from table name to kudu 
table)
diff --git a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java 
b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
index 27ea37300..fed37bf29 100644
--- a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
+++ b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
@@ -192,7 +192,7 @@ public class CostingSegment extends 
TreeNode<CostingSegment> {
         nodeStepCount, minParallelism, maxParallelism, producerCost, cost_);
     newParallelism = Math.max(newParallelism, cost_.getNumInstancesExpected());
     Preconditions.checkState(newParallelism <= maxParallelism,
-        "originalParallelism=" + originalParallelism + ". newParallelism="
+        getRootId() + " originalParallelism=" + originalParallelism + ". 
newParallelism="
             + newParallelism + " > maxParallelism=" + maxParallelism);
 
     if (LOG.isTraceEnabled()) {
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 44b52f152..de708b0a8 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -1062,6 +1062,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       int parentParallelism, int nodeStepCount) {
     int maxThreadAllowed = IntMath.saturatedMultiply(maxThreadPerNode, 
getNumNodes());
     boolean canTryLower = true;
+    int maxScannerThreads = Integer.MAX_VALUE;
 
     // Compute selectedParallelism as the maximum allowed parallelism.
     int selectedParallelism = getNumInstances();
@@ -1099,13 +1100,12 @@ public class PlanFragment extends 
TreeNode<PlanFragment> {
         if (!scanNodes.isEmpty()) {
           // The existence of scan node may justify an increase of parallelism 
for this
           // union fargment, but it should be capped at 
costBasedMaxParallelism.
-          long maxRangesPerScanNode = 1;
+          maxScannerThreads = 1;
           for (ScanNode scanNode : scanNodes) {
-            maxRangesPerScanNode =
-                Math.max(maxRangesPerScanNode, 
scanNode.getEffectiveNumScanRanges());
+            maxScannerThreads = Math.max(maxScannerThreads, 
scanNode.maxScannerThreads_);
           }
-          maxParallelism_ = Math.max(maxParallelism_,
-              (int) Math.min(costBasedMaxParallelism, maxRangesPerScanNode));
+          maxParallelism_ = Math.max(
+              maxParallelism_, Math.min(costBasedMaxParallelism, 
maxScannerThreads));
         }
 
         if (maxParallelism_ > maxThreadAllowed) {
@@ -1135,8 +1135,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         if (!scanNodes.isEmpty()) {
           Preconditions.checkState(scanNodes.size() == 1);
           ScanNode scanNode = scanNodes.get(0);
-          maxParallelism_ =
-              (int) Math.min(maxParallelism_, 
scanNode.getEffectiveNumScanRanges());
+          maxScannerThreads = scanNode.maxScannerThreads_;
+          maxParallelism_ = Math.min(maxParallelism_, maxScannerThreads);
 
           // Prevent caller from lowering parallelism if fragment has ScanNode
           // because there is no child fragment to compare with.
@@ -1153,7 +1153,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
                 getNumInstances(), selectedParallelism, "Follow 
maxThreadPerNode.");
           }
         } else {
-          if (maxParallelism_ < minParallelism && scanNodes.isEmpty()) {
+          if (maxParallelism_ < minParallelism && minParallelism < 
maxScannerThreads) {
             maxParallelism_ = minParallelism;
             canTryLower = false;
             if (LOG.isTraceEnabled()) {
@@ -1177,7 +1177,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
 
     // Initialize this fragment's parallelism to the selectedParallelism.
     setAdjustedInstanceCount(selectedParallelism);
-    return canTryLower;
+    return canTryLower && selectedParallelism > 1;
   }
 
   private boolean hasUnionNode() {
@@ -1186,12 +1186,6 @@ public class PlanFragment extends TreeNode<PlanFragment> 
{
     return !nodes.isEmpty();
   }
 
-  private boolean hasScanNode() {
-    List<ScanNode> nodes = Lists.newArrayList();
-    collectPlanNodes(Predicates.instanceOf(ScanNode.class), nodes);
-    return !nodes.isEmpty();
-  }
-
   /**
    * Compute {@link CoreCount} of this fragment and populate it into 
'fragmentCoreState'.
    * @param fragmentCoreState A map holding per-fragment core state.
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index a2bcda486..66ef705ce 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -84,6 +84,10 @@ abstract public class ScanNode extends PlanNode {
   // Refer to the comment of 'TableRef.tableNumRowsHint_'
   protected long tableNumRowsHint_ = -1;
 
+  // Maximum number of scanner threads after considering number of scan ranges
+  // and related query options.Calculated at computeScanProcessingCost.
+  protected int maxScannerThreads_ = -1;
+
   public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) {
     super(id, desc.getId().asList(), displayName);
     desc_ = desc;
@@ -358,39 +362,38 @@ abstract public class ScanNode extends PlanNode {
    */
   protected ProcessingCost computeScanProcessingCost(TQueryOptions 
queryOptions) {
     Preconditions.checkArgument(queryOptions.isCompute_processing_cost());
-    ProcessingCost cardinalityBasedCost =
-        ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(),
-            ExprUtil.computeExprsTotalCost(conjuncts_), 
rowMaterializationCost());
 
     // maxThread calculation below intentionally does not include core count 
from
     // executor group config. This is to allow scan fragment parallelism to 
scale
     // regardless of the core count limit.
     int maxThreadsPerNode = 
Math.max(queryOptions.getProcessing_cost_min_threads(),
         queryOptions.getMax_fragment_instances_per_node());
-    int maxScannerThreads = (int) Math.min(getEffectiveNumScanRanges(),
+    maxScannerThreads_ = (int) Math.min(getEffectiveNumScanRanges(),
         IntMath.saturatedMultiply(getNumNodes(), maxThreadsPerNode));
-
-    if (getInputCardinality() == 0) {
-      Preconditions.checkState(cardinalityBasedCost.getTotalCost() == 0,
-          "Scan is empty but cost is non-zero.");
+    long inputCardinality = getInputCardinality();
+
+    if (inputCardinality >= 0) {
+      ProcessingCost cardinalityBasedCost =
+          ProcessingCost.basicCost(getDisplayLabel(), inputCardinality,
+              ExprUtil.computeExprsTotalCost(conjuncts_), 
rowMaterializationCost());
+      if (inputCardinality == 0) {
+        Preconditions.checkState(cardinalityBasedCost.getTotalCost() == 0,
+            "Scan is empty but cost is non-zero.");
+      }
       return cardinalityBasedCost;
-    } else if (maxScannerThreads < cardinalityBasedCost.getTotalCost()
-            / BackendConfig.INSTANCE.getMinProcessingPerThread()) {
-      // Input cardinality is unknown or cost is too high compared to 
maxScanThreads
-      // Return synthetic ProcessingCost based on maxScanThreads.
+    } else {
+      // Input cardinality is unknown. Return synthetic ProcessingCost based on
+      // maxScannerThreads_.
       long syntheticCardinality =
-          Math.max(1, Math.min(getInputCardinality(), maxScannerThreads));
+          Math.max(1, Math.min(inputCardinality, maxScannerThreads_));
       long syntheticPerRowCost = LongMath.saturatedMultiply(
           Math.max(1,
               BackendConfig.INSTANCE.getMinProcessingPerThread() / 
syntheticCardinality),
-          maxScannerThreads);
+          maxScannerThreads_);
 
       return ProcessingCost.basicCost(
           getDisplayLabel(), syntheticCardinality, 0, syntheticPerRowCost);
     }
-
-    // None of the conditions above apply.
-    return cardinalityBasedCost;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 2821dfc5e..0d2153656 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -180,6 +180,7 @@ import org.apache.impala.thrift.TGetTableHistoryResult;
 import org.apache.impala.thrift.TGetTableHistoryResultItem;
 import org.apache.impala.thrift.TGrantRevokePrivParams;
 import org.apache.impala.thrift.TGrantRevokeRoleParams;
+import org.apache.impala.thrift.TImpalaQueryOptions;
 import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TLoadDataReq;
 import org.apache.impala.thrift.TLoadDataResp;
@@ -257,10 +258,8 @@ public class Frontend {
   private static final String VERDICT = "Verdict";
   private static final String MEMORY_MAX = "MemoryMax";
   private static final String MEMORY_ASK = "MemoryAsk";
-  private static final String MEMORY_ASK_UNBOUNDED = "MemoryAskUnbounded";
   private static final String CPU_MAX = "CpuMax";
   private static final String CPU_ASK = "CpuAsk";
-  private static final String CPU_ASK_UNBOUNDED = "CpuAskUnbounded";
 
   /**
    * Plan-time context that allows capturing various artifacts created
@@ -2074,8 +2073,6 @@ public class Frontend {
     int attempt = 0;
     int lastExecutorGroupTotalCores =
         expectedTotalCores(executorGroupSetsToUse.get(num_executor_group_sets 
- 1));
-    long memoryAskUnbounded = -1;
-    int cpuAskUnbounded = -1;
     int i = 0;
     while (i < num_executor_group_sets) {
       group_set = executorGroupSetsToUse.get(i);
@@ -2175,6 +2172,16 @@ public class Frontend {
       int scaled_cores_requirement = -1;
       if (isComputeCost) {
         Preconditions.checkState(cores_requirement > 0);
+        if (queryOptions.getProcessing_cost_min_threads()
+            > queryOptions.getMax_fragment_instances_per_node()) {
+          throw new AnalysisException(
+              TImpalaQueryOptions.PROCESSING_COST_MIN_THREADS.name() + " ("
+              + queryOptions.getProcessing_cost_min_threads()
+              + ") can not be larger than "
+              + TImpalaQueryOptions.MAX_FRAGMENT_INSTANCES_PER_NODE.name() + " 
("
+              + queryOptions.getMax_fragment_instances_per_node() + ").");
+        }
+
         scaled_cores_requirement = (int) Math.min(Integer.MAX_VALUE,
             Math.ceil(
                 cores_requirement / 
BackendConfig.INSTANCE.getQueryCpuCountDivisor()));
@@ -2184,19 +2191,6 @@ public class Frontend {
             groupSetProfile, new TCounter(CPU_ASK, TUnit.UNIT, 
scaled_cores_requirement));
         addCounter(groupSetProfile,
             new TCounter(EFFECTIVE_PARALLELISM, TUnit.UNIT, 
cores_requirement));
-
-        if (memoryAskUnbounded > 0) {
-          addCounter(groupSetProfile,
-              new TCounter(MEMORY_ASK_UNBOUNDED, TUnit.BYTES,
-                  LongMath.saturatedMultiply(
-                      expectedNumExecutor(group_set), memoryAskUnbounded)));
-          memoryAskUnbounded = -1;
-        }
-        if (cpuAskUnbounded > 0) {
-          addCounter(groupSetProfile,
-              new TCounter(CPU_ASK_UNBOUNDED, TUnit.UNIT, cpuAskUnbounded));
-          cpuAskUnbounded = -1;
-        }
       }
 
       boolean matchFound = false;
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index af94e68f0..0ba856597 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -994,10 +994,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
           "Memory and cpu limit checking is skipped."),
          "EffectiveParallelism: 13", "ExecutorGroupsConsidered: 1"])
 
-    # Test setting REQUEST_POOL and disabling COMPUTE_PROCESSING_COST
-    self._set_query_options({
-      'COMPUTE_PROCESSING_COST': 'false',
-      'REQUEST_POOL': 'root.large'})
+    # Test setting REQUEST_POOL=root.large and disabling 
COMPUTE_PROCESSING_COST
+    self._set_query_options({'COMPUTE_PROCESSING_COST': 'false'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
         ["Query Options (set by configuration): REQUEST_POOL=root.large",
          "Executor Group: root.large-group",
@@ -1024,26 +1022,13 @@ class TestExecutorGroups(CustomClusterTestSuite):
         ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
           "Verdict: Match", "CpuAsk: 12"])
 
-    # ENABLE_REPLAN=false should force query to run in tiny group, but high 
scan
-    # parallelism will cause it to exceed the admission control slots.
+    # ENABLE_REPLAN=false should force query to run in first group (tiny).
     self._set_query_options({'ENABLE_REPLAN': 'false'})
-    result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
-    status = ("Rejected query from pool root.tiny: number of admission control 
slots "
-        r"needed \(10\) on backend '.*' is greater than total slots available 
8. "
-        "Reduce mt_dop to less than 8 to ensure that the query can execute.")
-    assert re.search(status, str(result))
-
-    # ENABLE_REPLAN=false and MAX_FRAGMENT_INSTANCES_PER_NODE=4 should allow 
query to run
-    # in tiny group.
-    self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'})
-    self._run_query_and_verify_profile(CPU_TEST_QUERY,
+    self._run_query_and_verify_profile(TEST_QUERY,
         ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
          "Verdict: Assign to first group because query option 
ENABLE_REPLAN=false"])
-
-    # Unset both ENABLE_REPLAN and MAX_FRAGMENT_INSTANCES_PER_NODE
-    self._set_query_options({
-      'ENABLE_REPLAN': '',
-      'MAX_FRAGMENT_INSTANCES_PER_NODE': ''})
+    # Unset ENABLE_REPLAN.
+    self._set_query_options({'ENABLE_REPLAN': ''})
 
     # Trivial query should be assigned to tiny group by Frontend.
     # Backend may decide to run it in coordinator only.
@@ -1065,33 +1050,31 @@ class TestExecutorGroups(CustomClusterTestSuite):
         ["Executor Group:"])
 
     # Test combination of PROCESSING_COST_MIN_THREADS and 
MAX_FRAGMENT_INSTANCES_PER_NODE.
-    self._set_query_options({
-      'PROCESSING_COST_MIN_THREADS': '1',
-      'MAX_FRAGMENT_INSTANCES_PER_NODE': '3'})
+    self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '3'})
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
         ["Executor Group: root.large-group", "EffectiveParallelism: 9",
          "ExecutorGroupsConsidered: 3"])
-    self._set_query_options({
-      'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'})
+    self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'})
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
         ["Executor Group: root.large-group", "EffectiveParallelism: 12",
          "ExecutorGroupsConsidered: 3"])
-    self._set_query_options({
-      'PROCESSING_COST_MIN_THREADS': '3',
-      'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
+    self._set_query_options({'PROCESSING_COST_MIN_THREADS': '2'})
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
-        ["Executor Group: root.large-group", "EffectiveParallelism: 9",
+        ["Executor Group: root.large-group", "EffectiveParallelism: 12",
          "ExecutorGroupsConsidered: 3"])
-    self._set_query_options({
-      'PROCESSING_COST_MIN_THREADS': '2',
-      'MAX_FRAGMENT_INSTANCES_PER_NODE': '2'})
+    self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '2'})
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
-        ["Executor Group: root.small-group", "EffectiveParallelism: 2",
+        ["Executor Group: root.small-group", "EffectiveParallelism: 4",
          "ExecutorGroupsConsidered: 2"])
+    self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
+    result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
+    status = (r"PROCESSING_COST_MIN_THREADS \(2\) can not be larger than "
+              r"MAX_FRAGMENT_INSTANCES_PER_NODE \(1\).")
+    assert re.search(status, str(result))
     # Unset PROCESSING_COST_MIN_THREADS and MAX_FRAGMENT_INSTANCES_PER_NODE.
     self._set_query_options({
-      'PROCESSING_COST_MIN_THREADS': '',
-      'MAX_FRAGMENT_INSTANCES_PER_NODE': ''})
+      'MAX_FRAGMENT_INSTANCES_PER_NODE': '',
+      'PROCESSING_COST_MIN_THREADS': ''})
 
     # BEGIN testing count queries
     # Test optimized count star query with 1824 scan ranges assign to small 
group.
@@ -1222,7 +1205,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
 
     # Check resource pools on the Web queries site and admission site
     self._verify_query_num_for_resource_pool("root.small", 10)
-    self._verify_query_num_for_resource_pool("root.tiny", 6)
+    self._verify_query_num_for_resource_pool("root.tiny", 5)
     self._verify_query_num_for_resource_pool("root.large", 12)
     self._verify_total_admitted_queries("root.small", 11)
     self._verify_total_admitted_queries("root.tiny", 8)

Reply via email to