This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 85d77b908b12ae3d3f48ed5d49f38fb3832edc4e
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Thu Nov 20 17:52:32 2025 +0100

    IMPALA-13756: Fix Iceberg V2 count(*) optimization for complex queries
    
    We optimize plain count(*) queries on Iceberg tables the following way:
    
           AGGREGATE
           COUNT(*)
               |
           UNION ALL
          /        \
         /          \
        /            \
       SCAN all  ANTI JOIN
       datafiles  /      \
       without   /        \
       deletes  SCAN      SCAN
                datafiles deletes
    
                ||
              rewrite
                ||
                \/
    
      ArithmethicExpr: LHS + RHS
          /             \
         /               \
        /                 \
       record_count  AGGREGATE
       of all        COUNT(*)
       datafiles         |
       without       ANTI JOIN
       deletes      /         \
                   /           \
                   SCAN        SCAN
                   datafiles   deletes
    
    This optimization consists of two parts:
     1 Rewriting count(*) expression to count(*) + "record_count" (of data
       files without deletes)
     2 In IcebergScanPlanner we only need to consruct the right side of
       the original UNION ALL operator, i.e.:
    
                ANTI JOIN
               /         \
              /           \
             SCAN        SCAN
             datafiles   deletes
    
    SelectStmt decides whether we can do the count(*) optimization, and if
    so, does the following:
    
     1: SelectStmt sets 'TotalRecordsNumV2' in the analyzer, then during the
        expression rewrite phase the CountStarToConstRule rewrites the
        count(*) to count(*) + record_count
     2: SelectStmt sets "OptimizeCountStarForIcebergV2" in the query context
        then IcebergScanPlanner creates plan accordingly.
    
    This mechanism works for simple queries, but can turn on count(*)
    optimization in IcebergScanPlanner for all Iceberg V2 tables in complex
    queries. Even if only one subquery enables count(*) optimization during
    analysis.
    
    With this patch the followings change:
     1: We introduce IcebergV2CountStarAccumulator which we use instead of
        the ArithmethicExpr. So after rewrite we still know if count(*)
        optimization should be enabled for the planner.
     2: Instead of using the query context, we pass the information to the
        IcebergScanPlanner via the TableRef object.
    
    Testing
     * e2e tests
    
    Change-Id: I1940031298eb634aa82c3d32bbbf16bce8eaf874
    Reviewed-on: http://gerrit.cloudera.org:8080/23705
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Zoltan Borok-Nagy <[email protected]>
---
 common/thrift/Query.thrift                         |   1 +
 .../analysis/IcebergV2CountStarAccumulator.java    |  31 +++++
 .../org/apache/impala/analysis/SelectStmt.java     |   9 +-
 .../java/org/apache/impala/analysis/TableRef.java  |  13 ++
 .../apache/impala/planner/IcebergScanPlanner.java  |   5 +-
 .../impala/rewrite/CountStarToConstRule.java       |   9 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |   2 +-
 ...2-count-star-optimization-in-complex-query.test | 135 +++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |   4 +
 9 files changed, 197 insertions(+), 12 deletions(-)

diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 09be199dc..26750f5a1 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -989,6 +989,7 @@ struct TQueryCtx {
   // True if the query is transactional for Kudu table.
   29: required bool is_kudu_transactional = false
 
+  // DEPRECATED by IMPALA-13756.
   // True if the query can be optimized for Iceberg V2 table.
   30: required bool optimize_count_star_for_iceberg_v2 = false
 
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/IcebergV2CountStarAccumulator.java
 
b/fe/src/main/java/org/apache/impala/analysis/IcebergV2CountStarAccumulator.java
new file mode 100644
index 000000000..c6f261885
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/analysis/IcebergV2CountStarAccumulator.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+/// For plain count(*) queries against Iceberg V2 tables we don't need to 
calculate
+/// the number of records in data files without corresponding delete files, as 
this
+/// can be retrieved from metadata. The backend will only calculate the number 
of records
+/// in data files that have corresponding delete files, then this expression 
simply
+/// adds the two numbers together.
+public class IcebergV2CountStarAccumulator extends ArithmeticExpr {
+  public IcebergV2CountStarAccumulator(Expr expr,
+      long numRowsInDataFilesWithoutDeletes) {
+    super(Operator.ADD, expr, NumericLiteral.create(
+        numRowsInDataFilesWithoutDeletes));
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index d152769c8..03c3f8e23 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -1552,16 +1552,21 @@ public class SelectStmt extends QueryStmt {
 
   private void optimizePlainCountStarQueryV2(TableRef tableRef, FeIcebergTable 
table)
       throws AnalysisException {
+    boolean alreadyOptimized = false;
     for (SelectListItem selectItem : getSelectList().getItems()) {
       Expr expr = selectItem.getExpr();
       if (expr == null) return;
       if (expr.isConstant()) continue;
+      if (expr instanceof IcebergV2CountStarAccumulator) {
+        alreadyOptimized = true;
+        continue;
+      }
       if (!FunctionCallExpr.isCountStarFunctionCallExpr(expr)) return;
     }
     long num = Utils.getRecordCountV2(table, tableRef.getTimeTravelSpec());
     if (num > 0) {
-      analyzer_.getQueryCtx().setOptimize_count_star_for_iceberg_v2(true);
-      analyzer_.setTotalRecordsNumV2(num);
+      tableRef.setOptimizeCountStarForIcebergV2(true);
+      if (!alreadyOptimized) analyzer_.setTotalRecordsNumV2(num);
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java 
b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index c372d93c2..0265d7a88 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -167,6 +167,9 @@ public class TableRef extends StmtNode {
   // Used only in PARTIAL optimization mode, otherwise it is null.
   private List<IcebergFileDescriptor> 
selectedDataFilesWithoutDeletesForOptimize_;
 
+  // True if IcebergScanPlanner can do count(*) optimization for this table 
ref.
+  private boolean optimizeCountStarForIcebergV2_ = false;
+
   // END: Members that need to be reset()
   /////////////////////////////////////////
 
@@ -271,6 +274,7 @@ public class TableRef extends StmtNode {
     zippingUnnestType_ = other.zippingUnnestType_;
     selectedDataFilesWithoutDeletesForOptimize_ =
         other.selectedDataFilesWithoutDeletesForOptimize_;
+    optimizeCountStarForIcebergV2_ = other.optimizeCountStarForIcebergV2_;
   }
 
   @Override
@@ -393,6 +397,14 @@ public class TableRef extends StmtNode {
     return exposeNestedColumnsByTableMaskView_;
   }
 
+  public void setOptimizeCountStarForIcebergV2(boolean b) {
+    optimizeCountStarForIcebergV2_ = b;
+  }
+
+  public boolean optimizeCountStarForIcebergV2() {
+    return optimizeCountStarForIcebergV2_;
+  }
+
   public void setHidden(boolean isHidden) { isHidden_ = isHidden; }
   public boolean isHidden() { return isHidden_; }
 
@@ -812,6 +824,7 @@ public class TableRef extends StmtNode {
     desc_ = null;
     if (timeTravelSpec_ != null) timeTravelSpec_.reset();
     selectedDataFilesWithoutDeletesForOptimize_ = null;
+    optimizeCountStarForIcebergV2_ = false;
   }
 
   public boolean isTableMaskingView() { return false; }
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index f68a5ba32..76b16f166 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -247,8 +247,7 @@ public class IcebergScanPlanner {
 
   private PlanNode createIcebergScanPlanImpl() throws ImpalaException {
     if (noDeleteFiles()) {
-      Preconditions.checkState(
-          !ctx_.getQueryCtx().isOptimize_count_star_for_iceberg_v2());
+      Preconditions.checkState(!tblRef_.optimizeCountStarForIcebergV2());
       // If there are no delete files we can just create a single SCAN node.
       Preconditions.checkState(dataFilesWithDeletes_.isEmpty());
       PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, 
conjuncts_,
@@ -269,7 +268,7 @@ public class IcebergScanPlanner {
     // If the count star query can be optimized for Iceberg V2 table, the 
number of rows
     // of all DataFiles without corresponding DeleteFiles can be calculated by 
Iceberg
     // meta files, it's added using ArithmeticExpr.
-    if (ctx_.getQueryCtx().isOptimize_count_star_for_iceberg_v2()) return 
joinNode;
+    if (tblRef_.optimizeCountStarForIcebergV2()) return joinNode;
 
     // All data files has corresponding delete files, so we just return an 
ANTI JOIN
     // between all data files and all delete files.
diff --git 
a/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java 
b/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java
index dc1fa0ef9..23d8850cf 100644
--- a/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java
+++ b/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java
@@ -18,12 +18,10 @@
 package org.apache.impala.rewrite;
 
 import org.apache.impala.analysis.Analyzer;
-import org.apache.impala.analysis.ArithmeticExpr;
-import org.apache.impala.analysis.ArithmeticExpr.Operator;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.IcebergV2CountStarAccumulator;
 import org.apache.impala.analysis.LiteralExpr;
-import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 
@@ -55,7 +53,7 @@ import org.apache.impala.common.AnalysisException;
  *          ||
  *          \/
  *
- *    ArithmeticExpr(ADD)
+ * IcebergV2CountStarAccumulator(ADD)
  *    /             \
  *   /               \
  *  /                 \
@@ -82,8 +80,7 @@ public enum CountStarToConstRule implements ExprRewriteRule {
           analyzer.getTotalRecordsNumV1()), Type.BIGINT);
     } else if (analyzer.canRewriteCountStartForV2()) {
       expr.setRewritten(true);
-      return new ArithmeticExpr(Operator.ADD, expr, NumericLiteral.create(
-          analyzer.getTotalRecordsNumV2()));
+      return new IcebergV2CountStarAccumulator(expr, 
analyzer.getTotalRecordsNumV2());
     } else {
       return expr;
     }
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index a9b450856..91e3cb2cd 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4692,7 +4692,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     testNumberOfMembers(ValuesStmt.class, 0);
 
     // Also check TableRefs.
-    testNumberOfMembers(TableRef.class, 31);
+    testNumberOfMembers(TableRef.class, 32);
     testNumberOfMembers(BaseTableRef.class, 0);
     testNumberOfMembers(InlineViewRef.class, 10);
   }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-count-star-optimization-in-complex-query.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-count-star-optimization-in-complex-query.test
new file mode 100644
index 000000000..59c81efe7
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-count-star-optimization-in-complex-query.test
@@ -0,0 +1,135 @@
+====
+---- QUERY
+-- Reference table
+CREATE TABLE expected_merge_exclude_columns (id integer,msg string,color 
string)
+STORED AS PARQUET;
+
+INSERT INTO expected_merge_exclude_columns VALUES
+    (1, 'hello',   'blue'),
+    (2, 'goodbye', 'green'),
+    (3, 'anyway',  'purple');
+
+-- base table
+CREATE TABLE merge_exclude_columns STORED BY ICEBERG AS
+  SELECT CAST(1 as integer) as id, 'hello' as msg, 'blue' as color
+  UNION ALL
+  SELECT CAST(2 as integer) as id, 'goodbye' as msg, 'red' as color;
+
+-- incremental update table
+CREATE TABLE merge_exclude_columns_tmp STORED BY ICEBERG as
+  select CAST(1 as integer) as id, 'hey' as msg, 'blue' as color
+  union all
+  select CAST(2 as integer) as id, 'yo' as msg, 'green' as color
+  union all
+  select CAST(3 as integer) as id, 'anyway' as msg, 'purple' as color;
+
+-- Do the MERGE
+MERGE INTO merge_exclude_columns as dest
+  USING merge_exclude_columns_tmp as source
+  ON (source.id = dest.id)
+  WHEN MATCHED THEN UPDATE SET
+    id = source.id, color = source.color
+  WHEN NOT MATCHED THEN INSERT
+    (id, msg, color)
+  VALUES
+    (source.id, source.msg, source.color);
+====
+---- QUERY
+-- Table merge_exclude_columns and table expected_merge_exclude_columns should
+-- have the same data so this query should return 0, 0
+-- First 0:  With the help of the EXCEPT set operator we calculate the number 
of
+--           records that differ.
+-- Second 0: With plain count(*) queries verify that the tables have the same 
number
+--           of records.
+-- In the RUNTIME_PROFILE we expect:
+--  "output exprs: count(*) - count(*) + CAST(3 AS BIGINT), count(*)'"
+--  Let's break them down:
+--    "count(*) - count(*) + CAST(3 AS BIGINT)":
+--      "count(*)                     : count(*) of 
expected_merge_exclude_columns.
+--      "count(*) + CAST(3 AS BIGINT)": adjusted count(*) expr for table 
merge_exclude_columns,
+--                                      this table has 3 rows in a data file 
without deletes, it
+--                                      is the data file added by the MERGE 
statement.
+--    "count(*)": count(*) in view 'diff_count'.
+with diff_count as (
+    SELECT
+        1 as id,
+        COUNT(*) as num_missing FROM (
+            (SELECT color, id, msg FROM expected_merge_exclude_columns EXCEPT
+             SELECT color, id, msg FROM merge_exclude_columns)
+             UNION ALL
+            (SELECT color, id, msg FROM merge_exclude_columns EXCEPT
+             SELECT color, id, msg FROM expected_merge_exclude_columns)
+        ) as a
+), table_a as (
+    SELECT COUNT(*) as num_rows FROM expected_merge_exclude_columns
+), table_b as (
+    SELECT COUNT(*) as num_rows FROM merge_exclude_columns
+), row_count_diff as (
+    select
+        1 as id,
+        table_a.num_rows - table_b.num_rows as difference
+    from table_a, table_b
+)
+select
+    row_count_diff.difference as row_count_difference,
+    diff_count.num_missing as num_mismatched
+from row_count_diff
+join diff_count using (id);
+---- RESULTS
+0,0
+---- TYPES
+BIGINT,BIGINT
+---- RUNTIME_PROFILE:
+  |  output exprs: count(*) - count(*) + CAST(3 AS BIGINT), count(*)
+====
+---- QUERY
+-- Let's have modifications in the reference table as well.
+ALTER TABLE expected_merge_exclude_columns CONVERT TO ICEBERG;
+UPDATE expected_merge_exclude_columns SET color = 'GREEN' WHERE id = 2;
+UPDATE expected_merge_exclude_columns SET color = 'green' WHERE id = 2;
+
+-- Verify that the results are still correct when the different count(*) exprs
+-- need different adjustments.
+-- In the RUNTIME_PROFILE we expect:
+--  "output exprs: count(*) + CAST(1 AS BIGINT) - count(*) + CAST(3 AS 
BIGINT), count(*)'"
+--  Let's break them down:
+--    "count(*) + CAST(1 AS BIGINT) - count(*) + CAST(3 AS BIGINT)":
+--      "count(*) + CAST(1 AS BIGINT)": adjusted count(*) expr for 
expected_merge_exclude_columns,
+--                                      this table has 1 row in a data file 
without deletes, it
+--                                      is the data file added by the last 
UPDATE.
+--      "count(*) + CAST(3 AS BIGINT)": adjusted count(*) expr for table 
merge_exclude_columns,
+--                                      this table has 3 rows in a data file 
without deletes, it
+--                                      is the data file added by the MERGE 
statement.
+--    "count(*)": count(*) in view 'diff_count'.
+with diff_count as (
+    SELECT
+        1 as id,
+        COUNT(*) as num_missing FROM (
+            (SELECT color, id, msg FROM expected_merge_exclude_columns EXCEPT
+             SELECT color, id, msg FROM merge_exclude_columns)
+             UNION ALL
+            (SELECT color, id, msg FROM merge_exclude_columns EXCEPT
+             SELECT color, id, msg FROM expected_merge_exclude_columns)
+        ) as a
+), table_a as (
+    SELECT COUNT(*) as num_rows FROM expected_merge_exclude_columns
+), table_b as (
+    SELECT COUNT(*) as num_rows FROM merge_exclude_columns
+), row_count_diff as (
+    select
+        1 as id,
+        table_a.num_rows - table_b.num_rows as difference
+    from table_a, table_b
+)
+select
+    row_count_diff.difference as row_count_difference,
+    diff_count.num_missing as num_mismatched
+from row_count_diff
+join diff_count using (id);
+---- RESULTS
+0,0
+---- TYPES
+BIGINT,BIGINT
+---- RUNTIME_PROFILE:
+  |  output exprs: count(*) + CAST(1 AS BIGINT) - count(*) + CAST(3 AS 
BIGINT), count(*)
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 762fb4d50..be72fac13 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1597,6 +1597,10 @@ class TestIcebergV2Table(IcebergTestSuite):
       self.run_test_case('QueryTest/iceberg-v2-plain-count-star-optimization',
                          vector)
 
+  def test_count_star_optimization_in_complex_query(self, vector, 
unique_database):
+      
self.run_test_case('QueryTest/iceberg-v2-count-star-optimization-in-complex-query',
+                         vector, unique_database)
+
   @SkipIfDockerizedCluster.internal_hostname
   @SkipIf.hardcoded_uris
   def test_read_position_deletes(self, vector):

Reply via email to