This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7e0feb4a8e436ddbf618bd0e090dcc24c3563fc4
Author: Gabor Kaszab <[email protected]>
AuthorDate: Thu Feb 16 16:57:44 2023 +0100

    IMPALA-11701 Part1: Don't push down predicates to scanner if already 
applied by Iceberg
    
    We push down predicates to Iceberg that uses them to filter out files
    when getting the results of planFiles(). Using the
    FileScanTask.residual() function we can find out if we have to use
    the predicates to further filter the rows of the given files or if
    Iceberg has already performed all the filtering.
    Basically if we only filter on IDENTITY-partition columns then Iceberg
    can filter the files and using these filters in Impala wouldn't filter
    any more rows from the output (assuming that no partition evolution was
    performed on the table).
    
    An additional benefit of not pushing down no-op predicates to the
    scanner is that we can potentially materialize less slots.
    For example:
    
    SELECT count(1) from iceberg_tbl where part_col = 10;
    
    Another additional benefit comes with count(*) queries. If all the
    predicates are skipped from being pushed to Impala's scanner for a
    count(*) query then the Parquet scanner can go to an optimized path
    where it uses stats instead of reading actual data to answer the query.
    
    In the above query Iceberg filters the files using the predicate on
    a partition column and then there won't be any need to materialize
    'part_col' in Impala, nor to push down the 'part_col = 10' predicate.
    
    Note, this is an all or nothing approach, meaning that assuming N
    number of predicates we either push down all predicates to the scanner
    or none of them. There is a room for improvement to identify a subset
    of the predicates that we still have to push down to the scanner.
    However, for this we'd need a mapping between Impala predicates and the
    predicates returned by Iceberg's FileScanTask.residual() function that
    would significantly increase the complexity of the relevant code.
    
    Testing:
      - Some existing tests needed some extra care as they were checking
        for predicates being pushed down to the scanner, but with this
        patch not all of them are pushed down. For these tests I added some
        extra predicates to achieve that all of the predicates are pushed
        down to the scanner.
      - Added a new planner test suite for checking how predicate push down
        works with Iceberg tables.
    
    Change-Id: Icfa80ce469cecfcfbcd0dcb595a6b04b7027285b
    Reviewed-on: http://gerrit.cloudera.org:8080/19534
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/planner/HdfsScanNode.java    |  17 ++-
 .../org/apache/impala/planner/IcebergScanNode.java |  20 +++-
 .../apache/impala/planner/IcebergScanPlanner.java  |  51 +++++++--
 .../org/apache/impala/planner/PlannerTest.java     |   9 ++
 .../functional/functional_schema_template.sql      |  26 +++++
 .../datasets/functional/schema_constraints.csv     |   1 +
 .../queries/PlannerTest/iceberg-predicates.test    |  77 ++++++++++++++
 .../queries/PlannerTest/iceberg-v2-tables.test     | 114 ++++++++++++++-------
 .../queries/PlannerTest/tablesample.test           |  14 +--
 .../QueryTest/iceberg-in-predicate-push-down.test  |  43 ++++++--
 .../QueryTest/iceberg-partitioned-insert.test      |  27 ++++-
 .../iceberg-plain-count-star-optimization.test     |   7 +-
 12 files changed, 331 insertions(+), 75 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 626c7c41c..71eab659c 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1203,7 +1203,9 @@ public class HdfsScanNode extends ScanNode {
       long partitionNumRows = partition.getNumRows();
 
       analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
-      fileFormats_.add(partition.getFileFormat());
+      if (partition.getFileFormat() != HdfsFileFormat.ICEBERG) {
+        fileFormats_.add(partition.getFileFormat());
+      }
       if (!partition.getFileFormat().isParquetBased()) {
         allParquet = false;
       }
@@ -1387,7 +1389,7 @@ public class HdfsScanNode extends ScanNode {
         || partition.getFileFormat() == HdfsFileFormat.ORC)) {
       // IMPALA-8834 introduced the optimization for partition key scan by 
generating
       // one scan range for each HDFS file. With Parquet and ORC, we start 
with the last
-      // block is to get a scan range that contains a file footer for 
short-circuiting.
+      // block to get a scan range that contains a file footer for 
short-circuiting.
       i = fileDesc.getNumFileBlocks() - 1;
     }
     for (; i < fileDesc.getNumFileBlocks(); ++i) {
@@ -1973,6 +1975,9 @@ public class HdfsScanNode extends ScanNode {
       if (isPartitionKeyScan_) {
         output.append(detailPrefix + "partition key scan\n");
       }
+
+      String derivedExplain = getDerivedExplainString(detailPrefix, 
detailLevel);
+      if (!derivedExplain.isEmpty()) output.append(derivedExplain);
     }
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
       output.append(getStatsExplainString(detailPrefix)).append("\n");
@@ -2016,6 +2021,14 @@ public class HdfsScanNode extends ScanNode {
     return output.toString();
   }
 
+  // Overriding this function can be used to add extra information to the 
explain string
+  // of the HDFS Scan node from the derived classes (e.g. IcebergScanNode). 
Each new line
+  // in the output should be appended to 'explainLevel' to have the correct 
indentation.
+  protected String getDerivedExplainString(
+      String indentPrefix, TExplainLevel detailLevel) {
+    return "";
+  }
+
   // Helper method that prints min max original conjuncts by tuple descriptor.
   private String getMinMaxOriginalConjunctsExplainString(
       String prefix, TExplainLevel detailLevel) {
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index 4551355d5..c2a028aac 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -36,9 +36,9 @@ import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.Type;
-import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.fb.FbIcebergDataFileFormat;
+import org.apache.impala.thrift.TExplainLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,15 +52,20 @@ public class IcebergScanNode extends HdfsScanNode {
   private final static Logger LOG = 
LoggerFactory.getLogger(IcebergScanNode.class);
 
   private List<FileDescriptor> fileDescs_;
+
   // Conjuncts on columns not involved in IDENTITY-partitioning. Subset of 
'conjuncts_',
   // but this does not include conjuncts on IDENTITY-partitioned columns, 
because such
   // conjuncts have already been pushed to Iceberg to filter out 
partitions/files, so
   // they don't have further selectivity on the surviving files.
   private List<Expr> nonIdentityConjuncts_;
 
+  // Conjuncts that will be skipped from pushing down to the scan node because 
Iceberg
+  // already applied them and they won't filter any further rows.
+  private List<Expr> skippedConjuncts_;
+
   public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts,
       MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs,
-      List<Expr> nonIdentityConjuncts)
+      List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts)
       throws ImpalaRuntimeException {
     super(id, tblRef.getDesc(), conjuncts,
         
getIcebergPartition(((FeIcebergTable)tblRef.getTable()).getFeFsTable()), tblRef,
@@ -90,6 +95,7 @@ public class IcebergScanNode extends HdfsScanNode {
     if (hasParquet) fileFormats_.add(HdfsFileFormat.PARQUET);
     if (hasOrc) fileFormats_.add(HdfsFileFormat.ORC);
     if (hasAvro) fileFormats_.add(HdfsFileFormat.AVRO);
+    this.skippedConjuncts_ = skippedConjuncts;
   }
 
   /**
@@ -216,4 +222,14 @@ public class IcebergScanNode extends HdfsScanNode {
     result.put(sampledPartitionMetadata, sampleFiles);
     return result;
   }
+
+  @Override
+  protected String getDerivedExplainString(
+      String indentPrefix, TExplainLevel detailLevel) {
+    if (!skippedConjuncts_.isEmpty()) {
+      return indentPrefix + String.format("skipped Iceberg predicates: %s\n",
+          Expr.getExplainString(skippedConjuncts_, detailLevel));
+    }
+    return "";
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 29accb498..806cdf62e 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.expressions.True;
 import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Types;
@@ -106,8 +107,14 @@ public class IcebergScanPlanner {
   private List<Expr> conjuncts_;
   private MultiAggregateInfo aggInfo_;
 
-  // Exprs in icebergConjuncts_ converted to Expression.
+  // Iceberg compatible expressions that are pushed down to Iceberg for query 
planning.
   private final List<Expression> icebergPredicates_ = new ArrayList<>();
+  // The Impala representation of the expressions in 'icebergPredicates_'
+  private final List<Expr> impalaIcebergPredicates_ = new ArrayList<>();
+  // Indicates whether we have to push down 'impalaIcebergPredicates' to 
Impala's scan
+  // node or has Iceberg already done the partition pruning and no further 
rows could be
+  // skipped using these filters.
+  private boolean canSkipPushingDownIcebergPredicates_ = false;
 
   private List<FileDescriptor> dataFilesWithoutDeletes_ = new ArrayList<>();
   private List<FileDescriptor> dataFilesWithDeletes_ = new ArrayList<>();
@@ -135,14 +142,15 @@ public class IcebergScanPlanner {
   }
 
   public PlanNode createIcebergScanPlan() throws ImpalaException {
-    analyzer_.materializeSlots(conjuncts_);
-
     if (!needIcebergForPlanning()) {
+      analyzer_.materializeSlots(conjuncts_);
       setFileDescriptorsBasedOnFileStore();
       return createIcebergScanPlanImpl();
     }
 
     filterFileDescriptors();
+    filterConjuncts();
+    analyzer_.materializeSlots(conjuncts_);
     return createIcebergScanPlanImpl();
   }
 
@@ -180,7 +188,8 @@ public class IcebergScanPlanner {
       // If there are no delete files we can just create a single SCAN node.
       Preconditions.checkState(dataFilesWithDeletes_.isEmpty());
       PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, 
conjuncts_,
-          aggInfo_, dataFilesWithoutDeletes_, nonIdentityConjuncts_);
+          aggInfo_, dataFilesWithoutDeletes_, nonIdentityConjuncts_,
+          getSkippedConjuncts());
       ret.init(analyzer_);
       return ret;
     }
@@ -199,7 +208,7 @@ public class IcebergScanPlanner {
     // can just create a SCAN node for these and do a UNION ALL with the ANTI 
JOIN.
     IcebergScanNode dataScanNode = new IcebergScanNode(
         ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, 
dataFilesWithoutDeletes_,
-        nonIdentityConjuncts_);
+        nonIdentityConjuncts_, getSkippedConjuncts());
     dataScanNode.init(analyzer_);
     List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map(
         entry -> new SlotRef(entry)).collect(Collectors.toList());
@@ -233,12 +242,16 @@ public class IcebergScanPlanner {
     addDeletePositionSlots(deleteDeltaRef);
     IcebergScanNode dataScanNode = new IcebergScanNode(
         dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
-        nonIdentityConjuncts_);
+        nonIdentityConjuncts_, getSkippedConjuncts());
     dataScanNode.init(analyzer_);
     IcebergScanNode deleteScanNode = new IcebergScanNode(
-        deleteScanNodeId, deleteDeltaRef, 
/*conjuncts=*/Collections.emptyList(),
-        aggInfo_, Lists.newArrayList(deleteFiles_),
-        /*nonIdentityConjuncts=*/Collections.emptyList());
+        deleteScanNodeId,
+        deleteDeltaRef,
+        Collections.emptyList(), /*conjuncts*/
+        aggInfo_,
+        Lists.newArrayList(deleteFiles_),
+        Collections.emptyList(), /*nonIdentityConjuncts*/
+        Collections.emptyList()); /*skippedConjuncts*/
     deleteScanNode.init(analyzer_);
 
     // Now let's create the JOIN node
@@ -336,10 +349,15 @@ public class IcebergScanPlanner {
   private void filterFileDescriptors() throws ImpalaException {
     TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec();
 
+    canSkipPushingDownIcebergPredicates_ = true;
     try (CloseableIterable<FileScanTask> fileScanTasks = IcebergUtil.planFiles(
-        getIceTable(), icebergPredicates_, timeTravelSpec)) {
+        getIceTable(), new ArrayList<Expression>(icebergPredicates_), 
timeTravelSpec)) {
       long dataFilesCacheMisses = 0;
       for (FileScanTask fileScanTask : fileScanTasks) {
+        Expression residualExpr = fileScanTask.residual();
+        if (residualExpr != null && !(residualExpr instanceof True)) {
+          canSkipPushingDownIcebergPredicates_ = false;
+        }
         Pair<FileDescriptor, Boolean> fileDesc = 
getFileDescriptor(fileScanTask.file());
         if (!fileDesc.second) ++dataFilesCacheMisses;
         if (fileScanTask.deletes().isEmpty()) {
@@ -360,7 +378,6 @@ public class IcebergScanPlanner {
           }
         }
       }
-
       if (dataFilesCacheMisses > 0) {
         Preconditions.checkState(timeTravelSpec != null);
         LOG.info("File descriptors had to be loaded on demand during time 
travel: " +
@@ -374,6 +391,17 @@ public class IcebergScanPlanner {
     updateDeleteStatistics();
   }
 
+  private void filterConjuncts() {
+    if (canSkipPushingDownIcebergPredicates_) {
+      conjuncts_.removeAll(impalaIcebergPredicates_);
+    }
+  }
+
+  private List<Expr> getSkippedConjuncts() {
+    if (!canSkipPushingDownIcebergPredicates_) return Collections.emptyList();
+    return impalaIcebergPredicates_;
+  }
+
   private void updateDeleteStatistics() {
     for (FileDescriptor fd : dataFilesWithDeletes_) {
       updateDataFilesWithDeletesStatistics(fd);
@@ -584,6 +612,7 @@ public class IcebergScanPlanner {
     Expression predicate = convertIcebergPredicate(expr);
     if (predicate != null) {
       icebergPredicates_.add(predicate);
+      impalaIcebergPredicates_.add(expr);
       LOG.debug("Push down the predicate: " + predicate + " to iceberg");
       return true;
     }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 54eb40393..6b4eba927 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1264,6 +1264,15 @@ public class PlannerTest extends PlannerTestBase {
             
PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
   }
 
+  /**
+   * Checks exercising predicate pushdown with Iceberg tables.
+   */
+  @Test
+  public void testIcebergPredicates() {
+    runPlannerTestFile("iceberg-predicates", "functional_parquet",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
+  }
+
   /**
    * Check that Iceberg V2 table scans work as expected.
    */
diff --git a/testdata/datasets/functional/functional_schema_template.sql 
b/testdata/datasets/functional/functional_schema_template.sql
index 507600762..90e842ea8 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3256,6 +3256,32 @@ hadoop fs -put -f 
${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/i
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+iceberg_partition_evolution
+---- CREATE
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
+(id int, int_col int, string_col string, date_string_col string, year int, 
month int)
+PARTITIONED BY SPEC (year, truncate(4, date_string_col))
+STORED AS ICEBERG;
+---- DEPENDENT_LOAD
+# We can use 'date_string_col' as it is once IMPALA-11954 is done.
+INSERT INTO {db_name}{db_suffix}.iceberg_partition_evolution
+    SELECT id, int_col, string_col, regexp_replace(date_string_col, '/', ''), 
year, month
+    FROM {db_name}{db_suffix}.alltypes;
+ALTER TABLE {db_name}{db_suffix}.iceberg_partition_evolution
+    SET PARTITION SPEC (year, truncate(4, date_string_col), month);
+INSERT INTO {db_name}{db_suffix}.iceberg_partition_evolution
+    SELECT
+        cast(id + 7300 as int),
+        int_col,
+        string_col,
+        regexp_replace(date_string_col, '/', ''),
+        year,
+        month
+    FROM {db_name}{db_suffix}.alltypes;
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 airports_orc
 ---- CREATE
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
diff --git a/testdata/datasets/functional/schema_constraints.csv 
b/testdata/datasets/functional/schema_constraints.csv
index 4c23e23dd..ee8bee76b 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -78,6 +78,7 @@ table_name:iceberg_alltypes_part, constraint:restrict_to, 
table_format:parquet/n
 table_name:iceberg_alltypes_part_orc, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_legacy_partition_schema_evolution, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_legacy_partition_schema_evolution_orc, 
constraint:restrict_to, table_format:parquet/none/none
+table_name:iceberg_partition_evolution, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_timestamp_part, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_timestamptz_part, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_uppercase_col, constraint:restrict_to, 
table_format:parquet/none/none
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test
new file mode 100644
index 000000000..a682813a0
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test
@@ -0,0 +1,77 @@
+# A predicate on a partition that is present both before and after partition 
evolution is
+# not pushed down to scan node because Iceberg already filtered out the 
associated rows.
+# Additionally, the slot associated with this predicate is not materialized.
+SELECT id, int_col, string_col from iceberg_partition_evolution where year = 
2010;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
+   HDFS partitions=1/1 files=730 size=1.25MB
+   skipped Iceberg predicates: `year` = 2010
+   row-size=20B cardinality=7.30K
+====
+# A predicate on a partition that is introduced by partition evolution is 
pushed down to
+# the scan node. Also the associated slot is materialized.
+SELECT id, int_col, string_col from iceberg_partition_evolution where month = 
1;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
+   HDFS partitions=1/1 files=124 size=216.63KB
+   predicates: `month` = 1
+   row-size=24B cardinality=1.24K
+====
+# The predicates that couldn't be pushed to Iceberg are pushed down to the 
scan node,
+# while the ones that are pushed to Iceberg could be skipped from pushing down 
to
+# Impala's scan node if they won't filter any further rows.
+SELECT id, int_col, string_col from iceberg_partition_evolution where year = 
2010 and power(id, 3) > 1000;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
+   HDFS partitions=1/1 files=730 size=1.25MB
+   predicates: power(id, 3) > 1000
+   skipped Iceberg predicates: `year` = 2010
+   row-size=20B cardinality=730
+====
+# Here both predicates are pushed to Iceberg and also to Impala's scan node. 
However,
+# here is a room for optimisation as we could skip pushing down 'year' to the 
scan node
+# as it won't filter further rows.
+SELECT id, int_col, string_col from iceberg_partition_evolution where year = 
2010 and id > 1000;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
+   HDFS partitions=1/1 files=730 size=1.25MB
+   predicates: `year` = 2010, id > 1000
+   row-size=24B cardinality=730
+====
+# If we have predicates on partition columns with non-identity transform that 
could not
+# be pushed to Iceberg then all the predicates are also pushed to Impala's 
scan node.
+# However, here is a room for optimisation as we could skip pushing down 
'year' to the
+# scan node as it won't filter further rows.
+SELECT * FROM iceberg_partition_evolution
+WHERE year = 2010 AND date_string_col='061610';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
+   HDFS partitions=1/1 files=2 size=3.49KB
+   predicates: `year` = 2010, date_string_col = '061610'
+   row-size=40B cardinality=2
+====
+# Checks when all the predicates are skipped in a count(*) query then the 
relevant
+# optimization kicks in for Parquet scanners.
+SELECT COUNT(*) FROM functional_parquet.iceberg_partitioned WHERE action = 
'click';
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_parquet.iceberg_partitioned.stats: 
num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.85KB
+   skipped Iceberg predicates: action = 'click'
+   row-size=8B cardinality=6
+====
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
index 74315a408..30c45b779 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
@@ -809,12 +809,12 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> i
    row-size=36B cardinality=4
 ====
-select * from iceberg_v2_partitioned_position_deletes where action = 
'download';
+select * from iceberg_v2_partitioned_position_deletes where action = 
'download' and id > 0;
 ---- PLAN
 PLAN-ROOT SINK
 |
 02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  row-size=64B cardinality=6
+|  row-size=64B cardinality=1
 |
 |--01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
 |     HDFS partitions=1/1 files=1 size=3.18KB
@@ -822,15 +822,15 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: action = 'download'
-   row-size=64B cardinality=6
+   predicates: id > 0, action = 'download'
+   row-size=64B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
 04:EXCHANGE [UNPARTITIONED]
 |
 02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
-|  row-size=64B cardinality=6
+|  row-size=64B cardinality=1
 |
 |--03:EXCHANGE [BROADCAST]
 |  |
@@ -840,8 +840,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: action = 'download'
-   row-size=64B cardinality=6
+   predicates: id > 0, action = 'download'
+   row-size=64B cardinality=1
 ====
 select * from iceberg_v2_partitioned_position_deletes
 where action = 'download' and user = 'Lisa';
@@ -878,14 +878,14 @@ PLAN-ROOT SINK
    predicates: `user` = 'Lisa', action = 'download'
    row-size=64B cardinality=1
 ====
-select event_time, action from iceberg_partitioned where action = 'click' or 
action = 'view';
+select event_time, action from iceberg_partitioned where (action = 'click' or 
action = 'view') and id > 0;
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: action IN ('click', 'view')
-   row-size=28B cardinality=14
+   predicates: id > 0, action IN ('click', 'view')
+   row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -893,17 +893,17 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: action IN ('click', 'view')
-   row-size=28B cardinality=14
+   predicates: id > 0, action IN ('click', 'view')
+   row-size=32B cardinality=1
 ====
-select event_time, action from iceberg_partitioned where action in ('click', 
'view');
+select event_time, action from iceberg_partitioned where action in ('click', 
'view') and id > 0;
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: action IN ('click', 'view')
-   row-size=28B cardinality=14
+   predicates: id > 0, action IN ('click', 'view')
+   row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -911,17 +911,17 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: action IN ('click', 'view')
-   row-size=28B cardinality=14
+   predicates: id > 0, action IN ('click', 'view')
+   row-size=32B cardinality=1
 ====
-select event_time, action from iceberg_partitioned where 
event_time='2020-01-01 11:00:00' or action = 'click';
+select event_time, action from iceberg_partitioned where 
(event_time='2020-01-01 11:00:00' or action = 'click') and id > 0;
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=6 size=6.85KB
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click'
-   row-size=28B cardinality=6
+   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action 
= 'click')
+   row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -929,17 +929,17 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=6 size=6.85KB
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click'
-   row-size=28B cardinality=6
+   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action 
= 'click')
+   row-size=32B cardinality=1
 ====
-select event_time, action from iceberg_partitioned where 
event_time='2020-01-01 11:00:00' or action = 'click' or action = 'view';
+select event_time, action from iceberg_partitioned where 
(event_time='2020-01-01 11:00:00' or action = 'click' or action = 'view') and 
id > 0;
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 
'click' OR action = 'view'
-   row-size=28B cardinality=14
+   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action 
= 'click' OR action = 'view')
+   row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -947,17 +947,17 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 
'click' OR action = 'view'
-   row-size=28B cardinality=14
+   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action 
= 'click' OR action = 'view')
+   row-size=32B cardinality=1
 ====
-select event_time, action from iceberg_partitioned where 
event_time='2020-01-01 11:00:00' or action in ('click', 'view');
+select event_time, action from iceberg_partitioned where 
(event_time='2020-01-01 11:00:00' or action in ('click', 'view')) and id > 0;
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN 
('click', 'view')
-   row-size=28B cardinality=14
+   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action 
IN ('click', 'view'))
+   row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -965,17 +965,17 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN 
('click', 'view')
-   row-size=28B cardinality=14
+   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action 
IN ('click', 'view'))
+   row-size=32B cardinality=1
 ====
-select event_time, action from iceberg_partitioned where 
event_time='2020-01-01 11:00:00' or action > 'a';
+select event_time, action from iceberg_partitioned where 
(event_time='2020-01-01 11:00:00' or action > 'a') and id > 0;
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=20 size=22.90KB
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a'
-   row-size=28B cardinality=20
+   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action 
> 'a')
+   row-size=32B cardinality=2
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -983,16 +983,17 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=20 size=22.90KB
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a'
-   row-size=28B cardinality=20
+   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action 
> 'a')
+   row-size=32B cardinality=2
 ====
+# All predicates are pushed down to Iceberg and won't filter any further rows. 
Skip pushing it to Scan node.
 select event_time, action from iceberg_partitioned where 
event_time='2020-01-01 11:00:00';
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=0 size=0B
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00'
+   skipped Iceberg predicates: event_time = TIMESTAMP '2020-01-01 11:00:00'
    row-size=28B cardinality=0
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -1001,6 +1002,41 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=0 size=0B
-   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00'
+   skipped Iceberg predicates: event_time = TIMESTAMP '2020-01-01 11:00:00'
    row-size=28B cardinality=0
 ====
+# Similar as above but on a table with positional deletes on all data files.
+select * from iceberg_v2_partitioned_position_deletes where action = 
'download';
+---- PLAN
+PLAN-ROOT SINK
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=64B cardinality=6
+|
+|--01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=207B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   skipped Iceberg predicates: action = 'download'
+   row-size=64B cardinality=6
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=64B cardinality=6
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=207B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   skipped Iceberg predicates: action = 'download'
+   row-size=64B cardinality=6
+====
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 201f72371..d4c347c76 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -266,7 +266,7 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=44B cardinality=3
    in pipelines: 00(GETNEXT)
 ====
-# Sampling Iceberg tables. Count(*) is not optimized.
+# Sampling Iceberg tables. Count(*) is optimized.
 select count(*) from functional_parquet.iceberg_non_partitioned tablesample 
system(10) repeatable(1234)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -276,7 +276,7 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
-|  output: count(*)
+|  output: sum_init_zero(functional_parquet.iceberg_non_partitioned.stats: 
num_rows)
 |  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB 
thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -288,7 +288,7 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=6
    mem-estimate=32.00MB mem-reservation=8.00KB thread-reservation=1
-   tuple-ids=0 row-size=0B cardinality=3
+   tuple-ids=0 row-size=8B cardinality=20
    in pipelines: 00(GETNEXT)
 ====
 # Sampling partitioned Iceberg tables.
@@ -313,7 +313,7 @@ PLAN-ROOT SINK
 # Sampling Iceberg tables with predicates. Predicate pushdown to Iceberg 
happens
 # before sampling (similarly to static partition pruning).
 select * from functional_parquet.iceberg_partitioned tablesample system(50) 
repeatable(1234)
-where action = 'click'
+where action = 'click' and id > 0
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=68.00MB mem-reservation=4.03MB 
thread-reservation=2
@@ -323,13 +323,13 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=4 size=4.57KB
-   predicates: action = 'click'
+   predicates: id > CAST(0 AS INT), action = 'click'
    stored statistics:
      table: rows=20 size=22.90KB
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=5
-   parquet statistics predicates: action = 'click'
-   parquet dictionary predicates: action = 'click'
+   parquet statistics predicates: id > CAST(0 AS INT), action = 'click'
+   parquet dictionary predicates: id > CAST(0 AS INT), action = 'click'
    mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=44B cardinality=4
    in pipelines: 00(GETNEXT)
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
index 5e1bee9ea..fcec4cb69 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
@@ -47,26 +47,43 @@ STRING, STRING, STRING, STRING
 ---- QUERY
 # Start testing predicate push-down for int column
 # The IN predicate matches all row group
+# Note, the filter on a non-partition col is needed in the below tests because 
without it Iceberg
+# could do the filtering for us and predicate pushdown to the scanners 
wouldn't be needed.
 select
   count(1)
 from
   ice_pred_pd1
 where
-  col_i in (0, 1, 2);
+  col_i in (0, 1, 2) and col_bi > 0;
 ---- RESULTS
 9
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 3
 ====
 ---- QUERY
-# The IN predicate matches two row group
+# Filtering only on a partition column is done by Iceberg and in Impala we can 
get the results
+# simply using file metadata.
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i in (0, 1, 2);
+---- RESULTS
+9
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 3
+====
+---- QUERY
+# The IN predicate matches two row groups
 # InList: constant expr
 select
   count(1)
 from
   ice_pred_pd1
 where
-  col_i in (ceil(-0.1), 1 * 2);
+  col_i in (ceil(-0.1), 1 * 2) and col_bi > 0;
 ---- RESULTS
 6
 ---- RUNTIME_PROFILE
@@ -79,7 +96,7 @@ select
 from
   ice_pred_pd1
 where
-  col_i in (0);
+  col_i in (0) and col_bi > 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
@@ -92,7 +109,7 @@ select
 from
   ice_pred_pd1
 where
-  col_i in (-1, 3);
+  col_i in (-1, 3) and col_bi > 0;
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
@@ -422,13 +439,27 @@ select
 from
   ice_pred_pd1
 where
-  col_i not in (0, 1) and col_i >= 0;
+  col_i not in (0, 1) and col_i >= 0 and col_bi > 0;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
+# NOT_IN could be answered using file metadata if only partition cols are 
included
+select
+  count(1)
+from
+  ice_pred_pd1
+where
+  col_i not in (0, 1) and col_i >= 0;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 1
+====
+---- QUERY
 # NOT_IN does not work because col_dt is not the partition column
 select
   count(1)
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index faeef6e4f..a1b955a94 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -173,11 +173,23 @@ aggregation(SUM, NumRowGroups): 0
 aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
+# When filtered only by partition column Iceberg can do the filtering and no 
need to read data in Impala.
 select count(*) from ice_bigints
 where i = 0 and j = 0;
 ---- RESULTS
 1217
 ---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+aggregation(SUM, NumFileMetadataRead): 1
+====
+---- QUERY
+# When not just partition columns are involved in the filtering then Impala 
has to read data to answer the query.
+select count(*) from ice_bigints
+where i = 0 and j = 0 and  k >= 0;
+---- RESULTS
+1217
+---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 1
 aggregation(SUM, RowsRead): 1217
 ====
@@ -276,7 +288,8 @@ where bool_col = true;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 4
 ====
 ---- QUERY
 select count(*) from alltypes_part
@@ -286,7 +299,8 @@ where float_col = 0;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 4
 ====
 ---- QUERY
 select count(*) from alltypes_part
@@ -296,7 +310,8 @@ where double_col = 0;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 4
 ====
 ---- QUERY
 select count(*) from alltypes_part
@@ -306,7 +321,8 @@ where date_col = '2009-01-01';
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 2
 ====
 ---- QUERY
 select count(*) from alltypes_part
@@ -316,7 +332,8 @@ where string_col = '0';
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 4
 ====
 ---- QUERY
 # 'timestamp_col' is not a partitioning column, so min/max stats will not be 
used to
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
index 5a3a19656..daeb1e440 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
@@ -144,6 +144,7 @@ explain select 123, count(*), 321 from ice_tbl;
 'SMALLINT)'
 ====
 ---- QUERY
+# Filtering by a partition column results in Iceberg doing the filtering 
instead of Impala.
 select
   count(*)
 from
@@ -153,8 +154,8 @@ where
 ---- RESULTS
 4
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 2
-aggregation(SUM, NumFileMetadataRead): 0
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 2
 ====
 ---- QUERY
 select
@@ -232,4 +233,4 @@ BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
 aggregation(SUM, NumFileMetadataRead): 0
-====
\ No newline at end of file
+====


Reply via email to