This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 47db4fd1f5793ed1e0aaf6004496bf51da8209c6 Author: Noemi Pap-Takacs <[email protected]> AuthorDate: Fri Oct 20 14:09:33 2023 +0200 IMPALA-12412: Support partition evolution in OPTIMIZE statement The OPTIMIZE statement is used to execute table maintenance tasks on Iceberg tables, such as: 1. compacting small files, 2. merging delete deltas, 3. rewriting the table according to the latest schema and partition spec. OptimizeStmt used to serve as an alias for INSERT OVERWRITE. After this change it works as follows: It creates a source statement that contains all columns of the table. All table content will be rewritten to new data files. After the executors finished writing, the Catalog calls RewriteFiles Iceberg API to commit the changes. All previous data and delete files will be excluded from, and all newly written data files will be added to the next snapshot. The old files remain accessible via time travel to older snapshots of the table. By default, Impala has as many file writers as query fragment instances and therefore can write too many files for unpartitioned tables. For smaller tables this can be limited by setting the MAX_FS_WRITERS Query Option. Authorization: OPTIMIZE TABLE requires ALL privileges. Limitations: All limitations about writing Iceberg tables apply. Testing: - E2E tests: - schema evolution - partition evolution - UPDATE/DELETE - time travel - table history - negative tests - Ranger tests for authorization - FE: Planner test: - sorting order - MAX_FS_WRITERS - partitioned exchange - Parser test Change-Id: I65a0c8529d274afff38ccd582f1b8a857716b1b5 Reviewed-on: http://gerrit.cloudera.org:8080/20866 Reviewed-by: Daniel Becker <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/client-request-state.cc | 3 + common/thrift/Types.thrift | 1 + .../apache/impala/analysis/AnalysisContext.java | 2 - .../apache/impala/analysis/IcebergUpdateImpl.java | 19 +-- .../org/apache/impala/analysis/InsertStmt.java | 2 +- .../org/apache/impala/analysis/OptimizeStmt.java | 180 +++++++++++++++++---- .../apache/impala/planner/DistributedPlanner.java | 7 +- .../java/org/apache/impala/planner/Planner.java | 44 ++--- .../org/apache/impala/planner/PlannerContext.java | 9 +- .../java/org/apache/impala/service/Frontend.java | 60 ++++--- .../impala/service/IcebergCatalogOpExecutor.java | 53 +++++- .../java/org/apache/impala/util/IcebergUtil.java | 38 ++++- .../org/apache/impala/analysis/ParserTest.java | 8 + .../org/apache/impala/planner/PlannerTest.java | 12 ++ .../queries/PlannerTest/iceberg-optimize.test | 149 +++++++++++++++++ .../queries/PlannerTest/insert-sort-by-zorder.test | 27 ---- .../queries/QueryTest/iceberg-negative.test | 9 +- .../queries/QueryTest/iceberg-optimize.test | 170 ++++++++++++++----- .../queries/QueryTest/ranger_column_masking.test | 2 +- tests/query_test/test_iceberg.py | 60 +++---- 20 files changed, 616 insertions(+), 239 deletions(-) diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 3deb8ef0c..d68590125 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -1640,6 +1640,9 @@ bool ClientRequestState::CreateIcebergCatalogOps( DCHECK(cat_ice_op->iceberg_data_files_fb.empty()); update_catalog = false; } + } else if (ice_finalize_params.operation == TIcebergOperation::OPTIMIZE) { + cat_ice_op->__set_iceberg_data_files_fb( + dml_exec_state->CreateIcebergDataFilesVector()); } if (!update_catalog) query_events_->MarkEvent("No-op Iceberg DML statement"); return update_catalog; diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift index f36ae556f..ee0803f5d 100644 --- a/common/thrift/Types.thrift +++ b/common/thrift/Types.thrift @@ -117,6 +117,7 @@ enum TIcebergOperation { INSERT = 0 DELETE = 1 UPDATE = 2 + OPTIMIZE = 3 } // Level of verboseness for "explain" output. diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java index ec1e34b03..b6cf29129 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java +++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java @@ -307,8 +307,6 @@ public class AnalysisContext { public InsertStmt getInsertStmt() { if (isCreateTableAsSelectStmt()) { return getCreateTableAsSelectStmt().getInsertStmt(); - } else if (isOptimizeStmt()) { - return getOptimizeStmt().getInsertStmt(); } else { Preconditions.checkState(isInsertStmt()); return (InsertStmt) stmt_; diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java index d4a5595ae..a7f455c00 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java @@ -63,30 +63,13 @@ public class IcebergUpdateImpl extends IcebergModifyImpl { public void analyze(Analyzer analyzer) throws AnalysisException { super.analyze(analyzer); deleteTableId_ = analyzer.getDescTbl().addTargetTable(icePosDelTable_); - IcebergUtil.validateIcebergColumnsForInsert(originalTargetTable_); + IcebergUtil.validateIcebergTableForInsert(originalTargetTable_); String updateMode = originalTargetTable_.getIcebergApiTable().properties().get( TableProperties.UPDATE_MODE); if (updateMode != null && !updateMode.equals("merge-on-read")) { throw new AnalysisException(String.format("Unsupported update mode: '%s' for " + "Iceberg table: %s", updateMode, originalTargetTable_.getFullName())); } - for (Column c : originalTargetTable_.getColumns()) { - if (c.getType().isComplexType()) { - throw new AnalysisException(String.format("Impala does not support updating " + - "tables with complex types. Table '%s' has column '%s' " + - "with type: %s", originalTargetTable_.getFullName(), c.getName(), - c.getType().toSql())); - } - } - Pair<List<Integer>, TSortingOrder> sortProperties = - AlterTableSetTblProperties.analyzeSortColumns(originalTargetTable_, - originalTargetTable_.getMetaStoreTable().getParameters()); - if (originalTargetTable_.getIcebergFileFormat() != TIcebergFileFormat.PARQUET) { - throw new AnalysisException(String.format("Impala can only write Parquet data " + - "files, while table '%s' expects '%s' data files.", - originalTargetTable_.getFullName(), - originalTargetTable_.getIcebergFileFormat().toString())); - } } @Override diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index 05d1b9168..82f3cbf4c 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -607,7 +607,7 @@ public class InsertStmt extends DmlStatementBase { } validateBucketTransformForOverwrite(iceTable); } - IcebergUtil.validateIcebergColumnsForInsert(iceTable); + IcebergUtil.validateIcebergTableForInsert(iceTable); } if (isHBaseTable && overwrite_) { diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java index ecbb23b66..0ab6efb69 100644 --- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java @@ -18,109 +18,219 @@ package org.apache.impala.analysis; import com.google.common.base.Preconditions; -import org.apache.commons.lang.NotImplementedException; import org.apache.impala.authorization.Privilege; +import org.apache.impala.catalog.Column; import org.apache.impala.catalog.FeIcebergTable; -import org.apache.impala.catalog.FeTable; import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.Pair; import org.apache.impala.planner.DataSink; +import org.apache.impala.planner.TableSink; import org.apache.impala.rewrite.ExprRewriter; import org.apache.impala.thrift.TSortingOrder; +import org.apache.impala.util.ExprUtil; +import org.apache.impala.util.IcebergUtil; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** * Representation of an OPTIMIZE statement used to execute table maintenance tasks in * Iceberg tables, such as: * 1. compacting small files, - * 2. merging delete deltas. - * Currently, it executes these tasks as an alias for INSERT OVERWRITE: - * OPTIMIZE TABLE tbl; -->> INSERT OVERWRITE TABLE tbl SELECT * FROM tbl; + * 2. merging delete deltas, + * 3. rewriting the table according to the latest partition spec and schema. */ public class OptimizeStmt extends DmlStatementBase { - // INSERT OVERWRITE statement that this OPTIMIZE statement is translated to. - private InsertStmt insertStmt_; // Target table name as seen by the parser. private final TableName originalTableName_; + + ///////////////////////////////////////// + // BEGIN: Members that need to be reset() + // Target table that should be compacted. May be qualified by analyze(). private TableName tableName_; + private TableRef tableRef_; + // SELECT statement that reads the data that we want to compact. + private SelectStmt sourceStmt_; + // Output expressions that produce the final results to write to the table. + // Set in prepareExpressions(). + // It will contain one Expr for each column of the table. + // The i'th expr produces the i'th column of the table. + private List<Expr> resultExprs_ = new ArrayList<>(); + // For every column of the target table that is referenced in the optional + // 'sort.columns' table property, this list will contain the corresponding + // result expr from 'resultExprs_'. Before insertion, all rows + // will be sorted by these exprs. If the list is empty, no additional sorting by + // non-partitioning columns will be performed. The column list must not contain + // partition columns. + private List<Expr> sortExprs_ = new ArrayList<>(); + private List<Integer> sortColumns_; + private TSortingOrder sortingOrder_ = TSortingOrder.LEXICAL;; + // Exprs corresponding to the partition fields of the table. + protected List<Expr> partitionKeyExprs_ = new ArrayList<>(); + + // END: Members that need to be reset() + ///////////////////////////////////////// public OptimizeStmt(TableName tableName) { tableName_ = tableName; originalTableName_ = tableName_; - List<SelectListItem> selectListItems = new ArrayList<>(); - selectListItems.add(SelectListItem.createStarItem(null)); - SelectList selectList = new SelectList(selectListItems); - List<TableRef> tableRefs = new ArrayList<>(); - tableRefs.add(new TableRef(tableName.toPath(), null)); - QueryStmt queryStmt = new SelectStmt(selectList, new FromClause(tableRefs), null, - null, null, null, null); - insertStmt_ = new InsertStmt(null, tableName, true, null, null, null, queryStmt, - null, false); + } + + private OptimizeStmt(OptimizeStmt other) { + super(other); + tableName_ = other.tableName_; + originalTableName_ = other.originalTableName_; } + @Override + public OptimizeStmt clone() { return new OptimizeStmt(this); } + @Override public void analyze(Analyzer analyzer) throws AnalysisException { if (isAnalyzed()) return; super.analyze(analyzer); - insertStmt_.analyze(analyzer); Preconditions.checkState(table_ == null); if (!tableName_.isFullyQualified()) { tableName_ = new TableName(analyzer.getDefaultDb(), tableName_.getTbl()); } + tableRef_ = TableRef.newTableRef(analyzer, tableName_.toPath(), null); + // For OPTIMIZE, ALL privileges are required. table_ = analyzer.getTable(tableName_, Privilege.ALL); - Preconditions.checkState(table_ == insertStmt_.getTargetTable()); + // Check that the referenced table has Iceberg format and is not a view. if (!(table_ instanceof FeIcebergTable)) { throw new AnalysisException("OPTIMIZE is only supported for Iceberg tables."); } + // TODO: IMPALA-12839 Optimizing empty table should be no-op. + if (((FeIcebergTable) table_).getContentFileStore().getNumFiles() == 0) { + throw new AnalysisException(String.format( + "Table '%s' is empty.", table_.getFullName())); + } + IcebergUtil.validateIcebergTableForInsert((FeIcebergTable) table_); + prepareExpressions(analyzer); + createSourceStmt(analyzer); + setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers()); + + // Analyze 'sort.columns' and 'sort.order' table properties and populate + // sortColumns_, sortExprs_, and sortingOrder_. + analyzeSortColumns(); + + // Add target table to descriptor table. + analyzer.getDescTbl().setTargetTable(table_); } @Override public void reset() { super.reset(); tableName_ = originalTableName_; - insertStmt_.reset(); + tableRef_.reset(); + sourceStmt_.reset(); + resultExprs_.clear(); + sortExprs_.clear(); + sortColumns_.clear(); + sortingOrder_ = TSortingOrder.LEXICAL; + partitionKeyExprs_.clear(); } - public InsertStmt getInsertStmt() { return insertStmt_; } + public DataSink createDataSink() { + TableSink tableSink = TableSink.create(table_, TableSink.Op.INSERT, + partitionKeyExprs_, resultExprs_, new ArrayList<>(), false, false, + new Pair<>(sortColumns_, sortingOrder_), -1, null, + maxTableSinks_); + return tableSink; + } + + private void createSourceStmt(Analyzer analyzer) throws AnalysisException { + List<TableRef> tableRefs = Arrays.asList(tableRef_); + List<Column> columns = table_.getColumns(); + List<SelectListItem> selectListItems = new ArrayList<>(); + for (Column col : columns) { + selectListItems.add( + new SelectListItem(createSlotRef(analyzer, col.getName()), null)); + } + SelectList selectList = new SelectList(selectListItems); + sourceStmt_ = new SelectStmt(selectList, new FromClause(tableRefs), null, + null, null, null, null); + sourceStmt_.analyze(analyzer); + sourceStmt_.getSelectList().getItems().addAll( + ExprUtil.exprsAsSelectList(partitionKeyExprs_)); + } + + private void prepareExpressions(Analyzer analyzer) throws AnalysisException { + List<Column> columns = table_.getColumns(); + for (Column col : columns) { + resultExprs_.add(createSlotRef(analyzer, col.getName())); + } + IcebergUtil.populatePartitionExprs(analyzer, null, columns, + resultExprs_, (FeIcebergTable) table_, partitionKeyExprs_, null); + } + + private SlotRef createSlotRef(Analyzer analyzer, String colName) + throws AnalysisException { + List<String> path = Path.createRawPath(tableRef_.getUniqueAlias(), colName); + SlotRef ref = new SlotRef(path); + ref.analyze(analyzer); + return ref; + } + + /** + * Analyzes the 'sort.columns' table property if it is set, and populates + * sortColumns_ and sortExprs_. If there are errors during the analysis, this will throw + * an AnalysisException. + */ + private void analyzeSortColumns() throws AnalysisException { + Pair<List<Integer>, TSortingOrder> sortProperties = + AlterTableSetTblProperties.analyzeSortColumns(table_, + table_.getMetaStoreTable().getParameters()); + sortColumns_ = sortProperties.first; + sortingOrder_ = sortProperties.second; + // Assign sortExprs_ based on sortColumns_. + for (Integer colIdx: sortColumns_) sortExprs_.add(resultExprs_.get(colIdx)); + } @Override - public DataSink createDataSink() { - throw new NotImplementedException(); + public String toSql(ToSqlOptions options) { + if (options == ToSqlOptions.DEFAULT) { + return "OPTIMIZE TABLE" + originalTableName_.toSql(); + } + return "OPTIMIZE TABLE" + tableName_.toSql(); + } + + public QueryStmt getQueryStmt() { + return sourceStmt_; } @Override public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) { - throw new NotImplementedException(); + sourceStmt_.substituteResultExprs(smap, analyzer); + resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true); + partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true); + sortExprs_ = Expr.substituteList(sortExprs_, smap, analyzer, true); } @Override - public TSortingOrder getSortingOrder() { - throw new NotImplementedException(); - } + public TSortingOrder getSortingOrder() { return sortingOrder_; } @Override - public List<Expr> getPartitionKeyExprs() { - return insertStmt_.getPartitionKeyExprs(); - } + public List<Expr> getSortExprs() { return sortExprs_; } @Override - public List<Expr> getSortExprs() { - return insertStmt_.getSortExprs(); - } + public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; } + + @Override + public List<Expr> getResultExprs() { return resultExprs_; } @Override public void collectTableRefs(List<TableRef> tblRefs) { - insertStmt_.collectTableRefs(tblRefs); + tblRefs.add(new TableRef(tableName_.toPath(), null)); } @Override public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException { Preconditions.checkState(isAnalyzed()); - insertStmt_.rewriteExprs(rewriter); + sourceStmt_.rewriteExprs(rewriter); } - } \ No newline at end of file diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index d10e90db5..48daa6484 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -73,16 +73,13 @@ public class DistributedPlanner { public List<PlanFragment> createPlanFragments( PlanNode singleNodePlan) throws ImpalaException { Preconditions.checkState(!ctx_.isSingleNodeExec()); - AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult(); QueryStmt queryStmt = ctx_.getQueryStmt(); List<PlanFragment> fragments = new ArrayList<>(); - // For inserts or CTAS, unless there is a limit, leave the root fragment + // For DML statements, unless there is a limit, leave the root fragment // partitioned, otherwise merge everything into a single coordinator fragment, // so we can pass it back to the client. boolean isPartitioned = false; - if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt() - || analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt()) - && !singleNodePlan.hasLimit()) { + if (ctx_.hasTableSink() && !singleNodePlan.hasLimit()) { Preconditions.checkState(!queryStmt.hasOffset()); isPartitioned = true; } diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 209790868..61031b036 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -169,29 +169,29 @@ public class Planner { createPreInsertSort(insertStmt, rootFragment, ctx_.getRootAnalyzer()); // set up table sink for root fragment rootFragment.setSink(insertStmt.createDataSink()); - } else { - if (ctx_.isUpdate() || ctx_.isDelete()) { - DmlStatementBase stmt; - if (ctx_.isUpdate()) { - stmt = ctx_.getAnalysisResult().getUpdateStmt(); - } else { - stmt = ctx_.getAnalysisResult().getDeleteStmt(); - } - Preconditions.checkNotNull(stmt); - stmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer()); - if (stmt.getTargetTable() instanceof FeIcebergTable) { - rootFragment = createIcebergDmlPlanFragment( - rootFragment, distributedPlanner, stmt, fragments); - } - // Set up update sink for root fragment - rootFragment.setSink(stmt.createDataSink()); - } else if (ctx_.isQuery()) { - QueryStmt queryStmt = ctx_.getQueryStmt(); - queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer()); - List<Expr> resultExprs = queryStmt.getResultExprs(); - rootFragment.setSink( - ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs)); + } else if (ctx_.isUpdate() || ctx_.isDelete() || ctx_.isOptimize()) { + DmlStatementBase stmt; + if (ctx_.isUpdate()) { + stmt = ctx_.getAnalysisResult().getUpdateStmt(); + } else if (ctx_.isDelete()) { + stmt = ctx_.getAnalysisResult().getDeleteStmt(); + } else { + stmt = ctx_.getAnalysisResult().getOptimizeStmt(); + } + Preconditions.checkNotNull(stmt); + stmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer()); + if (stmt.getTargetTable() instanceof FeIcebergTable) { + rootFragment = createIcebergDmlPlanFragment( + rootFragment, distributedPlanner, stmt, fragments); } + // Set up update sink for root fragment + rootFragment.setSink(stmt.createDataSink()); + } else if (ctx_.isQuery()) { + QueryStmt queryStmt = ctx_.getQueryStmt(); + queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer()); + List<Expr> resultExprs = queryStmt.getResultExprs(); + rootFragment.setSink( + ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs)); } // The check for disabling codegen uses estimates of rows per node so must be done diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java index 03418e578..a54e3ae4d 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java +++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java @@ -80,6 +80,8 @@ public class PlannerContext { queryStmt_ = analysisResult.getUpdateStmt().getQueryStmt(); } else if (analysisResult.isDeleteStmt()) { queryStmt_ = analysisResult.getDeleteStmt().getQueryStmt(); + } else if (analysisResult.isOptimizeStmt()) { + queryStmt_ = analysisResult.getOptimizeStmt().getQueryStmt(); } else { queryStmt_ = analysisResult.getQueryStmt(); } @@ -103,18 +105,17 @@ public class PlannerContext { public PlanNodeId getNextNodeId() { return nodeIdGenerator_.getNextId(); } public PlanFragmentId getNextFragmentId() { return fragmentIdGenerator_.getNextId(); } public boolean isInsertOrCtas() { - //TODO: IMPALA-12412: remove isOptimizeStmt(). - return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt() - || analysisResult_.isOptimizeStmt(); + return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt(); } public boolean isInsert() { return analysisResult_.isInsertStmt(); } + public boolean isOptimize() { return analysisResult_.isOptimizeStmt(); } public boolean isCtas() { return analysisResult_.isCreateTableAsSelectStmt(); } public boolean isUpdateOrDelete() { return analysisResult_.isUpdateStmt() || analysisResult_.isDeleteStmt(); } public boolean isQuery() { return analysisResult_.isQueryStmt(); } public boolean hasTableSink() { return isInsertOrCtas() || analysisResult_.isUpdateStmt() - || analysisResult_.isDeleteStmt(); + || analysisResult_.isDeleteStmt() || analysisResult_.isOptimizeStmt(); } public boolean hasSubplan() { return !subplans_.isEmpty(); } public SubplanNode getSubplan() { return subplans_.getFirst(); } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index d030655fd..25a4be16a 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -74,9 +74,9 @@ import org.apache.impala.analysis.CreateDataSrcStmt; import org.apache.impala.analysis.CreateDropRoleStmt; import org.apache.impala.analysis.CreateUdaStmt; import org.apache.impala.analysis.CreateUdfStmt; -import org.apache.impala.analysis.DeleteStmt; import org.apache.impala.analysis.DescribeTableStmt; import org.apache.impala.analysis.DescriptorTable; +import org.apache.impala.analysis.DmlStatementBase; import org.apache.impala.analysis.DropDataSrcStmt; import org.apache.impala.analysis.DropFunctionStmt; import org.apache.impala.analysis.DropStatsStmt; @@ -95,7 +95,6 @@ import org.apache.impala.analysis.StmtMetadataLoader; import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache; import org.apache.impala.analysis.TableName; import org.apache.impala.analysis.TruncateStmt; -import org.apache.impala.analysis.UpdateStmt; import org.apache.impala.authentication.saml.ImpalaSamlClient; import org.apache.impala.authorization.AuthorizationChecker; import org.apache.impala.authorization.AuthorizationConfig; @@ -2430,8 +2429,7 @@ public class Frontend { } } if (!analysisResult.isExplainStmt() && - (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt() - || analysisResult.isOptimizeStmt())) { + (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())) { InsertStmt insertStmt = analysisResult.getInsertStmt(); FeTable targetTable = insertStmt.getTargetTable(); if (AcidUtils.isTransactionalTable( @@ -2550,7 +2548,7 @@ public class Frontend { result.query_exec_request.stmt_type = result.stmt_type; // fill in the metadata result.setResult_set_metadata(createQueryResultSetMetadata(analysisResult)); - } else if (analysisResult.isInsertStmt() || analysisResult.isOptimizeStmt() || + } else if (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt()) { // For CTAS the overall TExecRequest statement type is DDL, but the // query_exec_request should be DML @@ -2562,15 +2560,19 @@ public class Frontend { queryCtx, queryExecRequest, analysisResult.getInsertStmt()); } else { Preconditions.checkState( - analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt()); + analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt() || + analysisResult.isOptimizeStmt()); result.stmt_type = TStmtType.DML; result.query_exec_request.stmt_type = TStmtType.DML; if (analysisResult.isDeleteStmt()) { - addFinalizationParamsForDelete(queryCtx, queryExecRequest, - analysisResult.getDeleteStmt()); + addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest, + analysisResult.getDeleteStmt(), TIcebergOperation.DELETE); } else if (analysisResult.isUpdateStmt()) { - addFinalizationParamsForUpdate(queryCtx, queryExecRequest, - analysisResult.getUpdateStmt()); + addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest, + analysisResult.getUpdateStmt(), TIcebergOperation.UPDATE); + } else if (analysisResult.isOptimizeStmt()) { + addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest, + analysisResult.getOptimizeStmt(), TIcebergOperation.OPTIMIZE); } } return result; @@ -2634,28 +2636,24 @@ public class Frontend { targetTable, insertStmt.getWriteId(), insertStmt.isOverwrite()); } - private static void addFinalizationParamsForDelete( - TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, DeleteStmt deleteStmt) { - FeTable targetTable = deleteStmt.getTargetTable(); - if (!(targetTable instanceof FeIcebergTable)) return; + /** + * Add the finalize params to the queryExecRequest for a non-INSERT Iceberg DML + * statement: DELETE, UPDATE and OPTIMIZE. + */ + private static void addFinalizationParamsForIcebergModify(TQueryCtx queryCtx, + TQueryExecRequest queryExecRequest, DmlStatementBase dmlStmt, + TIcebergOperation iceOperation) { + Preconditions.checkState(!(dmlStmt instanceof InsertStmt)); + FeTable targetTable = dmlStmt.getTargetTable(); + if (!(targetTable instanceof FeIcebergTable)) return; + if (iceOperation == TIcebergOperation.DELETE) { Preconditions.checkState(targetTable instanceof IcebergPositionDeleteTable); targetTable = ((IcebergPositionDeleteTable)targetTable).getBaseTable(); - TFinalizeParams finalizeParams = addFinalizationParamsForDml( - queryCtx, targetTable, false); - TIcebergDmlFinalizeParams iceFinalizeParams = addFinalizationParamsForIcebergDml( - (FeIcebergTable)targetTable, TIcebergOperation.DELETE); - finalizeParams.setIceberg_params(iceFinalizeParams); - queryExecRequest.setFinalize_params(finalizeParams); - } - - private static void addFinalizationParamsForUpdate( - TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, UpdateStmt updateStmt) { - FeTable targetTable = updateStmt.getTargetTable(); - if (!(targetTable instanceof FeIcebergTable)) return; + } TFinalizeParams finalizeParams = addFinalizationParamsForDml( queryCtx, targetTable, false); - TIcebergDmlFinalizeParams iceFinalizeParams = addFinalizationParamsForIcebergDml( - (FeIcebergTable)targetTable, TIcebergOperation.UPDATE); + TIcebergDmlFinalizeParams iceFinalizeParams = + addFinalizationParamsForIcebergDml((FeIcebergTable)targetTable, iceOperation); finalizeParams.setIceberg_params(iceFinalizeParams); queryExecRequest.setFinalize_params(finalizeParams); } @@ -2683,10 +2681,8 @@ public class Frontend { finalizeParams.setWrite_id(writeId); } else if (targetTable instanceof FeIcebergTable) { FeIcebergTable iceTable = (FeIcebergTable)targetTable; - TIcebergDmlFinalizeParams iceFinalizeParams = new TIcebergDmlFinalizeParams(); - iceFinalizeParams.operation = TIcebergOperation.INSERT; - iceFinalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId()); - iceFinalizeParams.setInitial_snapshot_id(iceTable.snapshotId()); + TIcebergDmlFinalizeParams iceFinalizeParams = + addFinalizationParamsForIcebergDml(iceTable, TIcebergOperation.INSERT); finalizeParams.setIceberg_params(iceFinalizeParams); } else { // TODO: Currently this flag only controls the removal of the query-level staging diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index cc4d68248..05f1d4e2e 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -21,8 +21,9 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.Collections; -import org.apache.commons.collections.CollectionUtils; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; @@ -35,6 +36,7 @@ import org.apache.iceberg.ManageSnapshots; import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -53,6 +55,7 @@ import org.apache.impala.catalog.IcebergTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.TableNotFoundException; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey; +import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.catalog.iceberg.IcebergCatalog; import org.apache.impala.catalog.iceberg.IcebergHiveCatalog; import org.apache.impala.common.ImpalaRuntimeException; @@ -63,7 +66,6 @@ import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams; import org.apache.impala.thrift.TAlterTableExecuteRollbackParams; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TIcebergCatalog; -import org.apache.impala.thrift.TIcebergOperation; import org.apache.impala.thrift.TIcebergOperationParam; import org.apache.impala.thrift.TIcebergPartitionSpec; import org.apache.impala.thrift.TRollbackType; @@ -353,6 +355,7 @@ public class IcebergCatalogOpExecutor { case INSERT: appendFiles(feIcebergTable, txn, icebergOp); break; case DELETE: deleteRows(feIcebergTable, txn, icebergOp); break; case UPDATE: updateRows(feIcebergTable, txn, icebergOp); break; + case OPTIMIZE: rewriteTable(feIcebergTable, txn, icebergOp); break; default: throw new ImpalaRuntimeException( "Unknown Iceberg operation: " + icebergOp.operation); } @@ -360,7 +363,6 @@ public class IcebergCatalogOpExecutor { private static void deleteRows(FeIcebergTable feIcebergTable, Transaction txn, TIcebergOperationParam icebergOp) throws ImpalaRuntimeException { - org.apache.iceberg.Table nativeIcebergTable = feIcebergTable.getIcebergApiTable(); List<ByteBuffer> deleteFilesFb = icebergOp.getIceberg_delete_files_fb(); RowDelta rowDelta = txn.newRowDelta(); for (ByteBuffer buf : deleteFilesFb) { @@ -381,7 +383,6 @@ public class IcebergCatalogOpExecutor { private static void updateRows(FeIcebergTable feIcebergTable, Transaction txn, TIcebergOperationParam icebergOp) throws ImpalaRuntimeException { - org.apache.iceberg.Table nativeIcebergTable = feIcebergTable.getIcebergApiTable(); List<ByteBuffer> deleteFilesFb = icebergOp.getIceberg_delete_files_fb(); List<ByteBuffer> dataFilesFb = icebergOp.getIceberg_data_files_fb(); RowDelta rowDelta = txn.newRowDelta(); @@ -458,7 +459,6 @@ public class IcebergCatalogOpExecutor { */ public static void appendFiles(FeIcebergTable feIcebergTable, Transaction txn, TIcebergOperationParam icebergOp) throws ImpalaRuntimeException { - org.apache.iceberg.Table nativeIcebergTable = feIcebergTable.getIcebergApiTable(); List<ByteBuffer> dataFilesFb = icebergOp.getIceberg_data_files_fb(); BatchWrite batchWrite; if (icebergOp.isIs_overwrite()) { @@ -504,11 +504,50 @@ public class IcebergCatalogOpExecutor { nullValueCounts, null, lowerBounds, upperBounds); } + private static void rewriteTable(FeIcebergTable feIcebergTable, Transaction txn, + TIcebergOperationParam icebergOp) throws ImpalaRuntimeException { + GroupedContentFiles contentFiles; + try { + // Get all files from the initial snapshot. + contentFiles = IcebergUtil.getIcebergFilesFromSnapshot( + feIcebergTable, /*predicates=*/Collections.emptyList(), + icebergOp.getInitial_snapshot_id()); + } catch (TableLoadingException e) { + throw new ImpalaRuntimeException(e.getMessage(), e); + } + RewriteFiles rewrite = txn.newRewrite(); + // Delete current data files from table. + for (DataFile dataFile : contentFiles.dataFilesWithDeletes) { + rewrite.deleteFile(dataFile); + } + for (DataFile dataFile : contentFiles.dataFilesWithoutDeletes) { + rewrite.deleteFile(dataFile); + } + // Delete current delete files from table. + for (DeleteFile deleteFile : contentFiles.positionDeleteFiles) { + rewrite.deleteFile(deleteFile); + } + for (DeleteFile deleteFile : contentFiles.equalityDeleteFiles) { + rewrite.deleteFile(deleteFile); + } + // Add newly written files to the table. + List<ByteBuffer> dataFilesToAdd = icebergOp.getIceberg_data_files_fb(); + for (ByteBuffer buf : dataFilesToAdd) { + DataFile dataFile = createDataFile(feIcebergTable, buf); + rewrite.addFile(dataFile); + } + try { + rewrite.validateFromSnapshot(icebergOp.getInitial_snapshot_id()); + rewrite.commit(); + } catch (ValidationException e) { + throw new ImpalaRuntimeException(e.getMessage(), e); + } + } + /** * Creates new snapshot for the iceberg table by deleting all data files. */ - public static void truncateTable(Transaction txn) - throws ImpalaRuntimeException { + public static void truncateTable(Transaction txn) { DeleteFiles delete = txn.newDelete(); delete.deleteFromRowFilter(Expressions.alwaysTrue()); delete.commit(); diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java index ad2ec5401..fa3ddd356 100644 --- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java +++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java @@ -680,6 +680,24 @@ public class IcebergUtil { return scan; } + public static GroupedContentFiles getIcebergFilesFromSnapshot( + FeIcebergTable table, List<Expression> predicates, long snapshotId) + throws TableLoadingException { + if (table.snapshotId() == -1) { + return new GroupedContentFiles(CloseableIterable.empty()); + } + TableScan scan = table.getIcebergApiTable().newScan(); + scan = scan.useSnapshot(snapshotId); + for (Expression predicate : predicates) { + scan = scan.filter(predicate); + } + try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) { + return new GroupedContentFiles(fileScanTasks); + } catch (IOException e) { + throw new TableLoadingException("Error during reading Iceberg manifest files.", e); + } + } + /** * Use ContentFile path to generate 128-bit Murmur3 hash as map key, cached in memory */ @@ -1204,7 +1222,10 @@ public class IcebergUtil { return props; } - public static void validateIcebergColumnsForInsert(FeIcebergTable iceTable) + /** + * Checks the table before insert for unsupported types and file formats. + */ + public static void validateIcebergTableForInsert(FeIcebergTable iceTable) throws AnalysisException { for (Types.NestedField field : iceTable.getIcebergSchema().columns()) { org.apache.iceberg.types.Type iceType = field.type(); @@ -1216,6 +1237,19 @@ public class IcebergUtil { } } } + for (Column c : iceTable.getColumns()) { + if (c.getType().isComplexType()) { + throw new AnalysisException(String.format("Impala does not support writing " + + "tables with complex types. Table '%s' has column '%s' " + + "with type: %s", iceTable.getFullName(), c.getName(), + c.getType().toSql())); + } + } + if (iceTable.getIcebergFileFormat() != TIcebergFileFormat.PARQUET) { + throw new AnalysisException(String.format("Impala can only write Parquet data " + + "files, while table '%s' expects '%s' data files.", + iceTable.getFullName(), iceTable.getIcebergFileFormat().toString())); + } } /** @@ -1237,7 +1271,7 @@ public class IcebergUtil { for (int i = 0; i < selectListExprs.size(); ++i) { IcebergColumn targetColumn = (IcebergColumn)selectExprTargetColumns.get(i); if (targetColumn.getFieldId() != partField.getSourceId()) continue; - // widestTypeExpr is widest type expression for column i + // widestTypeExpr is the widest type expression for column i Expr widestTypeExpr = (widestTypeExprList != null) ? widestTypeExprList.get(i) : null; Expr icebergPartitionTransformExpr = getIcebergPartitionTransformExpr(analyzer, diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index d32e8b749..db52e94a3 100755 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -1964,6 +1964,14 @@ public class ParserTest extends FrontendTestBase { ParserError("delete t join f on t.id = f.id"); } + @Test + public void TestOptimize() { + ParsesOk("optimize table t"); + ParserError("optimize t"); + ParserError("optimize table t for system_time as of now()"); + ParserError("optimize table t for system_version as of 12345"); + } + @Test public void TestUse() { ParserError("USE"); diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 7449a1697..61b374c06 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -1329,6 +1329,18 @@ public class PlannerTest extends PlannerTestBase { ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); } + /** + * Test that OPTIMIZE TABLE statements on Iceberg tables work as expected. + */ + @Test + public void testIcebergOptimize() { + TQueryOptions options = defaultQueryOptions(); + options.setMax_fs_writers(2); + options.setExplain_level(TExplainLevel.EXTENDED); + runPlannerTestFile("iceberg-optimize", "functional_parquet", options, + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); + } + /** * Check that Iceberg metadata table scan plans are as expected. */ diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-optimize.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-optimize.test new file mode 100644 index 000000000..8e45d6d64 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-optimize.test @@ -0,0 +1,149 @@ +# IMPALA-12293: Verify that OPTIMIZE respects the sorting properties. +optimize table iceberg_partition_transforms_zorder +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +| Per-Host Resources: mem-estimate=76.00MB mem-reservation=12.03MB thread-reservation=2 +WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))] +| output exprs: ts, s, i, j +| mem-estimate=44B mem-reservation=0B thread-reservation=0 +| +01:SORT +| order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j +| materialized: year(functional_parquet.iceberg_partition_transforms_zorder.ts), iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) +| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0 +| tuple-ids=2 row-size=44B cardinality=1 +| in pipelines: 01(GETNEXT), 00(OPEN) +| +00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder] + HDFS partitions=1/1 files=1 size=1.08KB + Iceberg snapshot id: 1304838939768391952 + stored statistics: + table: rows=1 size=1.08KB + columns: unavailable + extrapolated-rows=disabled max-scan-range-rows=1 + mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1 + tuple-ids=0 row-size=36B cardinality=1 + in pipelines: 00(GETNEXT) +---- DISTRIBUTEDPLAN +F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 +| Per-Host Resources: mem-estimate=76.00MB mem-reservation=12.03MB thread-reservation=2 +WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))] +| output exprs: ts, s, i, j +| mem-estimate=44B mem-reservation=0B thread-reservation=0 +| +01:SORT +| order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j +| materialized: year(functional_parquet.iceberg_partition_transforms_zorder.ts), iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) +| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0 +| tuple-ids=2 row-size=44B cardinality=1 +| in pipelines: 01(GETNEXT), 00(OPEN) +| +00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder, RANDOM] + HDFS partitions=1/1 files=1 size=1.08KB + Iceberg snapshot id: 1304838939768391952 + stored statistics: + table: rows=1 size=1.08KB + columns: unavailable + extrapolated-rows=disabled max-scan-range-rows=1 + mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1 + tuple-ids=0 row-size=36B cardinality=1 + in pipelines: 00(GETNEXT) +==== +# Verify that MAX_FS_WRITERS and the partitioning affect the distributed plan. +optimize table iceberg_v2_partitioned_position_deletes +---- PLAN +F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 +| Per-Host Resources: mem-estimate=128.00MB mem-reservation=12.05MB thread-reservation=3 +WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)] +| output exprs: id, user, action, event_time +| mem-estimate=440B mem-reservation=0B thread-reservation=0 +| +03:SORT +| order by: action ASC NULLS LAST +| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0 +| tuple-ids=4 row-size=44B cardinality=10 +| in pipelines: 03(GETNEXT), 00(OPEN) +| +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN] +| equality predicates: functional_parquet.iceberg_v2_partitioned_position_deletes.file__position = functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos, functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name = functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path +| mem-estimate=692B mem-reservation=692B thread-reservation=0 +| tuple-ids=0 row-size=64B cardinality=10 +| in pipelines: 00(GETNEXT), 01(OPEN) +| +|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] +| HDFS partitions=1/1 files=3 size=9.47KB +| Iceberg snapshot id: 8885697082976537578 +| stored statistics: +| table: rows=10 size=9.47KB +| columns: all +| extrapolated-rows=disabled max-scan-range-rows=3 +| mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1 +| tuple-ids=1 row-size=204B cardinality=10 +| in pipelines: 01(GETNEXT) +| +00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] + HDFS partitions=1/1 files=3 size=3.48KB + Iceberg snapshot id: 8885697082976537578 + stored statistics: + table: rows=20 size=12.95KB + columns missing stats: id, user, action, event_time + extrapolated-rows=disabled max-scan-range-rows=6 + mem-estimate=96.00MB mem-reservation=48.00KB thread-reservation=1 + tuple-ids=0 row-size=64B cardinality=20 + in pipelines: 00(GETNEXT) +---- DISTRIBUTEDPLAN +F02:PLAN FRAGMENT [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.action)] hosts=2 instances=2 +| Per-Host Resources: mem-estimate=12.02MB mem-reservation=12.00MB thread-reservation=1 +WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)] +| output exprs: id, user, action, event_time +| mem-estimate=220B mem-reservation=0B thread-reservation=0 +| +05:SORT +| order by: action ASC NULLS LAST +| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0 +| tuple-ids=4 row-size=44B cardinality=10 +| in pipelines: 05(GETNEXT), 00(OPEN) +| +04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.action)] +| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 +| tuple-ids=0 row-size=64B cardinality=10 +| in pipelines: 00(GETNEXT) +| +F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 +Per-Host Resources: mem-estimate=96.53MB mem-reservation=48.22KB thread-reservation=2 +02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] +| equality predicates: functional_parquet.iceberg_v2_partitioned_position_deletes.file__position = functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos, functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name = functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path +| mem-estimate=230B mem-reservation=230B thread-reservation=0 +| tuple-ids=0 row-size=64B cardinality=10 +| in pipelines: 00(GETNEXT), 01(OPEN) +| +|--03:EXCHANGE [DIRECTED] +| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 +| | tuple-ids=1 row-size=204B cardinality=10 +| | in pipelines: 01(GETNEXT) +| | +| F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 +| Per-Host Resources: mem-estimate=34.44MB mem-reservation=16.00KB thread-reservation=2 +| 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete, RANDOM] +| HDFS partitions=1/1 files=3 size=9.47KB +| Iceberg snapshot id: 8885697082976537578 +| stored statistics: +| table: rows=10 size=9.47KB +| columns: all +| extrapolated-rows=disabled max-scan-range-rows=3 +| mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1 +| tuple-ids=1 row-size=204B cardinality=10 +| in pipelines: 01(GETNEXT) +| +00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, RANDOM] + HDFS partitions=1/1 files=3 size=3.48KB + Iceberg snapshot id: 8885697082976537578 + stored statistics: + table: rows=20 size=12.95KB + columns missing stats: id, user, action, event_time + extrapolated-rows=disabled max-scan-range-rows=6 + mem-estimate=96.00MB mem-reservation=48.00KB thread-reservation=1 + tuple-ids=0 row-size=64B cardinality=20 + in pipelines: 00(GETNEXT) +==== + diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test index 7aa8965e3..df27a22f4 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test @@ -390,30 +390,3 @@ WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(b.`year`, runtime filters: RF000 -> b.id row-size=12B cardinality=7.30K ==== -# IMPALA-12293: Verify that OPTIMIZE respects the sorting properties. -optimize table functional_parquet.iceberg_partition_transforms_zorder ----- PLAN -WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=true, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))] -| -01:SORT -| order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j -| row-size=44B cardinality=1 -| -00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder] - HDFS partitions=1/1 files=1 size=1.08KB - Iceberg snapshot id: 7350750578864730166 - row-size=36B cardinality=1 ----- DISTRIBUTEDPLAN -WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=true, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))] -| -02:SORT -| order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j -| row-size=44B cardinality=1 -| -01:EXCHANGE [UNPARTITIONED] -| -00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder] - HDFS partitions=1/1 files=1 size=1.08KB - Iceberg snapshot id: 7350750578864730166 - row-size=36B cardinality=1 -==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test index 6d7cba6f2..7fca94d3d 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test @@ -765,16 +765,17 @@ optimize table non_iceberg_table; AnalysisException: OPTIMIZE is only supported for Iceberg tables. ==== ---- QUERY -optimize table iceberg_overwrite_bucket; +optimize table functional_parquet.iceberg_alltypes_part_orc; ---- CATCH -AnalysisException: The Iceberg table has multiple partition specs. This means the outcome of dynamic partition overwrite is unforeseeable. Consider using TRUNCATE then INSERT INTO from the previous snapshot to overwrite your table. +AnalysisException: Impala can only write Parquet data files, while table 'functional_parquet.iceberg_alltypes_part_orc' expects 'ORC' data files. ==== ---- QUERY CREATE TABLE ice_complex (id BIGINT NULL, int_array ARRAY<INT> NULL) STORED AS ICEBERG TBLPROPERTIES ('format-version'='2'); +# Empty table cannot be optimized. TODO: IMPALA-12839 Optimizing empty table should be no-op. optimize table ice_complex; ---- CATCH -AnalysisException: Unable to INSERT into target table ($DATABASE.ice_complex) because the column 'int_array' has a complex type 'ARRAY<INT>' and Impala doesn't support inserting into tables containing complex type columns +AnalysisException: Table '$DATABASE.ice_complex' is empty. ==== ---- QUERY # ICEBERG__DATA__SEQUENCE__NUMBER is not supported for non-Iceberg tables. @@ -796,7 +797,7 @@ AnalysisException: Unsupported update mode: 'copy-on-write' for Iceberg table: $ ---- QUERY update ice_complex set id = id + 1; ---- CATCH -AnalysisException: Impala does not support updating tables with complex types. Table '$DATABASE.ice_complex' has column 'int_array' with type: ARRAY<INT> +AnalysisException: Impala does not support writing tables with complex types. Table '$DATABASE.ice_complex' has column 'int_array' with type: ARRAY<INT> ==== ---- QUERY # Cannot update virtual columns diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test index 3912ce67b..4da54a8e4 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test @@ -1,14 +1,38 @@ ==== ---- QUERY -CREATE TABLE ice_optimize (i int, s string) +CREATE TABLE ice_optimize (int_col int, string_col string, bool_col boolean) STORED BY ICEBERG TBLPROPERTIES ('format-version'='2'); ==== ---- QUERY +# Insert a value then delete everything from the table. +INSERT INTO ice_optimize VALUES(1, 'one', true); +DELETE FROM ice_optimize WHERE bool_col=true; +SHOW FILES IN ice_optimize; +---- LABELS +Path,Size,Partition,EC Policy +---- RESULTS: VERIFY_IS_SUBSET +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/delete-.*parq','.*','','$ERASURECODE_POLICY' +---- TYPES +STRING, STRING, STRING, STRING +==== +---- QUERY +# Testing that OPTIMIZE merged the data and delete files and the result is an empty snapshot. +OPTIMIZE TABLE ice_optimize; +SHOW FILES IN ice_optimize; +---- LABELS +Path,Size,Partition,EC Policy +---- RESULTS: VERIFY_IS_EQUAL +---- TYPES +STRING, STRING, STRING, STRING +==== +---- QUERY # Insert rows one by one to write multiple small files. -INSERT INTO ice_optimize VALUES(1, 'one'); -INSERT INTO ice_optimize VALUES(2, 'two'); -INSERT INTO ice_optimize VALUES(3, 'three'); +INSERT INTO ice_optimize VALUES(1, 'one', true); +INSERT INTO ice_optimize VALUES(2, 'two', false); +INSERT INTO ice_optimize VALUES(2, 'two', true); +INSERT INTO ice_optimize VALUES(3, 'three', true); SHOW FILES IN ice_optimize; ---- LABELS Path,Size,Partition,EC Policy @@ -16,108 +40,168 @@ Path,Size,Partition,EC Policy row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' ---- TYPES STRING, STRING, STRING, STRING ==== ---- QUERY -# OPTIMIZE TABLE should create 1 data file. +# OPTIMIZE TABLE should create 1 data file per executor, resulting in a total of 3 files. OPTIMIZE TABLE ice_optimize; SHOW FILES IN ice_optimize; ---- LABELS Path,Size,Partition,EC Policy ---- RESULTS: VERIFY_IS_EQUAL row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' ---- TYPES STRING, STRING, STRING, STRING ==== ---- QUERY SELECT * FROM ice_optimize; ---- RESULTS -1,'one' -2,'two' -3,'three' +1,'one',true +2,'two',false +2,'two',true +3,'three',true ---- TYPES -INT,STRING +INT,STRING,BOOLEAN ==== ---- QUERY -DELETE FROM ice_optimize WHERE i = 2; +DELETE FROM ice_optimize WHERE int_col = 2; SHOW FILES IN ice_optimize; ---- LABELS Path,Size,Partition,EC Policy ---- RESULTS: VERIFY_IS_SUBSET row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/delete-.*parq','.*','','$ERASURECODE_POLICY' ---- TYPES STRING, STRING, STRING, STRING ==== ---- QUERY -# Checking that the delete file was merged and there is no delete file in the table. +# Checking that MAX_FS_WRITERS has an effect on the number of files written. +# Also checking that the delete file was merged and there is no delete file in the table. +SET MAX_FS_WRITERS=1; OPTIMIZE TABLE ice_optimize; +SET MAX_FS_WRITERS=0; SHOW FILES IN ice_optimize; ---- LABELS Path,Size,Partition,EC Policy ---- RESULTS: VERIFY_IS_EQUAL row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY' +---- RESULTS: VERIFY_IS_NOT_IN +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/delete-.*parq','.*','','$ERASURECODE_POLICY' ---- TYPES STRING, STRING, STRING, STRING ==== ---- QUERY SELECT * FROM ice_optimize; ---- RESULTS -1,'one' -3,'three' +1,'one',true +3,'three',true ---- TYPES -INT,STRING +INT,STRING,BOOLEAN ==== ---- QUERY # Schema evolution should work and return correct results according to the latest schema. -ALTER TABLE ice_optimize DROP COLUMN s; -ALTER TABLE ice_optimize ADD COLUMN b BOOLEAN; -INSERT INTO ice_optimize VALUES(4, true); +ALTER TABLE ice_optimize DROP COLUMN string_col; +ALTER TABLE ice_optimize ADD COLUMN date_col DATE; +ALTER TABLE ice_optimize ADD COLUMN double_col DOUBLE; +INSERT INTO ice_optimize VALUES((4, false, '2024-01-22', 4.444), (1, false, '2024-02-02', 1.101)); OPTIMIZE TABLE ice_optimize; SELECT * FROM ice_optimize; ---- RESULTS -1,NULL -3,NULL -4,true +1,true,NULL,NULL +3,true,NULL,NULL +4,false,2024-01-22,4.444 +1,false,2024-02-02,1.101 ---- TYPES -INT,BOOLEAN +INT,BOOLEAN,DATE,DOUBLE ==== ---- QUERY -CREATE TABLE ice_optimize_part -PARTITIONED BY(i int) -STORED BY ICEBERG -TBLPROPERTIES ('format-version'='1'); +# OPTIMIZE TABLE should use the latest partition spec and create 1 file per partition. +ALTER TABLE ice_optimize SET PARTITION SPEC(int_col); +INSERT INTO ice_optimize VALUES((2, false, '2024-01-22', 2.2), (3, false, '2024-02-22', 3.3)); +OPTIMIZE TABLE ice_optimize; +SHOW FILES IN ice_optimize; +---- LABELS +Path,Size,Partition,EC Policy +---- RESULTS: VERIFY_IS_EQUAL +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/int_col=1.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/int_col=2.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/int_col=3.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/int_col=4.*.0.parq','.*','','$ERASURECODE_POLICY' +---- TYPES +STRING, STRING, STRING, STRING +==== +---- QUERY +SELECT * FROM ice_optimize; +---- RESULTS +1,true,NULL,NULL +3,true,NULL,NULL +4,false,2024-01-22,4.444 +1,false,2024-02-02,1.101 +2,false,2024-01-22,2.2 +3,false,2024-02-22,3.3 +---- TYPES +INT,BOOLEAN,DATE,DOUBLE +==== +---- QUERY +# OPTIMIZE TABLE should handle partition evolution and merge update deltas as well. +ALTER TABLE ice_optimize SET PARTITION SPEC(month(date_col)); +UPDATE ice_optimize SET date_col='2023-01-07' WHERE bool_col=true; +OPTIMIZE TABLE ice_optimize; +SHOW FILES IN ice_optimize; +---- LABELS +Path,Size,Partition,EC Policy +---- RESULTS: VERIFY_IS_EQUAL +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/date_col_month=2023-01/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/date_col_month=2024-01/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/date_col_month=2024-02/.*.0.parq','.*','','$ERASURECODE_POLICY' +---- TYPES +STRING, STRING, STRING, STRING ==== ---- QUERY -# Insert values into each partition to write multiple small files in each. -INSERT INTO ice_optimize_part VALUES(1), (2), (3); -INSERT INTO ice_optimize_part VALUES(2), (3); -INSERT INTO ice_optimize_part VALUES(1), (3); -SHOW FILES IN ice_optimize_part; +ALTER TABLE ice_optimize SET PARTITION SPEC(bucket(2,int_col)); +OPTIMIZE TABLE ice_optimize; +SHOW FILES IN ice_optimize; ---- LABELS Path,Size,Partition,EC Policy ---- RESULTS: VERIFY_IS_EQUAL -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=1/.*.0.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=1/.*.0.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=2/.*.0.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=2/.*.0.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/int_col_bucket_2=0/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/int_col_bucket_2=1/.*.0.parq','.*','','$ERASURECODE_POLICY' ---- TYPES STRING, STRING, STRING, STRING ==== ---- QUERY -# OPTIMIZE TABLE should create 1 data file per partition. -OPTIMIZE TABLE ice_optimize_part; -SHOW FILES IN ice_optimize_part; +ALTER TABLE ice_optimize ADD COLUMN string_col STRING; +INSERT INTO ice_optimize VALUES((10, true, '2024-01-01', 10.1010, 'impala'), (11, true, '2024-01-01', 11.11, 'iceberg')); +ALTER TABLE ice_optimize SET PARTITION SPEC(truncate(2, string_col)); +OPTIMIZE TABLE ice_optimize; +SHOW FILES IN ice_optimize; ---- LABELS Path,Size,Partition,EC Policy ---- RESULTS: VERIFY_IS_EQUAL -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=1/.*.0.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=2/.*.0.parq','.*','','$ERASURECODE_POLICY' -row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/string_col_trunc_2=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/string_col_trunc_2=ic/.*.0.parq','.*','','$ERASURECODE_POLICY' +row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/string_col_trunc_2=im/.*.0.parq','.*','','$ERASURECODE_POLICY' ---- TYPES STRING, STRING, STRING, STRING +==== +---- QUERY +# Make sure that the table content is not modified by OPTIMIZE TABLE. +SELECT * FROM ice_optimize; +---- RESULTS +1,true,2023-01-07,NULL,'NULL' +3,true,2023-01-07,NULL,'NULL' +4,false,2024-01-22,4.444,'NULL' +1,false,2024-02-02,1.101,'NULL' +2,false,2024-01-22,2.2,'NULL' +3,false,2024-02-22,3.3,'NULL' +10,true,2024-01-01,10.1010,'impala' +11,true,2024-01-01,11.11,'iceberg' +---- TYPES +INT,BOOLEAN,DATE,DOUBLE,STRING ==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test index f36db71a6..626cfd7a3 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test +++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test @@ -769,7 +769,7 @@ AuthorizationException: User '$USER' does not have privileges to execute 'ALTER' # Optimize statement on masked tables should be blocked, because reading and inserting masked data would result in data loss. optimize table functional_parquet.iceberg_partitioned ---- CATCH -AuthorizationException: User '$USER' does not have privileges to execute 'INSERT' on: functional_parquet.iceberg_partitioned +AuthorizationException: User '$USER' does not have privileges to access: functional_parquet.iceberg_partitioned ==== ---- QUERY # Deletes on masked tables should be blocked. diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index ed34262a6..fec75fe92 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1599,18 +1599,19 @@ class TestIcebergV2Table(IcebergTestSuite): def _test_update_basic_snapshots(self, db): """Verifies that the tables have the expected number of snapshots, and the parent ids match the previous snapshot ids. See IMPALA-12708.""" - def validate_snapshots(tbl, expected_snapshots): - tbl_name = "{}.{}".format(db, tbl) - snapshots = get_snapshots(self.client, tbl_name, - expected_result_size=expected_snapshots) - parent_id = None - for s in snapshots: - assert s.get_parent_id() == parent_id - parent_id = s.get_snapshot_id() - - validate_snapshots("single_col", 3) - validate_snapshots("ice_alltypes", 17) - validate_snapshots("ice_id_partitioned", 4) + + self.validate_snapshots(db, "single_col", 3) + self.validate_snapshots(db, "ice_alltypes", 17) + self.validate_snapshots(db, "ice_id_partitioned", 4) + + def validate_snapshots(self, db, tbl, expected_snapshots): + tbl_name = "{}.{}".format(db, tbl) + snapshots = get_snapshots(self.client, tbl_name, + expected_result_size=expected_snapshots) + parent_id = None + for s in snapshots: + assert s.get_parent_id() == parent_id + parent_id = s.get_snapshot_id() def _update_basic_hive_tests(self, db): def get_hive_results(tbl, order_by_col): @@ -1747,36 +1748,23 @@ class TestIcebergV2Table(IcebergTestSuite): "3,true,3,11,1.1,2.222,123.321,2022-05-22,impala"] def test_optimize(self, vector, unique_database): - tbl_name = unique_database + ".optimize_iceberg" - self.execute_query("""create table {0} (i int) - stored as iceberg""".format(tbl_name)) - self.execute_query("insert into {0} values (1);".format(tbl_name)) - self.execute_query("insert into {0} values (2);".format(tbl_name)) - self.execute_query("insert into {0} values (8);".format(tbl_name)) - result_before_opt = self.execute_query("SELECT * FROM {}".format(tbl_name)) - snapshots = get_snapshots(self.client, tbl_name, expected_result_size=3) - snapshot_before = snapshots[2] - - # Check that a new snapshot is created after Iceberg table optimization. - self.execute_query("optimize table {0};".format(tbl_name)) - snapshots = get_snapshots(self.client, tbl_name, expected_result_size=4) - snapshot_after = snapshots[3] - assert(snapshot_before.get_creation_time() < snapshot_after.get_creation_time()) - # Check that the last snapshot's parent ID is the snapshot ID before 'OPTIMIZE TABLE'. - assert(snapshot_before.get_snapshot_id() == snapshot_after.get_parent_id()) + self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database) + expected_snapshots = 19 + self.validate_snapshots(unique_database, "ice_optimize", expected_snapshots) - result_after_opt = self.execute_query("SELECT * FROM {0}".format(tbl_name)) - # Check that we get the same result from the table before and after 'OPTIMIZE TABLE'. - assert result_after_opt.data.sort() == result_before_opt.data.sort() + # The last operation was an OPTIMIZE TABLE statement. + # Check that time travel to the previous snapshot returns all results correctly. + tbl_name = unique_database + ".ice_optimize" + snapshots = get_snapshots( + self.client, tbl_name, expected_result_size=expected_snapshots) + snapshot_before_last = snapshots[-2] + result_after_opt = self.execute_query("SELECT * FROM {0}".format(tbl_name)) result_time_travel = self.execute_query( "select * from {0} for system_version as of {1};".format( - tbl_name, snapshot_before.get_snapshot_id())) - # Check that time travel to the previous snapshot returns all results correctly. + tbl_name, snapshot_before_last.get_snapshot_id())) assert result_after_opt.data.sort() == result_time_travel.data.sort() - self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database) - # Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most # of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it
