This is an automated email from the ASF dual-hosted git repository.

jasonmfehr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1d640905912944ea05deaa3453cb6a85013b2e54
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Sun Jun 8 10:12:14 2025 +0200

    IMPALA-14123: Allow forcing predicate push down to Iceberg
    
    Since IMPALA-11591 Impala tries to avoid  pushing down predicates to
    Iceberg unless it is necessary (timetravel) or is likely to be useful
    (at least 1 partition column is involved in predicates). While this
    makes planning faster, it may miss opportunities to skip files during
    planning.
    
    This patch adds table property impala.iceberg.push_down_hint that
    expects a comma separated list of column names and leads to push
    down to Iceberg when there is a predicate on any of these columns.
    Users can set this manually, while in the future Impala or other tools
    may be able to set it automatically, e.g. during COMPUTE STATS if
    there are many files with non-overlapping min/max stats for a given
    column.
    
    Note that in most cases when Iceberg can skip files the Parquet/ORC
    scanner would also skip most of the data based on stat filtering. The
    benefit of doing it during planning is reading less footers and a
    "smaller" query plan.
    
    Change-Id: I8eb4ab5204c20b3991fdf305d7317f4023904a0f
    Reviewed-on: http://gerrit.cloudera.org:8080/22995
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/planner/IcebergScanPlanner.java  | 75 +++++++++++++++++-----
 .../iceberg-predicate-push-down-hint.test          | 36 +++++++++++
 tests/query_test/test_iceberg.py                   |  4 ++
 3 files changed, 99 insertions(+), 16 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index b7ef9b6d6..ef739f4d7 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -164,6 +164,9 @@ public class IcebergScanPlanner {
 
   private final InMemoryMetricsReporter metricsReporter_ = new 
InMemoryMetricsReporter();
 
+  // Predicates on columns in this set are always pushed down to Iceberg.
+  private final Set<String> columnsWithPushDownHint_ = new HashSet<>();
+
   public IcebergScanPlanner(Analyzer analyzer, PlannerContext ctx,
       TableRef iceTblRef, List<Expr> conjuncts, MultiAggregateInfo aggInfo)
       throws ImpalaException {
@@ -174,6 +177,8 @@ public class IcebergScanPlanner {
     tblRef_ = iceTblRef;
     conjuncts_ = conjuncts;
     aggInfo_ = aggInfo;
+
+    initPushDownHint();
     extractIcebergConjuncts();
     snapshotId_ = IcebergUtil.getSnapshotId(getIceTable(), 
tblRef_.getTimeTravelSpec());
   }
@@ -845,7 +850,7 @@ public class IcebergScanPlanner {
    * please refer: https://iceberg.apache.org/spec/#scan-planning
    */
   private void extractIcebergConjuncts() throws ImpalaException {
-    boolean isPartitionColumnIncluded = false;
+    boolean pushDownToIceberg = false;
     Map<SlotId, SlotDescriptor> idToSlotDesc = new HashMap<>();
     // Track identity conjuncts by their index in conjuncts_.
     // The array values are initialized to false.
@@ -855,14 +860,17 @@ public class IcebergScanPlanner {
     }
     for (int i = 0; i < conjuncts_.size(); i++) {
       Expr expr = conjuncts_.get(i);
-      if (isPartitionColumnIncluded(expr, idToSlotDesc)) {
-        isPartitionColumnIncluded = true;
-        if (isIdentityPartitionIncluded(expr, idToSlotDesc)) {
+      List<IcebergColumn> cols = getColumnsInExpr(expr, idToSlotDesc);
+      if (isPartitionColumnIncluded(cols, idToSlotDesc)) {
+        pushDownToIceberg = true;
+        if (isIdentityPartitionIncluded(cols, idToSlotDesc)) {
           identityConjunctIndex[i] = true;
         }
+      } else if (hasPushDownHint(cols, idToSlotDesc)) {
+        pushDownToIceberg = true;
       }
     }
-    if (!isPartitionColumnIncluded) {
+    if (!pushDownToIceberg) {
       // No partition conjuncts, i.e. every conjunct is non-identity conjunct.
       nonIdentityConjuncts_ = conjuncts_;
       return;
@@ -879,27 +887,43 @@ public class IcebergScanPlanner {
     }
   }
 
-  private boolean isPartitionColumnIncluded(Expr expr,
+  private boolean isPartitionColumnIncluded(List<IcebergColumn> cols,
       Map<SlotId, SlotDescriptor> idToSlotDesc) {
-    return hasPartitionTransformType(expr, idToSlotDesc,
+    return hasPartitionTransformType(cols, idToSlotDesc,
         transformType -> transformType != TIcebergPartitionTransformType.VOID);
   }
 
-  private boolean isIdentityPartitionIncluded(Expr expr,
+  private void initPushDownHint() {
+    String HINT_KEY = "impala.iceberg.push_down_hint";
+    String hint =
+        
tblRef_.getDesc().getTable().getMetaStoreTable().getParameters().get(HINT_KEY);
+    if (hint != null) {
+      for (String col: hint.split(",")) {
+        columnsWithPushDownHint_.add(col.toLowerCase());
+      }
+    }
+  }
+
+  private boolean hasPushDownHint(List<IcebergColumn> cols,
       Map<SlotId, SlotDescriptor> idToSlotDesc) {
-    return hasPartitionTransformType(expr, idToSlotDesc,
-        transformType -> transformType == 
TIcebergPartitionTransformType.IDENTITY);
+    for (IcebergColumn col: cols) {
+      // TODO: what to do if some cols in the pred are in the hint, some are 
not?
+      // TODO: numerical values could be considered as field id instead of col 
name
+      // column names are already in lower case
+      if (columnsWithPushDownHint_.contains(col.getName())) return true;
+    }
+    return false;
   }
 
-  private boolean hasPartitionTransformType(Expr expr,
-      Map<SlotId, SlotDescriptor> idToSlotDesc,
-      Predicate<TIcebergPartitionTransformType> pred) {
-    List<TupleId> tupleIds = Lists.newArrayList();
+  private List<IcebergColumn> getColumnsInExpr(Expr expr,
+      Map<SlotId, SlotDescriptor> idToSlotDesc) {
+    List<IcebergColumn> result = Lists.newArrayList();
     List<SlotId> slotIds = Lists.newArrayList();
+    List<TupleId> tupleIds = Lists.newArrayList();
     expr.getIds(tupleIds, slotIds);
 
-    if (tupleIds.size() != 1) return false;
-    if (!tupleIds.get(0).equals(tblRef_.getDesc().getId())) return false;
+    if (tupleIds.size() != 1) return result;
+    if (!tupleIds.get(0).equals(tblRef_.getDesc().getId())) return result;
 
     for (SlotId sId : slotIds) {
       SlotDescriptor slotDesc = idToSlotDesc.get(sId);
@@ -908,11 +932,30 @@ public class IcebergScanPlanner {
       if (col == null) continue;
       Preconditions.checkState(col instanceof IcebergColumn);
       IcebergColumn iceCol = (IcebergColumn)col;
+      result.add(iceCol);
+    }
+    return result;
+  }
+
+  private boolean isIdentityPartitionIncluded(List<IcebergColumn> cols,
+      Map<SlotId, SlotDescriptor> idToSlotDesc) {
+    return hasPartitionTransformType(cols, idToSlotDesc,
+        transformType -> transformType == 
TIcebergPartitionTransformType.IDENTITY);
+  }
+
+  private boolean hasPartitionTransformType(List<IcebergColumn> cols,
+      Map<SlotId, SlotDescriptor> idToSlotDesc,
+      Predicate<TIcebergPartitionTransformType> pred) {
+    for (IcebergColumn iceCol : cols) {
       TIcebergPartitionTransformType transformType =
           IcebergUtil.getPartitionTransformType(
               iceCol,
               getIceTable().getDefaultPartitionSpec());
       if (pred.test(transformType)) {
+        // TODO: the semantics of the function are not clear - shouldn't
+        // it only return true if ALL cols in the expr have the given
+        // partition type? Compound predicates with OR can have multiple
+        // columns.
         return true;
       }
     }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-predicate-push-down-hint.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-predicate-push-down-hint.test
new file mode 100644
index 000000000..b45d4a0a8
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-predicate-push-down-hint.test
@@ -0,0 +1,36 @@
+====
+---- QUERY
+# File not filtered during planning as there is no predicate on partition 
column.
+create table t (a int, b int, c int) stored as iceberg;
+insert into t values (1, 1, 1);
+select count(*) from t where a = 0;
+---- TYPES
+BIGINT
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+# File filtered during planning if there is a predicate on a column in hint.
+alter table t set tblproperties ("impala.iceberg.push_down_hint"="a,b");
+select count(*) from t where a = 0;
+---- TYPES
+BIGINT
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+# File not filtered during planning if there are predicates only on columns 
not in hint.
+select count(*) from t where c = 0;
+---- TYPES
+BIGINT
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 141808c67..7ead8d178 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -2156,6 +2156,10 @@ class TestIcebergV2Table(IcebergTestSuite):
       files_result = check_output(["hdfs", "dfs", "-ls", table_location])
       assert "Found 1 items" in files_result
 
+  def test_predicate_push_down_hint(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-predicate-push-down-hint', vector,
+                       use_db=unique_database)
+
 
 # Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. 
Note, that most
 # of the test coverage is in TestIcebergV2Table.test_read_position_deletes but 
since it

Reply via email to