This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 85d77b908b12ae3d3f48ed5d49f38fb3832edc4e Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Thu Nov 20 17:52:32 2025 +0100 IMPALA-13756: Fix Iceberg V2 count(*) optimization for complex queries We optimize plain count(*) queries on Iceberg tables the following way: AGGREGATE COUNT(*) | UNION ALL / \ / \ / \ SCAN all ANTI JOIN datafiles / \ without / \ deletes SCAN SCAN datafiles deletes || rewrite || \/ ArithmethicExpr: LHS + RHS / \ / \ / \ record_count AGGREGATE of all COUNT(*) datafiles | without ANTI JOIN deletes / \ / \ SCAN SCAN datafiles deletes This optimization consists of two parts: 1 Rewriting count(*) expression to count(*) + "record_count" (of data files without deletes) 2 In IcebergScanPlanner we only need to consruct the right side of the original UNION ALL operator, i.e.: ANTI JOIN / \ / \ SCAN SCAN datafiles deletes SelectStmt decides whether we can do the count(*) optimization, and if so, does the following: 1: SelectStmt sets 'TotalRecordsNumV2' in the analyzer, then during the expression rewrite phase the CountStarToConstRule rewrites the count(*) to count(*) + record_count 2: SelectStmt sets "OptimizeCountStarForIcebergV2" in the query context then IcebergScanPlanner creates plan accordingly. This mechanism works for simple queries, but can turn on count(*) optimization in IcebergScanPlanner for all Iceberg V2 tables in complex queries. Even if only one subquery enables count(*) optimization during analysis. With this patch the followings change: 1: We introduce IcebergV2CountStarAccumulator which we use instead of the ArithmethicExpr. So after rewrite we still know if count(*) optimization should be enabled for the planner. 2: Instead of using the query context, we pass the information to the IcebergScanPlanner via the TableRef object. Testing * e2e tests Change-Id: I1940031298eb634aa82c3d32bbbf16bce8eaf874 Reviewed-on: http://gerrit.cloudera.org:8080/23705 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Zoltan Borok-Nagy <[email protected]> --- common/thrift/Query.thrift | 1 + .../analysis/IcebergV2CountStarAccumulator.java | 31 +++++ .../org/apache/impala/analysis/SelectStmt.java | 9 +- .../java/org/apache/impala/analysis/TableRef.java | 13 ++ .../apache/impala/planner/IcebergScanPlanner.java | 5 +- .../impala/rewrite/CountStarToConstRule.java | 9 +- .../apache/impala/analysis/AnalyzeStmtsTest.java | 2 +- ...2-count-star-optimization-in-complex-query.test | 135 +++++++++++++++++++++ tests/query_test/test_iceberg.py | 4 + 9 files changed, 197 insertions(+), 12 deletions(-) diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 09be199dc..26750f5a1 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -989,6 +989,7 @@ struct TQueryCtx { // True if the query is transactional for Kudu table. 29: required bool is_kudu_transactional = false + // DEPRECATED by IMPALA-13756. // True if the query can be optimized for Iceberg V2 table. 30: required bool optimize_count_star_for_iceberg_v2 = false diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergV2CountStarAccumulator.java b/fe/src/main/java/org/apache/impala/analysis/IcebergV2CountStarAccumulator.java new file mode 100644 index 000000000..c6f261885 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergV2CountStarAccumulator.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +/// For plain count(*) queries against Iceberg V2 tables we don't need to calculate +/// the number of records in data files without corresponding delete files, as this +/// can be retrieved from metadata. The backend will only calculate the number of records +/// in data files that have corresponding delete files, then this expression simply +/// adds the two numbers together. +public class IcebergV2CountStarAccumulator extends ArithmeticExpr { + public IcebergV2CountStarAccumulator(Expr expr, + long numRowsInDataFilesWithoutDeletes) { + super(Operator.ADD, expr, NumericLiteral.create( + numRowsInDataFilesWithoutDeletes)); + } +} \ No newline at end of file diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java index d152769c8..03c3f8e23 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java @@ -1552,16 +1552,21 @@ public class SelectStmt extends QueryStmt { private void optimizePlainCountStarQueryV2(TableRef tableRef, FeIcebergTable table) throws AnalysisException { + boolean alreadyOptimized = false; for (SelectListItem selectItem : getSelectList().getItems()) { Expr expr = selectItem.getExpr(); if (expr == null) return; if (expr.isConstant()) continue; + if (expr instanceof IcebergV2CountStarAccumulator) { + alreadyOptimized = true; + continue; + } if (!FunctionCallExpr.isCountStarFunctionCallExpr(expr)) return; } long num = Utils.getRecordCountV2(table, tableRef.getTimeTravelSpec()); if (num > 0) { - analyzer_.getQueryCtx().setOptimize_count_star_for_iceberg_v2(true); - analyzer_.setTotalRecordsNumV2(num); + tableRef.setOptimizeCountStarForIcebergV2(true); + if (!alreadyOptimized) analyzer_.setTotalRecordsNumV2(num); } } diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java index c372d93c2..0265d7a88 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java @@ -167,6 +167,9 @@ public class TableRef extends StmtNode { // Used only in PARTIAL optimization mode, otherwise it is null. private List<IcebergFileDescriptor> selectedDataFilesWithoutDeletesForOptimize_; + // True if IcebergScanPlanner can do count(*) optimization for this table ref. + private boolean optimizeCountStarForIcebergV2_ = false; + // END: Members that need to be reset() ///////////////////////////////////////// @@ -271,6 +274,7 @@ public class TableRef extends StmtNode { zippingUnnestType_ = other.zippingUnnestType_; selectedDataFilesWithoutDeletesForOptimize_ = other.selectedDataFilesWithoutDeletesForOptimize_; + optimizeCountStarForIcebergV2_ = other.optimizeCountStarForIcebergV2_; } @Override @@ -393,6 +397,14 @@ public class TableRef extends StmtNode { return exposeNestedColumnsByTableMaskView_; } + public void setOptimizeCountStarForIcebergV2(boolean b) { + optimizeCountStarForIcebergV2_ = b; + } + + public boolean optimizeCountStarForIcebergV2() { + return optimizeCountStarForIcebergV2_; + } + public void setHidden(boolean isHidden) { isHidden_ = isHidden; } public boolean isHidden() { return isHidden_; } @@ -812,6 +824,7 @@ public class TableRef extends StmtNode { desc_ = null; if (timeTravelSpec_ != null) timeTravelSpec_.reset(); selectedDataFilesWithoutDeletesForOptimize_ = null; + optimizeCountStarForIcebergV2_ = false; } public boolean isTableMaskingView() { return false; } diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java index f68a5ba32..76b16f166 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java @@ -247,8 +247,7 @@ public class IcebergScanPlanner { private PlanNode createIcebergScanPlanImpl() throws ImpalaException { if (noDeleteFiles()) { - Preconditions.checkState( - !ctx_.getQueryCtx().isOptimize_count_star_for_iceberg_v2()); + Preconditions.checkState(!tblRef_.optimizeCountStarForIcebergV2()); // If there are no delete files we can just create a single SCAN node. Preconditions.checkState(dataFilesWithDeletes_.isEmpty()); PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_, @@ -269,7 +268,7 @@ public class IcebergScanPlanner { // If the count star query can be optimized for Iceberg V2 table, the number of rows // of all DataFiles without corresponding DeleteFiles can be calculated by Iceberg // meta files, it's added using ArithmeticExpr. - if (ctx_.getQueryCtx().isOptimize_count_star_for_iceberg_v2()) return joinNode; + if (tblRef_.optimizeCountStarForIcebergV2()) return joinNode; // All data files has corresponding delete files, so we just return an ANTI JOIN // between all data files and all delete files. diff --git a/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java b/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java index dc1fa0ef9..23d8850cf 100644 --- a/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java +++ b/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java @@ -18,12 +18,10 @@ package org.apache.impala.rewrite; import org.apache.impala.analysis.Analyzer; -import org.apache.impala.analysis.ArithmeticExpr; -import org.apache.impala.analysis.ArithmeticExpr.Operator; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.FunctionCallExpr; +import org.apache.impala.analysis.IcebergV2CountStarAccumulator; import org.apache.impala.analysis.LiteralExpr; -import org.apache.impala.analysis.NumericLiteral; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; @@ -55,7 +53,7 @@ import org.apache.impala.common.AnalysisException; * || * \/ * - * ArithmeticExpr(ADD) + * IcebergV2CountStarAccumulator(ADD) * / \ * / \ * / \ @@ -82,8 +80,7 @@ public enum CountStarToConstRule implements ExprRewriteRule { analyzer.getTotalRecordsNumV1()), Type.BIGINT); } else if (analyzer.canRewriteCountStartForV2()) { expr.setRewritten(true); - return new ArithmeticExpr(Operator.ADD, expr, NumericLiteral.create( - analyzer.getTotalRecordsNumV2())); + return new IcebergV2CountStarAccumulator(expr, analyzer.getTotalRecordsNumV2()); } else { return expr; } diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index a9b450856..91e3cb2cd 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -4692,7 +4692,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest { testNumberOfMembers(ValuesStmt.class, 0); // Also check TableRefs. - testNumberOfMembers(TableRef.class, 31); + testNumberOfMembers(TableRef.class, 32); testNumberOfMembers(BaseTableRef.class, 0); testNumberOfMembers(InlineViewRef.class, 10); } diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-count-star-optimization-in-complex-query.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-count-star-optimization-in-complex-query.test new file mode 100644 index 000000000..59c81efe7 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-count-star-optimization-in-complex-query.test @@ -0,0 +1,135 @@ +==== +---- QUERY +-- Reference table +CREATE TABLE expected_merge_exclude_columns (id integer,msg string,color string) +STORED AS PARQUET; + +INSERT INTO expected_merge_exclude_columns VALUES + (1, 'hello', 'blue'), + (2, 'goodbye', 'green'), + (3, 'anyway', 'purple'); + +-- base table +CREATE TABLE merge_exclude_columns STORED BY ICEBERG AS + SELECT CAST(1 as integer) as id, 'hello' as msg, 'blue' as color + UNION ALL + SELECT CAST(2 as integer) as id, 'goodbye' as msg, 'red' as color; + +-- incremental update table +CREATE TABLE merge_exclude_columns_tmp STORED BY ICEBERG as + select CAST(1 as integer) as id, 'hey' as msg, 'blue' as color + union all + select CAST(2 as integer) as id, 'yo' as msg, 'green' as color + union all + select CAST(3 as integer) as id, 'anyway' as msg, 'purple' as color; + +-- Do the MERGE +MERGE INTO merge_exclude_columns as dest + USING merge_exclude_columns_tmp as source + ON (source.id = dest.id) + WHEN MATCHED THEN UPDATE SET + id = source.id, color = source.color + WHEN NOT MATCHED THEN INSERT + (id, msg, color) + VALUES + (source.id, source.msg, source.color); +==== +---- QUERY +-- Table merge_exclude_columns and table expected_merge_exclude_columns should +-- have the same data so this query should return 0, 0 +-- First 0: With the help of the EXCEPT set operator we calculate the number of +-- records that differ. +-- Second 0: With plain count(*) queries verify that the tables have the same number +-- of records. +-- In the RUNTIME_PROFILE we expect: +-- "output exprs: count(*) - count(*) + CAST(3 AS BIGINT), count(*)'" +-- Let's break them down: +-- "count(*) - count(*) + CAST(3 AS BIGINT)": +-- "count(*) : count(*) of expected_merge_exclude_columns. +-- "count(*) + CAST(3 AS BIGINT)": adjusted count(*) expr for table merge_exclude_columns, +-- this table has 3 rows in a data file without deletes, it +-- is the data file added by the MERGE statement. +-- "count(*)": count(*) in view 'diff_count'. +with diff_count as ( + SELECT + 1 as id, + COUNT(*) as num_missing FROM ( + (SELECT color, id, msg FROM expected_merge_exclude_columns EXCEPT + SELECT color, id, msg FROM merge_exclude_columns) + UNION ALL + (SELECT color, id, msg FROM merge_exclude_columns EXCEPT + SELECT color, id, msg FROM expected_merge_exclude_columns) + ) as a +), table_a as ( + SELECT COUNT(*) as num_rows FROM expected_merge_exclude_columns +), table_b as ( + SELECT COUNT(*) as num_rows FROM merge_exclude_columns +), row_count_diff as ( + select + 1 as id, + table_a.num_rows - table_b.num_rows as difference + from table_a, table_b +) +select + row_count_diff.difference as row_count_difference, + diff_count.num_missing as num_mismatched +from row_count_diff +join diff_count using (id); +---- RESULTS +0,0 +---- TYPES +BIGINT,BIGINT +---- RUNTIME_PROFILE: + | output exprs: count(*) - count(*) + CAST(3 AS BIGINT), count(*) +==== +---- QUERY +-- Let's have modifications in the reference table as well. +ALTER TABLE expected_merge_exclude_columns CONVERT TO ICEBERG; +UPDATE expected_merge_exclude_columns SET color = 'GREEN' WHERE id = 2; +UPDATE expected_merge_exclude_columns SET color = 'green' WHERE id = 2; + +-- Verify that the results are still correct when the different count(*) exprs +-- need different adjustments. +-- In the RUNTIME_PROFILE we expect: +-- "output exprs: count(*) + CAST(1 AS BIGINT) - count(*) + CAST(3 AS BIGINT), count(*)'" +-- Let's break them down: +-- "count(*) + CAST(1 AS BIGINT) - count(*) + CAST(3 AS BIGINT)": +-- "count(*) + CAST(1 AS BIGINT)": adjusted count(*) expr for expected_merge_exclude_columns, +-- this table has 1 row in a data file without deletes, it +-- is the data file added by the last UPDATE. +-- "count(*) + CAST(3 AS BIGINT)": adjusted count(*) expr for table merge_exclude_columns, +-- this table has 3 rows in a data file without deletes, it +-- is the data file added by the MERGE statement. +-- "count(*)": count(*) in view 'diff_count'. +with diff_count as ( + SELECT + 1 as id, + COUNT(*) as num_missing FROM ( + (SELECT color, id, msg FROM expected_merge_exclude_columns EXCEPT + SELECT color, id, msg FROM merge_exclude_columns) + UNION ALL + (SELECT color, id, msg FROM merge_exclude_columns EXCEPT + SELECT color, id, msg FROM expected_merge_exclude_columns) + ) as a +), table_a as ( + SELECT COUNT(*) as num_rows FROM expected_merge_exclude_columns +), table_b as ( + SELECT COUNT(*) as num_rows FROM merge_exclude_columns +), row_count_diff as ( + select + 1 as id, + table_a.num_rows - table_b.num_rows as difference + from table_a, table_b +) +select + row_count_diff.difference as row_count_difference, + diff_count.num_missing as num_mismatched +from row_count_diff +join diff_count using (id); +---- RESULTS +0,0 +---- TYPES +BIGINT,BIGINT +---- RUNTIME_PROFILE: + | output exprs: count(*) + CAST(1 AS BIGINT) - count(*) + CAST(3 AS BIGINT), count(*) +==== diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 762fb4d50..be72fac13 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1597,6 +1597,10 @@ class TestIcebergV2Table(IcebergTestSuite): self.run_test_case('QueryTest/iceberg-v2-plain-count-star-optimization', vector) + def test_count_star_optimization_in_complex_query(self, vector, unique_database): + self.run_test_case('QueryTest/iceberg-v2-count-star-optimization-in-complex-query', + vector, unique_database) + @SkipIfDockerizedCluster.internal_hostname @SkipIf.hardcoded_uris def test_read_position_deletes(self, vector):
