This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1a814f5f01f1f401517f252aeefe906a2ef1c87a Author: Arnab Karmakar <[email protected]> AuthorDate: Tue Dec 16 13:07:55 2025 +0530 IMPALA-14555: Add Iceberg support for SHOW PARTITIONS WHERE This patch adds WHERE clause support for SHOW PARTITIONS on Iceberg tables, extending the functionality previously added for HDFS tables in IMPALA-14065. Iceberg uses manifest-based file/partition filtering instead of filesystem traversal. This initial implementation supports simple predicates (comparisons, IN, BETWEEN, IS NULL, AND/OR) but not functions, subqueries, or analytics. For complex queries with functions, use Iceberg metadata tables. The following is not supported in SHOW PARTITIONS: SHOW PARTITIONS functional_parquet.iceberg_partitioned WHERE upper(action) = 'CLICK'; Use the metadata table instead: SELECT `partition` FROM functional_parquet.iceberg_partitioned.`partitions` WHERE upper(`partition`.action) = 'CLICK'; This change also adds DROP PARTITION coverage for BETWEEN predicates on Iceberg partition columns in iceberg-drop-partition.test. Testing: - Unit tests in AnalyzeDDLTest for valid predicates and error cases - Positive test cases for simple predicates, BETWEEN, IN, IS NULL, AND, OR, NOT, and boolean literals - Negative test cases that reject unsupported functions with clear errors - DROP PARTITION tests with BETWEEN predicates in testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test TODO: IMPALA-14675: Support complex predicates (SQL functions, non-deterministic functions, aggregates, analytics, subqueries) in SHOW PARTITIONS WHERE for Iceberg tables by evaluating them per-partition using FeSupport.EvalPredicateBatch(), similar to HDFS tables. Generated-by: Cursor AI (claude-4.5-sonnet) Change-Id: I4c0ee4d171ae939770725d89dc504e13f82a7688 Reviewed-on: http://gerrit.cloudera.org:8080/23800 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- common/thrift/Frontend.thrift | 4 + docs/topics/impala_iceberg.xml | 46 ++++++ .../IcebergPartitionExpressionRewriter.java | 19 +++ .../org/apache/impala/analysis/ShowStatsStmt.java | 153 ++++++++++++++---- .../org/apache/impala/catalog/FeIcebergTable.java | 115 ++++++++++++-- .../impala/common/IcebergPredicateConverter.java | 6 +- .../java/org/apache/impala/service/Frontend.java | 14 +- .../org/apache/impala/service/JniFrontend.java | 4 +- .../org/apache/impala/analysis/AnalyzeDDLTest.java | 41 ++++- .../queries/QueryTest/iceberg-drop-partition.test | 81 ++++++++++ .../queries/QueryTest/show-stats.test | 175 ++++++++++++++++++++- 11 files changed, 600 insertions(+), 58 deletions(-) diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift index 9632c0b7e..8de375f45 100644 --- a/common/thrift/Frontend.thrift +++ b/common/thrift/Frontend.thrift @@ -254,7 +254,11 @@ struct TShowStatsParams { 2: CatalogObjects.TTableName table_name 3: optional bool show_column_minmax_stats // Optional: filtered partition ids for SHOW PARTITIONS with a WHERE clause. + // Only used for HDFS tables. 4: optional list<i64> filtered_partition_ids + // Optional: For Iceberg tables with WHERE clause in SHOW PARTITIONS, this contains + // the pre-computed filtered Iceberg partition stats. + 5: optional Results.TResultSet filtered_iceberg_partition_stats } // Parameters for DESCRIBE HISTORY command diff --git a/docs/topics/impala_iceberg.xml b/docs/topics/impala_iceberg.xml index cac82b10d..684f3acf9 100644 --- a/docs/topics/impala_iceberg.xml +++ b/docs/topics/impala_iceberg.xml @@ -474,6 +474,52 @@ CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY (p1 INT, p2 STRING) STORED AS I </conbody> </concept> + <concept id="iceberg_show_partitions"> + <title>Filtering partition information with SHOW PARTITIONS WHERE</title> + <conbody> + <p> + Since <keyword keyref="impala50"/> Impala supports filtering partition information using the + <codeph>SHOW PARTITIONS</codeph> statement with a <codeph>WHERE</codeph> clause for Iceberg tables. + This allows you to view only specific partitions that match the given filter criteria. + </p> + <p> + Example: + <codeblock> +SHOW PARTITIONS functional_parquet.iceberg_partitioned WHERE action = 'click'; +SHOW PARTITIONS ice_t WHERE hour(event_time) > 438296 AND status != 'pending'; +SHOW PARTITIONS ice_t WHERE year(date_col) BETWEEN 2020 AND 2023; + </codeblock> + </p> + <p> + Limitations: + <ul> + <li> + Only simple predicates on partition columns are supported. Standard SQL functions + (e.g., <codeph>rand()</codeph>, <codeph>upper()</codeph>, <codeph>length()</codeph>) are not allowed + in the <codeph>WHERE</codeph> clause. + </li> + <li> + Supported predicates include column comparisons, <codeph>BETWEEN</codeph>, <codeph>IN</codeph>, + <codeph>IS NULL</codeph>/<codeph>IS NOT NULL</codeph>, and logical operators (<codeph>AND</codeph>, + <codeph>OR</codeph>, <codeph>NOT</codeph>). + </li> + <li> + Iceberg partition transforms (<codeph>year()</codeph>, <codeph>month()</codeph>, <codeph>day()</codeph>, + <codeph>hour()</codeph>, <codeph>bucket()</codeph>, <codeph>truncate()</codeph>) can be used in predicates. + </li> + <li> + For more complex queries with functions or aggregations, use Iceberg metadata tables instead: + <codeblock>SELECT * FROM table_name.partitions WHERE <complex predicate></codeblock> + </li> + </ul> + </p> + <p> + More information about the <codeph>SHOW PARTITIONS</codeph> statement can be found at + <xref href="impala_show.xml#show_partitions"/>. + </p> + </conbody> + </concept> + <concept id="iceberg_inserts"> <title>Inserting data into Iceberg tables</title> <conbody> diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java index c47da3ea1..83325e319 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java @@ -25,6 +25,7 @@ import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.IcebergTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.rewrite.BetweenToCompoundRule; import org.apache.impala.thrift.TIcebergPartitionTransformType; import org.apache.impala.util.IcebergUtil; @@ -57,6 +58,14 @@ class IcebergPartitionExpressionRewriter { * @throws AnalysisException when expression rewrite fails */ public Expr rewrite(Expr expr) throws AnalysisException { + // BoolLiterals don't need rewriting, return as-is + if (expr instanceof BoolLiteral) { + return expr; + } + if (expr instanceof BetweenPredicate) { + BetweenPredicate betweenPredicate = (BetweenPredicate) expr; + return rewrite(betweenPredicate); + } if (expr instanceof BinaryPredicate) { BinaryPredicate binaryPredicate = (BinaryPredicate) expr; return rewrite(binaryPredicate); @@ -85,6 +94,16 @@ class IcebergPartitionExpressionRewriter { "Invalid partition filtering expression: " + expr.toSql()); } + private CompoundPredicate rewrite(BetweenPredicate betweenPredicate) + throws AnalysisException { + Expr compoundPredicate = + BetweenToCompoundRule.INSTANCE.apply(betweenPredicate, analyzer_); + if (!compoundPredicate.isAnalyzed()) { + compoundPredicate.analyze(analyzer_); + } + return rewrite((CompoundPredicate) compoundPredicate); + } + private BinaryPredicate rewrite(BinaryPredicate binaryPredicate) throws AnalysisException { Expr term = binaryPredicate.getChild(0); diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java index fef820d9d..e84383ae6 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java @@ -33,9 +33,11 @@ import org.apache.impala.catalog.FeFsPartition; import org.apache.impala.catalog.FeView; import org.apache.impala.catalog.paimon.FePaimonTable; import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.IcebergPartitionPredicateConverter; import org.apache.impala.planner.HdfsPartitionPruner; import org.apache.impala.rewrite.ExprRewriter; import org.apache.impala.rewrite.ExtractCompoundVerticalBarExprRule; +import org.apache.impala.thrift.TResultSet; import org.apache.impala.thrift.TShowStatsOp; import org.apache.impala.thrift.TShowStatsParams; @@ -51,12 +53,17 @@ public class ShowStatsStmt extends StatementBase implements SingleTableStmt { protected final TableName tableName_; protected boolean show_column_minmax_stats_ = false; // Optional WHERE predicate for SHOW PARTITIONS. - // ONLY supported for HDFS tables (FeFsTable, excluding FeIcebergTable). - // Iceberg tables use a different partition mechanism and are not supported. + // Supported for HDFS and Iceberg tables. protected Expr whereClause_; + // Computed during analysis if whereClause_ is set for HDFS tables. protected List<Long> filteredPartitionIds_ = null; + // For Iceberg tables with WHERE clause, store the pre-computed filtered + // iceberg partition stats. + // This is computed during analysis and serialized through Thrift. + protected TResultSet filteredIcebergPartitionStats_ = null; + // Set during analysis. protected FeTable table_; @@ -180,9 +187,9 @@ public class ShowStatsStmt extends StatementBase implements SingleTableStmt { * the filtered partition IDs. */ private void analyzeWhereClause(Analyzer analyzer) throws AnalysisException { - if (!(table_ instanceof FeFsTable) || table_ instanceof FeIcebergTable) { + if (!(table_ instanceof FeFsTable)) { throw new AnalysisException( - "WHERE clause in SHOW PARTITIONS is only supported for HDFS tables"); + "WHERE clause in SHOW PARTITIONS is only supported for HDFS and Iceberg tables"); } // Disable authorization checks for internal analysis of WHERE clause @@ -224,33 +231,13 @@ public class ShowStatsStmt extends StatementBase implements SingleTableStmt { throw new AnalysisException( "Aggregate functions are not allowed in SHOW PARTITIONS WHERE"); } - // Ensure all conjuncts reference only partition columns. - List<SlotId> partitionSlots = tableRef.getDesc().getPartitionSlots(); - if (!whereClause_.isBoundBySlotIds(partitionSlots)) { - throw new AnalysisException( - "SHOW PARTITIONS WHERE supports only partition columns"); - } - // Prune the partitions using the HdfsPartitionPruner. - HdfsPartitionPruner pruner = new HdfsPartitionPruner(tableRef.getDesc()); - - try { - // Clone the conjuncts because the pruner will modify the original list. - List<Expr> conjunctsCopy = new ArrayList<>(whereClause_.getConjuncts()); - // Pass evalAllFuncs=true to ensure non-deterministic functions are evaluated - // per-partition instead of being skipped. - Pair<List<? extends FeFsPartition>, List<Expr>> res = - pruner.prunePartitions(analyzer, conjunctsCopy, true, true, tableRef); - Preconditions.checkState(conjunctsCopy.isEmpty(), - "All conjuncts should be evaluated"); - - // All partitions from the pruner have matched - collect their IDs. - Set<Long> ids = new HashSet<>(); - for (FeFsPartition p : res.first) ids.add(p.getId()); - filteredPartitionIds_ = new ArrayList<>(ids); - } catch (org.apache.impala.common.ImpalaException e) { - throw new AnalysisException( - "Failed to evaluate WHERE clause for SHOW PARTITIONS: " + e.getMessage(), e); + // Handle Iceberg tables separately + if (table_ instanceof FeIcebergTable) { + analyzeIcebergWhereClause(analyzer, (FeIcebergTable) table_); + } else { + // Handle HDFS tables + analyzeHdfsWhereClause(analyzer, tableRef); } } finally { // Re-enable authorization checks @@ -258,11 +245,110 @@ public class ShowStatsStmt extends StatementBase implements SingleTableStmt { } } + /** + * Analyzes the WHERE clause for Iceberg tables and computes filtered partition stats. + * Converts the WHERE expression to an Iceberg Expression and evaluates it against + * the table's manifest files. Following the IMPALA-12243 pattern, we compute results + * during analysis (when Analyzer exists) and serialize the results rather than the + * expression, to avoid recreating QueryContext/Analyzer. + * + * Note: Only simple predicates on partition columns are supported for Iceberg tables. + * Functions (deterministic or non-deterministic), aggregates, analytics, and subqueries + * are not supported. For complex queries with functions, use Iceberg metadata tables: + * SHOW PARTITIONS functional_parquet.iceberg_partitioned + * WHERE upper(action) = 'CLICK' + * -- Not supported due to upper() function + * Instead query the metadata table directly: + * SELECT `partition` FROM functional_parquet.iceberg_partitioned.`partitions` + * WHERE upper(`partition`.action) = 'CLICK' + * + * TODO: IMPALA-14675: Add support for more complex predicates by evaluating them + * per-partition using FeSupport.EvalPredicateBatch(), similar to HDFS tables. + */ + private void analyzeIcebergWhereClause(Analyzer analyzer, FeIcebergTable table) + throws AnalysisException { + // Rewrite expressions for Iceberg partition transforms and column references + // This converts Iceberg partition transform functions to IcebergPartitionExpr + // and also handles BETWEEN rewriting internally + IcebergPartitionExpressionRewriter rewriter = + new IcebergPartitionExpressionRewriter(analyzer, + table.getIcebergApiTable().spec()); + + try { + Expr rewrittenExpr = rewriter.rewrite(whereClause_); + rewrittenExpr.analyze(analyzer); + + // Defensive check: all FunctionCallExprs should have been converted by the rewriter + // If any remain, it indicates an internal error in the rewriter logic + Preconditions.checkState(!rewrittenExpr.contains(FunctionCallExpr.class), + "Rewriter should have converted or rejected all function calls, but found: %s", + rewrittenExpr.toSql()); + + // Constant-fold the expression + Expr foldedExpr = analyzer.getConstantFolder().rewrite(rewrittenExpr, analyzer); + + // Convert the Impala expression to an Iceberg expression + // BoolLiterals are handled by the converter and optimized in getPartitionStats + IcebergPartitionPredicateConverter converter = + new IcebergPartitionPredicateConverter(table.getIcebergSchema(), analyzer); + org.apache.iceberg.expressions.Expression icebergExpr = + converter.convert(foldedExpr); + + // Compute the filtered partition stats using the Iceberg Expression + filteredIcebergPartitionStats_ = + FeIcebergTable.Utils.getPartitionStats(table, icebergExpr); + + } catch (org.apache.impala.common.ImpalaException e) { + // Catch errors from Iceberg expression conversion or partition stats computation + throw new AnalysisException( + "Invalid partition filtering expression: " + whereClause_.toSql() + + ".\n" + e.getMessage()); + } + } + + /** + * Analyzes the WHERE clause for HDFS tables and computes filtered partition IDs. + */ + private void analyzeHdfsWhereClause(Analyzer analyzer, TableRef tableRef) + throws AnalysisException { + Preconditions.checkArgument(tableRef.getTable() instanceof FeFsTable); + // Ensure all conjuncts reference only partition columns. + List<SlotId> partitionSlots = tableRef.getDesc().getPartitionSlots(); + if (!whereClause_.isBoundBySlotIds(partitionSlots)) { + throw new AnalysisException( + "SHOW PARTITIONS WHERE supports only partition columns"); + } + + // Prune the partitions using the HdfsPartitionPruner. + HdfsPartitionPruner pruner = new HdfsPartitionPruner(tableRef.getDesc()); + + try { + // Clone the conjuncts because the pruner will modify the original list. + List<Expr> conjunctsCopy = new ArrayList<>(whereClause_.getConjuncts()); + // Pass evalAllFuncs=true to ensure non-deterministic functions are evaluated + // per-partition instead of being skipped. + Pair<List<? extends FeFsPartition>, List<Expr>> res = + pruner.prunePartitions(analyzer, conjunctsCopy, true, true, tableRef); + Preconditions.checkState(conjunctsCopy.isEmpty(), + "All conjuncts should be evaluated"); + + // All partitions from the pruner have matched - collect their IDs. + Set<Long> ids = new HashSet<>(); + for (FeFsPartition p : res.first) ids.add(p.getId()); + filteredPartitionIds_ = new ArrayList<>(ids); + } catch (org.apache.impala.common.ImpalaException e) { + throw new AnalysisException( + "Failed to evaluate WHERE clause for SHOW PARTITIONS: " + e.getMessage(), e); + } + } + @Override public void reset() { super.reset(); - // Clear computed partition IDs so they'll be recomputed after re-analysis + // Clear computed partition IDs and filtered stats so they'll be + // recomputed after re-analysis filteredPartitionIds_ = null; + filteredIcebergPartitionStats_ = null; // Reset the whereExpr if it exists if (whereClause_ != null) whereClause_ = whereClause_.reset(); } @@ -290,6 +376,11 @@ public class ShowStatsStmt extends StatementBase implements SingleTableStmt { if (filteredPartitionIds_ != null) { showStatsParam.setFiltered_partition_ids(filteredPartitionIds_); } + // For Iceberg tables with WHERE clause, pass the pre-computed + // filtered partition stats. + if (filteredIcebergPartitionStats_ != null) { + showStatsParam.setFiltered_iceberg_partition_stats(filteredIcebergPartitionStats_); + } return showStatsParam; } } diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java index dc2a2b988..7a5e7a01d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java @@ -37,6 +37,8 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import javax.annotation.Nullable; + import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.BlockLocation; @@ -47,7 +49,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -521,16 +527,36 @@ public interface FeIcebergTable extends FeFsTable { * Get partition stats for the given fe iceberg table. */ public static TResultSet getPartitionStats(FeIcebergTable table) { - TResultSet result = new TResultSet(); - TResultSetMetadata resultSchema = new TResultSetMetadata(); - result.setSchema(resultSchema); - result.setRows(new ArrayList<>()); + return getPartitionStats(table, null); + } - resultSchema.addToColumns(new TColumn("Partition", Type.STRING.toThrift())); - resultSchema.addToColumns(new TColumn("Number Of Rows", Type.BIGINT.toThrift())); - resultSchema.addToColumns(new TColumn("Number Of Files", Type.BIGINT.toThrift())); + /** + * Get partition stats for the given Iceberg table, optionally filtered by an + * Iceberg Expression. This is used for SHOW PARTITIONS with a WHERE clause. + * @param table the Iceberg table + * @param filterExpr optional Iceberg Expression to filter partitions, + * null for no filtering + */ + public static TResultSet getPartitionStats(FeIcebergTable table, + @Nullable Expression filterExpr) { + TResultSet result = createEmptyPartitionStatsResult(); + + // Check if the filter expression is alwaysTrue or alwaysFalse + if (filterExpr != null) { + if (filterExpr.equals(org.apache.iceberg.expressions.Expressions.alwaysTrue())) { + // Treat alwaysTrue same as null filter - return all partitions + filterExpr = null; + } else if (filterExpr.equals( + org.apache.iceberg.expressions.Expressions.alwaysFalse())) { + // Return empty result for alwaysFalse + return result; + } + } + + // Get the ordered partition stats + Map<String, TIcebergPartitionStats> nameToStats = + getOrderedPartitionStats(table, filterExpr); - Map<String, TIcebergPartitionStats> nameToStats = getOrderedPartitionStats(table); for (Map.Entry<String, TIcebergPartitionStats> partitionInfo : nameToStats .entrySet()) { TResultRowBuilder builder = new TResultRowBuilder(); @@ -543,11 +569,73 @@ public interface FeIcebergTable extends FeFsTable { } /** - * Get partition stats for the given fe iceberg table ordered by partition name. + * Creates an empty TResultSet for partition stats with the correct schema. + * Used when no partitions match the filter or for initializing the result. */ - private static Map<String, TIcebergPartitionStats> getOrderedPartitionStats( - FeIcebergTable table) { - return table.getIcebergPartitionStats() + private static TResultSet createEmptyPartitionStatsResult() { + TResultSet result = new TResultSet(); + TResultSetMetadata resultSchema = new TResultSetMetadata(); + result.setSchema(resultSchema); + result.setRows(new ArrayList<>()); + + resultSchema.addToColumns(new TColumn("Partition", Type.STRING.toThrift())); + resultSchema.addToColumns(new TColumn("Number Of Rows", Type.BIGINT.toThrift())); + resultSchema.addToColumns(new TColumn("Number Of Files", Type.BIGINT.toThrift())); + + return result; + } + + /** + * Get partition stats for the given Iceberg table, optionally filtered by an + * Iceberg Expression, ordered by partition name. + * + * If filterExpr is null, returns stats for all partitions from the cached stats. + * If filterExpr is provided, uses Iceberg's TableScan API to efficiently filter + * content files at the metadata level (using manifest files) before loading them. + * + * @param table the Iceberg table + * @param filterExpr optional Iceberg Expression to filter partitions, + * null for all partitions + * @return Map of partition key to partition stats, ordered by partition key + */ + private static Map<String, TIcebergPartitionStats> + getOrderedPartitionStats(FeIcebergTable table, @Nullable Expression filterExpr) { + // If no filter, return the cached partition stats + if (filterExpr == null) { + return table.getIcebergPartitionStats() + .entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, + (oldValue, newValue) -> oldValue, LinkedHashMap::new)); + } + + Preconditions.checkNotNull(filterExpr); + Map<String, TIcebergPartitionStats> nameToStats = new HashMap<>(); + // Use IcebergUtil.planFiles to retrieve only matching files. + // This uses Iceberg's metadata (manifest files) to skip files that don't match. + try (CloseableIterable<FileScanTask> tasks = IcebergUtil.planFiles(table, + Collections.singletonList(filterExpr), null, null)) { + // Iceberg has already done the filtering at the metadata level. + for (FileScanTask task : tasks) { + ContentFile<?> contentFile = task.file(); + String partitionKey = getPartitionKey(table, contentFile); + nameToStats.put(partitionKey, + mergePartitionStats(nameToStats, contentFile, partitionKey)); + + // Also include delete files for completeness + for (DeleteFile deleteFile : task.deletes()) { + String deletePartitionKey = getPartitionKey(table, deleteFile); + nameToStats.put(deletePartitionKey, + mergePartitionStats(nameToStats, deleteFile, deletePartitionKey)); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to retrieve partition stats", e); + } + + // Return the filtered stats sorted by partition name + return nameToStats .entrySet() .stream() .sorted(Map.Entry.comparingByKey()) @@ -945,7 +1033,8 @@ public interface FeIcebergTable extends FeFsTable { /** * Get iceberg partition from a dataFile and wrapper to a json string */ - public static String getPartitionKey(IcebergTable table, ContentFile<?> contentFile) { + public static String getPartitionKey(FeIcebergTable table, + ContentFile<?> contentFile) { PartitionSpec spec = table.getIcebergApiTable().specs().get(contentFile.specId()); Map<String, String> fieldNameToPartitionValue = new LinkedHashMap<>(); for (int i = 0; i < spec.fields().size(); ++i) { diff --git a/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java b/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java index 8825c8fd9..c42f96f4d 100644 --- a/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java +++ b/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java @@ -63,7 +63,11 @@ public class IcebergPredicateConverter { } public Expression convert(Expr expr) throws ImpalaRuntimeException { - if (expr instanceof BinaryPredicate) { + if (expr instanceof BoolLiteral) { + BoolLiteral boolLiteral = (BoolLiteral) expr; + return boolLiteral.getValue() ? Expressions.alwaysTrue() : + Expressions.alwaysFalse(); + } else if (expr instanceof BinaryPredicate) { return convert((BinaryPredicate) expr); } else if (expr instanceof InPredicate) { return convert((InPredicate) expr); diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 35ddee177..6f9d95221 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -1739,13 +1739,14 @@ public class Frontend { * Generate result set and schema for a SHOW TABLE STATS command. */ public TResultSet getTableStats(String dbName, String tableName, TShowStatsOp op, - List<Long> filteredPartitionIds) + List<Long> filteredPartitionIds, @Nullable TResultSet filteredIcebergPartitionStats) throws ImpalaException { RetryTracker retries = new RetryTracker( String.format("fetching table stats from %s.%s", dbName, tableName)); while (true) { try { - return doGetTableStats(dbName, tableName, op, filteredPartitionIds); + return doGetTableStats(dbName, tableName, op, filteredPartitionIds, + filteredIcebergPartitionStats); } catch(InconsistentMetadataFetchException e) { retries.handleRetryOrThrow(e); } @@ -1753,11 +1754,18 @@ public class Frontend { } private TResultSet doGetTableStats(String dbName, String tableName, TShowStatsOp op, - List<Long> filteredPartitionIds) + List<Long> filteredPartitionIds, @Nullable TResultSet filteredIcebergPartitionStats) throws ImpalaException { FeTable table = getCatalog().getTable(dbName, tableName); if (table instanceof FeFsTable) { if (table instanceof FeIcebergTable && op == TShowStatsOp.PARTITIONS) { + // For Iceberg tables with WHERE clause, use the pre-computed filtered stats. + // Following the IMPALA-12243 pattern, we compute results during analysis + // and serialize the results rather than the expression. + if (filteredIcebergPartitionStats != null) { + return filteredIcebergPartitionStats; + } + // No filter - return all partitions return FeIcebergTable.Utils.getPartitionStats((FeIcebergTable) table); } if (table instanceof FeIcebergTable && op == TShowStatsOp.TABLE_STATS) { diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index 1997442eb..d4ecb8d6e 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -488,7 +488,9 @@ public class JniFrontend { result = frontend_.getTableStats(params.getTable_name().getDb_name(), params.getTable_name().getTable_name(), params.op, params.isSetFiltered_partition_ids() ? - params.getFiltered_partition_ids() : null); + params.getFiltered_partition_ids() : null, + params.isSetFiltered_iceberg_partition_stats() ? + params.getFiltered_iceberg_partition_stats() : null); } try { TSerializer serializer = new TSerializer(protocolFactory_); diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index ae0529e4f..d8269d053 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -4575,12 +4575,43 @@ public class AnalyzeDDLTest extends FrontendTestBase { // WHERE clause with Kudu table should fail (non-HDFS tables don't support WHERE) AnalysisError("show partitions functional_kudu.alltypes where year = 2009", - "WHERE clause in SHOW PARTITIONS is only supported for HDFS tables"); - - // WHERE clause with Iceberg table should fail + "WHERE clause in SHOW PARTITIONS is only supported for HDFS and Iceberg tables"); + + // Valid WHERE clauses with Iceberg tables + AnalyzesOk("show partitions functional_parquet.iceberg_int_partitioned where i = 1"); + AnalyzesOk("show partitions functional_parquet.iceberg_int_partitioned " + + "where i > 1 and j < 30"); + AnalyzesOk("show partitions functional_parquet.iceberg_partitioned " + + "where action = 'click'"); + + // Constant expressions in WHERE clause for Iceberg tables + AnalyzesOk("show partitions functional_parquet.iceberg_int_partitioned where 1 = 1"); + AnalyzesOk("show partitions functional_parquet.iceberg_int_partitioned where 1 = 0"); + AnalyzesOk("show partitions functional_parquet.iceberg_int_partitioned where true"); + AnalyzesOk("show partitions functional_parquet.iceberg_int_partitioned where false"); + + // Functions (deterministic or non-deterministic) are not supported for Iceberg tables + AnalysisError("show partitions functional_parquet.iceberg_int_partitioned " + + "where rand() < 0.5", + "Invalid partition filtering expression: rand() < 0.5.\n" + + "Invalid partition predicate: rand()"); + AnalysisError("show partitions functional_parquet.iceberg_int_partitioned " + + "where length(uuid()) > 30", + "Invalid partition filtering expression: length(uuid()) > 30.\n" + + "Invalid partition predicate: length(uuid())"); + AnalysisError("show partitions functional_parquet.iceberg_int_partitioned " + + "where i = 1 and rand() < 1", + "Invalid partition filtering expression: i = 1 AND rand() < 1.\n" + + "Invalid partition predicate: rand()"); AnalysisError("show partitions functional_parquet.iceberg_int_partitioned " + - "where i = 1", - "WHERE clause in SHOW PARTITIONS is only supported for HDFS tables"); + "where upper(cast(i as string)) = '1'", + "Invalid partition filtering expression: upper(CAST(i AS STRING)) = '1'.\n" + + "Invalid partition predicate: upper(CAST(i AS STRING))"); + + // Iceberg WHERE clause with partition field name should fail + AnalysisError("show partitions functional_parquet.iceberg_partitioned " + + "where event_time_hour = 438296", + "Could not resolve column/field reference: 'event_time_hour'"); // WHERE clause must be a boolean expression AnalysisError("show partitions functional.alltypes where year", diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test index 43d18cadc..44f0020ab 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test @@ -415,3 +415,84 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/iceberg_drop_partition_delete/d ---- TYPES STRING, STRING, STRING, STRING ==== +---- QUERY +# Test BETWEEN predicate in DROP PARTITION +CREATE TABLE iceberg_drop_partition_between_test + (partition_col int, data_col string) +PARTITIONED BY SPEC (identity(partition_col)) +STORED AS ICEBERG; +INSERT INTO iceberg_drop_partition_between_test VALUES (1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five'); +==== +---- QUERY +# Check initial partition count +SELECT COUNT(*) FROM $DATABASE.iceberg_drop_partition_between_test.`partitions`; +---- RESULTS +5 +---- TYPES +BIGINT +==== +---- QUERY +# Drop partitions using BETWEEN predicate +ALTER TABLE $DATABASE.iceberg_drop_partition_between_test DROP PARTITION (partition_col BETWEEN 2 AND 4); +---- RESULTS +'Dropped 3 partition(s)' +==== +---- QUERY +# Verify correct partitions were dropped +SELECT COUNT(*) FROM $DATABASE.iceberg_drop_partition_between_test.`partitions`; +---- RESULTS +2 +---- TYPES +BIGINT +==== +---- QUERY +# Verify remaining data - note duplicates due to multiple data files +SELECT DISTINCT partition_col FROM $DATABASE.iceberg_drop_partition_between_test ORDER BY partition_col; +---- RESULTS +1 +5 +---- TYPES +INT +==== +---- QUERY +# Test BETWEEN with partition column in mixed partition spec +CREATE TABLE iceberg_drop_partition_between_mixed_test + (identity_col int, unpart_col string, ts_col timestamp) +PARTITIONED BY SPEC (identity(identity_col), hour(ts_col)) +STORED AS ICEBERG; +INSERT INTO iceberg_drop_partition_between_mixed_test VALUES (1, 'one', '2023-01-01 10:00:00'), (2, 'two', '2023-01-01 11:00:00'), (3, 'three', '2023-01-01 12:00:00'), (4, 'four', '2023-01-01 13:00:00'), (5, 'five', '2023-01-01 14:00:00'); +==== +---- QUERY +# Check initial partition count +SELECT COUNT(*) FROM $DATABASE.iceberg_drop_partition_between_mixed_test.`partitions`; +---- RESULTS +5 +---- TYPES +BIGINT +==== +---- QUERY +# Drop partitions using BETWEEN on identity partition column +ALTER TABLE $DATABASE.iceberg_drop_partition_between_mixed_test DROP PARTITION (identity_col BETWEEN 2 AND 4); +---- RESULTS +'Dropped 3 partition(s)' +==== +---- QUERY +# Verify remaining partitions +SELECT COUNT(*) FROM $DATABASE.iceberg_drop_partition_between_mixed_test.`partitions`; +---- RESULTS +2 +---- TYPES +BIGINT +==== +---- QUERY +# Negative test: BETWEEN with transform function call should fail +# BETWEEN is rewritten to compound predicate early in analysis, before transform functions are resolved +CREATE TABLE iceberg_drop_partition_between_bucket_test + (bucket_col int, data_col string) +PARTITIONED BY SPEC (bucket(3, bucket_col)) +STORED AS ICEBERG; +INSERT INTO iceberg_drop_partition_between_bucket_test VALUES (1, 'one'), (2, 'two'), (3, 'three'); +ALTER TABLE $DATABASE.iceberg_drop_partition_between_bucket_test DROP PARTITION (bucket(3, bucket_col) BETWEEN 0 AND 1); +---- CATCH +AnalysisException: $DATABASE.bucket() unknown for database $DATABASE. Currently this db has 0 functions. +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-stats.test b/testdata/workloads/functional-query/queries/QueryTest/show-stats.test index 51a67bd54..0f9bf2a03 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/show-stats.test +++ b/testdata/workloads/functional-query/queries/QueryTest/show-stats.test @@ -460,10 +460,177 @@ STRING_COL, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREM STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING, STRING ==== ---- QUERY -# Negative test: SHOW PARTITIONS WHERE not supported on Iceberg tables -show partitions functional_parquet.iceberg_int_partitioned where i = 1 +# SHOW PARTITIONS on Iceberg table - basic filter +show partitions functional_parquet.iceberg_partitioned where action = 'click' +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438297","action":"click"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - multiple filters +show partitions functional_parquet.iceberg_partitioned where hour(event_time) > 438296 and action != 'download' +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438297","action":"click"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - constant true +show partitions functional_parquet.iceberg_partitioned where true +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438296","action":"view"}',8,8 +'{"event_time_hour":"438297","action":"click"}',6,6 +'{"event_time_hour":"438298","action":"download"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - constant false (no results) +show partitions functional_parquet.iceberg_partitioned where false +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - constant expression (1=0) +show partitions functional_parquet.iceberg_partitioned where 1 = 0 +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - constant expression (1=1) +show partitions functional_parquet.iceberg_partitioned where 1 = 1 +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438296","action":"view"}',8,8 +'{"event_time_hour":"438297","action":"click"}',6,6 +'{"event_time_hour":"438298","action":"download"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - IN clause +show partitions functional_parquet.iceberg_partitioned where action in ('view', 'download') +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438296","action":"view"}',8,8 +'{"event_time_hour":"438298","action":"download"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - BETWEEN +show partitions functional_parquet.iceberg_partitioned where hour(event_time) between 438296 and 438297 +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438296","action":"view"}',8,8 +'{"event_time_hour":"438297","action":"click"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - IS NOT NULL +show partitions functional_parquet.iceberg_partitioned where action is not null +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438296","action":"view"}',8,8 +'{"event_time_hour":"438297","action":"click"}',6,6 +'{"event_time_hour":"438298","action":"download"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - OR conditions +show partitions functional_parquet.iceberg_partitioned where action = 'view' or hour(event_time) = 438298 +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438296","action":"view"}',8,8 +'{"event_time_hour":"438298","action":"download"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - NOT EQUAL +show partitions functional_parquet.iceberg_partitioned where action != 'click' +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438296","action":"view"}',8,8 +'{"event_time_hour":"438298","action":"download"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - complex AND/OR combination +show partitions functional_parquet.iceberg_partitioned where (action = 'view' and hour(event_time) = 438296) or (hour(event_time) >= 438298) +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438296","action":"view"}',8,8 +'{"event_time_hour":"438298","action":"download"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# Negative test: SHOW PARTITIONS WHERE on Iceberg with non-deterministic function +show partitions functional_parquet.iceberg_partitioned where rand() < 0.5 +---- CATCH +Invalid partition filtering expression: rand() < 0.5. +Invalid partition predicate: rand() +==== +---- QUERY +# Negative test: SHOW PARTITIONS WHERE on Iceberg with function (length/uuid) +show partitions functional_parquet.iceberg_partitioned where length(uuid()) > 30 +---- CATCH +Invalid partition filtering expression: length(uuid()) > 30. +Invalid partition predicate: length(uuid()) +==== +---- QUERY +# Negative test: SHOW PARTITIONS WHERE on Iceberg with partition column and function +show partitions functional_parquet.iceberg_partitioned where hour(event_time) > 438296 and rand() < 1 +---- CATCH +Invalid partition filtering expression: HOUR(event_time) > 438296 AND rand() < 1. +Invalid partition predicate: rand() +==== +---- QUERY +# Negative test: SHOW PARTITIONS WHERE on Iceberg with deterministic function (upper/cast) +show partitions functional_parquet.iceberg_partitioned where upper(cast(action as string)) = 'VIEW' +---- CATCH +Invalid partition filtering expression: upper(CAST(action AS STRING)) = 'VIEW'. +Invalid partition predicate: upper(CAST(action AS STRING)) +==== +---- QUERY +# SHOW PARTITIONS on Iceberg table - Filtering using derived partition +show partitions functional_parquet.iceberg_partitioned where hour(event_time) = 438297 +---- LABELS +Partition, Number Of Rows, Number Of Files +---- RESULTS +'{"event_time_hour":"438297","action":"click"}',6,6 +---- TYPES +STRING, BIGINT, BIGINT +==== +---- QUERY +# Negative test: SHOW PARTITIONS WHERE on Iceberg with partition field name (derived column name) +show partitions functional_parquet.iceberg_partitioned where event_time_hour = 438296 ---- CATCH -WHERE clause in SHOW PARTITIONS is only supported for HDFS tables +Could not resolve column/field reference: 'event_time_hour' ==== ---- QUERY # Negative test: SHOW PARTITIONS WHERE with non-partition column @@ -499,7 +666,7 @@ Analytic expressions are not allowed in SHOW PARTITIONS WHERE # Negative test: SHOW PARTITIONS WHERE on Kudu table show partitions functional_kudu.alltypes where year = 2009 ---- CATCH -WHERE clause in SHOW PARTITIONS is only supported for HDFS tables +WHERE clause in SHOW PARTITIONS is only supported for HDFS and Iceberg tables ==== ---- QUERY # Negative test: SHOW PARTITIONS WHERE with subquery
