This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1388be8eb8011eaad45327e2da8671a1ff10844e
Author: Riza Suminto <[email protected]>
AuthorDate: Fri Oct 20 11:52:35 2023 -0700

    IMPALA-12510: Floor PlanFragment.maxParallelism_ at 1
    
    IMPALA-12444 introduce a bug where PlanFragment.maxParallelism_ can be
    set to 0. This can happen at scan fragment if table is empty. Number of
    scan ranges will be 0, which then propagate to
    ScanNode.maxScannerThreads_ and PlanFragment.maxParallelism_.
    
    This patch fix it by flooring ScanNode.maxScannerThreads_ and
    PlanFragment.maxParallelism_ at 1.
    
    Testing:
    - Add select star over an empty table testcase to
      PlannerTest.testProcessingCost.
    
    Change-Id: Ibfa50abfdb9cdb994c5c3d7904b377a25f5b8b97
    Reviewed-on: http://gerrit.cloudera.org:8080/20606
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Riza Suminto <[email protected]>
---
 .../org/apache/impala/planner/PlanFragment.java    | 15 ++++++-----
 .../java/org/apache/impala/planner/ScanNode.java   | 11 +++++---
 .../queries/PlannerTest/tpcds-processing-cost.test | 31 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 11 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index de708b0a8..b8e5838f4 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -1000,8 +1000,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
    *                         TExecutorGroupSet.num_cores_per_executor)}.
    * @param parentParallelism Number of instance of parent fragment.
    */
-  protected void traverseEffectiveParallelism(
-      int minThreadPerNode, int maxThreadPerNode, int parentParallelism) {
+  protected void traverseEffectiveParallelism(final int minThreadPerNode,
+      final int maxThreadPerNode, final int parentParallelism) {
     Preconditions.checkNotNull(
         rootSegment_, "ProcessingCost Fragment %s has not been computed!", 
getId());
     int nodeStepCount = getNumInstances() % getNumNodes() == 0 ? getNumNodes() 
: 1;
@@ -1058,8 +1058,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
    * @return True if it is possible to lower this fragment's parallelism 
through
    * ProcessingCost comparison. False if the parallelism should not be changed 
anymore.
    */
-  private boolean adjustToMaxParallelism(int minThreadPerNode, int 
maxThreadPerNode,
-      int parentParallelism, int nodeStepCount) {
+  private boolean adjustToMaxParallelism(final int minThreadPerNode,
+      final int maxThreadPerNode, final int parentParallelism, final int 
nodeStepCount) {
     int maxThreadAllowed = IntMath.saturatedMultiply(maxThreadPerNode, 
getNumNodes());
     boolean canTryLower = true;
     int maxScannerThreads = Integer.MAX_VALUE;
@@ -1081,6 +1081,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       canTryLower = false; // no need to compute effective parallelism anymore.
     } else {
       int costBasedMaxParallelism = Math.max(nodeStepCount, 
getCostBasedMaxParallelism());
+      Preconditions.checkState(costBasedMaxParallelism > 0);
 
       if (hasUnionNode()) {
         // We set parallelism of union fragment as a max between its input 
fragments and
@@ -1134,9 +1135,9 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         collectPlanNodes(Predicates.instanceOf(ScanNode.class), scanNodes);
         if (!scanNodes.isEmpty()) {
           Preconditions.checkState(scanNodes.size() == 1);
-          ScanNode scanNode = scanNodes.get(0);
-          maxScannerThreads = scanNode.maxScannerThreads_;
-          maxParallelism_ = Math.min(maxParallelism_, maxScannerThreads);
+          maxScannerThreads = scanNodes.get(0).maxScannerThreads_;
+          maxParallelism_ = Math.max(ScanNode.MIN_NUM_SCAN_THREADS,
+              Math.min(maxParallelism_, maxScannerThreads));
 
           // Prevent caller from lowering parallelism if fragment has ScanNode
           // because there is no child fragment to compare with.
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index 66ef705ce..36a694296 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -57,6 +57,7 @@ abstract public class ScanNode extends PlanNode {
   // scan ranges than would have been estimated assuming a uniform 
distribution.
   // Used for HDFS and Kudu Scan node estimations.
   protected static final double SCAN_RANGE_SKEW_FACTOR = 1.2;
+  protected static final int MIN_NUM_SCAN_THREADS = 1;
 
   protected final TupleDescriptor desc_;
 
@@ -85,8 +86,9 @@ abstract public class ScanNode extends PlanNode {
   protected long tableNumRowsHint_ = -1;
 
   // Maximum number of scanner threads after considering number of scan ranges
-  // and related query options.Calculated at computeScanProcessingCost.
-  protected int maxScannerThreads_ = -1;
+  // and related query options. Calculated at computeScanProcessingCost.
+  // Default to MIN_NUM_SCAN_THREADS.
+  protected int maxScannerThreads_ = MIN_NUM_SCAN_THREADS;
 
   public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) {
     super(id, desc.getId().asList(), displayName);
@@ -368,8 +370,9 @@ abstract public class ScanNode extends PlanNode {
     // regardless of the core count limit.
     int maxThreadsPerNode = 
Math.max(queryOptions.getProcessing_cost_min_threads(),
         queryOptions.getMax_fragment_instances_per_node());
-    maxScannerThreads_ = (int) Math.min(getEffectiveNumScanRanges(),
-        IntMath.saturatedMultiply(getNumNodes(), maxThreadsPerNode));
+    int maxThreadsGlobal = IntMath.saturatedMultiply(getNumNodes(), 
maxThreadsPerNode);
+    maxScannerThreads_ = Math.max(MIN_NUM_SCAN_THREADS,
+        (int) Math.min(getEffectiveNumScanRanges(), maxThreadsGlobal));
     long inputCardinality = getInputCardinality();
 
     if (inputCardinality >= 0) {
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
index e4ed15684..b31db836e 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
@@ -1,3 +1,34 @@
+# Regression test for IMPALA-12510: select star on empty table
+select * from functional.emptytable;
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
+Per-Host Resource Estimates: Memory=10MB
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[0]
+PLAN-ROOT SINK
+|  output exprs: functional.emptytable.field, functional.emptytable.f2
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=0
+|
+01:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=20.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=16B cardinality=0 cost=0
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+Per-Instance Resources: mem-estimate=80.00KB mem-reservation=0B 
thread-reservation=1
+max-parallelism=1 segment-costs=[0]
+00:SCAN HDFS [functional.emptytable, RANDOM]
+   partitions=0/0 files=0 size=0B
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/0 rows=0
+     columns missing stats: field
+   extrapolated-rows=disabled max-scan-range-rows=0
+   mem-estimate=0B mem-reservation=0B thread-reservation=0
+   tuple-ids=0 row-size=16B cardinality=0 cost=0
+   in pipelines: 00(GETNEXT)
+====
 # TPCDS-Q3
 select
   dt.d_year,

Reply via email to