This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1dbde2c197a26971aabed7d6bb387aac7c64d83f Author: Riza Suminto <[email protected]> AuthorDate: Mon Feb 5 12:10:27 2024 -0800 IMPALA-12790: Fix overestimation in ScanNode.getInputCardinality ScanNode.getInputCardinality() can overestimate if LIMIT is large. If table cardinality stats exist and it is less than the LIMIT, then the table cardinality should be returned. This patch adds those fixes and more test cases under small-query-opt.test. This also moves tests added by IMPALA-5602 to small-query-opt.test since only PlannerTest.testSmallQueryOptimization runs with EXEC_SINGLE_NODE_ROWS_THRESHOLD > 0. Testing: - Pass FE tests. Change-Id: Icc5b39a7684fb8748185349d0b80baf8dcd6b126 Reviewed-on: http://gerrit.cloudera.org:8080/21005 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../java/org/apache/impala/planner/ScanNode.java | 32 +++- .../impala/util/MaxRowsProcessedVisitor.java | 5 +- .../org/apache/impala/planner/PlannerTest.java | 9 +- .../queries/PlannerTest/data-source-tables.test | 15 -- .../queries/PlannerTest/kudu.test | 29 --- .../queries/PlannerTest/small-query-opt.test | 199 +++++++++++++++++++++ .../queries/PlannerTest/tpcds-processing-cost.test | 61 +++++++ tests/query_test/test_codegen.py | 3 +- 8 files changed, 293 insertions(+), 60 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index ea9b6e17c..0dbef697a 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -324,20 +324,36 @@ abstract public class ScanNode extends PlanNode { return result; } + /** + * Return true if this scan node has limit, no scan conjunct, + * and no storage layer conjunct. Otherwise, return false. + * This is mainly used to determine whether a scan's input cardinality can be bounded by + * the LIMIT clause or not. + */ + public boolean hasSimpleLimit() { + return hasLimit() && !hasScanConjuncts() && !hasStorageLayerConjuncts(); + } + + private long capInputCardinalityWithLimit(long inputCardinality) { + if (hasSimpleLimit()) { + if (inputCardinality < 0) { + return getLimit(); + } else { + return Math.min(getLimit(), inputCardinality); + } + } + return inputCardinality; + } + @Override public long getInputCardinality() { - if (!hasScanConjuncts() && !hasStorageLayerConjuncts() && hasLimit()) { - return getLimit(); - } - return inputCardinality_; + return capInputCardinalityWithLimit(inputCardinality_); } // TODO: merge this with getInputCardinality(). public long getFilteredInputCardinality() { - if (!hasScanConjuncts() && !hasStorageLayerConjuncts() && hasLimit()) { - return getLimit(); - } - return filteredInputCardinality_ > -1 ? filteredInputCardinality_ : inputCardinality_; + return capInputCardinalityWithLimit( + filteredInputCardinality_ > -1 ? filteredInputCardinality_ : inputCardinality_); } @Override diff --git a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java index fb10a74f1..de4592a80 100644 --- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java +++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java @@ -50,12 +50,11 @@ public class MaxRowsProcessedVisitor implements Visitor<PlanNode> { boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats(); // In the absence of collection stats, treat scans on collections as if they // have no limit. - if (scan.isAccessingCollectionType() || - (missingStats && !(scan.hasLimit() && !scan.hasScanConjuncts() && - !scan.hasStorageLayerConjuncts()))) { + if (scan.isAccessingCollectionType() || (missingStats && !scan.hasSimpleLimit())) { valid_ = false; return; } + Preconditions.checkState(numRows > -1); maxRowsProcessed_ = Math.max(maxRowsProcessed_, numRows); maxRowsProcessedPerNode_ = Math.max(maxRowsProcessedPerNode_, (long)Math.ceil(numRows / (double)numNodes)); diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 8a3d077cc..b28d489eb 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -577,7 +577,11 @@ public class PlannerTest extends PlannerTestBase { public void testSmallQueryOptimization() { TQueryOptions options = new TQueryOptions(); options.setExec_single_node_rows_threshold(8); - runPlannerTestFile("small-query-opt", options); + addTestDb("kudu_planner_test", "Test DB for Kudu Planner."); + addTestTable("CREATE EXTERNAL TABLE kudu_planner_test.no_stats STORED AS KUDU " + + "TBLPROPERTIES ('kudu.table_name' = 'impala::functional_kudu.alltypes');"); + runPlannerTestFile("small-query-opt", options, + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); } @Test @@ -679,9 +683,6 @@ public class PlannerTest extends PlannerTestBase { options.unsetEnabled_runtime_filter_types(); options.addToEnabled_runtime_filter_types(TRuntimeFilterType.BLOOM); options.addToEnabled_runtime_filter_types(TRuntimeFilterType.MIN_MAX); - addTestDb("kudu_planner_test", "Test DB for Kudu Planner."); - addTestTable("CREATE EXTERNAL TABLE kudu_planner_test.no_stats STORED AS KUDU " + - "TBLPROPERTIES ('kudu.table_name' = 'impala::functional_kudu.alltypes');"); runPlannerTestFile("kudu", options); } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test index 5fcbdf732..cbbebcd94 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test @@ -103,18 +103,3 @@ PLAN-ROOT SINK | 00:EMPTYSET ==== -# IMPALA-5602: If a query contains predicates that are all pushed to the datasource and -# there is a limit, then the query should not incorrectly run with 'small query' -# optimization. -select * from functional.alltypes_datasource where id = 1 limit 15 ----- DISTRIBUTEDPLAN -PLAN-ROOT SINK -| -01:EXCHANGE [UNPARTITIONED] -| limit: 15 -| -00:SCAN DATA SOURCE [functional.alltypes_datasource] -data source predicates: id = 1 - limit: 15 - row-size=116B cardinality=15 -==== diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test index ce20acd33..69fb70ce1 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test @@ -511,35 +511,6 @@ INSERT INTO KUDU [default.t] partitions=24/24 files=24 size=478.45KB row-size=89B cardinality=7.30K ==== -# IMPALA-5602: If a query contains predicates that are all pushed to kudu and there is a -# limit, then the query should not incorrectly run with 'small query' optimization. -select * from functional_kudu.alltypesagg where tinyint_col = 9 limit 10; ----- DISTRIBUTEDPLAN -PLAN-ROOT SINK -| -01:EXCHANGE [UNPARTITIONED] -| limit: 10 -| -00:SCAN KUDU [functional_kudu.alltypesagg_idx] - kudu predicates: functional_kudu.alltypesagg_idx.tinyint_col = 9 - limit: 10 - row-size=103B cardinality=10 -==== -# IMPALA-5602: If a query contains predicates that are all pushed to kudu, there is a -# limit, and no table stats, then the query should not incorrectly run with 'small query' -# optimization. -select * from kudu_planner_test.no_stats where tinyint_col = 9 limit 10; ----- DISTRIBUTEDPLAN -PLAN-ROOT SINK -| -01:EXCHANGE [UNPARTITIONED] -| limit: 10 -| -00:SCAN KUDU [kudu_planner_test.no_stats] - kudu predicates: tinyint_col = 9 - limit: 10 - row-size=88B cardinality=10 -==== # Insert into an unpartitioned table, shouldn't partition/sort insert into tpch_kudu.nation select * from tpch_parquet.nation diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test index 3817a82f4..9ce15f4ef 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test @@ -449,3 +449,202 @@ PLAN-ROOT SINK partition key scan row-size=4B cardinality=1.82K ==== +# testbl is an empty table. It does not have stats computed and query has no limit. +# Query should run without 'small query' optimization. +select * from functional.testtbl; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN HDFS [functional.testtbl] + HDFS partitions=1/1 files=0 size=0B + row-size=24B cardinality=0 +==== +# testbl is an empty table, but has limit less than 8. +# 'small query' optimization is enabled. +select * from functional.testtbl limit 4; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:SCAN HDFS [functional.testtbl] + HDFS partitions=1/1 files=0 size=0B + limit: 4 + row-size=24B cardinality=0 +==== +# testbl does not have stats computed. +# But its input cardinality is estimated to be 0 and query has limit. +# 'small query' optimization is enabled. +select * from functional.testtbl limit 1000; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:SCAN HDFS [functional.testtbl] + HDFS partitions=1/1 files=0 size=0B + limit: 1000 + row-size=24B cardinality=0 +==== +# functional.tinytable has no stats and not empty. +# Query should run without 'small query' optimization. +select * from functional.tinytable; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN HDFS [functional.tinytable] + HDFS partitions=1/1 files=1 size=38B + row-size=24B cardinality=2 +==== +# tinytable has no stats, but has limit less than 8. +# 'small query' optimization is enabled. +select * from functional.tinytable limit 4; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:SCAN HDFS [functional.tinytable] + HDFS partitions=1/1 files=1 size=38B + limit: 4 + row-size=24B cardinality=2 +==== +# tinytable does not have stats computed. +# But its input cardinality is estimated to be 2 and query has limit. +# 'small query' optimization is enabled. +select * from functional.tinytable limit 1000; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:SCAN HDFS [functional.tinytable] + HDFS partitions=1/1 files=1 size=38B + limit: 1000 + row-size=24B cardinality=2 +==== +# functional_kudu.tinytable has stats and its input cardinality is 3. +# Query should run with 'small query' optimization even without limit. +select * from functional_kudu.tinytable; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:SCAN KUDU [functional_kudu.tinytable] + row-size=43B cardinality=3 +==== +# If input cardinality < EXEC_SINGLE_NODE_ROWS_THRESHOLD < limit, +# then the query should run with 'small query' optimization. +select * from functional_kudu.tinytable limit 1000; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:SCAN KUDU [functional_kudu.tinytable] + limit: 1000 + row-size=43B cardinality=3 +==== +# kudu_planner_test.no_stats has no stats computed and query has no limit. +# Query should run without 'small query' optimization. +select * from kudu_planner_test.no_stats; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN KUDU [kudu_planner_test.no_stats] + row-size=88B cardinality=unavailable +==== +# kudu_planner_test.no_stats has no stats computed and +# query has limit less than 8. 'small query' optimization is enabled. +select * from kudu_planner_test.no_stats limit 7; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:SCAN KUDU [kudu_planner_test.no_stats] + limit: 7 + row-size=88B cardinality=7 +==== +# kudu_planner_test.no_stats has no stats computed and query has limit more than 8. +# 'small query' optimization is disabled. +select * from kudu_planner_test.no_stats limit 1000; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| limit: 1000 +| +00:SCAN KUDU [kudu_planner_test.no_stats] + limit: 1000 + row-size=88B cardinality=1.00K +==== +# IMPALA-5602: If a query contains predicates that are all pushed to kudu and there is a +# limit, then the query should not incorrectly run with 'small query' optimization. +select * from functional_kudu.alltypesagg where tinyint_col = 9 limit 7; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| limit: 7 +| +00:SCAN KUDU [functional_kudu.alltypesagg_idx] + kudu predicates: functional_kudu.alltypesagg_idx.tinyint_col = 9 + limit: 7 + row-size=103B cardinality=7 +==== +# IMPALA-5602: If a query contains predicates that are all pushed to kudu, there is a +# limit, and no table stats, then the query should not incorrectly run with 'small query' +# optimization. +select * from kudu_planner_test.no_stats where tinyint_col = 9 limit 7; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| limit: 7 +| +00:SCAN KUDU [kudu_planner_test.no_stats] + kudu predicates: tinyint_col = 9 + limit: 7 + row-size=88B cardinality=0 +==== +# alltypes_datasource has estimated input cardinality of 5000. +# Query should run without 'small query' optimization. +select * from functional.alltypes_datasource; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN DATA SOURCE [functional.alltypes_datasource] + row-size=116B cardinality=5.00K +==== +# limit is less than 8. 'small query' optimization is enabled. +select * from functional.alltypes_datasource limit 7; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:SCAN DATA SOURCE [functional.alltypes_datasource] + limit: 7 + row-size=116B cardinality=7 +==== +# limit is more than 8. 'small query' optimization is disabled. +select * from functional.alltypes_datasource limit 1000; +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| limit: 1000 +| +00:SCAN DATA SOURCE [functional.alltypes_datasource] + limit: 1000 + row-size=116B cardinality=1.00K +==== +# IMPALA-5602: If a query contains predicates that are all pushed to the datasource and +# there is a limit, then the query should not incorrectly run with 'small query' +# optimization. +select * from functional.alltypes_datasource where id = 1 limit 7 +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| limit: 7 +| +00:SCAN DATA SOURCE [functional.alltypes_datasource] +data source predicates: id = 1 + limit: 7 + row-size=116B cardinality=7 +==== diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test index 4c2104f26..4a71ca2a7 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test @@ -139,3 +139,64 @@ max-parallelism=21 segment-costs=[194962048, 2276043] tuple-ids=0 row-size=24B cardinality=2.88M cost=91267504 in pipelines: 00(GETNEXT) ==== +select * from income_band; +---- PARALLELPLANS +Max Per-Host Resource Reservation: Memory=4.02MB Threads=2 +Per-Host Resource Estimates: Memory=20MB +F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1 +| max-parallelism=1 segment-costs=[61] +PLAN-ROOT SINK +| output exprs: tpcds_partitioned_parquet_snap.income_band.ib_income_band_sk, tpcds_partitioned_parquet_snap.income_band.ib_lower_bound, tpcds_partitioned_parquet_snap.income_band.ib_upper_bound +| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=60 +| +01:EXCHANGE [UNPARTITIONED] +| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 +| tuple-ids=0 row-size=12B cardinality=20 cost=1 +| in pipelines: 00(GETNEXT) +| +F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 +Per-Instance Resources: mem-estimate=16.06MB mem-reservation=24.00KB thread-reservation=1 +max-parallelism=1 segment-costs=[50002] +00:SCAN HDFS [tpcds_partitioned_parquet_snap.income_band, RANDOM] + HDFS partitions=1/1 files=1 size=1.21KB + stored statistics: + table: rows=20 size=1.21KB + columns: all + extrapolated-rows=disabled max-scan-range-rows=20 + mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=0 + tuple-ids=0 row-size=12B cardinality=20 cost=50001 + in pipelines: 00(GETNEXT) +==== +# Scan cost should be exactly the same as select star without limit. +select * from income_band limit 1000000000; +---- PARALLELPLANS +Max Per-Host Resource Reservation: Memory=4.02MB Threads=2 +Per-Host Resource Estimates: Memory=20MB +F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1 +| max-parallelism=1 segment-costs=[61] +PLAN-ROOT SINK +| output exprs: tpcds_partitioned_parquet_snap.income_band.ib_income_band_sk, tpcds_partitioned_parquet_snap.income_band.ib_lower_bound, tpcds_partitioned_parquet_snap.income_band.ib_upper_bound +| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=60 +| +01:EXCHANGE [UNPARTITIONED] +| limit: 1000000000 +| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 +| tuple-ids=0 row-size=12B cardinality=20 cost=1 +| in pipelines: 00(GETNEXT) +| +F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 +Per-Instance Resources: mem-estimate=16.06MB mem-reservation=24.00KB thread-reservation=1 +max-parallelism=1 segment-costs=[50002] +00:SCAN HDFS [tpcds_partitioned_parquet_snap.income_band, RANDOM] + HDFS partitions=1/1 files=1 size=1.21KB + stored statistics: + table: rows=20 size=1.21KB + columns: all + extrapolated-rows=disabled max-scan-range-rows=20 + limit: 1000000000 + mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=0 + tuple-ids=0 row-size=12B cardinality=20 cost=50001 + in pipelines: 00(GETNEXT) +==== diff --git a/tests/query_test/test_codegen.py b/tests/query_test/test_codegen.py index 81a56b3b0..3ef4ad9c9 100644 --- a/tests/query_test/test_codegen.py +++ b/tests/query_test/test_codegen.py @@ -47,7 +47,8 @@ class TestCodegen(ImpalaTestSuite): def test_select_node_codegen(self, vector): """Test that select node is codegened""" result = self.execute_query('select * from (select * from functional.alltypes ' - 'limit 1000000) t1 where int_col > 10 limit 10') + 'limit 1000000) t1 where int_col > 10 limit 10', + {'disable_codegen_rows_threshold': 7000}) exec_options = get_node_exec_options(result.runtime_profile, 1) # Make sure test fails if there are no exec options in the profile for the node assert len(exec_options) > 0
