This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c21e5212b73d4b51cd93e8080de69d3dee7bbfb7 Author: Peter Rozsa <[email protected]> AuthorDate: Thu Jul 27 08:16:01 2023 +0200 IMPALA-12243: Add support for DROP PARTITION for Iceberg tables This change adds support for DROP PARTITION operation for Iceberg tables. Users can now execute 'ALTER TABLE table DROP PARTITION (partition_filtering_expression)' statements where 'partition_filtering_expression' consists of at least one binary predicate, IN predicate, IS NULL predicate or arbitrary logical combination of these, where the predicate's term is a partitioned column decorated with a partition transform, for example: 'day(column)' or simply column. To select partitions with mutating transforms (non-identity transforms), the user must provide the transform in the selection, for example: if column 'a' is partitioned as year(a), the user must filter it with explicitly providing the transform, like (year(a) = "2012"). This is a requirement for Iceberg's planFiles API. If the column is filtered without the transform, the filtering of datafiles with strict projection results in ambiguous results. The binary predicate's operand can be basic comparison operators: =, !=, <, >, <=, >=. The IN and IS NULL predicates can be inverted with the NOT keyword. Only constant literals are accepted currently for these predicates; for example: (identity_string = "string") is accepted, (identity_string = concat(identity_string, "another_string") is not accepted. Tests: - unit tests for IcebergUtil.getDateTransformValue - analyzer tests for negative cases, simple cases - e2e tests for every transform type, schema evolution and rollback Change-Id: I2a768ba2966f570454687e02e4e6d67df46741f9 Reviewed-on: http://gerrit.cloudera.org:8080/20515 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- common/thrift/CatalogObjects.thrift | 10 + common/thrift/JniCatalog.thrift | 3 + .../analysis/AlterTableDropPartitionStmt.java | 118 +++++++- .../impala/analysis/IcebergPartitionExpr.java | 186 ++++++++++++ .../IcebergPartitionExpressionRewriter.java | 198 +++++++++++++ .../impala/analysis/IcebergPartitionField.java | 2 + .../org/apache/impala/analysis/PartitionSet.java | 19 +- .../apache/impala/analysis/PartitionSpecBase.java | 11 +- .../org/apache/impala/catalog/FeIcebergTable.java | 36 ++- .../org/apache/impala/catalog/IcebergTable.java | 3 +- .../impala/catalog/iceberg/IcebergCtasTarget.java | 6 +- .../impala/catalog/local/LocalIcebergTable.java | 3 +- .../common/IcebergPartitionPredicateConverter.java | 56 ++++ .../impala/common/IcebergPredicateConverter.java | 316 +++++++++++++++++++++ .../apache/impala/planner/IcebergScanPlanner.java | 281 ++---------------- .../apache/impala/service/CatalogOpExecutor.java | 11 +- .../impala/service/DescribeResultFactory.java | 6 +- .../impala/service/IcebergCatalogOpExecutor.java | 24 +- .../java/org/apache/impala/util/IcebergUtil.java | 73 ++++- .../org/apache/impala/analysis/AnalyzeDDLTest.java | 49 ++++ .../org/apache/impala/util/IcebergUtilTest.java | 69 ++++- .../queries/QueryTest/iceberg-drop-partition.test | 287 +++++++++++++++++++ .../queries/QueryTest/iceberg-negative.test | 7 +- tests/query_test/test_iceberg.py | 49 ++++ 24 files changed, 1496 insertions(+), 327 deletions(-) diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 891d86f28..116e484cf 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -626,6 +626,16 @@ struct TIcebergContentFileStore { 5: optional bool has_orc 6: optional bool has_parquet } +// Represents a drop partition request for Iceberg tables +struct TIcebergDropPartitionRequest { + // List of affected file paths (could be empty if the drop partition + // request can be exchanged with a truncate command) + 1: required list<string> paths + // Indicates whether the request could be exchanged with a truncate command + 2: required bool is_truncate + // Number of affected partitions that will be dropped + 3: required i64 num_partitions +} struct TIcebergTable { // Iceberg file system table location diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift index d4b235770..5297cc4dd 100755 --- a/common/thrift/JniCatalog.thrift +++ b/common/thrift/JniCatalog.thrift @@ -292,6 +292,9 @@ struct TAlterTableDropPartitionParams { // If true, underlying data is purged using -skipTrash 3: required bool purge + + // Summary of partitions to delete for Iceberg tables + 4: optional CatalogObjects.TIcebergDropPartitionRequest iceberg_drop_partition_request } // Parameters for ALTER TABLE ALTER/CHANGE COLUMN commands diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java index cd52a29bf..8e75f077f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java @@ -17,15 +17,34 @@ package org.apache.impala.analysis; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeKuduTable; import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.IcebergPartitionPredicateConverter; +import org.apache.impala.common.IcebergPredicateConverter; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.thrift.TAlterTableDropPartitionParams; import org.apache.impala.thrift.TAlterTableParams; import org.apache.impala.thrift.TAlterTableType; -import com.google.common.base.Preconditions; +import org.apache.impala.thrift.TIcebergDropPartitionRequest; +import org.apache.impala.util.IcebergUtil; /** * Represents an ALTER TABLE DROP PARTITION statement. @@ -38,6 +57,16 @@ public class AlterTableDropPartitionStmt extends AlterTableStmt { // deleted. For example, for HDFS tables it skips the trash mechanism private final boolean purgePartition_; + // File paths selected by Iceberg's partition filtering + private List<String> icebergFilePaths_; + + // Statistics for selected Iceberg partitions + private long numberOfIcebergPartitions_; + + // If every partition is selected by Iceberg's partition filtering, this flag signals + // that a truncate should be executed instead of deleting every file from the metadata. + private boolean isIcebergTruncate_ = false; + public AlterTableDropPartitionStmt(TableName tableName, PartitionSet partitionSet, boolean ifExists, boolean purgePartition) { super(tableName); @@ -64,11 +93,24 @@ public class AlterTableDropPartitionStmt extends AlterTableStmt { public TAlterTableParams toThrift() { TAlterTableParams params = super.toThrift(); params.setAlter_type(TAlterTableType.DROP_PARTITION); - TAlterTableDropPartitionParams addPartParams = new TAlterTableDropPartitionParams(); - addPartParams.setPartition_set(partitionSet_.toThrift()); - addPartParams.setIf_exists(!partitionSet_.getPartitionShouldExist()); - addPartParams.setPurge(purgePartition_); - params.setDrop_partition_params(addPartParams); + TAlterTableDropPartitionParams dropPartParams = new TAlterTableDropPartitionParams(); + dropPartParams.setIf_exists(!partitionSet_.getPartitionShouldExist()); + dropPartParams.setPurge(purgePartition_); + params.setDrop_partition_params(dropPartParams); + if (table_ instanceof FeIcebergTable) { + TIcebergDropPartitionRequest request = new TIcebergDropPartitionRequest(); + request.setIs_truncate(isIcebergTruncate_); + if (isIcebergTruncate_) { + request.setPaths(Collections.emptyList()); + } else { + request.setPaths(icebergFilePaths_); + } + request.num_partitions = numberOfIcebergPartitions_; + dropPartParams.setIceberg_drop_partition_request(request); + dropPartParams.setPartition_set(Collections.emptyList()); + } else { + dropPartParams.setPartition_set(partitionSet_.toThrift()); + } return params; } @@ -79,12 +121,70 @@ public class AlterTableDropPartitionStmt extends AlterTableStmt { if (table instanceof FeKuduTable) { throw new AnalysisException("ALTER TABLE DROP PARTITION is not supported for " + "Kudu tables: " + partitionSet_.toSql()); - } else if (table instanceof FeIcebergTable) { - throw new AnalysisException("ALTER TABLE DROP PARTITION is not supported for " + - "Iceberg tables: " + table.getFullName()); } if (!ifExists_) partitionSet_.setPartitionShouldExist(); partitionSet_.setPrivilegeRequirement(Privilege.ALTER); partitionSet_.analyze(analyzer); + + if (table instanceof FeIcebergTable) { analyzeIceberg(analyzer); } + } + + public void analyzeIceberg(Analyzer analyzer) throws AnalysisException { + if (purgePartition_) { + throw new AnalysisException( + "Partition purge is not supported for Iceberg tables"); + } + + FeIcebergTable table = (FeIcebergTable) table_; + // To rewrite transforms and column references + IcebergPartitionExpressionRewriter rewriter = + new IcebergPartitionExpressionRewriter(analyzer, + table.getIcebergApiTable().spec()); + // For Impala expression to Iceberg expression conversion + IcebergPredicateConverter converter = + new IcebergPartitionPredicateConverter(table.getIcebergSchema(), analyzer); + + List<Expression> icebergPartitionExprs = new ArrayList<>(); + for (Expr expr : partitionSet_.getPartitionExprs()) { + expr = rewriter.rewrite(expr); + expr.analyze(analyzer); + analyzer.getConstantFolder().rewrite(expr, analyzer); + try { + icebergPartitionExprs.add(converter.convert(expr)); + } catch (ImpalaException e) { + throw new AnalysisException( + "Invalid partition filtering expression: " + expr.toSql()); + } + } + + try (CloseableIterable<FileScanTask> fileScanTasks = IcebergUtil.planFiles(table, + icebergPartitionExprs, null)) { + icebergFilePaths_ = new ArrayList<>(); + Set<String> icebergPartitionSummary = new HashSet<>(); + for (FileScanTask fileScanTask : fileScanTasks) { + if (fileScanTask.residual().isEquivalentTo(Expressions.alwaysTrue())) { + icebergPartitionSummary.add(fileScanTask.file().partition().toString()); + List<DeleteFile> deleteFiles = fileScanTask.deletes(); + if (!deleteFiles.isEmpty()) { + icebergFilePaths_.addAll(deleteFiles.stream() + .map(deleteFile -> deleteFile.path().toString()).collect( + Collectors.toSet())); + } + icebergFilePaths_.add(fileScanTask.file().path().toString()); + } + } + numberOfIcebergPartitions_ = icebergPartitionSummary.size(); + if (icebergFilePaths_.size() == FeIcebergTable.Utils.getTotalNumberOfFiles(table, + null)) { + isIcebergTruncate_ = true; + icebergFilePaths_ = Collections.emptyList(); + } + } catch (IOException | TableLoadingException | ImpalaRuntimeException e) { + throw new AnalysisException("Error loading metadata for Iceberg table", e); + } + if (numberOfIcebergPartitions_ == 0 && !ifExists_) { + throw new AnalysisException( + "No matching partition(s) found"); + } } } diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpr.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpr.java new file mode 100644 index 000000000..20859b25a --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpr.java @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import java.util.List; +import java.util.Optional; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.IcebergColumn; +import org.apache.impala.catalog.Type; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.thrift.TExprNode; +import org.apache.impala.thrift.TIcebergPartitionTransformType; +import org.apache.impala.util.IcebergUtil; + +/** + * IcebergPartitionExpr represents a partitioned Iceberg table column with its transform + * and its value. This class is used as a temporary state for Iceberg table's partition + * scanning. Later on it gets transformed to an Iceberg Expression. + */ +public class IcebergPartitionExpr extends Expr { + private final IcebergPartitionTransform transform_; + private final SlotRef slotRef_; + private final PartitionSpec partitionSpec_; + + public IcebergPartitionExpr(SlotRef slotRef, PartitionSpec partitionSpec) { + this(new IcebergPartitionTransform(TIcebergPartitionTransformType.IDENTITY), slotRef, + partitionSpec); + } + + protected IcebergPartitionExpr(IcebergPartitionTransform transform, SlotRef slotRef, + PartitionSpec partitionSpec) { + transform_ = transform; + slotRef_ = slotRef; + partitionSpec_ = partitionSpec; + } + + public IcebergPartitionExpr(FunctionCallExpr callExpr, PartitionSpec partitionSpec) + throws AnalysisException { + partitionSpec_ = partitionSpec; + FunctionParams params = callExpr.getParams(); + if (params.size() > 2 && params.size() < 1) { + throw new AnalysisException("Invalid partition predicate: " + callExpr.toSql()); + } + Expr slotRefExpr; + Expr transformParamExpr; + Optional<Integer> transformParam = Optional.empty(); + + if (params.size() == 1) { // transform(column) + slotRefExpr = params.exprs().get(0); + } else { // transform(transform_param, column) + transformParamExpr = params.exprs().get(0); + slotRefExpr = params.exprs().get(1); + + if (!(transformParamExpr instanceof LiteralExpr)) { + throw new AnalysisException("Invalid transform parameter: " + transformParamExpr); + } + LiteralExpr literal = (LiteralExpr) transformParamExpr; + String stringValue = literal.getStringValue(); + try { + int parsedInt = Integer.parseInt(stringValue); + transformParam = Optional.of(parsedInt); + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid transform parameter value: " + stringValue); + } + } + + if (!(slotRefExpr instanceof SlotRef)) { + throw new AnalysisException("Invalid partition predicate: " + callExpr.toSql()); + } + + String functionName = callExpr.getFnName().getFunction(); + if (functionName == null) { + List<String> fnNamePath = callExpr.getFnName().getFnNamePath(); + if (fnNamePath.size() > 1) { + throw new AnalysisException("Invalid partition transform: " + fnNamePath); + } + functionName = fnNamePath.get(0); + } + String transformString = functionName.toUpperCase(); + + slotRef_ = (SlotRef) slotRefExpr; + + try{ + if (transformParam.isPresent()) { + transform_ = + IcebergUtil.getPartitionTransform(transformString, transformParam.get()); + } else { + transform_ = IcebergUtil.getPartitionTransform(transformString); + } + } + catch (ImpalaRuntimeException e) { + throw new AnalysisException(e.getMessage()); + } + + if (transform_.getTransformType().equals(TIcebergPartitionTransformType.VOID)) { + throw new AnalysisException( + "VOID transform is not supported for partition selection"); + } + } + + public SlotRef getSlotRef() { return slotRef_; } + + public IcebergPartitionTransform getTransform() { return transform_; } + + @Override + protected void analyzeImpl(Analyzer analyzer) throws AnalysisException { + slotRef_.analyze(analyzer); + transform_.analyze(analyzer); + SlotDescriptor desc = slotRef_.getDesc(); + IcebergColumn icebergColumn = (IcebergColumn)desc.getColumn(); + + List<PartitionField> partitionFields = + partitionSpec_.getFieldsBySourceId(icebergColumn.getFieldId()); + if (partitionFields.isEmpty()) { + throw new AnalysisException( + "Partition exprs cannot contain non-partition column(s): " + toSql()); + } + TIcebergPartitionTransformType transformType = transform_.getTransformType(); + if (!transformType.equals(TIcebergPartitionTransformType.IDENTITY)) { + for (PartitionField field : partitionFields) { + TIcebergPartitionTransformType transformTypeFromSpec; + try { + transformTypeFromSpec = + IcebergUtil.getPartitionTransformType(field.transform().toString()); + } + catch (ImpalaRuntimeException e) { + throw new AnalysisException(e.getCause()); + } + if (!transformTypeFromSpec.equals(transformType)) { + throw new AnalysisException( + String.format("Can't filter column '%s' with transform type: '%s'", + slotRef_.toSql(), transformType)); + } + } + } + if (transformType.equals(TIcebergPartitionTransformType.TRUNCATE) + || transformType.equals(TIcebergPartitionTransformType.IDENTITY)) { + type_ = slotRef_.type_; + } + if (transformType.equals(TIcebergPartitionTransformType.YEAR) + || transformType.equals(TIcebergPartitionTransformType.BUCKET) + || transformType.equals(TIcebergPartitionTransformType.MONTH) + || transformType.equals(TIcebergPartitionTransformType.DAY) + || transformType.equals(TIcebergPartitionTransformType.HOUR)) { + type_ = Type.INT; + } + } + + @Override + protected float computeEvalCost() { + return UNKNOWN_COST; + } + + @Override + protected String toSqlImpl(ToSqlOptions options) { + return transform_.toSql(slotRef_.toSql()); + } + + @Override + protected void toThrift(TExprNode msg) { + /* Keeping it empty to avoid constant folding */ + } + + @Override + public Expr clone() { + return new IcebergPartitionExpr(transform_, slotRef_, partitionSpec_); + } +} diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java new file mode 100644 index 000000000..c47da3ea1 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionExpressionRewriter.java @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import com.google.common.base.Preconditions; +import java.util.List; +import java.util.function.Function; +import org.apache.iceberg.PartitionSpec; +import org.apache.impala.catalog.FeIcebergTable; +import org.apache.impala.catalog.IcebergTable; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.thrift.TIcebergPartitionTransformType; +import org.apache.impala.util.IcebergUtil; + +class IcebergPartitionExpressionRewriter { + private final Analyzer analyzer_; + private final org.apache.iceberg.PartitionSpec partitionSpec_; + + public IcebergPartitionExpressionRewriter(Analyzer analyzer, + org.apache.iceberg.PartitionSpec partitionSpec) { + this.analyzer_ = analyzer; + this.partitionSpec_ = partitionSpec; + } + + /** + * Rewrites SlotRefs and FunctionCallExprs as IcebergPartitionExpr. SlotRefs targeting a + * columns are rewritten to an IcebergPartitionExpr where the transform type is + * IDENTITY. FunctionExprs are checked whether the function name matches any Iceberg + * transform name, if it matches, then it gets rewritten to an IcebergPartitionExpr + * where the transform is located from the function name, and the parameter (if there's + * any) for the transform is saved as well, and the targeted column's SlotRef is also + * saved in the IcebergPartitionExpr. The resulting IcebergPartitionExpr then replaces + * the original SlotRef/FunctionCallExpr. For Date transforms (year, month, day, hour), + * an implicit conversion happens during the rewriting: string literals formed as + * Iceberg partition values like "2023", "2023-12", ... are parsed and transformed + * automatically to their numeric counterparts. + * + * @param expr incoming expression tree + * @return Expr where SlotRefs and FunctionCallExprs are replaced with + * IcebergPartitionExpr + * @throws AnalysisException when expression rewrite fails + */ + public Expr rewrite(Expr expr) throws AnalysisException { + if (expr instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) expr; + return rewrite(binaryPredicate); + } + if (expr instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + return rewrite(compoundPredicate); + } + if (expr instanceof SlotRef) { + SlotRef slotRef = (SlotRef) expr; + return rewrite(slotRef); + } + if (expr instanceof FunctionCallExpr) { + FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr; + return rewrite(functionCallExpr); + } + if (expr instanceof IsNullPredicate) { + IsNullPredicate isNullPredicate = (IsNullPredicate) expr; + return rewrite(isNullPredicate); + } + if (expr instanceof InPredicate) { + InPredicate isNullPredicate = (InPredicate) expr; + return rewrite(isNullPredicate); + } + throw new AnalysisException( + "Invalid partition filtering expression: " + expr.toSql()); + } + + private BinaryPredicate rewrite(BinaryPredicate binaryPredicate) + throws AnalysisException { + Expr term = binaryPredicate.getChild(0); + Expr literal = binaryPredicate.getChild(1); + IcebergPartitionExpr partitionExpr; + if (term instanceof SlotRef) { + partitionExpr = rewrite((SlotRef) term); + binaryPredicate.getChildren().set(0, partitionExpr); + } else if (term instanceof FunctionCallExpr) { + partitionExpr = rewrite((FunctionCallExpr) term); + binaryPredicate.getChildren().set(0, partitionExpr); + } else { + return binaryPredicate; + } + if (!(literal instanceof LiteralExpr)) { + return binaryPredicate; + } + TIcebergPartitionTransformType transformType = partitionExpr.getTransform() + .getTransformType(); + if (!IcebergUtil.isDateTimeTransformType(transformType)) { return binaryPredicate; } + rewriteDateTransformConstants((LiteralExpr) literal, transformType, + numericLiteral -> binaryPredicate.getChildren().set(1, numericLiteral)); + return binaryPredicate; + } + + private InPredicate rewrite(InPredicate inPredicate) + throws AnalysisException { + Expr term = inPredicate.getChild(0); + List<Expr> literals = inPredicate.getChildren() + .subList(1, inPredicate.getChildCount()); + IcebergPartitionExpr partitionExpr; + if (term instanceof SlotRef) { + partitionExpr = rewrite((SlotRef) term); + inPredicate.getChildren().set(0, partitionExpr); + } else if (term instanceof FunctionCallExpr) { + partitionExpr = rewrite((FunctionCallExpr) term); + inPredicate.getChildren().set(0, partitionExpr); + } else { + return inPredicate; + } + TIcebergPartitionTransformType transformType = partitionExpr.getTransform() + .getTransformType(); + for (int i = 0; i < literals.size(); ++i) { + if (!(literals.get(i) instanceof LiteralExpr)) { + return inPredicate; + } + LiteralExpr literal = (LiteralExpr) literals.get(i); + int affectedChildId = i + 1; + if (!IcebergUtil.isDateTimeTransformType(transformType)) { continue; } + rewriteDateTransformConstants(literal, transformType, + numericLiteral -> inPredicate.getChildren() + .set(affectedChildId, numericLiteral)); + } + return inPredicate; + } + + private void rewriteDateTransformConstants(LiteralExpr literal, + TIcebergPartitionTransformType transformType, + Function<NumericLiteral, ?> rewrite) { + Preconditions.checkState(IcebergUtil.isDateTimeTransformType(transformType)); + if (transformType.equals(TIcebergPartitionTransformType.YEAR) + && literal instanceof NumericLiteral) { + NumericLiteral numericLiteral = (NumericLiteral) literal; + long longValue = numericLiteral.getLongValue(); + long target = longValue + IcebergUtil.ICEBERG_EPOCH_YEAR; + analyzer_.addWarning(String.format( + "The YEAR transform expects years normalized to %d: %d is targeting year %d", + IcebergUtil.ICEBERG_EPOCH_YEAR, longValue, target)); + return; + } + if (!(literal instanceof StringLiteral)) { return; } + try { + Integer dateTimeTransformValue = IcebergUtil.getDateTimeTransformValue( + transformType, + literal.getStringValue()); + rewrite.apply(NumericLiteral.create(dateTimeTransformValue)); + } catch (ImpalaRuntimeException ignore) {} + } + + private CompoundPredicate rewrite(CompoundPredicate compoundPredicate) + throws AnalysisException { + Expr left = compoundPredicate.getChild(0); + Expr right = compoundPredicate.getChild(1); + compoundPredicate.setChild(0, rewrite(left)); + compoundPredicate.setChild(1, rewrite(right)); + return compoundPredicate; + } + + private IcebergPartitionExpr rewrite(SlotRef slotRef) { + return new IcebergPartitionExpr(slotRef, partitionSpec_); + } + + private IcebergPartitionExpr rewrite(FunctionCallExpr functionCallExpr) + throws AnalysisException { + return new IcebergPartitionExpr(functionCallExpr, partitionSpec_); + } + + private IsNullPredicate rewrite(IsNullPredicate isNullPredicate) + throws AnalysisException { + Expr child = isNullPredicate.getChild(0); + + if (child instanceof SlotRef) { + isNullPredicate.getChildren().set(0, rewrite(child)); + } + if (child instanceof FunctionCallExpr) { + isNullPredicate.getChildren() + .set(0, new IcebergPartitionExpr((FunctionCallExpr) child, partitionSpec_)); + } + return isNullPredicate; + } +} diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java index bb77c3699..2e9915a9e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java @@ -65,6 +65,8 @@ public class IcebergPartitionField extends StmtNode { return sourceId_; } + public int getFieldId_() { return fieldId_; } + public TIcebergPartitionTransformType getTransformType() { return transform_.getTransformType(); } diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java index 375b8997b..d28af7341 100644 --- a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java +++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java @@ -17,14 +17,17 @@ package org.apache.impala.analysis; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; - import org.apache.impala.analysis.BinaryPredicate.Operator; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.FeFsPartition; +import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; @@ -32,10 +35,6 @@ import org.apache.impala.common.Reference; import org.apache.impala.planner.HdfsPartitionPruner; import org.apache.impala.thrift.TPartitionKeyValue; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - /** * Represents a set of partitions resulting from evaluating a list of partition conjuncts * against a table's partition list. @@ -45,20 +44,20 @@ public class PartitionSet extends PartitionSpecBase { // Result of analysis, null until analysis is complete. private List<? extends FeFsPartition> partitions_; - public PartitionSet(List<Expr> partitionExprs) { this.partitionExprs_ = ImmutableList.copyOf(partitionExprs); } - public List<? extends FeFsPartition> getPartitions() { return partitions_; } + public List<Expr> getPartitionExprs() { return partitionExprs_; } @Override public void analyze(Analyzer analyzer) throws AnalysisException { super.analyze(analyzer); + if(table_ instanceof FeIcebergTable) return; List<Expr> conjuncts = new ArrayList<>(); // Do not register column-authorization requests. analyzer.setEnablePrivChecks(false); - for (Expr e: partitionExprs_) { + for (Expr e : partitionExprs_) { e.analyze(analyzer); e.checkReturnsBool("Partition expr", false); conjuncts.addAll(e.getConjuncts()); @@ -66,7 +65,7 @@ public class PartitionSet extends PartitionSpecBase { TupleDescriptor desc = analyzer.getDescriptor(tableName_.toString()); List<SlotId> partitionSlots = desc.getPartitionSlots(); - for (Expr e: conjuncts) { + for (Expr e : conjuncts) { analyzer.getConstantFolder().rewrite(e, analyzer); // Make sure there are no constant predicates in the partition exprs. if (e.isConstant()) { @@ -89,7 +88,7 @@ public class PartitionSet extends PartitionSpecBase { partitions_ = pruner.prunePartitions(analyzer, transformedConjuncts, true, null).first; } catch (ImpalaException e) { - if (e instanceof AnalysisException) throw (AnalysisException) e; + if (e instanceof AnalysisException) throw (AnalysisException)e; throw new AnalysisException("Partition expr evaluation failed in the backend.", e); } diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java b/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java index 8f6cd3b1d..723b90db8 100644 --- a/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java +++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSpecBase.java @@ -18,6 +18,7 @@ package org.apache.impala.analysis; import org.apache.impala.authorization.Privilege; +import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeTable; @@ -83,7 +84,7 @@ public abstract class PartitionSpecBase extends StmtNode { } // Make sure the target table is partitioned. - if (table.getMetaStoreTable().getPartitionKeysSize() == 0) { + if (!isPartitioned(table)) { throw new AnalysisException("Table is not partitioned: " + tableName_); } @@ -93,6 +94,14 @@ public abstract class PartitionSpecBase extends StmtNode { nullPartitionKeyValue_ = table_.getNullPartitionKeyValue(); } + private boolean isPartitioned(FeTable table) { + if (table instanceof FeIcebergTable) { + return ((FeIcebergTable) table).isPartitioned(); + } else { + return table.getMetaStoreTable().getPartitionKeysSize() != 0; + } + } + @Override public final String toSql() { return toSql(DEFAULT); diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java index 93449fc90..cd2fd0af5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java @@ -23,7 +23,6 @@ import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -67,6 +66,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FileSystemUtil; +import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Pair; import org.apache.impala.common.PrintUtils; import org.apache.impala.common.Reference; @@ -545,7 +545,7 @@ public interface FeIcebergTable extends FeFsTable { * @return a list of {@link org.apache.hadoop.hive.metastore.api.FieldSchema} */ public static List<FieldSchema> getPartitionTransformKeys(FeIcebergTable table) - throws TableLoadingException { + throws ImpalaRuntimeException { Table icebergTable = table.getIcebergApiTable(); if (icebergTable.specs().isEmpty()) { @@ -553,7 +553,7 @@ public interface FeIcebergTable extends FeFsTable { } PartitionSpec latestSpec = icebergTable.spec(); - HashMap<String, Integer> transformParams = IcebergUtil.getPartitionTransformParams( + Map<String, Integer> transformParams = IcebergUtil.getPartitionTransformParams( latestSpec); List<FieldSchema> fieldSchemaList = Lists.newArrayList(); for (PartitionField field : latestSpec.fields()) { @@ -888,7 +888,7 @@ public interface FeIcebergTable extends FeFsTable { * Get iceberg partition spec by iceberg table metadata */ public static List<IcebergPartitionSpec> loadPartitionSpecByIceberg( - FeIcebergTable table) throws TableLoadingException { + FeIcebergTable table) throws ImpalaRuntimeException { List<IcebergPartitionSpec> ret = new ArrayList<>(); for (PartitionSpec spec : table.getIcebergApiTable().specs().values()) { ret.add(convertPartitionSpec(spec)); @@ -897,9 +897,9 @@ public interface FeIcebergTable extends FeFsTable { } public static IcebergPartitionSpec convertPartitionSpec(PartitionSpec spec) - throws TableLoadingException { - List<IcebergPartitionField> fields = new ArrayList<>();; - HashMap<String, Integer> transformParams = + throws ImpalaRuntimeException { + List<IcebergPartitionField> fields = new ArrayList<>(); + Map<String, Integer> transformParams = IcebergUtil.getPartitionTransformParams(spec); for (PartitionField field : spec.fields()) { fields.add(new IcebergPartitionField(field.sourceId(), field.fieldId(), @@ -1052,5 +1052,27 @@ public interface FeIcebergTable extends FeFsTable { return new FileStatus(contentFile.fileSizeInBytes(), false, 0, 0, DEFAULT_MODIFICATION_TIME, path); } + + public static long getTotalNumberOfFiles(FeIcebergTable icebergTable, + TimeTravelSpec travelSpec) + throws ImpalaRuntimeException { + Map<String, String> snapshotSummary = getSnapshotSummary( + icebergTable.getIcebergApiTable(), + travelSpec); + if (snapshotSummary == null) { + throw new ImpalaRuntimeException("Invalid Iceberg snapshot summary"); + } + try { + String totalDataFilesProp = snapshotSummary.get( + SnapshotSummary.TOTAL_DATA_FILES_PROP); + String totalDeleteFilesProp = snapshotSummary.getOrDefault( + SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0"); + long totalDataFiles = Long.parseLong(totalDataFilesProp); + long totalDeleteFiles = Long.parseLong(totalDeleteFilesProp); + return totalDataFiles + totalDeleteFiles; + } catch (NumberFormatException e) { + throw new ImpalaRuntimeException("Invalid Iceberg snapshot summary value"); + } + } } } diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 8ef7793a5..d789e12c5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -465,7 +465,8 @@ public class IcebergTable extends Table implements FeIcebergTable { /** * Load schema and partitioning schemes directly from Iceberg. */ - public void loadSchemaFromIceberg() throws TableLoadingException { + public void loadSchemaFromIceberg() + throws TableLoadingException, ImpalaRuntimeException { loadSchema(); addVirtualColumns(); partitionSpecs_ = Utils.loadPartitionSpecByIceberg(this); diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java index e9806ac73..4edd9271a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java @@ -90,7 +90,7 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable public IcebergCtasTarget(FeDb db, org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs, IcebergPartitionSpec partSpec) - throws CatalogException { + throws CatalogException, ImpalaRuntimeException { super(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); createFsTable(db, msTbl); createIcebergSchema(columnDefs); @@ -125,11 +125,11 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable } private void createPartitionSpec(IcebergPartitionSpec partSpec) - throws CatalogException { + throws CatalogException, ImpalaRuntimeException { Preconditions.checkState(iceSchema_ != null); PartitionSpec iceSpec = null; try { - // Let's create an Iceberg PartitionSpec with the help of Icebeg from 'partSpec', + // Let's create an Iceberg PartitionSpec with the help of Iceberg from 'partSpec', // then convert it back to an IcebergPartitionSpec. if (partSpec == null) { iceSpec = PartitionSpec.unpartitioned(); diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java index 755055a7e..5fbaa7ca4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java @@ -36,6 +36,7 @@ import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.IcebergContentFileStore; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.local.MetaProvider.TableMetaRef; +import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.thrift.TCompressionCodec; import org.apache.impala.thrift.THdfsPartition; import org.apache.impala.thrift.THdfsTable; @@ -138,7 +139,7 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable { private LocalIcebergTable(LocalDb db, Table msTable, MetaProvider.TableMetaRef ref, LocalFsTable fsTable, ColumnMap cmap, TPartialTableInfo tableInfo, TableParams tableParams, org.apache.iceberg.Table icebergApiTable) - throws TableLoadingException { + throws ImpalaRuntimeException { super(db, msTable, ref, cmap); Preconditions.checkNotNull(tableInfo); diff --git a/fe/src/main/java/org/apache/impala/common/IcebergPartitionPredicateConverter.java b/fe/src/main/java/org/apache/impala/common/IcebergPartitionPredicateConverter.java new file mode 100644 index 000000000..1a7a3cf93 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/common/IcebergPartitionPredicateConverter.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.common; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.UnboundTerm; +import org.apache.iceberg.transforms.Transforms; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.IcebergPartitionExpr; +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.IcebergColumn; +import org.apache.impala.thrift.TIcebergPartitionTransformType; + +public class IcebergPartitionPredicateConverter extends IcebergPredicateConverter { + public IcebergPartitionPredicateConverter(Schema schema, Analyzer analyzer) { + super(schema, analyzer); + } + + @Override + protected Term getTerm(Expr expr) throws ImpalaRuntimeException { + if(!(expr instanceof IcebergPartitionExpr)) { + throw new ImpalaRuntimeException("Unsupported expression type: " + expr); + } + IcebergPartitionExpr partitionExpr = (IcebergPartitionExpr) expr; + Column column = getColumnFromSlotRef(partitionExpr.getSlotRef()); + if(!(column instanceof IcebergColumn)){ + throw new ImpalaRuntimeException( + String.format("Invalid column type %s for column: %s", + column.getType(), column)); + } + IcebergColumn icebergColumn = (IcebergColumn) column; + if (partitionExpr.getTransform().getTransformType().equals( + TIcebergPartitionTransformType.IDENTITY)) { + return new Term(Expressions.ref(column.getName()),icebergColumn); + } + return new Term((UnboundTerm<Object>) Expressions.transform(column.getName(), + Transforms.fromString(partitionExpr.getTransform().toSql())), icebergColumn); + } +} diff --git a/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java b/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java new file mode 100644 index 000000000..124d76a31 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/common/IcebergPredicateConverter.java @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.common; + +import com.google.common.base.Preconditions; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expression.Operation; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.expressions.UnboundTerm; +import org.apache.iceberg.types.Types; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.BinaryPredicate; +import org.apache.impala.analysis.BoolLiteral; +import org.apache.impala.analysis.CompoundPredicate; +import org.apache.impala.analysis.DateLiteral; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.InPredicate; +import org.apache.impala.analysis.IsNullPredicate; +import org.apache.impala.analysis.LiteralExpr; +import org.apache.impala.analysis.NumericLiteral; +import org.apache.impala.analysis.SlotDescriptor; +import org.apache.impala.analysis.SlotRef; +import org.apache.impala.analysis.StringLiteral; +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.IcebergColumn; +import org.apache.impala.catalog.PrimitiveType; +import org.apache.impala.catalog.Type; +import org.apache.impala.util.ExprUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergPredicateConverter { + private static final Logger LOG = + LoggerFactory.getLogger(IcebergPredicateConverter.class); + private final Schema schema_; + private final Analyzer analyzer_; + + public IcebergPredicateConverter(Schema schema, Analyzer analyzer) { + this.schema_ = schema; + this.analyzer_ = analyzer; + } + + public Expression convert(Expr expr) throws ImpalaRuntimeException { + if (expr instanceof BinaryPredicate) { + return convert((BinaryPredicate) expr); + } else if (expr instanceof InPredicate) { + return convert((InPredicate) expr); + } else if (expr instanceof IsNullPredicate) { + return convert((IsNullPredicate) expr); + } else if (expr instanceof CompoundPredicate) { + return convert((CompoundPredicate) expr); + } else { + throw new ImpalaRuntimeException(String.format( + "Unsupported expression: %s", expr.toSql())); + } + } + + protected Expression convert(BinaryPredicate predicate) throws ImpalaRuntimeException { + Term term = getTerm(predicate.getChild(0)); + IcebergColumn column = term.referencedColumn_; + + LiteralExpr literal = getSecondChildAsLiteralExpr(predicate); + checkNullLiteral(literal); + Operation op = getOperation(predicate); + Object value = getIcebergValue(column, literal); + + List<Object> literals = Collections.singletonList(value); + return Expressions.predicate(op, term.term_, literals); + } + + protected UnboundPredicate<Object> convert(InPredicate predicate) + throws ImpalaRuntimeException { + Term term = getTerm(predicate.getChild(0)); + IcebergColumn column = term.referencedColumn_; + // Expressions takes a list of values as Objects + List<Object> values = new ArrayList<>(); + for (int i = 1; i < predicate.getChildren().size(); ++i) { + if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) { + throw new ImpalaRuntimeException( + String.format("Expression is not a literal: %s", + predicate.getChild(i))); + } + LiteralExpr literal = (LiteralExpr) predicate.getChild(i); + checkNullLiteral(literal); + Object value = getIcebergValue(column, literal); + values.add(value); + } + + // According to the method: + // 'org.apache.iceberg.expressions.InclusiveMetricsEvaluator.MetricsEvalVisitor#notIn' + // Expressions.notIn only works when the push-down column is the partition column + if (predicate.isNotIn()) { + return Expressions.notIn(term.term_, values); + } else { + return Expressions.in(term.term_, values); + } + } + + protected UnboundPredicate<Object> convert(IsNullPredicate predicate) + throws ImpalaRuntimeException { + Term term = getTerm(predicate.getChild(0)); + if (predicate.isNotNull()) { + return Expressions.notNull(term.term_); + } else { + return Expressions.isNull(term.term_); + } + } + + protected Expression convert(CompoundPredicate predicate) + throws ImpalaRuntimeException { + Operation op = getOperation(predicate); + + Expr leftExpr = predicate.getChild(0); + Expression left = convert(leftExpr); + + if (op.equals(Operation.NOT)) { + return Expressions.not(left); + } + + Expr rightExpr = predicate.getChild(1); + Expression right = convert(rightExpr); + + return op.equals(Operation.AND) ? Expressions.and(left, right) : + Expressions.or(left, right); + } + + protected void checkNullLiteral(LiteralExpr literal) throws ImpalaRuntimeException { + if (Expr.IS_NULL_LITERAL.apply(literal)) { + throw new ImpalaRuntimeException("Expression can't be NULL literal: " + literal); + } + } + + protected Object getIcebergValue(IcebergColumn column, LiteralExpr literal) + throws ImpalaRuntimeException { + PrimitiveType primitiveType = literal.getType().getPrimitiveType(); + switch (primitiveType) { + case BOOLEAN: return ((BoolLiteral) literal).getValue(); + case TINYINT: + case SMALLINT: + case INT: return ((NumericLiteral) literal).getIntValue(); + case BIGINT: return ((NumericLiteral) literal).getLongValue(); + case FLOAT: return (float) ((NumericLiteral) literal).getDoubleValue(); + case DOUBLE: return ((NumericLiteral) literal).getDoubleValue(); + case STRING: + case DATETIME: + case CHAR: return ((StringLiteral) literal).getUnescapedValue(); + case TIMESTAMP: return getIcebergTsValue(literal, column, schema_); + case DATE: return ((DateLiteral) literal).getValue(); + case DECIMAL: return getIcebergDecimalValue(column, (NumericLiteral) literal); + default: { + throw new ImpalaRuntimeException( + String.format("Unable to parse Iceberg value '%s' for type %s", + literal.getStringValue(), primitiveType)); + } + } + } + + /** + * Returns Iceberg operator by BinaryPredicate operator, or null if the operation is not + * supported by Iceberg. + */ + protected Operation getIcebergOperator(BinaryPredicate.Operator op) + throws ImpalaRuntimeException { + switch (op) { + case EQ: return Operation.EQ; + case NE: return Operation.NOT_EQ; + case LE: return Operation.LT_EQ; + case GE: return Operation.GT_EQ; + case LT: return Operation.LT; + case GT: return Operation.GT; + default: + throw new ImpalaRuntimeException( + String.format("Unsupported Impala operator: %s", op.getName())); + } + } + + /** + * Returns Iceberg operator by CompoundPredicate operator, or null if the operation is + * not supported by Iceberg. + */ + protected Operation getIcebergOperator(CompoundPredicate.Operator op) + throws ImpalaRuntimeException { + switch (op) { + case AND: return Operation.AND; + case OR: return Operation.OR; + case NOT: return Operation.NOT; + default: + throw new ImpalaRuntimeException( + String.format("Unsupported Impala operator: %s", op)); + } + } + + protected BigDecimal getIcebergDecimalValue(IcebergColumn column, + NumericLiteral literal) throws ImpalaRuntimeException { + Type colType = column.getType(); + int scale = colType.getDecimalDigits(); + BigDecimal literalValue = literal.getValue(); + + if (literalValue.scale() > scale) { + throw new ImpalaRuntimeException( + String.format("Invalid scale %d for type: %s", literalValue.scale(), + colType.toSql())); + } + // Iceberg DecimalLiteral needs to have the exact same scale. + if (literalValue.scale() < scale) { + return literalValue.setScale(scale); + } + return literalValue; + } + + protected Long getIcebergTsValue(LiteralExpr literal, IcebergColumn column, + Schema iceSchema) throws ImpalaRuntimeException { + try { + org.apache.iceberg.types.Type iceType = iceSchema.findType(column.getFieldId()); + Preconditions.checkState(iceType instanceof Types.TimestampType); + Types.TimestampType tsType = (Types.TimestampType) iceType; + if (tsType.shouldAdjustToUTC()) { + return ExprUtil.localTimestampToUnixTimeMicros(analyzer_, literal); + } else { + return ExprUtil.utcTimestampToUnixTimeMicros(analyzer_, literal); + } + } catch (InternalException ex) { + // We cannot interpret the timestamp literal. Maybe the timestamp is invalid, + // or the local timestamp ambiguously converts to UTC due to daylight saving + // time backward turn. E.g. '2021-10-31 02:15:00 Europe/Budapest' converts to + // either '2021-10-31 00:15:00 UTC' or '2021-10-31 01:15:00 UTC'. + LOG.warn("Exception occurred during timestamp conversion: %s" + + "\nThis means timestamp predicate is not pushed to Iceberg, let Impala " + + "backend handle it.", ex); + } catch (AnalysisException ignored) {} + throw new ImpalaRuntimeException( + String.format("Unable to parse timestamp value from: %s", + literal.getStringValue())); + } + + protected Column getColumnFromSlotRef(SlotRef slotRef) throws ImpalaRuntimeException { + SlotDescriptor desc = slotRef.getDesc(); + // If predicate contains map/struct, this column would be null + Column column = desc.getColumn(); + if (column == null) { + throw new ImpalaRuntimeException( + "Expressions with complex types can't be converted to Iceberg expressions: " + + slotRef); + } + return column; + } + + + protected LiteralExpr getSecondChildAsLiteralExpr(Expr expr) + throws ImpalaRuntimeException { + if (!(expr.getChild(1) instanceof LiteralExpr)) { + throw new ImpalaRuntimeException( + String.format("Invalid child expression: %s", expr)); + } + return (LiteralExpr) expr.getChild(1); + } + + protected Operation getOperation(Expr expr) throws ImpalaRuntimeException { + Operation op; + if (expr instanceof BinaryPredicate) { + op = getIcebergOperator(((BinaryPredicate) expr).getOp()); + } else if (expr instanceof CompoundPredicate) { + op = getIcebergOperator(((CompoundPredicate) expr).getOp()); + } else { + throw new ImpalaRuntimeException( + String.format("Invalid expression type: %s", expr.getType())); + } + return op; + } + + protected Term getTerm(Expr expr) throws ImpalaRuntimeException { + if(!(expr instanceof SlotRef)){ + throw new ImpalaRuntimeException( + String.format("Unable to create term from expression: %s", expr.toSql())); + } + Column column = getColumnFromSlotRef((SlotRef) expr); + if(!(column instanceof IcebergColumn)){ + throw new ImpalaRuntimeException( + String.format("Invalid column type %s for column: %s", column.getType(), + column)); + } + + return new Term(Expressions.ref(column.getName()), (IcebergColumn) column); + } + + public static class Term { + public final UnboundTerm<Object> term_; + public final IcebergColumn referencedColumn_; + + public Term(UnboundTerm<Object> term, IcebergColumn referencedColumn){ + term_ = term; + referencedColumn_ = referencedColumn; + } + } +} diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java index 7baccb2ac..7863d8b67 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java @@ -17,8 +17,8 @@ package org.apache.impala.planner; +import com.google.common.base.Preconditions; import java.io.IOException; -import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -32,47 +32,38 @@ import java.util.Set; import java.util.TreeSet; import java.util.function.Predicate; import java.util.stream.Collectors; - import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ExpressionUtil; import org.apache.iceberg.expressions.ExpressionVisitors; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Expression.Operation; import org.apache.iceberg.expressions.True; -import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Types; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BinaryPredicate; -import org.apache.impala.analysis.CompoundPredicate; +import org.apache.impala.analysis.BinaryPredicate.Operator; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.IcebergExpressionCollector; import org.apache.impala.analysis.InPredicate; import org.apache.impala.analysis.IsNullPredicate; import org.apache.impala.analysis.JoinOperator; -import org.apache.impala.analysis.LiteralExpr; -import org.apache.impala.catalog.Column; -import org.apache.impala.catalog.ColumnStats; -import org.apache.impala.analysis.BoolLiteral; -import org.apache.impala.analysis.DateLiteral; import org.apache.impala.analysis.MultiAggregateInfo; -import org.apache.impala.analysis.NumericLiteral; import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotId; import org.apache.impala.analysis.SlotRef; -import org.apache.impala.analysis.StringLiteral; import org.apache.impala.analysis.TableRef; import org.apache.impala.analysis.TimeTravelSpec; import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleId; -import org.apache.impala.analysis.BinaryPredicate.Operator; +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.ColumnStats; import org.apache.impala.catalog.FeIcebergTable; +import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.IcebergColumn; import org.apache.impala.catalog.IcebergContentFileStore; import org.apache.impala.catalog.IcebergPositionDeleteTable; @@ -80,23 +71,18 @@ import org.apache.impala.catalog.IcebergTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.VirtualColumn; -import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.IcebergPredicateConverter; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; -import org.apache.impala.common.InternalException; import org.apache.impala.common.Pair; import org.apache.impala.planner.JoinNode.DistributionMode; import org.apache.impala.thrift.TColumnStats; import org.apache.impala.thrift.TIcebergPartitionTransformType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TVirtualColumnType; -import org.apache.impala.util.ExprUtil; import org.apache.impala.util.IcebergUtil; - -import com.google.common.base.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -220,7 +206,7 @@ public class IcebergScanPlanner { nonIdentityConjuncts_, getSkippedConjuncts()); dataScanNode.init(analyzer_); List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map( - entry -> new SlotRef(entry)).collect(Collectors.toList()); + SlotRef::new).collect(Collectors.toList()); UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tblRef_.getId(), outputExprs, false); unionNode.addChild(dataScanNode, outputExprs); @@ -404,8 +390,8 @@ public class IcebergScanPlanner { } if (dataFilesCacheMisses > 0) { Preconditions.checkState(timeTravelSpec != null); - LOG.info("File descriptors had to be loaded on demand during time travel: " + - String.valueOf(dataFilesCacheMisses)); + LOG.info("File descriptors had to be loaded on demand during time travel: {}", + dataFilesCacheMisses); } } catch (IOException | TableLoadingException e) { throw new ImpalaRuntimeException(String.format( @@ -506,7 +492,7 @@ public class IcebergScanPlanner { if (tblRef_.getTimeTravelSpec() == null) { // We should always find the data files in the cache when not doing time travel. throw new ImpalaRuntimeException("Cannot find file in cache: " + cf.path() - + " with snapshot id: " + String.valueOf(getIceTable().snapshotId())); + + " with snapshot id: " + getIceTable().snapshotId()); } // We can still find the file descriptor among the old file descriptors. fileDesc = fileStore.getOldFileDescriptor(pathHash); @@ -626,248 +612,21 @@ public class IcebergScanPlanner { } /** - * Returns Iceberg operator by BinaryPredicate operator, or null if the operation - * is not supported by Iceberg. - */ - private Operation getIcebergOperator(BinaryPredicate.Operator op) { - switch (op) { - case EQ: return Operation.EQ; - case NE: return Operation.NOT_EQ; - case LE: return Operation.LT_EQ; - case GE: return Operation.GT_EQ; - case LT: return Operation.LT; - case GT: return Operation.GT; - default: return null; - } - } - - /** - * Returns Iceberg operator by CompoundPredicate operator, or null if the operation - * is not supported by Iceberg. - */ - private Operation getIcebergOperator(CompoundPredicate.Operator op) { - switch (op) { - case AND: return Operation.AND; - case OR: return Operation.OR; - case NOT: return Operation.NOT; - default: return null; - } - } - - /** - * Transform impala predicate to iceberg predicate + * Transform Impala predicate to Iceberg predicate */ - private boolean tryConvertIcebergPredicate(Expr expr) - throws ImpalaException { - Expression predicate = convertIcebergPredicate(expr); - if (predicate != null) { + private boolean tryConvertIcebergPredicate(Expr expr) { + IcebergPredicateConverter converter = + new IcebergPredicateConverter(getIceTable().getIcebergSchema(), analyzer_); + try { + Expression predicate = converter.convert(expr); impalaIcebergPredicateMapping_.put(predicate, expr); - LOG.debug("Push down the predicate: " + predicate + " to iceberg"); + LOG.debug("Push down the predicate: {} to iceberg", predicate); return true; } - untranslatedExpressions_.add(expr); - return false; - } - - private Expression convertIcebergPredicate(Expr expr) - throws ImpalaException { - if (expr instanceof BinaryPredicate) { - return convertIcebergPredicate((BinaryPredicate) expr); - } else if (expr instanceof InPredicate) { - return convertIcebergPredicate((InPredicate) expr); - } else if (expr instanceof IsNullPredicate) { - return convertIcebergPredicate((IsNullPredicate) expr); - } else if (expr instanceof CompoundPredicate) { - return convertIcebergPredicate((CompoundPredicate) expr); - } else { - return null; - } - } - - private UnboundPredicate<Object> convertIcebergPredicate(BinaryPredicate predicate) - throws ImpalaException { - Operation op = getIcebergOperator(predicate.getOp()); - if (op == null) { - return null; - } - - // Do not convert if there is an implicit cast - if (!(predicate.getChild(0) instanceof SlotRef)) { - return null; - } - SlotRef ref = (SlotRef) predicate.getChild(0); - - if (!(predicate.getChild(1) instanceof LiteralExpr)) { - return null; - } - LiteralExpr literal = (LiteralExpr) predicate.getChild(1); - - // If predicate contains map/struct, this column would be null - Column col = ref.getDesc().getColumn(); - if (col == null) { - return null; - } - - // Cannot push BinaryPredicate with null literal values - if (Expr.IS_NULL_LITERAL.apply(literal)) { - return null; - } - - Object value = getIcebergValue(ref, literal); - if (value == null) { - return null; - } - - return Expressions.predicate(op, col.getName(), value); - } - - private UnboundPredicate<Object> convertIcebergPredicate(InPredicate predicate) - throws ImpalaException { - // Do not convert if there is an implicit cast - if (!(predicate.getChild(0) instanceof SlotRef)) { - return null; - } - SlotRef ref = (SlotRef) predicate.getChild(0); - - // If predicate contains map/struct, this column would be null - Column col = ref.getDesc().getColumn(); - if (col == null) { - return null; - } - - // Expressions takes a list of values as Objects - List<Object> values = new ArrayList<>(); - for (int i = 1; i < predicate.getChildren().size(); ++i) { - if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) { - return null; - } - LiteralExpr literal = (LiteralExpr) predicate.getChild(i); - - // Cannot push IN or NOT_IN predicate with null literal values - if (Expr.IS_NULL_LITERAL.apply(literal)) { - return null; - } - - Object value = getIcebergValue(ref, literal); - if (value == null) { - return null; - } - - values.add(value); - } - - // According to the method: - // 'org.apache.iceberg.expressions.InclusiveMetricsEvaluator.MetricsEvalVisitor#notIn' - // Expressions.notIn only works when the push-down column is the partition column - if (predicate.isNotIn()) - return Expressions.notIn(col.getName(), values); - else { - return Expressions.in(col.getName(), values); - } - } - - private UnboundPredicate<Object> convertIcebergPredicate(IsNullPredicate predicate) { - // Do not convert if there is an implicit cast - if (!(predicate.getChild(0) instanceof SlotRef)) { - return null; - } - SlotRef ref = (SlotRef) predicate.getChild(0); - - // If predicate contains map/struct, this column would be null - Column col = ref.getDesc().getColumn(); - if (col == null) { - return null; - } - - if (predicate.isNotNull()) { - return Expressions.notNull(col.getName()); - } else{ - return Expressions.isNull(col.getName()); - } - } - - private Expression convertIcebergPredicate(CompoundPredicate predicate) - throws ImpalaException { - Operation op = getIcebergOperator(predicate.getOp()); - if (op == null) { - return null; - } - - Expression left = convertIcebergPredicate(predicate.getChild(0)); - if (left == null) { - return null; - } - if (op.equals(Operation.NOT)) { - return Expressions.not(left); - } - - Expression right = convertIcebergPredicate(predicate.getChild(1)); - if (right == null) { - return null; - } - return op.equals(Operation.AND) ? Expressions.and(left, right) - : Expressions.or(left, right); - } - - private Object getIcebergValue(SlotRef ref, LiteralExpr literal) - throws ImpalaException { - IcebergColumn iceCol = (IcebergColumn) ref.getDesc().getColumn(); - Schema iceSchema = getIceTable().getIcebergSchema(); - switch (literal.getType().getPrimitiveType()) { - case BOOLEAN: return ((BoolLiteral) literal).getValue(); - case TINYINT: - case SMALLINT: - case INT: return ((NumericLiteral) literal).getIntValue(); - case BIGINT: return ((NumericLiteral) literal).getLongValue(); - case FLOAT: return (float) ((NumericLiteral) literal).getDoubleValue(); - case DOUBLE: return ((NumericLiteral) literal).getDoubleValue(); - case STRING: - case DATETIME: - case CHAR: return ((StringLiteral) literal).getUnescapedValue(); - case TIMESTAMP: return getIcebergTsValue(literal, iceCol, iceSchema); - case DATE: return ((DateLiteral) literal).getValue(); - case DECIMAL: return getIcebergDecimalValue(ref, (NumericLiteral) literal); - default: { - Preconditions.checkState(false, - "Unsupported iceberg type considered for predicate: %s", - literal.getType().toSql()); - } - } - return null; - } - - private BigDecimal getIcebergDecimalValue(SlotRef ref, NumericLiteral literal) { - Type colType = ref.getDesc().getColumn().getType(); - int scale = colType.getDecimalDigits(); - BigDecimal literalValue = literal.getValue(); - - if (literalValue.scale() > scale) return null; - // Iceberg DecimalLiteral needs to have the exact same scale. - if (literalValue.scale() < scale) return literalValue.setScale(scale); - return literalValue; - } - - private Object getIcebergTsValue(LiteralExpr literal, - IcebergColumn iceCol, Schema iceSchema) throws AnalysisException { - try { - org.apache.iceberg.types.Type iceType = iceSchema.findType(iceCol.getFieldId()); - Preconditions.checkState(iceType instanceof Types.TimestampType); - Types.TimestampType tsType = (Types.TimestampType) iceType; - if (tsType.shouldAdjustToUTC()) { - return ExprUtil.localTimestampToUnixTimeMicros(analyzer_, literal); - } else { - return ExprUtil.utcTimestampToUnixTimeMicros(analyzer_, literal); - } - } catch (InternalException ex) { - // We cannot interpret the timestamp literal. Maybe the timestamp is invalid, - // or the local timestamp ambigously converts to UTC due to daylight saving - // time backward turn. E.g. '2021-10-31 02:15:00 Europe/Budapest' converts to - // either '2021-10-31 00:15:00 UTC' or '2021-10-31 01:15:00 UTC'. - LOG.warn("Exception occurred during timestamp conversion: " + ex.toString() + - "\nThis means timestamp predicate is not pushed to Iceberg, let Impala " + - "backend handle it."); + catch (ImpalaException e) { + untranslatedExpressions_.add(expr); + return false; } - return null; } } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 170e1d94f..f89bfd40f 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -1395,7 +1395,8 @@ public class CatalogOpExecutor { || type == TAlterTableType.ALTER_COLUMN || type == TAlterTableType.SET_PARTITION_SPEC || type == TAlterTableType.SET_TBL_PROPERTIES - || type == TAlterTableType.UNSET_TBL_PROPERTIES; + || type == TAlterTableType.UNSET_TBL_PROPERTIES + || type == TAlterTableType.DROP_PARTITION; } /** @@ -1472,6 +1473,14 @@ public class CatalogOpExecutor { needsToUpdateHms |= !unsetIcebergTblProperties(tbl, params, iceTxn); addSummary(response, "Updated table."); break; + case DROP_PARTITION: + // Metadata change only + needsToUpdateHms = false; + long droppedPartitions = IcebergCatalogOpExecutor.alterTableDropPartition( + iceTxn, params.getDrop_partition_params()); + addSummary( + response, String.format("Dropped %d partition(s)", droppedPartitions)); + break; case REPLACE_COLUMNS: // It doesn't make sense to replace all the columns of an Iceberg table as it // would basically make all existing data inaccessible. diff --git a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java index 9f42d5547..888df7b65 100644 --- a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java +++ b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java @@ -33,7 +33,7 @@ import org.apache.impala.catalog.IcebergColumn; import org.apache.impala.catalog.KuduColumn; import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; -import org.apache.impala.catalog.TableLoadingException; +import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.compat.MetastoreShim; import org.apache.impala.thrift.TColumnValue; import org.apache.impala.thrift.TDescribeOutputStyle; @@ -198,8 +198,8 @@ public class DescribeResultFactory { * Hive's MetadataFormatUtils class is used to build the results. filteredColumns is a * list of columns the user is authorized to view. */ - public static TDescribeResult buildDescribeFormattedResult(FeTable table, - List<Column> filteredColumns) throws TableLoadingException { + public static TDescribeResult buildDescribeFormattedResult( + FeTable table, List<Column> filteredColumns) throws ImpalaRuntimeException { TDescribeResult result = new TDescribeResult(); result.results = Lists.newArrayList(); diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index 786e5ff2a..f98289311 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -35,7 +35,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; -import org.apache.iceberg.SnapshotManager; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -57,6 +56,7 @@ import org.apache.impala.catalog.iceberg.IcebergHiveCatalog; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.fb.FbIcebergColumnStats; import org.apache.impala.fb.FbIcebergDataFile; +import org.apache.impala.thrift.TAlterTableDropPartitionParams; import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams; import org.apache.impala.thrift.TAlterTableExecuteRollbackParams; import org.apache.impala.thrift.TColumn; @@ -239,10 +239,26 @@ public class IcebergCatalogOpExecutor { } /** - * Drops a column from a Iceberg table. + * Deletes files related to specific set of partitions */ - public static void dropColumn(Transaction txn, String colName) - throws TableLoadingException, ImpalaRuntimeException { + public static long alterTableDropPartition( + Transaction iceTxn, TAlterTableDropPartitionParams params) { + DeleteFiles deleteFiles = iceTxn.newDelete(); + if (params.iceberg_drop_partition_request.is_truncate) { + deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue()); + } else { + for (String path : params.iceberg_drop_partition_request.paths) { + deleteFiles.deleteFile(path); + } + } + deleteFiles.commit(); + return params.iceberg_drop_partition_request.num_partitions; + } + + /** + * Drops a column from an Iceberg table. + */ + public static void dropColumn(Transaction txn, String colName) { UpdateSchema schema = txn.updateSchema(); schema.deleteColumn(colName); schema.commit(); diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java index 71c200dab..f7092d2fb 100644 --- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java +++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java @@ -27,6 +27,7 @@ import com.google.common.primitives.Longs; import com.google.flatbuffers.FlatBufferBuilder; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.DateTimeException; import java.time.Instant; import java.time.LocalDateTime; import java.time.OffsetDateTime; @@ -56,6 +57,7 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.mr.Catalogs; @@ -101,8 +103,7 @@ import org.apache.impala.thrift.TIcebergPartitionTransformType; @SuppressWarnings("UnstableApiUsage") public class IcebergUtil { - - private static final int ICEBERG_EPOCH_YEAR = 1970; + public static final int ICEBERG_EPOCH_YEAR = 1970; private static final int ICEBERG_EPOCH_MONTH = 1; @SuppressWarnings("unused") private static final int ICEBERG_EPOCH_DAY = 1; @@ -390,28 +391,27 @@ public class IcebergUtil { return pSize; } - public static IcebergPartitionTransform getPartitionTransform( - PartitionField field, HashMap<String, Integer> transformParams) - throws TableLoadingException { + public static IcebergPartitionTransform getPartitionTransform(PartitionField field, + Map<String, Integer> transformParams) throws ImpalaRuntimeException { String type = field.transform().toString(); String transformMappingKey = getPartitionTransformMappingKey(field.sourceId(), getPartitionTransformType(type)); return getPartitionTransform(type, transformParams.get(transformMappingKey)); } - public static IcebergPartitionTransform getPartitionTransform(String transformType, - Integer transformParam) throws TableLoadingException { + public static IcebergPartitionTransform getPartitionTransform( + String transformType, Integer transformParam) throws ImpalaRuntimeException { return new IcebergPartitionTransform(getPartitionTransformType(transformType), transformParam); } public static IcebergPartitionTransform getPartitionTransform(String transformType) - throws TableLoadingException { + throws ImpalaRuntimeException { return getPartitionTransform(transformType, null); } - private static TIcebergPartitionTransformType getPartitionTransformType( - String transformType) throws TableLoadingException { + public static TIcebergPartitionTransformType getPartitionTransformType( + String transformType) throws ImpalaRuntimeException { Preconditions.checkNotNull(transformType); transformType = transformType.toUpperCase(); if ("IDENTITY".equals(transformType)) { @@ -428,8 +428,8 @@ public class IcebergUtil { case "YEAR": case "YEARS": return TIcebergPartitionTransformType.YEAR; case "VOID": return TIcebergPartitionTransformType.VOID; default: - throw new TableLoadingException("Unsupported iceberg partition type: " + - transformType); + throw new ImpalaRuntimeException( + "Unsupported Iceberg partition type: " + transformType); } } @@ -449,7 +449,7 @@ public class IcebergUtil { * expose the interface of the transform types outside their package and the only * way to get the transform's parameter is implementing this visitor class. */ - public static HashMap<String, Integer> getPartitionTransformParams(PartitionSpec spec) { + public static Map<String, Integer> getPartitionTransformParams(PartitionSpec spec) { List<Pair<String, Integer>> transformParams = PartitionSpecVisitor.visit( spec, new PartitionSpecVisitor<Pair<String, Integer>>() { @Override @@ -745,6 +745,39 @@ public class IcebergUtil { throw new ImpalaRuntimeException("Unexpected partition transform: " + transformType); } + /** + * Returns the integer representation of date transforms + * @param transformType type of the transform + * @param stringValue date value as a string + * @return Integer representation of a transform value, or an empty optional if the + * parse failed, or the supplied transform is not supported. + */ + public static Integer getDateTimeTransformValue( + TIcebergPartitionTransformType transformType, String stringValue) + throws ImpalaRuntimeException { + try { + switch (transformType) { + case YEAR: return parseYearToTransformYear(stringValue); + case MONTH: return parseMonthToTransformMonth(stringValue); + case DAY: return parseDayToTransformMonth(stringValue); + case HOUR: return parseHourToTransformHour(stringValue); + } + } catch (NumberFormatException | DateTimeException | IllegalStateException e) { + throw new ImpalaRuntimeException( + String.format("Unable to parse value '%s' as '%s' transform value", stringValue, + transformType)); + } + throw new ImpalaRuntimeException("Unexpected partition transform: " + transformType); + } + + public static boolean isDateTimeTransformType( + TIcebergPartitionTransformType transformType) { + return transformType.equals(TIcebergPartitionTransformType.YEAR) + || transformType.equals(TIcebergPartitionTransformType.MONTH) + || transformType.equals(TIcebergPartitionTransformType.DAY) + || transformType.equals(TIcebergPartitionTransformType.HOUR); + } + /** * In the partition path years are represented naturally, e.g. 1984. However, we need * to convert it to an integer which represents the years from 1970. So, for 1984 the @@ -769,13 +802,23 @@ public class IcebergUtil { return years * 12 + months; } + /** + * In the partition path days are represented as <year>-<month>-<day>, e.g. 2023-12-12. + * This functions converts this string to an integer which represents the days from + * '1970-01-01' with the help of Iceberg's type converter. + */ + private static Integer parseDayToTransformMonth(String monthStr) { + Literal<Integer> days = Literal.of(monthStr).to(Types.DateType.get()); + return days.value(); + } + /** * In the partition path hours are represented as <year>-<month>-<day>-<hour>, e.g. * 1970-01-01-01. We need to convert it to a single integer which represents the hours * from '1970-01-01 00:00:00'. */ private static Integer parseHourToTransformHour(String hourStr) { - final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + final OffsetDateTime epoch = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); String[] parts = hourStr.split("-", -1); Preconditions.checkState(parts.length == 4); int year = Integer.parseInt(parts[0]); @@ -785,7 +828,7 @@ public class IcebergUtil { OffsetDateTime datetime = OffsetDateTime.of( LocalDateTime.of(year, month, day, hour, /*minute=*/0), ZoneOffset.UTC); - return (int)ChronoUnit.HOURS.between(EPOCH, datetime); + return (int) ChronoUnit.HOURS.between(epoch, datetime); } public static TCompressionCodec parseParquetCompressionCodec( diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java old mode 100755 new mode 100644 index 28e88a1a1..88f9dbf09 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -325,6 +325,55 @@ public class AnalyzeDDLTest extends FrontendTestBase { AnalysisError("alter table functional.alltypes add " + "partition(year=2050, month=10) location ' '", "URI path cannot be empty."); + + // Iceberg DROP PARTITION + String partitioned = + "alter table functional_parquet.iceberg_partitioned drop partition"; + String evolution = + "alter table functional_parquet.iceberg_partition_evolution drop partition"; + String nonPartitioned = + "alter table functional_parquet.iceberg_non_partitioned drop partition"; + + AnalysisError(nonPartitioned + "(user = 'Alan')", + "Table is not partitioned: functional_parquet.iceberg_non_partitioned"); + AnalysisError(partitioned + "(action = 'Foo')", + "No matching partition(s) found"); + AnalysisError(partitioned + "(user = 'Alan')", + "Partition exprs cannot contain non-partition column(s): `user`"); + AnalysisError(partitioned + "(user = 'Alan' or user = 'Lisa' and id > 10)", + "Partition exprs cannot contain non-partition column(s): `user`"); + AnalysisError(partitioned + "(void(action) = 'click')", + "VOID transform is not supported for partition selection"); + AnalysisError(partitioned + "(day(action) = 'Alan')", + "Can't filter column 'action' with transform type: 'DAY'"); + AnalysisError(partitioned + "(action = action)", + "Invalid partition filtering expression: action = action"); + AnalysisError(partitioned + "(action = action and action = 'click' " + + "or hour(event_time) > '2020-01-01-01')", + "Invalid partition filtering expression: " + + "action = action AND action = 'click' OR HOUR(event_time) > 438289"); + AnalysisError( + partitioned + "(action)", "Invalid partition filtering expression: action"); + AnalysisError(partitioned + "(2)", "Invalid partition filtering expression: 2"); + AnalysisError(partitioned + "(truncate(action))", + "BUCKET and TRUNCATE partition transforms should have a parameter"); + AnalysisError(partitioned + "(truncate('string', action))", + "Invalid transform parameter value: string"); + AnalysisError(partitioned + "(truncate(1, 2, action))", + "Invalid partition predicate: truncate(1, 2, action)"); + AnalysisError(partitioned + " (action = 'click') purge", + "Partition purge is not supported for Iceberg tables"); + + AnalyzesOk(partitioned + "(hour(event_time) > '2020-01-01-01')"); + AnalyzesOk(partitioned + "(hour(event_time) < '2020-02-01-01')"); + AnalyzesOk(partitioned + "(hour(event_time) = '2020-01-01-9')"); + AnalyzesOk(partitioned + "(hour(event_time) = '2020-01-01-9', action = 'click')"); + AnalyzesOk(partitioned + "(action = 'click')"); + AnalyzesOk(partitioned + "(action = 'click' or action = 'download')"); + AnalyzesOk(partitioned + "(action in ('click', 'download'))"); + AnalyzesOk(partitioned + "(hour(event_time) in ('2020-01-01-9', '2020-01-01-1'))"); + AnalyzesOk(evolution + "(truncate(4,date_string_col,4) = '1231')"); + AnalyzesOk(evolution + "(month = 12)"); } @Test diff --git a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java index 3a3d0031c..3482c7d41 100644 --- a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java +++ b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java @@ -23,6 +23,7 @@ import static org.apache.impala.thrift.TIcebergCatalog.CATALOGS; import static org.apache.impala.thrift.TIcebergCatalog.HADOOP_CATALOG; import static org.apache.impala.thrift.TIcebergCatalog.HADOOP_TABLES; import static org.apache.impala.thrift.TIcebergCatalog.HIVE_CATALOG; +import static org.apache.impala.util.IcebergUtil.getDateTimeTransformValue; import static org.apache.impala.util.IcebergUtil.getFilePathHash; import static org.apache.impala.util.IcebergUtil.getIcebergFileFormat; import static org.apache.impala.util.IcebergUtil.getPartitionTransform; @@ -53,7 +54,6 @@ import org.apache.impala.analysis.IcebergPartitionTransform; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.IcebergColumn; import org.apache.impala.catalog.IcebergTable; -import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.iceberg.IcebergCatalog; import org.apache.impala.catalog.iceberg.IcebergCatalogs; @@ -70,6 +70,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.HashMap; +import java.util.Map; import java.util.List; /** @@ -202,7 +203,7 @@ public class IcebergUtilTest { try { transform = getPartitionTransform( partitionTransform.transformName, partitionTransform.parameter); - } catch (TableLoadingException t) { + } catch (ImpalaRuntimeException t) { fail("Transform " + partitionTransform + " caught unexpected " + t); } assertNotNull(transform); @@ -223,7 +224,7 @@ public class IcebergUtilTest { /* IcebergPartitionTransform transform = */ getPartitionTransform( partitionTransform.transformName, partitionTransform.parameter); fail("Transform " + partitionTransform + " should have got exception"); - } catch (TableLoadingException t) { + } catch (ImpalaRuntimeException t) { // OK, fall through } } @@ -239,7 +240,7 @@ public class IcebergUtilTest { try { transform = getPartitionTransform( partitionTransform.transformName, partitionTransform.parameter); - } catch (TableLoadingException t) { + } catch (ImpalaRuntimeException t) { fail("Transform " + partitionTransform + " caught unexpected " + t); } assertNotNull(transform); @@ -271,7 +272,7 @@ public class IcebergUtilTest { int numBuckets = 128; PartitionSpec partitionSpec = PartitionSpec.builderFor(SCHEMA).bucket("i", numBuckets).build(); - HashMap<String, Integer> partitionTransformParams = + Map<String, Integer> partitionTransformParams = getPartitionTransformParams(partitionSpec); assertNotNull(partitionTransformParams); String expectedKey = "1_BUCKET"; @@ -325,6 +326,64 @@ public class IcebergUtilTest { } } + /** + * Unit test for getDateTransformValue + */ + @Test + public void testGetDateTransformValue() { + + assertThrows(() -> getDateTimeTransformValue(TIcebergPartitionTransformType.IDENTITY, + "string")); + assertThrows( + () -> getDateTimeTransformValue(TIcebergPartitionTransformType.BUCKET, "string")); + assertThrows(() -> getDateTimeTransformValue(TIcebergPartitionTransformType.TRUNCATE, + "string")); + assertThrows( + () -> getDateTimeTransformValue(TIcebergPartitionTransformType.VOID, "string")); + + assertThrows( + () -> getDateTimeTransformValue(TIcebergPartitionTransformType.YEAR, "2023-12")); + assertThrows( + () -> getDateTimeTransformValue(TIcebergPartitionTransformType.MONTH, "2023")); + assertThrows(() -> getDateTimeTransformValue(TIcebergPartitionTransformType.DAY, + "2023-12-12-1")); + assertThrows( + () -> getDateTimeTransformValue(TIcebergPartitionTransformType.HOUR, "2023-12")); + + try { + int yearTransformValidString = + getDateTimeTransformValue(TIcebergPartitionTransformType.YEAR, "2023"); + int monthTransformValidString = + getDateTimeTransformValue(TIcebergPartitionTransformType.MONTH, "2023-12"); + int dayTransformValidString = + getDateTimeTransformValue(TIcebergPartitionTransformType.DAY, "2023-12-12"); + int hourTransformValidString = + getDateTimeTransformValue(TIcebergPartitionTransformType.HOUR, "2023-12-12-1"); + + assertEquals(53, yearTransformValidString); + assertEquals(647, monthTransformValidString); + assertEquals(19703, dayTransformValidString); + assertEquals(472873, hourTransformValidString); + + } catch (ImpalaRuntimeException e) { + fail(String.format("Unexpected parse error: %s", e)); + } + } + + interface DateTimeTransformCallable { + Integer call() throws ImpalaRuntimeException; + } + + private void assertThrows(DateTimeTransformCallable function) { + try { + function.call(); + } catch (ImpalaRuntimeException e) { + assertEquals(ImpalaRuntimeException.class, e.getClass()); + return; + } + fail(); + } + /** * Holder class for testing Partition transforms. */ diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test new file mode 100644 index 000000000..15d7c089f --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-drop-partition.test @@ -0,0 +1,287 @@ +==== +---- QUERY +# Failing implicit string to hour cast +alter table iceberg_all_partitions drop partition (hour(hour_timestamp) = "2012-12-12"); +---- CATCH +AnalysisException: operands of type INT and STRING are not comparable: HOUR(hour_timestamp) = '2012-12-12' +==== +---- QUERY +# Failing implicit string to day cast +alter table iceberg_all_partitions drop partition (day(day_date) = "2012-12"); +---- CATCH +AnalysisException: operands of type INT and STRING are not comparable: DAY(day_date) = '2012-12' +==== +---- QUERY +# Failing implicit string to month cast +alter table iceberg_all_partitions drop partition (month(month_date) = "2012"); +---- CATCH +AnalysisException: operands of type INT and STRING are not comparable: MONTH(month_date) = '2012' +==== +---- QUERY +# Failing implicit string to year cast +alter table iceberg_all_partitions drop partition (year(year_date) = "2012-12-12-20"); +---- CATCH +AnalysisException: operands of type INT and STRING are not comparable: YEAR(year_date) = '2012-12-12-20' +==== +---- QUERY +INSERT INTO iceberg_all_partitions(identity_boolean) VALUES (true); +INSERT INTO iceberg_all_partitions(identity_int) VALUES (1); +INSERT INTO iceberg_all_partitions(identity_bigint) VALUES (1); +INSERT INTO iceberg_all_partitions(identity_float) VALUES (1.0); +INSERT INTO iceberg_all_partitions(identity_double) VALUES (1.0); +INSERT INTO iceberg_all_partitions(identity_decimal) VALUES (1); +INSERT INTO iceberg_all_partitions(identity_date) VALUES ('2000-12-12'); +INSERT INTO iceberg_all_partitions(identity_string) VALUES ("string-transform-omitted"); +INSERT INTO iceberg_all_partitions(identity_string) VALUES ("string-transform-set"); +INSERT INTO iceberg_all_partitions(identity_string) VALUES ("string"), ("another-string"); +INSERT INTO iceberg_all_partitions(identity_string) VALUES ("string"), ("another-string"); +INSERT INTO iceberg_all_partitions(bucket_int) VALUES (100), (200); +INSERT INTO iceberg_all_partitions(bucket_bigint) VALUES (100); +INSERT INTO iceberg_all_partitions(bucket_decimal) VALUES (10); +INSERT INTO iceberg_all_partitions(bucket_date) VALUES ("1526-01-12"); +INSERT INTO iceberg_all_partitions(bucket_string) VALUES ("string"); +INSERT INTO iceberg_all_partitions(bucket_timestamp) VALUES ("1583-04-02 03:00:00"); +INSERT INTO iceberg_all_partitions(truncate_int) VALUES (131072); +INSERT INTO iceberg_all_partitions(truncate_bigint) VALUES (68719476736); +INSERT INTO iceberg_all_partitions(truncate_decimal) VALUES (100000.1234567891); +INSERT INTO iceberg_all_partitions(truncate_string) VALUES ('thisisalongstring'); +INSERT INTO iceberg_all_partitions(year_date) VALUES ('2077-05-06'); +INSERT INTO iceberg_all_partitions(month_date) VALUES ('2023-12-01'); +INSERT INTO iceberg_all_partitions(day_date) VALUES ('2023-12-01'); +INSERT INTO iceberg_all_partitions(year_timestamp) VALUES ('2023-12-02 00:00:00'); +INSERT INTO iceberg_all_partitions(month_timestamp) VALUES ('2023-12-02 00:00:00'); +INSERT INTO iceberg_all_partitions(day_timestamp) VALUES ('2023-03-02 00:00:00'); +INSERT INTO iceberg_all_partitions(hour_timestamp) VALUES ('2023-06-02 00:00:00'); +INSERT INTO iceberg_all_partitions(identity_string, hour_timestamp) VALUES ('string-hour','2023-03-02 00:00:00'); +INSERT INTO iceberg_all_partitions(identity_string, hour_timestamp) VALUES ('another-string-hour', '2023-03-02 00:00:00'); +INSERT INTO iceberg_all_partitions(identity_string, hour_timestamp) VALUES ('another-string-hour', '2023-03-02 10:00:00'); +INSERT INTO iceberg_all_partitions(identity_string, hour_timestamp) VALUES ('string-hour', '2023-03-02 10:00:00'); +INSERT INTO iceberg_all_partitions(identity_string, identity_int) VALUES ('string-comma', 567); +INSERT INTO iceberg_all_partitions(identity_string, identity_int) VALUES ('string-comma', 568); +INSERT INTO iceberg_all_partitions(identity_int) VALUES (NULL); +==== +---- QUERY +# Number of partitions before DROP PARTITION queries +SELECT COUNT(1) FROM $DATABASE.iceberg_all_partitions.`partitions`; +---- RESULTS +36 +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP IF EXISTS PARTITION (identity_boolean = false) +---- RESULTS +'Dropped 0 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_boolean = false) +---- CATCH +No matching partition(s) found +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_boolean = true) +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_int = 1) +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_bigint = 1) +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_float < 3.0) +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_double > 0.0) +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_decimal < 3); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_date = '2000-12-12'); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_string = "string-transform-omitted"); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity(identity_string) = "string-transform-set"); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity(identity_string) = "another-string"); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity(identity_string) = "string"); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_int) in (1,2)); +---- RESULTS +'Dropped 2 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_bigint) = 1); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_decimal) = 3); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_date) = 0); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_timestamp) = 1); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (bucket(5, bucket_string) = 1); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (truncate(5, truncate_int) = 131070); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (truncate(5, truncate_bigint) = 68719476735); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (truncate(5, truncate_decimal) = 100000.1234567890); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (truncate(5, truncate_string) = 'thisi'); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (year(year_date) = '2077'); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +# Checkpoint for remaining partitions +SELECT COUNT(1) FROM $DATABASE.iceberg_all_partitions.`partitions`; +---- RESULTS +13 +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (month(month_date) = '2023-12'); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (day(day_date) = '2023-12-01'); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (year(year_timestamp) = '2023'); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (month(month_timestamp) = '2023-12'); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (day(day_timestamp) = '2023-03-02'); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (hour(hour_timestamp) = '2023-06-02-0'); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_string in ('string-hour', 'another-string-hour') and hour(hour_timestamp) = '2023-03-02-10'); +---- RESULTS +'Dropped 2 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (hour(hour_timestamp) < '2030-03-02-10'); +---- RESULTS +'Dropped 2 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_string = "string-comma", identity_int in (567, 568)); +---- RESULTS +'Dropped 2 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_all_partitions DROP PARTITION (identity_int IS NULL); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +# Number of partitions after DROP PARTITION queries +SELECT count(1) FROM $DATABASE.iceberg_all_partitions.`partitions`; +---- RESULTS +0 +==== +---- QUERY +# Partition evolution +CREATE TABLE iceberg_drop_partition_evolution(identity_int int, unpartitioned_int_to_identity_int int, year_date_col_to_month_date_col date) +PARTITIONED BY SPEC(identity(identity_int), year(year_date_col_to_month_date_col)) STORED AS ICEBERG; +INSERT INTO iceberg_drop_partition_evolution VALUES (1, 2, "2023-10-11"); +ALTER TABLE iceberg_drop_partition_evolution SET PARTITION SPEC(identity(identity_int), identity(unpartitioned_int_to_identity_int), year(year_date_col_to_month_date_col)); +INSERT INTO iceberg_drop_partition_evolution VALUES (1, 2, "2023-01-11"); +ALTER TABLE iceberg_drop_partition_evolution DROP PARTITION (unpartitioned_int_to_identity_int = 2); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +ALTER TABLE iceberg_drop_partition_evolution SET PARTITION SPEC(identity(identity_int), month(year_date_col_to_month_date_col)); +ALTER TABLE iceberg_drop_partition_evolution DROP PARTITION (unpartitioned_int_to_identity_int = 2); +---- CATCH +AnalysisException: Partition exprs cannot contain non-partition column(s): unpartitioned_int_to_identity_int +==== +---- QUERY +INSERT INTO iceberg_drop_partition_evolution VALUES (1, 2, "2023-11-11"); +ALTER TABLE iceberg_drop_partition_evolution DROP PARTITION (month(year_date_col_to_month_date_col) = "2023-11"); +---- RESULTS +'Dropped 1 partition(s)' +==== +---- QUERY +# Dropping delete files +CREATE TABLE iceberg_drop_partition_delete(identity_int int, unpartitioned_int int) +PARTITIONED BY SPEC (identity_int) STORED AS ICEBERG TBLPROPERTIES('format-version'='2'); +INSERT INTO iceberg_drop_partition_delete VALUES (1,2); +INSERT INTO iceberg_drop_partition_delete VALUES (2,1); +DELETE FROM iceberg_drop_partition_delete WHERE identity_int = 1; +ALTER TABLE iceberg_drop_partition_delete DROP PARTITION (identity_int = 1); +SHOW FILES IN iceberg_drop_partition_delete; +---- RESULTS +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/iceberg_drop_partition_delete/data/identity_int=2/.*_data.*.parq','.*','','$ERASURECODE_POLICY' +---- TYPES +STRING, STRING, STRING, STRING +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test index ef8f7fe30..6f70d510c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test @@ -347,11 +347,6 @@ ALTER TABLE iceberg_table_hadoop_catalog ADD PARTITION(fake_col='fake_value'); AnalysisException: ALTER TABLE ADD PARTITION is not supported for Iceberg tables: $DATABASE.iceberg_table_hadoop_catalog ==== ---- QUERY -ALTER TABLE iceberg_table_hadoop_catalog DROP PARTITION(fake_col='fake_value'); ----- CATCH -AnalysisException: ALTER TABLE DROP PARTITION is not supported for Iceberg tables: $DATABASE.iceberg_table_hadoop_catalog -==== ----- QUERY ALTER TABLE iceberg_table_hadoop_catalog RECOVER PARTITIONS; ---- CATCH AnalysisException: ALTER TABLE RECOVER PARTITIONS is not supported on Iceberg tables: $DATABASE.iceberg_table_hadoop_catalog @@ -446,7 +441,7 @@ CREATE TABLE iceberg_wrong_partition (i int) PARTITIONED BY SPEC (wrong(i)) STORED AS ICEBERG; ---- CATCH -Unsupported iceberg partition type: WRONG +Unsupported Iceberg partition type: WRONG ==== ---- QUERY CREATE TABLE iceberg_wrong_parquet_row_group_size1 ( i int) diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 58b3ad14c..2ae967d24 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1173,6 +1173,55 @@ class TestIcebergTable(IcebergTestSuite): assert len(data.data) == 1 assert data.data[0] == '1' + def test_drop_partition(self, vector, unique_database): + create_table_stmt = """CREATE TABLE {}.iceberg_all_partitions + (identity_boolean boolean, identity_int int, identity_bigint bigint, + identity_float float, identity_double double, identity_decimal decimal(20,10), + identity_date date, identity_timestamp timestamp, identity_string string, + bucket_int int, bucket_bigint bigint, bucket_decimal decimal(20,10), + bucket_date date, bucket_timestamp timestamp, bucket_string string, + truncate_int int, truncate_bigint bigint, truncate_decimal decimal(20,10), + truncate_string string, year_date date, year_timestamp timestamp, + month_date date, month_timestamp timestamp, day_date date, + day_timestamp timestamp, hour_timestamp timestamp) + PARTITIONED BY SPEC + (identity(identity_boolean), identity(identity_int), identity(identity_bigint), + identity(identity_float), identity(identity_double), identity(identity_decimal), + identity(identity_date), identity(identity_string), bucket(5,bucket_int), + bucket(5,bucket_bigint), bucket(5,bucket_decimal), bucket(5,bucket_date), + bucket(5,bucket_timestamp), bucket(5,bucket_string), truncate(5,truncate_int), + truncate(5,truncate_bigint), truncate(5,truncate_decimal), + truncate(5,truncate_string), year(year_date), year(year_timestamp), + month(month_date), month(month_timestamp), day(day_date), day(day_timestamp), + hour(hour_timestamp)) + STORED AS ICEBERG""".format(unique_database) + self.execute_query(create_table_stmt) + self.run_test_case('QueryTest/iceberg-drop-partition', vector, + use_db=unique_database) + + def test_rollback_after_drop_partition(self, vector, unique_database): + table_name = "iceberg_drop_partition_rollback" + qualified_table_name = "{}.{}".format(unique_database, table_name) + create_table_stmt = """CREATE TABLE {}(identity_int int, unpartitioned_int int) + PARTITIONED BY SPEC (identity_int) STORED AS ICEBERG""".format(qualified_table_name) + insert_into_stmt = """INSERT INTO {} values(1, 2)""".format(qualified_table_name) + drop_partition_stmt = """ALTER TABLE {} DROP PARTITION (identity_int = 1)""".format( + qualified_table_name) + + self.execute_query(create_table_stmt) + self.execute_query(insert_into_stmt) + self.execute_query(drop_partition_stmt) + + snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=2) + rollback = """ALTER TABLE {} EXECUTE ROLLBACK ({})""".format( + qualified_table_name, snapshots[0].get_snapshot_id()) + # Rollback before DROP PARTITION + self.execute_query(rollback) + snapshots = get_snapshots(self.client, qualified_table_name, expected_result_size=3) + assert snapshots[0].get_snapshot_id() == snapshots[2].get_snapshot_id() + assert snapshots[0].get_parent_id() == snapshots[2].get_parent_id() + assert snapshots[0].get_creation_time() < snapshots[2].get_creation_time() + class TestIcebergV2Table(IcebergTestSuite): """Tests related to Iceberg V2 tables."""
