This is an automated email from the ASF dual-hosted git repository. jasonmfehr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1d640905912944ea05deaa3453cb6a85013b2e54 Author: Csaba Ringhofer <[email protected]> AuthorDate: Sun Jun 8 10:12:14 2025 +0200 IMPALA-14123: Allow forcing predicate push down to Iceberg Since IMPALA-11591 Impala tries to avoid pushing down predicates to Iceberg unless it is necessary (timetravel) or is likely to be useful (at least 1 partition column is involved in predicates). While this makes planning faster, it may miss opportunities to skip files during planning. This patch adds table property impala.iceberg.push_down_hint that expects a comma separated list of column names and leads to push down to Iceberg when there is a predicate on any of these columns. Users can set this manually, while in the future Impala or other tools may be able to set it automatically, e.g. during COMPUTE STATS if there are many files with non-overlapping min/max stats for a given column. Note that in most cases when Iceberg can skip files the Parquet/ORC scanner would also skip most of the data based on stat filtering. The benefit of doing it during planning is reading less footers and a "smaller" query plan. Change-Id: I8eb4ab5204c20b3991fdf305d7317f4023904a0f Reviewed-on: http://gerrit.cloudera.org:8080/22995 Reviewed-by: Csaba Ringhofer <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../apache/impala/planner/IcebergScanPlanner.java | 75 +++++++++++++++++----- .../iceberg-predicate-push-down-hint.test | 36 +++++++++++ tests/query_test/test_iceberg.py | 4 ++ 3 files changed, 99 insertions(+), 16 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java index b7ef9b6d6..ef739f4d7 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java @@ -164,6 +164,9 @@ public class IcebergScanPlanner { private final InMemoryMetricsReporter metricsReporter_ = new InMemoryMetricsReporter(); + // Predicates on columns in this set are always pushed down to Iceberg. + private final Set<String> columnsWithPushDownHint_ = new HashSet<>(); + public IcebergScanPlanner(Analyzer analyzer, PlannerContext ctx, TableRef iceTblRef, List<Expr> conjuncts, MultiAggregateInfo aggInfo) throws ImpalaException { @@ -174,6 +177,8 @@ public class IcebergScanPlanner { tblRef_ = iceTblRef; conjuncts_ = conjuncts; aggInfo_ = aggInfo; + + initPushDownHint(); extractIcebergConjuncts(); snapshotId_ = IcebergUtil.getSnapshotId(getIceTable(), tblRef_.getTimeTravelSpec()); } @@ -845,7 +850,7 @@ public class IcebergScanPlanner { * please refer: https://iceberg.apache.org/spec/#scan-planning */ private void extractIcebergConjuncts() throws ImpalaException { - boolean isPartitionColumnIncluded = false; + boolean pushDownToIceberg = false; Map<SlotId, SlotDescriptor> idToSlotDesc = new HashMap<>(); // Track identity conjuncts by their index in conjuncts_. // The array values are initialized to false. @@ -855,14 +860,17 @@ public class IcebergScanPlanner { } for (int i = 0; i < conjuncts_.size(); i++) { Expr expr = conjuncts_.get(i); - if (isPartitionColumnIncluded(expr, idToSlotDesc)) { - isPartitionColumnIncluded = true; - if (isIdentityPartitionIncluded(expr, idToSlotDesc)) { + List<IcebergColumn> cols = getColumnsInExpr(expr, idToSlotDesc); + if (isPartitionColumnIncluded(cols, idToSlotDesc)) { + pushDownToIceberg = true; + if (isIdentityPartitionIncluded(cols, idToSlotDesc)) { identityConjunctIndex[i] = true; } + } else if (hasPushDownHint(cols, idToSlotDesc)) { + pushDownToIceberg = true; } } - if (!isPartitionColumnIncluded) { + if (!pushDownToIceberg) { // No partition conjuncts, i.e. every conjunct is non-identity conjunct. nonIdentityConjuncts_ = conjuncts_; return; @@ -879,27 +887,43 @@ public class IcebergScanPlanner { } } - private boolean isPartitionColumnIncluded(Expr expr, + private boolean isPartitionColumnIncluded(List<IcebergColumn> cols, Map<SlotId, SlotDescriptor> idToSlotDesc) { - return hasPartitionTransformType(expr, idToSlotDesc, + return hasPartitionTransformType(cols, idToSlotDesc, transformType -> transformType != TIcebergPartitionTransformType.VOID); } - private boolean isIdentityPartitionIncluded(Expr expr, + private void initPushDownHint() { + String HINT_KEY = "impala.iceberg.push_down_hint"; + String hint = + tblRef_.getDesc().getTable().getMetaStoreTable().getParameters().get(HINT_KEY); + if (hint != null) { + for (String col: hint.split(",")) { + columnsWithPushDownHint_.add(col.toLowerCase()); + } + } + } + + private boolean hasPushDownHint(List<IcebergColumn> cols, Map<SlotId, SlotDescriptor> idToSlotDesc) { - return hasPartitionTransformType(expr, idToSlotDesc, - transformType -> transformType == TIcebergPartitionTransformType.IDENTITY); + for (IcebergColumn col: cols) { + // TODO: what to do if some cols in the pred are in the hint, some are not? + // TODO: numerical values could be considered as field id instead of col name + // column names are already in lower case + if (columnsWithPushDownHint_.contains(col.getName())) return true; + } + return false; } - private boolean hasPartitionTransformType(Expr expr, - Map<SlotId, SlotDescriptor> idToSlotDesc, - Predicate<TIcebergPartitionTransformType> pred) { - List<TupleId> tupleIds = Lists.newArrayList(); + private List<IcebergColumn> getColumnsInExpr(Expr expr, + Map<SlotId, SlotDescriptor> idToSlotDesc) { + List<IcebergColumn> result = Lists.newArrayList(); List<SlotId> slotIds = Lists.newArrayList(); + List<TupleId> tupleIds = Lists.newArrayList(); expr.getIds(tupleIds, slotIds); - if (tupleIds.size() != 1) return false; - if (!tupleIds.get(0).equals(tblRef_.getDesc().getId())) return false; + if (tupleIds.size() != 1) return result; + if (!tupleIds.get(0).equals(tblRef_.getDesc().getId())) return result; for (SlotId sId : slotIds) { SlotDescriptor slotDesc = idToSlotDesc.get(sId); @@ -908,11 +932,30 @@ public class IcebergScanPlanner { if (col == null) continue; Preconditions.checkState(col instanceof IcebergColumn); IcebergColumn iceCol = (IcebergColumn)col; + result.add(iceCol); + } + return result; + } + + private boolean isIdentityPartitionIncluded(List<IcebergColumn> cols, + Map<SlotId, SlotDescriptor> idToSlotDesc) { + return hasPartitionTransformType(cols, idToSlotDesc, + transformType -> transformType == TIcebergPartitionTransformType.IDENTITY); + } + + private boolean hasPartitionTransformType(List<IcebergColumn> cols, + Map<SlotId, SlotDescriptor> idToSlotDesc, + Predicate<TIcebergPartitionTransformType> pred) { + for (IcebergColumn iceCol : cols) { TIcebergPartitionTransformType transformType = IcebergUtil.getPartitionTransformType( iceCol, getIceTable().getDefaultPartitionSpec()); if (pred.test(transformType)) { + // TODO: the semantics of the function are not clear - shouldn't + // it only return true if ALL cols in the expr have the given + // partition type? Compound predicates with OR can have multiple + // columns. return true; } } diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-predicate-push-down-hint.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-predicate-push-down-hint.test new file mode 100644 index 000000000..b45d4a0a8 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-predicate-push-down-hint.test @@ -0,0 +1,36 @@ +==== +---- QUERY +# File not filtered during planning as there is no predicate on partition column. +create table t (a int, b int, c int) stored as iceberg; +insert into t values (1, 1, 1); +select count(*) from t where a = 0; +---- TYPES +BIGINT +---- RESULTS +0 +---- RUNTIME_PROFILE +aggregation(SUM, NumRowGroups): 1 +aggregation(SUM, NumStatsFilteredRowGroups): 1 +==== +---- QUERY +# File filtered during planning if there is a predicate on a column in hint. +alter table t set tblproperties ("impala.iceberg.push_down_hint"="a,b"); +select count(*) from t where a = 0; +---- TYPES +BIGINT +---- RESULTS +0 +---- RUNTIME_PROFILE +aggregation(SUM, NumRowGroups): 0 +==== +---- QUERY +# File not filtered during planning if there are predicates only on columns not in hint. +select count(*) from t where c = 0; +---- TYPES +BIGINT +---- RESULTS +0 +---- RUNTIME_PROFILE +aggregation(SUM, NumRowGroups): 1 +aggregation(SUM, NumStatsFilteredRowGroups): 1 +==== \ No newline at end of file diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 141808c67..7ead8d178 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -2156,6 +2156,10 @@ class TestIcebergV2Table(IcebergTestSuite): files_result = check_output(["hdfs", "dfs", "-ls", table_location]) assert "Found 1 items" in files_result + def test_predicate_push_down_hint(self, vector, unique_database): + self.run_test_case('QueryTest/iceberg-predicate-push-down-hint', vector, + use_db=unique_database) + # Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most # of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it
