This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new bf7c2088d IMPALA-11986: (part 1) Optimize partition key scans for 
Iceberg tables
bf7c2088d is described below

commit bf7c2088dd5495a763ff9a381970f99e6101cd4b
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Feb 17 18:24:59 2026 +0100

    IMPALA-11986: (part 1) Optimize partition key scans for Iceberg tables
    
    This patch optimizes queries that only scan IDENTITY-partitioned
    columns. The optimization only applies, if:
    * All materialized aggregate expressions have distinct semantics
      (e.g. MIN, MAX, NDV). In other words, this optimization will work
      for COUNT(DISTINCT c) but not COUNT(c).
    * All materialized columns are IDENTITY-partitioned in all partition
      specs (this can be relaxed later)
    
    If the above conditions are met, then each data file (without deletes)
    only produce a single record. The rest of the table (data files with
    deletes and delete files) are scanned normally.
    
    Testing:
    * added e2e tests
    
    Change-Id: I32f78ee60ac4a410e91cf0e858199dd39d2e9afe
    Reviewed-on: http://gerrit.cloudera.org:8080/23985
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/planner/HdfsScanNode.java    |   2 +-
 .../org/apache/impala/planner/IcebergScanNode.java |  11 +-
 .../apache/impala/planner/IcebergScanPlanner.java  |  62 +++++++--
 .../QueryTest/iceberg-partition-key-scans.test     | 140 +++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |   3 +
 5 files changed, 206 insertions(+), 12 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index f2e598c06..256c36d1f 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -268,7 +268,7 @@ public class HdfsScanNode extends ScanNode {
 
   // True if this is a scan that only returns partition keys and is only 
required to
   // return at least one of each of the distinct values of the partition keys.
-  private final boolean isPartitionKeyScan_;
+  protected final boolean isPartitionKeyScan_;
 
   // Conjuncts that can be evaluated while materializing the items (tuples) of
   // collection-typed slots. Maps from tuple descriptor to the conjuncts bound 
by that
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index ed8e5808c..d6a67a0bd 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -89,19 +89,20 @@ public class IcebergScanNode extends HdfsScanNode {
   public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts,
       MultiAggregateInfo aggInfo, List<IcebergFileDescriptor> fileDescs,
       int numPartitions,
-      List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts, long 
snapshotId) {
+      List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts, long 
snapshotId,
+      boolean isPartitionKeyScan) {
     this(id, tblRef, conjuncts, aggInfo, fileDescs, numPartitions, 
nonIdentityConjuncts,
-        skippedConjuncts, null, snapshotId);
+        skippedConjuncts, null, snapshotId, isPartitionKeyScan);
   }
 
   public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts,
       MultiAggregateInfo aggInfo, List<IcebergFileDescriptor> fileDescs,
       int numPartitions,
       List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts, PlanNodeId 
deleteId,
-      long snapshotId) {
+      long snapshotId, boolean isPartitionKeyScan) {
     super(id, tblRef.getDesc(), conjuncts,
         
getIcebergPartition(((FeIcebergTable)tblRef.getTable()).getFeFsTable()), tblRef,
-        aggInfo, null, false);
+        aggInfo, null, isPartitionKeyScan);
     // Hdfs table transformed from iceberg table only has one partition
     Preconditions.checkState(partitions_.size() == 1);
 
@@ -143,6 +144,8 @@ public class IcebergScanNode extends HdfsScanNode {
               cardinality_, 
iceFd.getFbFileMetadata().icebergMetadata().recordCount());
         }
       }
+    } else if (isPartitionKeyScan_) {
+      cardinality_ = fileDescs_.size();
     } else {
       for (IcebergFileDescriptor fd : fileDescs_) {
         cardinality_ = MathUtil.addCardinalities(
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 2d0bbd87f..c470029ef 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -52,6 +52,7 @@ import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.BinaryPredicate.Operator;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.IcebergExpressionCollector;
+import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.MultiAggregateInfo;
 import org.apache.impala.analysis.SlotDescriptor;
@@ -247,6 +248,7 @@ public class IcebergScanPlanner {
   }
 
   private PlanNode createIcebergScanPlanImpl() throws ImpalaException {
+    boolean isPartitionKeyScan = IsPartitionKeyScan();
     if (noDeleteFiles()) {
       Preconditions.checkState(!tblRef_.optimizeCountStarForIcebergV2());
       // If there are no delete files we can just create a single SCAN node.
@@ -255,7 +257,7 @@ public class IcebergScanPlanner {
           aggInfo_, dataFilesWithoutDeletes_,
           getIceTable().getContentFileStore().getNumPartitions(),
           nonIdentityConjuncts_,
-          getSkippedConjuncts(), snapshotId_);
+          getSkippedConjuncts(), snapshotId_, isPartitionKeyScan);
       ret.init(analyzer_);
       return ret;
     }
@@ -280,7 +282,7 @@ public class IcebergScanPlanner {
     IcebergScanNode dataScanNode = new IcebergScanNode(
         ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, 
dataFilesWithoutDeletes_,
         getIceTable().getContentFileStore().getNumPartitions(),
-        nonIdentityConjuncts_, getSkippedConjuncts(), snapshotId_);
+        nonIdentityConjuncts_, getSkippedConjuncts(), snapshotId_, 
isPartitionKeyScan);
     dataScanNode.init(analyzer_);
     List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map(
         SlotRef::new).collect(Collectors.toList());
@@ -299,6 +301,48 @@ public class IcebergScanPlanner {
     return unionNode;
   }
 
+  private boolean IsPartitionKeyScan() {
+    if (tblRef_.optimizeCountStarForIcebergV2()) return false;
+    boolean allAggsDistinct = aggInfo_ != null && aggInfo_.hasAllDistinctAgg();
+    if (!allAggsDistinct) return false;
+    TupleDescriptor tDesc = tblRef_.getDesc();
+    if (!tDesc.hasMaterializedSlots()) return true;
+
+    FeIcebergTable iceTable = getIceTable();
+    for (SlotDescriptor slotDesc: tDesc.getSlots()) {
+      if (!slotDesc.isMaterialized()) continue;
+      IcebergColumn column = (IcebergColumn) slotDesc.getColumn();
+      if (column == null) continue;
+      // We check all partition specs here. We are a bit stricter than 
necessary,
+      // because old partition specs might no longer have any data.
+      // TODO: later we could group data files (without deletes) into 
categories:
+      // - files eligible for partition key scan
+      // - files non-eligible for partition key scans
+      // Then we could do the following plan:
+      //              UNION  ALL
+      //           /       |      \
+      //         /         |        \
+      //       /           |          \
+      //  PARTITION     SCAN         ICEBERG
+      //  KEY          WITHOUT       DELETE
+      //  SCAN         DELETES        NODE
+      //                              /  \
+      //                             /    \
+      //                           SCAN   SCAN
+      //                           data   delete
+      //                           files  files
+      // Later PARTITION KEY SCAN could be a UNION NODE that produces the 
partition keys,
+      // see SingleNodePlanner.createOptimizedPartitionUnionNode().
+      for (IcebergPartitionSpec spec : iceTable.getPartitionSpecs()) {
+        if (IcebergUtil.getPartitionTransformType(column, spec) !=
+            TIcebergPartitionTransformType.IDENTITY) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
   private PlanNode createPositionJoinNode() throws ImpalaException {
     Preconditions.checkState(positionDeletesRecordCount_ != 0);
     Preconditions.checkState(dataFilesWithDeletesSumPaths_ != 0);
@@ -320,7 +364,8 @@ public class IcebergScanPlanner {
     IcebergScanNode dataScanNode = new IcebergScanNode(
         dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
         getIceTable().getContentFileStore().getNumPartitions(),
-        nonIdentityConjuncts_, getSkippedConjuncts(), deleteScanNodeId, 
snapshotId_);
+        nonIdentityConjuncts_, getSkippedConjuncts(), deleteScanNodeId, 
snapshotId_,
+        false /*isPartitionKeyScan*/);
     dataScanNode.init(analyzer_);
     IcebergScanNode deleteScanNode = new IcebergScanNode(
         deleteScanNodeId,
@@ -331,7 +376,8 @@ public class IcebergScanPlanner {
         getIceTable().getContentFileStore().getNumPartitions(),
         Collections.emptyList(), /*nonIdentityConjuncts*/
         Collections.emptyList(), /*skippedConjuncts*/
-        snapshotId_);
+        snapshotId_,
+        false /*isPartitionKeyScan*/);
     deleteScanNode.init(analyzer_);
 
     // Now let's create the JOIN node
@@ -533,7 +579,8 @@ public class IcebergScanPlanner {
       IcebergScanNode dataScanNode = new IcebergScanNode(
           dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
           getIceTable().getContentFileStore().getNumPartitions(),
-          nonIdentityConjuncts_, getSkippedConjuncts(), snapshotId_);
+          nonIdentityConjuncts_, getSkippedConjuncts(), snapshotId_,
+          false /*isPartitionKeyScan*/);
       addAllSlotsForEqualityDeletes(tblRef_);
       dataScanNode.init(analyzer_);
 
@@ -573,8 +620,9 @@ public class IcebergScanPlanner {
           Lists.newArrayList(equalityDeleteFiles),
           getIceTable().getContentFileStore().getNumPartitions(),
           Collections.emptyList(), /*nonIdentityConjuncts*/
-          Collections.emptyList(),
-          snapshotId_); /*skippedConjuncts*/
+          Collections.emptyList(), /*skippedConjuncts*/
+          snapshotId_,
+          false /*isPartitionKeyScan*/);
       deleteScanNode.init(analyzer_);
 
       Pair<List<BinaryPredicate>, List<Expr>> equalityJoinConjuncts =
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-key-scans.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-key-scans.test
new file mode 100644
index 000000000..6bbff42b0
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-key-scans.test
@@ -0,0 +1,140 @@
+====
+---- QUERY
+CREATE TABLE ice_store_sales PARTITIONED BY SPEC (ss_store_sk)
+STORED BY ICEBERG
+AS SELECT * FROM tpcds_parquet.store_sales;
+====
+---- QUERY
+select distinct ss_store_sk
+from ice_store_sales;
+---- RESULTS
+1
+2
+8
+4
+10
+7
+NULL
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+   partition key scan
+   tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+select count(distinct ss_store_sk)
+from ice_store_sales;
+---- RESULTS
+6
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+   partition key scan
+   tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+select min(ss_store_sk), max(ss_store_sk)
+from ice_store_sales;
+---- RESULTS
+1,10
+---- TYPES
+INT,INT
+---- RUNTIME_PROFILE
+   partition key scan
+   tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+# Partition key scan optimization cannot be applied with non-partition column 
in select list.
+select min(ss_store_sk), max(ss_store_sk), max(ss_sold_date_sk)
+from ice_store_sales;
+---- RESULTS
+1,10,2452642
+---- TYPES
+INT,INT,INT
+---- RUNTIME_PROFILE
+   tuple-ids=0 row-size=8B cardinality=2.88M
+====
+---- QUERY
+select distinct typeof(ss_store_sk)
+from ice_store_sales;
+---- RESULTS
+'INT'
+---- TYPES
+STRING
+---- RUNTIME_PROFILE
+   partition key scan
+   tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+select distinct ss_store_sk
+from ice_store_sales
+where ss_store_sk % 2 = 0;
+---- RESULTS
+2
+8
+4
+10
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+   partition key scan
+   tuple-ids=0 row-size=4B cardinality=1
+====
+---- QUERY
+select count(*) from (select distinct ss_store_sk from ice_store_sales limit 
3) v;
+---- RESULTS
+3
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+    partition key scan
+    tuple-ids=0 row-size=4B cardinality=7
+aggregation(SUM, NumPages): 7
+====
+---- QUERY
+DELETE FROM ice_store_sales WHERE ss_store_sk = 1
+AND ss_sold_date_sk % 199 = 0;
+====
+---- QUERY
+select distinct ss_store_sk
+from ice_store_sales;
+---- RESULTS
+1
+2
+8
+4
+10
+7
+NULL
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+   partition key scan
+   tuple-ids=0 row-size=24B cardinality=6
+   |     tuple-ids=0 row-size=24B cardinality=456.90K
+====
+---- QUERY
+ALTER TABLE ice_store_sales SET PARTITION SPEC (ss_sold_date_sk);
+====
+---- QUERY
+# Now partition key scan optimization cannot be applied on 'ss_store_sk'.
+select distinct ss_store_sk
+from ice_store_sales;
+---- RESULTS
+1
+2
+8
+4
+10
+7
+NULL
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+   tuple-ids=0 row-size=24B cardinality=2.42M
+   |     tuple-ids=0 row-size=24B cardinality=456.90K
+====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index ba56794a4..c3ab38f58 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -2264,6 +2264,9 @@ class TestIcebergV2Table(IcebergTestSuite):
             tbl_name, second_snapshot.get_snapshot_id()))
     assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile
 
+  def test_partition_key_scans(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-partition-key-scans', vector, 
unique_database)
+
   def test_table_repair(self, unique_database):
     tbl_name = 'tbl_with_removed_files'
     db_tbl = unique_database + "." + tbl_name

Reply via email to