This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1388be8eb8011eaad45327e2da8671a1ff10844e Author: Riza Suminto <[email protected]> AuthorDate: Fri Oct 20 11:52:35 2023 -0700 IMPALA-12510: Floor PlanFragment.maxParallelism_ at 1 IMPALA-12444 introduce a bug where PlanFragment.maxParallelism_ can be set to 0. This can happen at scan fragment if table is empty. Number of scan ranges will be 0, which then propagate to ScanNode.maxScannerThreads_ and PlanFragment.maxParallelism_. This patch fix it by flooring ScanNode.maxScannerThreads_ and PlanFragment.maxParallelism_ at 1. Testing: - Add select star over an empty table testcase to PlannerTest.testProcessingCost. Change-Id: Ibfa50abfdb9cdb994c5c3d7904b377a25f5b8b97 Reviewed-on: http://gerrit.cloudera.org:8080/20606 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Riza Suminto <[email protected]> --- .../org/apache/impala/planner/PlanFragment.java | 15 ++++++----- .../java/org/apache/impala/planner/ScanNode.java | 11 +++++--- .../queries/PlannerTest/tpcds-processing-cost.test | 31 ++++++++++++++++++++++ 3 files changed, 46 insertions(+), 11 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java index de708b0a8..b8e5838f4 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java @@ -1000,8 +1000,8 @@ public class PlanFragment extends TreeNode<PlanFragment> { * TExecutorGroupSet.num_cores_per_executor)}. * @param parentParallelism Number of instance of parent fragment. */ - protected void traverseEffectiveParallelism( - int minThreadPerNode, int maxThreadPerNode, int parentParallelism) { + protected void traverseEffectiveParallelism(final int minThreadPerNode, + final int maxThreadPerNode, final int parentParallelism) { Preconditions.checkNotNull( rootSegment_, "ProcessingCost Fragment %s has not been computed!", getId()); int nodeStepCount = getNumInstances() % getNumNodes() == 0 ? getNumNodes() : 1; @@ -1058,8 +1058,8 @@ public class PlanFragment extends TreeNode<PlanFragment> { * @return True if it is possible to lower this fragment's parallelism through * ProcessingCost comparison. False if the parallelism should not be changed anymore. */ - private boolean adjustToMaxParallelism(int minThreadPerNode, int maxThreadPerNode, - int parentParallelism, int nodeStepCount) { + private boolean adjustToMaxParallelism(final int minThreadPerNode, + final int maxThreadPerNode, final int parentParallelism, final int nodeStepCount) { int maxThreadAllowed = IntMath.saturatedMultiply(maxThreadPerNode, getNumNodes()); boolean canTryLower = true; int maxScannerThreads = Integer.MAX_VALUE; @@ -1081,6 +1081,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { canTryLower = false; // no need to compute effective parallelism anymore. } else { int costBasedMaxParallelism = Math.max(nodeStepCount, getCostBasedMaxParallelism()); + Preconditions.checkState(costBasedMaxParallelism > 0); if (hasUnionNode()) { // We set parallelism of union fragment as a max between its input fragments and @@ -1134,9 +1135,9 @@ public class PlanFragment extends TreeNode<PlanFragment> { collectPlanNodes(Predicates.instanceOf(ScanNode.class), scanNodes); if (!scanNodes.isEmpty()) { Preconditions.checkState(scanNodes.size() == 1); - ScanNode scanNode = scanNodes.get(0); - maxScannerThreads = scanNode.maxScannerThreads_; - maxParallelism_ = Math.min(maxParallelism_, maxScannerThreads); + maxScannerThreads = scanNodes.get(0).maxScannerThreads_; + maxParallelism_ = Math.max(ScanNode.MIN_NUM_SCAN_THREADS, + Math.min(maxParallelism_, maxScannerThreads)); // Prevent caller from lowering parallelism if fragment has ScanNode // because there is no child fragment to compare with. diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index 66ef705ce..36a694296 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -57,6 +57,7 @@ abstract public class ScanNode extends PlanNode { // scan ranges than would have been estimated assuming a uniform distribution. // Used for HDFS and Kudu Scan node estimations. protected static final double SCAN_RANGE_SKEW_FACTOR = 1.2; + protected static final int MIN_NUM_SCAN_THREADS = 1; protected final TupleDescriptor desc_; @@ -85,8 +86,9 @@ abstract public class ScanNode extends PlanNode { protected long tableNumRowsHint_ = -1; // Maximum number of scanner threads after considering number of scan ranges - // and related query options.Calculated at computeScanProcessingCost. - protected int maxScannerThreads_ = -1; + // and related query options. Calculated at computeScanProcessingCost. + // Default to MIN_NUM_SCAN_THREADS. + protected int maxScannerThreads_ = MIN_NUM_SCAN_THREADS; public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) { super(id, desc.getId().asList(), displayName); @@ -368,8 +370,9 @@ abstract public class ScanNode extends PlanNode { // regardless of the core count limit. int maxThreadsPerNode = Math.max(queryOptions.getProcessing_cost_min_threads(), queryOptions.getMax_fragment_instances_per_node()); - maxScannerThreads_ = (int) Math.min(getEffectiveNumScanRanges(), - IntMath.saturatedMultiply(getNumNodes(), maxThreadsPerNode)); + int maxThreadsGlobal = IntMath.saturatedMultiply(getNumNodes(), maxThreadsPerNode); + maxScannerThreads_ = Math.max(MIN_NUM_SCAN_THREADS, + (int) Math.min(getEffectiveNumScanRanges(), maxThreadsGlobal)); long inputCardinality = getInputCardinality(); if (inputCardinality >= 0) { diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test index e4ed15684..b31db836e 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test @@ -1,3 +1,34 @@ +# Regression test for IMPALA-12510: select star on empty table +select * from functional.emptytable; +---- PARALLELPLANS +Max Per-Host Resource Reservation: Memory=4.00MB Threads=2 +Per-Host Resource Estimates: Memory=10MB +F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1 +| max-parallelism=1 segment-costs=[0] +PLAN-ROOT SINK +| output exprs: functional.emptytable.field, functional.emptytable.f2 +| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=0 +| +01:EXCHANGE [UNPARTITIONED] +| mem-estimate=20.00KB mem-reservation=0B thread-reservation=0 +| tuple-ids=0 row-size=16B cardinality=0 cost=0 +| in pipelines: 00(GETNEXT) +| +F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 +Per-Instance Resources: mem-estimate=80.00KB mem-reservation=0B thread-reservation=1 +max-parallelism=1 segment-costs=[0] +00:SCAN HDFS [functional.emptytable, RANDOM] + partitions=0/0 files=0 size=0B + stored statistics: + table: rows=unavailable size=unavailable + partitions: 0/0 rows=0 + columns missing stats: field + extrapolated-rows=disabled max-scan-range-rows=0 + mem-estimate=0B mem-reservation=0B thread-reservation=0 + tuple-ids=0 row-size=16B cardinality=0 cost=0 + in pipelines: 00(GETNEXT) +==== # TPCDS-Q3 select dt.d_year,
