This is an automated email from the ASF dual-hosted git repository. asherman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 66efe50d15f63696debd1e19a53482e5b0f013ab Author: Andrew Sherman <[email protected]> AuthorDate: Mon Dec 19 16:09:29 2022 -0800 IMPALA-10893: Use old schema during iceberg time travel. Before this change the schema used during Iceberg Time Travel was the current schema of the table. With this change we will use the schema from the point specified by the Time Travel parameters. The parameters used by an Iceberg Time Travel query are part of the FROM clause of the query. Previously analysis of the Time Travel parameters took place after the table Path was resolved, at which point some schema information is cached. In order to use the old schema during iceberg time travel however we need to ensure that the version of the Table that is used is always the version specified by the Time Travel parameters. To do this we have to move the analysis of the Time Travel parameters inside the code that resolves the Path. Add a new implementation of FeIcebergTable that represents an Iceberg table involved in Time Travel. This is implemented by embedding a reference to the base Iceberg Table. All methods that are not Time Travel related are delegated to the base table. The Time Travel related methods use the historic Iceberg schema. TESTING: - Add a new file iceberg_util.py to hold the snapshot utility code that was developed for the in-progress IMPALA-11482. - Extend the existing Iceberg Time Travel tests to check the schema. - Add a test that shows time travel working with columns masking. The column masking configuration is not tightly coupled to the schema so it is possible to mask historical columns. Change-Id: I7cbef6e20bbb567e517744fb1f34d880970399ab Reviewed-on: http://gerrit.cloudera.org:8080/19380 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../java/org/apache/impala/analysis/Analyzer.java | 45 +- .../org/apache/impala/analysis/BaseTableRef.java | 4 +- .../apache/impala/analysis/DescriptorTable.java | 17 +- .../java/org/apache/impala/analysis/TableRef.java | 17 +- .../org/apache/impala/analysis/TimeTravelSpec.java | 21 +- .../org/apache/impala/catalog/FeIcebergTable.java | 2 + .../impala/catalog/IcebergPositionDeleteTable.java | 7 + .../org/apache/impala/catalog/IcebergTable.java | 6 +- .../impala/catalog/IcebergTimeTravelTable.java | 593 +++++++++++++++++++++ .../impala/catalog/iceberg/IcebergCtasTarget.java | 6 + .../impala/catalog/local/LocalIcebergTable.java | 5 + .../apache/impala/analysis/AnalyzeStmtsTest.java | 3 +- tests/authorization/test_ranger.py | 116 ++++ tests/common/iceberg_test_suite.py | 40 +- tests/query_test/test_iceberg.py | 147 +++-- tests/util/iceberg_util.py | 107 ++++ 16 files changed, 1037 insertions(+), 99 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 9328b5172..44d4dcb4c 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -51,10 +51,12 @@ import org.apache.impala.catalog.FeDataSourceTable; import org.apache.impala.catalog.FeDb; import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeHBaseTable; +import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeIncompleteTable; import org.apache.impala.catalog.FeKuduTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeView; +import org.apache.impala.catalog.IcebergTimeTravelTable; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.MaterializedViewHdfsTable; import org.apache.impala.catalog.StructField; @@ -851,7 +853,8 @@ public class Analyzer { List<String> rawPath = tableRef.getPath(); Path resolvedPath = null; try { - resolvedPath = resolvePathWithMasking(rawPath, PathType.TABLE_REF); + resolvedPath = + resolvePathWithMasking(rawPath, PathType.TABLE_REF, tableRef.timeTravelSpec_); } catch (AnalysisException e) { // Register privilege requests to prefer reporting an authorization error over // an analysis error. We should not accidentally reveal the non-existence of a @@ -1171,7 +1174,12 @@ public class Analyzer { */ public Path resolvePathWithMasking(List<String> rawPath, PathType pathType) throws AnalysisException, TableLoadingException { - Path resolvedPath = resolvePath(rawPath, pathType); + return this.resolvePathWithMasking(rawPath, pathType, null); + } + + public Path resolvePathWithMasking(List<String> rawPath, PathType pathType, + TimeTravelSpec timeTravelSpec) throws AnalysisException, TableLoadingException { + Path resolvedPath = resolvePath(rawPath, pathType, timeTravelSpec); // Skip normal resolution cases that don't relate to nested types. if (pathType == PathType.TABLE_REF) { if (resolvedPath.destTable() != null || !resolvedPath.isRootedAtTuple()) { @@ -1209,7 +1217,7 @@ public class Analyzer { tableMaskingView.getUnMaskedTableRef() instanceof BaseTableRef); // Resolve rawPath inside the table masking view to point to the real table. Path maskedPath = tableMaskingView.inlineViewAnalyzer_.resolvePath( - rawPath, pathType); + rawPath, pathType, timeTravelSpec); maskedPath.setPathBeforeMasking(resolvedPath); return maskedPath; } @@ -1239,6 +1247,11 @@ public class Analyzer { */ public Path resolvePath(List<String> rawPath, PathType pathType) throws AnalysisException, TableLoadingException { + return this.resolvePath(rawPath, pathType, null); + } + + public Path resolvePath(List<String> rawPath, PathType pathType, + TimeTravelSpec timeTravelSpec) throws AnalysisException, TableLoadingException { // We only allow correlated references in predicates of a subquery. boolean resolveInAncestors = false; if (pathType == PathType.TABLE_REF || pathType == PathType.ANY) { @@ -1249,11 +1262,12 @@ public class Analyzer { // Convert all path elements to lower case. List<String> lcRawPath = Lists.newArrayListWithCapacity(rawPath.size()); for (String s: rawPath) lcRawPath.add(s.toLowerCase()); - return resolvePath(lcRawPath, pathType, resolveInAncestors); + return resolvePath(lcRawPath, pathType, resolveInAncestors, timeTravelSpec); } private Path resolvePath(List<String> rawPath, PathType pathType, - boolean resolveInAncestors) throws AnalysisException, TableLoadingException { + boolean resolveInAncestors, TimeTravelSpec timeTravelSpec) + throws AnalysisException, TableLoadingException { // List of all candidate paths with different roots. Paths in this list are initially // unresolved and may be illegal with respect to the pathType. List<Path> candidates = getTupleDescPaths(rawPath); @@ -1287,6 +1301,23 @@ public class Analyzer { // Ignore to allow path resolution to continue. } if (tbl != null) { + if (timeTravelSpec != null) { + if (!(tbl instanceof FeIcebergTable)) { + throw new AnalysisException(String.format( + "FOR %s AS OF clause is only supported for Iceberg tables. " + + "%s is not an Iceberg table.", + timeTravelSpec.getKind() == TimeTravelSpec.Kind.TIME_AS_OF ? + "SYSTEM_TIME" : + "SYSTEM_VERSION", + tbl.getFullName())); + } + timeTravelSpec.analyze(this); + + FeIcebergTable rootTable = (FeIcebergTable) tbl; + IcebergTimeTravelTable timeTravelTable = + new IcebergTimeTravelTable(rootTable, timeTravelSpec); + tbl = timeTravelTable; + } candidates.add(new Path(tbl, rawPath.subList(tblNameIdx + 1, rawPath.size()))); } } @@ -1296,7 +1327,7 @@ public class Analyzer { Path result = resolvePaths(rawPath, candidates, pathType, errors); if (result == null && resolveInAncestors && hasAncestors()) { LOG.trace("Resolve in ancestors"); - result = getParentAnalyzer().resolvePath(rawPath, pathType, true); + result = getParentAnalyzer().resolvePath(rawPath, pathType, true, timeTravelSpec); } if (result == null) { Preconditions.checkState(!errors.isEmpty()); @@ -1336,7 +1367,7 @@ public class Analyzer { /** * Resolves the given paths and checks them for legality and ambiguity. Returns the * single legal path resolution if it exists, null otherwise. - * Populates 'errors' with a a prioritized list of error messages starting with the + * Populates 'errors' with a prioritized list of error messages starting with the * most relevant one. The list contains at least one error message if null is returned. */ private Path resolvePaths(List<String> rawPath, List<Path> paths, PathType pathType, diff --git a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java index 2c76aa57c..327d2c607 100644 --- a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java @@ -62,11 +62,11 @@ public class BaseTableRef extends TableRef { if (isAnalyzed_) return; analyzer.registerAuthAndAuditEvent(resolvedPath_.getRootTable(), priv_, requireGrantOption_); + analyzeTimeTravel(analyzer); desc_ = analyzer.registerTableRef(this); isAnalyzed_ = true; - analyzer.checkTableCapability(getTable(), Analyzer.OperationType.ANY); + Analyzer.checkTableCapability(getTable(), Analyzer.OperationType.ANY); analyzeTableSample(analyzer); - analyzeTimeTravel(analyzer); analyzeHints(analyzer); analyzeJoin(analyzer); analyzeSkipHeaderLineCount(); diff --git a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java index 399eddac5..95ba0d084 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java +++ b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.impala.catalog.ArrayType; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeView; +import org.apache.impala.catalog.IcebergTimeTravelTable; import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Type; @@ -193,7 +194,7 @@ public class DescriptorTable { // Verify table level consistency in the same query by checking that references to // the same Table refer to the same table instance. FeTable checkTable = referencedTables.get(tblName); - Preconditions.checkState(checkTable == null || table == checkTable); + Preconditions.checkState(checkTable == null || isSameTableRef(table, checkTable)); if (tableId == null) { tableId = nextTableId_++; tableIdMap.put(table, tableId); @@ -220,6 +221,20 @@ public class DescriptorTable { return result; } + /** + * @return true if the two tables are the same. + * For Iceberg Time Travel tables we compare the underlying base table. + */ + private boolean isSameTableRef(FeTable first, FeTable second) { + if (first instanceof IcebergTimeTravelTable) { + first = ((IcebergTimeTravelTable) first).getBase(); + } + if (second instanceof IcebergTimeTravelTable) { + second = ((IcebergTimeTravelTable) second).getBase(); + } + return first == second; + } + public TDescriptorTableSerialized toSerializedThrift() throws ImpalaException { TDescriptorTableSerialized result = new TDescriptorTableSerialized(); TDescriptorTable desc_tbl = toThrift(); diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java index 64c1f7050..d94008795 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.impala.analysis.TimeTravelSpec.Kind; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.FeFsTable; @@ -275,7 +274,7 @@ public class TableRef extends StmtNode { } /** - * Creates and returns a empty TupleDescriptor registered with the analyzer + * Creates and returns an empty TupleDescriptor registered with the analyzer * based on the resolvedPath_. * This method is called from the analyzer when registering this table reference. */ @@ -482,15 +481,9 @@ public class TableRef extends StmtNode { } protected void analyzeTimeTravel(Analyzer analyzer) throws AnalysisException { + // We are analyzing the time travel spec before we know the table type, so we + // cannot check if the table supports time travel. if (timeTravelSpec_ != null) { - if (!(getTable() instanceof FeIcebergTable)) { - throw new AnalysisException(String.format( - "FOR %s AS OF clause is only supported for Iceberg tables. " + - "%s is not an Iceberg table.", - timeTravelSpec_.getKind() == Kind.TIME_AS_OF ? "SYSTEM_TIME" : - "SYSTEM_VERSION", - getTable().getFullName())); - } timeTravelSpec_.analyze(analyzer); } } @@ -860,11 +853,11 @@ public class TableRef extends StmtNode { other.timeTravelSpec_ = timeTravelSpec_; // Clear properties. Don't clear aliases_ since it's still used in resolving slots // in the query block of 'other'. + // Don't clear timeTravelSpec_ as it is still relevant. onClause_ = null; usingColNames_ = null; joinOp_ = null; joinHints_ = new ArrayList<>(); tableHints_ = new ArrayList<>(); - timeTravelSpec_ = null; } -} +} \ No newline at end of file diff --git a/fe/src/main/java/org/apache/impala/analysis/TimeTravelSpec.java b/fe/src/main/java/org/apache/impala/analysis/TimeTravelSpec.java index e89e9c53a..08e4ae05f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TimeTravelSpec.java +++ b/fe/src/main/java/org/apache/impala/analysis/TimeTravelSpec.java @@ -55,6 +55,9 @@ public class TimeTravelSpec extends StmtNode { // A time string represents the asOfMicros_ for the query option TIMEZONE private String timeString_; + // Flag to show that analysis has been done + private boolean analyzed_; + public Kind getKind() { return kind_; } public long getAsOfVersion() { return asOfVersion_; } @@ -82,15 +85,30 @@ public class TimeTravelSpec extends StmtNode { @Override public void analyze(Analyzer analyzer) throws AnalysisException { + if (analyzed_) return; switch (kind_) { case TIME_AS_OF: analyzeTimeBased(analyzer); break; case VERSION_AS_OF: analyzeVersionBased(analyzer); break; } + analyzed_ = true; } private void analyzeTimeBased(Analyzer analyzer) throws AnalysisException { Preconditions.checkNotNull(asOfExpr_); - asOfExpr_.analyze(analyzer); + try { + asOfExpr_.analyze(analyzer); + } catch (AnalysisException e) { + if (e.getMessage().contains("Could not resolve column/field reference")) { + // If the AS_OF expr is not a simple constant it will need table information + // that is not yet available as the analysis of the table is not yet + // complete. If this happens we know it is not a constant expr, so construct + // a better error message. + throw new AnalysisException( + "FOR SYSTEM_TIME AS OF <expression> must be a constant expression: " + + toSql()); + } + throw e; + } if (!asOfExpr_.isConstant()) { throw new AnalysisException( "FOR SYSTEM_TIME AS OF <expression> must be a constant expression: " + toSql()); @@ -118,7 +136,6 @@ public class TimeTravelSpec extends StmtNode { throw new AnalysisException( "Invalid TIMESTAMP expression: " + ie.getMessage(), ie); } - } private void analyzeVersionBased(Analyzer analyzer) throws AnalysisException { diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java index fb1489640..21b79f285 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java @@ -300,6 +300,8 @@ public interface FeIcebergTable extends FeFsTable { return false; } + THdfsTable transfromToTHdfsTable(boolean updatePartitionFlag); + /** * Returns the current snapshot id of the Iceberg API table if it exists, otherwise * returns -1. diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java index 89f1da79d..d5929ab36 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java @@ -27,6 +27,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.analysis.IcebergPartitionSpec; import org.apache.impala.thrift.TColumnStats; import org.apache.impala.thrift.TCompressionCodec; +import org.apache.impala.thrift.THdfsTable; import org.apache.impala.thrift.TIcebergCatalog; import org.apache.impala.thrift.TIcebergFileFormat; import org.apache.impala.thrift.TIcebergPartitionStats; @@ -176,4 +177,10 @@ public class IcebergPositionDeleteTable extends VirtualTable implements FeIceber public int getDefaultPartitionSpecId() { return -1; } + + @Override + public THdfsTable transfromToTHdfsTable(boolean updatePartitionFlag) { + throw new IllegalStateException("not implemented here"); + } + } diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index a7333d342..77e7b49c0 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -136,10 +136,10 @@ public class IcebergTable extends Table implements FeIcebergTable { public static final int V2_FILE_PATH_FIELD_ID = 2147483546; public static final int V2_POS_FIELD_ID = 2147483545; - // Iceberg catalog type dependend on table properties + // Iceberg catalog type dependent on table properties private TIcebergCatalog icebergCatalog_; - // Iceberg file format dependend on table properties + // Iceberg file format dependent on table properties private TIcebergFileFormat icebergFileFormat_; // Iceberg parquet compression codec dependent on table properties @@ -500,7 +500,7 @@ public class IcebergTable extends Table implements FeIcebergTable { return desc; } - private THdfsTable transfromToTHdfsTable(boolean updatePartitionFlag) { + public THdfsTable transfromToTHdfsTable(boolean updatePartitionFlag) { THdfsTable hdfsTable = hdfsTable_.getTHdfsTable(ThriftObjectType.FULL, null); if (updatePartitionFlag) { // Iceberg table only has one THdfsPartition, we set this partition diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTimeTravelTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTimeTravelTable.java new file mode 100644 index 000000000..6b7967858 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTimeTravelTable.java @@ -0,0 +1,593 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import static org.apache.impala.analysis.TimeTravelSpec.Kind.TIME_AS_OF; +import static org.apache.impala.analysis.TimeTravelSpec.Kind.VERSION_AS_OF; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.iceberg.Schema; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.impala.analysis.IcebergPartitionSpec; +import org.apache.impala.analysis.LiteralExpr; +import org.apache.impala.analysis.TableName; +import org.apache.impala.analysis.TimeTravelSpec; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.FileSystemUtil; +import org.apache.impala.thrift.TCatalogObjectType; +import org.apache.impala.thrift.TColumnDescriptor; +import org.apache.impala.thrift.TCompressionCodec; +import org.apache.impala.thrift.THdfsTable; +import org.apache.impala.thrift.TIcebergCatalog; +import org.apache.impala.thrift.TIcebergFileFormat; +import org.apache.impala.thrift.TIcebergPartitionStats; +import org.apache.impala.thrift.TImpalaTableType; +import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.thrift.TResultSet; +import org.apache.impala.thrift.TSortingOrder; +import org.apache.impala.thrift.TTableDescriptor; +import org.apache.impala.thrift.TTableStats; +import org.apache.impala.thrift.TTableType; +import org.apache.impala.util.IcebergSchemaConverter; +import org.apache.impala.util.ListMap; +import org.apache.thrift.TException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * Represents an Iceberg Table involved in Time Travel. + * This is implemented by embedding a reference to the base Iceberg Table. + * All methods that are not Time Travel delegated are delegated to the base table. + */ +public class IcebergTimeTravelTable + extends ForwardingFeIcebergTable implements FeIcebergTable { + // The base table to which non-Time Travel related methods are delegated + private final FeIcebergTable base_; + + // The Time Travel parameters that control the schema for the table. + private final TimeTravelSpec timeTravelSpec_; + + // colsByPos[i] refers to the ith column in the table. The first numClusteringCols are + // the clustering columns. + protected final ArrayList<Column> colsByPos_ = new ArrayList<>(); + + // map from lowercase column name to Column object. + protected final Map<String, Column> colsByName_ = new HashMap<>(); + + // Type of this table (array of struct) that mirrors the columns. Useful for analysis. + protected final ArrayType type_ = new ArrayType(new StructType()); + + public IcebergTimeTravelTable(FeIcebergTable base, TimeTravelSpec timeTravelSpec) + throws AnalysisException { + super(base); + base_ = base; + timeTravelSpec_ = timeTravelSpec; + this.readSchema(); + } + + public FeIcebergTable getBase() { return base_; } + + /** + * Initialize the columns from the schema corresponding to the time travel + * specification. + */ + private void readSchema() throws AnalysisException { + org.apache.iceberg.Table icebergApiTable = getIcebergApiTable(); + Schema icebergSchema; + if (timeTravelSpec_.getKind() == VERSION_AS_OF) { + long snapshotId = timeTravelSpec_.getAsOfVersion(); + icebergSchema = SnapshotUtil.schemaFor(icebergApiTable, snapshotId, null); + } else { + Preconditions.checkState(timeTravelSpec_.getKind() == TIME_AS_OF); + long timestampMillis = timeTravelSpec_.getAsOfMillis(); + try { + icebergSchema = SnapshotUtil.schemaFor(icebergApiTable, null, timestampMillis); + } catch (IllegalArgumentException e) { + // Use time with local TZ in exception so that it's clearer. + throw new IllegalArgumentException( + "Cannot find a snapshot older than " + timeTravelSpec_.toTimeString()); + } + } + try { + for (Column col : IcebergSchemaConverter.convertToImpalaSchema(icebergSchema)) { + addColumn(col); + } + } catch (TableLoadingException e) { + throw new AnalysisException("Could not create iceberg schema.", e); + } + } + + @Override + public List<Column> getColumnsInHiveOrder() { + Preconditions.checkState(base_.getNumClusteringCols() == 0); + return colsByPos_; + } + + @Override + public List<String> getColumnNames() { + return Column.toColumnNames(colsByPos_); + } + + @Override + public List<Column> getColumns() { + return colsByPos_; + } + + @Override + public List<Column> getClusteringColumns() { + return Collections.emptyList(); + } + + @Override + public Column getColumn(String name) { + return colsByName_.get(name.toLowerCase()); + } + + @Override // FeTable + public List<Column> getNonClusteringColumns() { + return colsByPos_; + } + + @Override + public TTableDescriptor toThriftDescriptor( + int tableId, Set<Long> referencedPartitions) { + TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.ICEBERG_TABLE, + getTColumnDescriptors(), 0, getName(), getDb().getName()); + desc.setIcebergTable(Utils.getTIcebergTable(this)); + desc.setHdfsTable(transfromToTHdfsTable(false)); + return desc; + } + + /** + * Returns a list of thrift column descriptors ordered by position. + */ + public List<TColumnDescriptor> getTColumnDescriptors() { + return FeCatalogUtils.getTColumnDescriptors(this); + } + + public ArrayType getType() { return type_; } + + public void addColumn(Column col) { + Preconditions.checkState(col instanceof IcebergColumn); + IcebergColumn iCol = (IcebergColumn) col; + colsByPos_.add(iCol); + colsByName_.put(iCol.getName().toLowerCase(), col); + + ((StructType) type_.getItemType()) + .addField(new IcebergStructField( + col.getName(), col.getType(), col.getComment(), iCol.getFieldId())); + } + + @Override + public THdfsTable transfromToTHdfsTable(boolean updatePartitionFlag) { + return base_.transfromToTHdfsTable(updatePartitionFlag); + } +} + +/** + * A forwarding class for FeIcebergTable. + * This is just boilerplate code that delegates all methods to base. + * See Effective Java: "Favor composition over inheritance." + */ +class ForwardingFeIcebergTable implements FeIcebergTable { + private final FeIcebergTable base; + + public ForwardingFeIcebergTable(FeIcebergTable base) { this.base = base; } + + @Override + public FileSystem getFileSystem() throws CatalogException { + return base.getFileSystem(); + } + + public static FileSystem getFileSystem(Path filePath) throws CatalogException { + return FeFsTable.getFileSystem(filePath); + } + + @Override + public List<String> getPrimaryKeyColumnNames() throws TException { + return base.getPrimaryKeyColumnNames(); + } + + @Override + public boolean isPartitioned() { + return base.isPartitioned(); + } + + @Override + public List<String> getForeignKeysSql() throws TException { + return base.getForeignKeysSql(); + } + + @Override + public int parseSkipHeaderLineCount(StringBuilder error) { + return base.parseSkipHeaderLineCount(error); + } + + @Override + public int getSortByColumnIndex(String col_name) { + return base.getSortByColumnIndex(col_name); + } + + @Override + public boolean isLeadingSortByColumn(String col_name) { + return base.isLeadingSortByColumn(col_name); + } + + @Override + public boolean isSortByColumn(String col_name) { + return base.isSortByColumn(col_name); + } + + @Override + public TSortingOrder getSortOrderForSortByColumn() { + return base.getSortOrderForSortByColumn(); + } + + @Override + public boolean IsLexicalSortByColumn() { + return base.IsLexicalSortByColumn(); + } + + @Override + public IcebergContentFileStore getContentFileStore() { + return base.getContentFileStore(); + } + + @Override + public Map<String, TIcebergPartitionStats> getIcebergPartitionStats() { + return base.getIcebergPartitionStats(); + } + + @Override + public FeFsTable getFeFsTable() { + return base.getFeFsTable(); + } + + @Override + public TIcebergCatalog getIcebergCatalog() { + return base.getIcebergCatalog(); + } + + @Override + public org.apache.iceberg.Table getIcebergApiTable() { + return base.getIcebergApiTable(); + } + + @Override + public String getIcebergCatalogLocation() { + return base.getIcebergCatalogLocation(); + } + + @Override + public TIcebergFileFormat getIcebergFileFormat() { + return base.getIcebergFileFormat(); + } + + @Override + public TCompressionCodec getIcebergParquetCompressionCodec() { + return base.getIcebergParquetCompressionCodec(); + } + + @Override + public long getIcebergParquetRowGroupSize() { + return base.getIcebergParquetRowGroupSize(); + } + + @Override + public long getIcebergParquetPlainPageSize() { + return base.getIcebergParquetPlainPageSize(); + } + + @Override + public long getIcebergParquetDictPageSize() { + return base.getIcebergParquetDictPageSize(); + } + + @Override + public String getIcebergTableLocation() { + return base.getIcebergTableLocation(); + } + + @Override + public List<IcebergPartitionSpec> getPartitionSpecs() { + return base.getPartitionSpecs(); + } + + @Override + public IcebergPartitionSpec getDefaultPartitionSpec() { + return base.getDefaultPartitionSpec(); + } + + @Override + public int getDefaultPartitionSpecId() { + return base.getDefaultPartitionSpecId(); + } + + @Override + public Schema getIcebergSchema() { + return base.getIcebergSchema(); + } + + @Override + public boolean isCacheable() { + return base.isCacheable(); + } + + @Override + public boolean isLocationCacheable() { + return base.isLocationCacheable(); + } + + @Override + public boolean isMarkedCached() { + return base.isMarkedCached(); + } + + @Override + public String getLocation() { + return base.getLocation(); + } + + @Override + public String getNullPartitionKeyValue() { + return base.getNullPartitionKeyValue(); + } + + @Override + public String getHdfsBaseDir() { + return base.getHdfsBaseDir(); + } + + @Override + public FileSystemUtil.FsType getFsType() { + return base.getFsType(); + } + + @Override + public long getTotalHdfsBytes() { + return base.getTotalHdfsBytes(); + } + + @Override + public boolean usesAvroSchemaOverride() { + return base.usesAvroSchemaOverride(); + } + + @Override + public Set<HdfsFileFormat> getFileFormats() { + return base.getFileFormats(); + } + + @Override + public boolean hasWriteAccessToBaseDir() { + return base.hasWriteAccessToBaseDir(); + } + + @Override + public String getFirstLocationWithoutWriteAccess() { + return base.getFirstLocationWithoutWriteAccess(); + } + + @Override + public TResultSet getTableStats() { + return base.getTableStats(); + } + + @Override + public Collection<? extends PrunablePartition> getPartitions() { + return base.getPartitions(); + } + + @Override + public Set<Long> getPartitionIds() { + return base.getPartitionIds(); + } + + @Override + public Map<Long, ? extends PrunablePartition> getPartitionMap() { + return base.getPartitionMap(); + } + + @Override + public TreeMap<LiteralExpr, Set<Long>> getPartitionValueMap(int col) { + return base.getPartitionValueMap(col); + } + + @Override + public Set<Long> getNullPartitionIds(int colIdx) { + return base.getNullPartitionIds(colIdx); + } + + @Override + public List<? extends FeFsPartition> loadPartitions(Collection<Long> ids) { + return base.loadPartitions(ids); + } + + @Override + public SqlConstraints getSqlConstraints() { + return base.getSqlConstraints(); + } + + @Override + public ListMap<TNetworkAddress> getHostIndex() { + return base.getHostIndex(); + } + + @Override + public boolean isComputedPartitionColumn(Column col) { + return base.isComputedPartitionColumn(col); + } + + @Override + public THdfsTable transfromToTHdfsTable(boolean updatePartitionFlag) { + return base.transfromToTHdfsTable(updatePartitionFlag); + } + + @Override + public long snapshotId() { + return base.snapshotId(); + } + + @Override + public void setIcebergTableStats() { + FeIcebergTable.super.setIcebergTableStats(); + } + + @Override + public boolean isLoaded() { + return base.isLoaded(); + } + + @Override + public Table getMetaStoreTable() { + return base.getMetaStoreTable(); + } + + @Override + public String getStorageHandlerClassName() { + return base.getStorageHandlerClassName(); + } + + @Override + public TCatalogObjectType getCatalogObjectType() { + return base.getCatalogObjectType(); + } + + @Override + public String getName() { + return base.getName(); + } + + @Override + public String getFullName() { + return base.getFullName(); + } + + @Override + public TableName getTableName() { + return base.getTableName(); + } + + @Override + public TImpalaTableType getTableType() { + return base.getTableType(); + } + + @Override + public String getTableComment() { + return base.getTableComment(); + } + + @Override + public List<Column> getColumns() { + return base.getColumns(); + } + + @Override + public List<VirtualColumn> getVirtualColumns() { + return base.getVirtualColumns(); + } + + @Override + public List<Column> getColumnsInHiveOrder() { + return base.getColumnsInHiveOrder(); + } + + @Override + public List<String> getColumnNames() { + return base.getColumnNames(); + } + + @Override + public List<Column> getClusteringColumns() { + return base.getClusteringColumns(); + } + + @Override + public List<Column> getNonClusteringColumns() { + return base.getNonClusteringColumns(); + } + + @Override + public int getNumClusteringCols() { + return base.getNumClusteringCols(); + } + + @Override + public boolean isClusteringColumn(Column c) { + return base.isClusteringColumn(c); + } + + @Override + public Column getColumn(String name) { + return base.getColumn(name); + } + + @Override + public ArrayType getType() { + return base.getType(); + } + + @Override + public FeDb getDb() { + return base.getDb(); + } + + @Override + public long getNumRows() { + return base.getNumRows(); + } + + @Override + public TTableStats getTTableStats() { + return base.getTTableStats(); + } + + @Override + public TTableDescriptor toThriftDescriptor( + int tableId, Set<Long> referencedPartitions) { + return base.toThriftDescriptor(tableId, referencedPartitions); + } + + @Override + public long getWriteId() { + return base.getWriteId(); + } + + @Override + public ValidWriteIdList getValidWriteIds() { + return base.getValidWriteIds(); + } + + @Override + public String getOwnerUser() { + return base.getOwnerUser(); + } +} diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java index 8732e6fb6..678e95667 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java @@ -253,6 +253,12 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable return iceSchema_; } + @Override + public THdfsTable transfromToTHdfsTable(boolean updatePartitionFlag) { + throw new IllegalStateException("not implemented here"); + } + + @Override public long snapshotId() { return -1; diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java index b0b2a489f..8395a4f8d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java @@ -257,6 +257,11 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable { return desc; } + @Override + public THdfsTable transfromToTHdfsTable(boolean updatePartitionFlag) { + return this.transfromToTHdfsTable(); + } + private THdfsTable transfromToTHdfsTable() { Map<Long, THdfsPartition> idToPartition = new HashMap<>(); // LocalFsTable transformed from iceberg table only has one partition diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index 3f3f799ed..ebad509d1 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -5001,7 +5001,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest { iceT); TblsAnalyzeOk("select * from $TBL for system_time as of now() + interval 3 days", iceT); - TblsAnalyzeOk("select * from $TBL for system_version as of 123456", iceT); + // Use a legal snapshot id '93996984692289973' from the testdata. + TblsAnalyzeOk("select * from $TBL for system_version as of 93996984692289973", iceT); TblsAnalysisError("select * from $TBL for system_time as of 42", iceT, "FOR SYSTEM_TIME AS OF <expression> must be a timestamp type"); diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py index 2463b57bd..95e64cf43 100644 --- a/tests/authorization/test_ranger.py +++ b/tests/authorization/test_ranger.py @@ -33,6 +33,7 @@ from tests.common.test_dimensions import (create_client_protocol_dimension, from tests.util.hdfs_util import NAMENODE from tests.util.calculation_util import get_random_id from tests.util.filesystem_utils import WAREHOUSE_PREFIX, WAREHOUSE +from tests.util.iceberg_util import get_snapshots ADMIN = "admin" RANGER_AUTH = ("admin", "admin") @@ -1817,6 +1818,121 @@ class TestRanger(CustomClusterTestSuite): for i in range(policy_cnt): TestRanger._remove_policy(unique_name + str(i)) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) + def test_iceberg_time_travel_with_masking(self, unique_name): + """When we do a time travel query on an iceberg table we will use the schema from + the time of the snapshot. Make sure this works when column masking is being used.""" + user = getuser() + admin_client = self.create_impala_client() + short_table_name = "ice_1" + unique_database = unique_name + "_db" + tbl_name = unique_database + "." + short_table_name + + try: + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database), user=ADMIN) + admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + admin_client.execute("create table {0} (a int, b string, c int) stored as iceberg" + .format(tbl_name), user=ADMIN) + admin_client.execute("insert into {0} values (1, 'one', 1)".format(tbl_name), + user=ADMIN) + admin_client.execute("alter table {0} drop column a".format(tbl_name), user=ADMIN) + admin_client.execute("insert into {0} values ('two', 2)".format(tbl_name), + user=ADMIN) + admin_client.execute("grant select on database {0} to user {1} with " + "grant option".format(unique_database, user), + user=ADMIN) + admin_client.execute("grant insert on database {0} to user {1} with " + "grant option".format(unique_database, user), + user=ADMIN) + + snapshots = get_snapshots(admin_client, tbl_name, expected_result_size=2) + # Create two versions of a simple query based on the two snapshot ids. + query = "select * from {0} FOR SYSTEM_VERSION AS OF {1}" + # The first query is for the data after the first insert. + first_time_travel_query = query.format(tbl_name, snapshots[0].get_snapshot_id()) + # The expected results of the first query. + first_query_columns = ['A', 'B', 'C'] + first_results_unmasked = ['1\tone\t1'] + first_results_masked_b = ['1\tNULL\t1'] # Column 'B' is masked to NULL. + first_results_masked_c = ['1\tone\tNULL'] # Column 'C' is masked to NULL. + + # Second query is for the data after the second insert, when the column has gone. + second_time_travel_query = query.format(tbl_name, snapshots[1].get_snapshot_id()) + # The expected results of the second query, depending on masking. + second_query_columns = ['B', 'C'] + second_results_unmasked = ['one\t1', 'two\t2'] + second_results_masked_b = ['NULL\t1', 'NULL\t2'] # Column 'B' masked to NULL. + second_results_masked_c = ['one\tNULL', 'two\tNULL'] # Column 'C' masked to NULL. + + # Run queries without column masking. + results = self.client.execute(first_time_travel_query) + assert results.column_labels == first_query_columns + assert results.data == first_results_unmasked + + results = self.client.execute(second_time_travel_query) + assert results.column_labels == second_query_columns + assert len(results.data) == len(second_results_unmasked) + for row in second_results_unmasked: + assert row in results.data + + try: + # Mask column C to null. + TestRanger._add_column_masking_policy( + unique_name, user, unique_database, short_table_name, "C", "MASK_NULL") + admin_client.execute("refresh authorization", user=ADMIN) + + # Run the time travel queries again, time travel should work, but column + # 'C' is masked. + results = self.client.execute(first_time_travel_query) + assert results.column_labels == first_query_columns + assert results.data == first_results_masked_c + + results = self.client.execute(second_time_travel_query) + assert results.column_labels == second_query_columns + assert len(results.data) == len(second_results_masked_c) + for row in second_results_masked_c: + assert row in results.data + finally: + # Remove the masking policy. + TestRanger._remove_policy(unique_name) + admin_client.execute("refresh authorization", user=ADMIN) + + # Run the queries again without masking as we are here. + results = self.client.execute(first_time_travel_query) + assert results.column_labels == first_query_columns + assert results.data == first_results_unmasked + + results = self.client.execute(second_time_travel_query) + assert results.column_labels == second_query_columns + assert len(results.data) == len(second_results_unmasked) + for row in second_results_unmasked: + assert row in results.data + + try: + # Mask column B to null. + TestRanger._add_column_masking_policy( + unique_name, user, unique_database, short_table_name, "B", "MASK_NULL") + admin_client.execute("refresh authorization", user=ADMIN) + + # Run the time travel queries again, time travel should work, but column + # 'B' is masked. + results = self.client.execute(first_time_travel_query) + assert results.column_labels == first_query_columns + assert results.data == first_results_masked_b + + results = self.client.execute(second_time_travel_query) + assert results.column_labels == second_query_columns + for row in second_results_masked_b: + assert row in results.data + finally: + TestRanger._remove_policy(unique_name) + finally: + admin_client.execute("drop database if exists {0} cascade".format(unique_database), + user=ADMIN) + @pytest.mark.execute_serially @SkipIfFS.hive @SkipIfHive2.ranger_auth diff --git a/tests/common/iceberg_test_suite.py b/tests/common/iceberg_test_suite.py index b9a8f76ac..f4ac06622 100644 --- a/tests/common/iceberg_test_suite.py +++ b/tests/common/iceberg_test_suite.py @@ -18,18 +18,11 @@ import datetime from tests.common.impala_test_suite import ImpalaTestSuite +from tests.util.iceberg_util import parse_timestamp, get_snapshots class IcebergTestSuite(ImpalaTestSuite): - @classmethod - def quote(cls, s): - return "'{0}'".format(s) - - @classmethod - def cast_ts(cls, ts): - return "CAST({0} as timestamp)".format(cls.quote(ts)) - @classmethod def execute_query_ts(cls, impalad_client, query): """Executes the given query then returns the time when it finished.""" @@ -41,26 +34,21 @@ class IcebergTestSuite(ImpalaTestSuite): """Executes DESCRIBE HISTORY <tbl> FROM through the given client. Verifies if the result snapshots are newer than the provided timestamp and checks the expected number of results.""" - query = "DESCRIBE HISTORY {0} FROM {1};".format(tbl_name, cls.cast_ts(ts)) - data = impalad_client.execute(query) - assert len(data.data) == expected_result_size - for i in range(len(data.data)): - result_ts_dt = cls.parse_timestamp(data.data[i].split('\t')[0]) - assert result_ts_dt >= ts - - @classmethod - def parse_timestamp(cls, ts_string): - """The client can receive the timestamp in two formats, if the timestamp has - fractional seconds "yyyy-MM-dd HH:mm:ss.SSSSSSSSS" pattern is used, otherwise - "yyyy-MM-dd HH:mm:ss". Additionally, Python's datetime library cannot handle - nanoseconds, therefore in that case the timestamp has to be trimmed.""" - if len(ts_string.split('.')) > 1: - return datetime.datetime.strptime(ts_string[:-3], '%Y-%m-%d %H:%M:%S.%f') - else: - return datetime.datetime.strptime(ts_string, '%Y-%m-%d %H:%M:%S') + snapshots = get_snapshots(impalad_client, tbl_name, ts_start=ts, + expected_result_size=expected_result_size) + for snapshot in snapshots: + assert snapshot.get_creation_time() >= ts + + def expect_results_between(cls, impalad_client, tbl_name, ts_start, ts_end, + expected_result_size): + snapshots = get_snapshots(impalad_client, tbl_name, ts_start=ts_start, + ts_end=ts_end, expected_result_size=expected_result_size) + for snapshot in snapshots: + assert snapshot.get_creation_time() >= ts_start + assert snapshot.get_creation_time() <= ts_end @classmethod def impala_now(cls, impalad_client): now_data = impalad_client.execute("select now()") - now_data_ts_dt = cls.parse_timestamp(now_data.data[0]) + now_data_ts_dt = parse_timestamp(now_data.data[0]) return now_data_ts_dt diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 5addca6ec..062a6cc12 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -38,9 +38,11 @@ from tests.common.file_utils import ( from tests.shell.util import run_impala_shell_cmd from tests.util.filesystem_utils import get_fs_path, IS_HDFS from tests.util.get_parquet_metadata import get_parquet_metadata +from tests.util.iceberg_util import cast_ts, quote, parse_timestamp LOG = logging.getLogger(__name__) + class TestIcebergTable(IcebergTestSuite): """Tests related to Iceberg tables.""" @@ -63,7 +65,7 @@ class TestIcebergTable(IcebergTestSuite): def test_alter_iceberg_tables(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-alter', vector, use_db=unique_database) - def test_expire_snapshots(self, vector, unique_database): + def test_expire_snapshots(self, unique_database): tbl_name = unique_database + ".expire_snapshots" # We are setting the TIMEZONE query option in this test, so let's create a local @@ -85,17 +87,17 @@ class TestIcebergTable(IcebergTestSuite): self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4) # Expire the oldest snapshot and test that the oldest one was expired expire_q = "alter table {0} execute expire_snapshots({1})" - impalad_client.execute(expire_q.format(tbl_name, self.cast_ts(ts_1))) + impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3) # Expire with a timestamp in which the interval does not touch existing snapshot - impalad_client.execute(expire_q.format(tbl_name, self.cast_ts(ts_1))) + impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) # Expire all, but retain 1 impalad_client.execute(expire_q.format(tbl_name, - self.cast_ts(datetime.datetime.now()))) + cast_ts(datetime.datetime.now()))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1) # Change number of retained snapshots, then expire all @@ -104,7 +106,7 @@ class TestIcebergTable(IcebergTestSuite): impalad_client.execute(insert_q) impalad_client.execute(insert_q) impalad_client.execute(expire_q.format(tbl_name, - self.cast_ts(datetime.datetime.now()))) + cast_ts(datetime.datetime.now()))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2) # Check that timezone is interpreted in local timezone controlled by query option @@ -115,7 +117,7 @@ class TestIcebergTable(IcebergTestSuite): impalad_client.execute("SET TIMEZONE='Europe/Budapest'") impalad_client.execute(insert_q) impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") - impalad_client.execute(expire_q.format(tbl_name, self.cast_ts(ts_tokyo))) + impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo))) self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1) def test_truncate_iceberg_tables(self, vector, unique_database): @@ -225,8 +227,8 @@ class TestIcebergTable(IcebergTestSuite): tbl_name = unique_database + ".iceberg_multi_snapshots" self.client.execute("""create table {0} (i int) stored as iceberg tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name)) - result = self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name)) - result = self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name)) + self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name)) + self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name)) result = self.client.execute("DESCRIBE HISTORY {0}".format(tbl_name)) assert(len(result.data) == 2) first_snapshot = result.data[0].split("\t") @@ -240,16 +242,16 @@ class TestIcebergTable(IcebergTestSuite): # Check "is_current_ancestor" column. assert(first_snapshot[3] == "TRUE" and second_snapshot[3] == "TRUE") - def test_describe_history_params(self, vector, unique_database): + def test_describe_history_params(self, unique_database): tbl_name = unique_database + ".describe_history" def expect_results_between(ts_start, ts_end, expected_result_size): query = "DESCRIBE HISTORY {0} BETWEEN {1} AND {2};".format( - tbl_name, self.cast_ts(ts_start), self.cast_ts(ts_end)) + tbl_name, cast_ts(ts_start), cast_ts(ts_end)) data = impalad_client.execute(query) assert len(data.data) == expected_result_size for i in range(len(data.data)): - result_ts_dt = self.parse_timestamp(data.data[i].split('\t')[0]) + result_ts_dt = parse_timestamp(data.data[i].split('\t')[0]) assert result_ts_dt >= ts_start and result_ts_dt <= ts_end # We are setting the TIMEZONE query option in this test, so let's create a local @@ -296,14 +298,18 @@ class TestIcebergTable(IcebergTestSuite): # Interpreting Budapest time in Tokyo time points to the past. self.expect_num_snapshots_from(impalad_client, tbl_name, now_budapest, 4) - def test_time_travel(self, vector, unique_database): + def test_time_travel(self, unique_database): tbl_name = unique_database + ".time_travel" - def expect_results(query, expected_results): + def expect_results(query, expected_results, expected_cols): data = impalad_client.execute(query) assert len(data.data) == len(expected_results) for r in expected_results: assert r in data.data + expected_col_labels = expected_cols['labels'] + expected_col_types = expected_cols['types'] + assert data.column_labels == expected_col_labels + assert data.column_types == expected_col_types def expect_for_count_star(query, expected): data = impalad_client.execute(query) @@ -312,20 +318,20 @@ class TestIcebergTable(IcebergTestSuite): assert "NumRowGroups" not in data.runtime_profile assert "NumFileMetadataRead" not in data.runtime_profile - def expect_results_t(ts, expected_results): + def expect_results_t(ts, expected_results, expected_cols): expect_results( "select * from {0} for system_time as of {1}".format(tbl_name, ts), - expected_results) + expected_results, expected_cols) def expect_for_count_star_t(ts, expected): expect_for_count_star( "select count(*) from {0} for system_time as of {1}".format(tbl_name, ts), expected) - def expect_results_v(snapshot_id, expected_results): + def expect_results_v(snapshot_id, expected_results, expected_cols): expect_results( "select * from {0} for system_version as of {1}".format(tbl_name, snapshot_id), - expected_results) + expected_results, expected_cols) def expect_for_count_star_v(snapshot_id, expected): expect_for_count_star( @@ -333,12 +339,6 @@ class TestIcebergTable(IcebergTestSuite): tbl_name, snapshot_id), expected) - def quote(s): - return "'{0}'".format(s) - - def cast_ts(ts): - return "CAST({0} as timestamp)".format(quote(ts)) - def get_snapshots(): data = impalad_client.execute("describe history {0}".format(tbl_name)) ret = list() @@ -365,60 +365,100 @@ class TestIcebergTable(IcebergTestSuite): time.sleep(5) ts_4 = self.execute_query_ts(impalad_client, "insert into {0} values (100)" .format(tbl_name)) + ts_no_ss = self.execute_query_ts(impalad_client, + "alter table {0} add column {1} bigint" + .format(tbl_name, "j")) + ts_5 = self.execute_query_ts(impalad_client, "insert into {0} (i,j) values (3, 103)" + .format(tbl_name)) + + # Descriptions of the different schemas we expect to see as Time Travel queries + # use the schema from the specified time or snapshot. + # + # When the schema is just the 'J' column. + j_cols = { + 'labels': ['J'], + 'types': ['BIGINT'] + } + # When the schema is just the 'I' column. + i_cols = { + 'labels': ['I'], + 'types': ['INT'] + } + # When the schema is the 'I' and 'J' columns. + ij_cols = { + 'labels': ['I', 'J'], + 'types': ['INT', 'BIGINT'] + } + # Query table as of timestamps. - expect_results_t("now()", ['100']) - expect_results_t(self.quote(ts_1), ['1']) - expect_results_t(self.quote(ts_2), ['1', '2']) - expect_results_t(self.quote(ts_3), []) - expect_results_t(self.cast_ts(ts_3) + " + interval 1 seconds", []) - expect_results_t(self.quote(ts_4), ['100']) - expect_results_t(self.cast_ts(ts_4) + " - interval 5 seconds", []) + expect_results_t("now()", ['100\tNULL', '3\t103'], ij_cols) + expect_results_t(quote(ts_1), ['1'], i_cols) + expect_results_t(quote(ts_2), ['1', '2'], i_cols) + expect_results_t(quote(ts_3), [], i_cols) + expect_results_t(cast_ts(ts_3) + " + interval 1 seconds", [], i_cols) + expect_results_t(quote(ts_4), ['100'], i_cols) + expect_results_t(cast_ts(ts_4) + " - interval 5 seconds", [], i_cols) + # There is no new snapshot created by the schema change between ts_4 and ts_no_ss. + # So at ts_no_ss we see the schema as of ts_4 + expect_results_t(quote(ts_no_ss), ['100'], i_cols) + expect_results_t(quote(ts_5), ['100\tNULL', '3\t103'], ij_cols) # Future queries return the current snapshot. - expect_results_t(self.cast_ts(ts_4) + " + interval 1 hours", ['100']) + expect_results_t(cast_ts(ts_5) + " + interval 1 hours", ['100\tNULL', '3\t103'], + ij_cols) + # Query table as of snapshot IDs. snapshots = get_snapshots() - expect_results_v(snapshots[0], ['1']) - expect_results_v(snapshots[1], ['1', '2']) - expect_results_v(snapshots[2], []) - expect_results_v(snapshots[3], ['100']) + expect_results_v(snapshots[0], ['1'], i_cols) + expect_results_v(snapshots[1], ['1', '2'], i_cols) + expect_results_v(snapshots[2], [], i_cols) + expect_results_v(snapshots[3], ['100'], i_cols) + expect_results_v(snapshots[4], ['100\tNULL', '3\t103'], ij_cols) # Test of plain count star optimization # 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile - expect_for_count_star_t("now()", '1') + expect_for_count_star_t("now()", '2') expect_for_count_star_t(quote(ts_1), '1') expect_for_count_star_t(quote(ts_2), '2') expect_for_count_star_t(quote(ts_3), '0') expect_for_count_star_t(cast_ts(ts_3) + " + interval 1 seconds", '0') expect_for_count_star_t(quote(ts_4), '1') expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0') - expect_for_count_star_t(cast_ts(ts_4) + " + interval 1 hours", '1') + expect_for_count_star_t(cast_ts(ts_5), '2') + expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2') expect_for_count_star_v(snapshots[0], '1') expect_for_count_star_v(snapshots[1], '2') expect_for_count_star_v(snapshots[2], '0') expect_for_count_star_v(snapshots[3], '1') + expect_for_count_star_v(snapshots[4], '2') # SELECT diff expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' MINUS SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format( tbl=tbl_name, ts_new=ts_2, ts_old=ts_1), - ['2']) + ['2'], i_cols) expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new} MINUS SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format( tbl=tbl_name, v_new=snapshots[1], v_old=snapshots[0]), - ['2']) - # Mix SYSTEM_TIME ans SYSTEM_VERSION + ['2'], i_cols) + # Mix SYSTEM_TIME and SYSTEM_VERSION expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new} MINUS SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format( tbl=tbl_name, v_new=snapshots[1], ts_old=ts_1), - ['2']) + ['2'], i_cols) expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' MINUS SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format( tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0]), - ['2']) + ['2'], i_cols) + expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' + MINUS + SELECT *, NULL FROM {tbl} FOR SYSTEM_TIME + AS OF '{ts_old}'""".format( + tbl=tbl_name, ts_new=ts_5, ts_old=ts_4), + ['3\t103'], ij_cols) # Query old snapshot try: @@ -435,22 +475,38 @@ class TestIcebergTable(IcebergTestSuite): except Exception as e: assert "Cannot find snapshot with ID 42" in str(e) + # Go back to one column + impalad_client.execute("alter table {0} drop column i".format(tbl_name)) + + # Test that deleted column is not selectable. + try: + impalad_client.execute("SELECT i FROM {0}".format(tbl_name)) + assert False # Exception must be thrown + except Exception as e: + assert "Could not resolve column/field reference: 'i'" in str(e) + + # Back at ts_2 the deleted 'I' column is there + expect_results("SELECT * FROM {0} FOR SYSTEM_TIME AS OF '{1}'". + format(tbl_name, ts_2), ['1', '2'], i_cols) + expect_results("SELECT i FROM {0} FOR SYSTEM_TIME AS OF '{1}'". + format(tbl_name, ts_2), ['1', '2'], i_cols) + # Check that timezone is interpreted in local timezone controlled by query option # TIMEZONE impalad_client.execute("truncate table {0}".format(tbl_name)) impalad_client.execute("insert into {0} values (1111)".format(tbl_name)) impalad_client.execute("SET TIMEZONE='Europe/Budapest'") now_budapest = impala_now() - expect_results_t(self.quote(now_budapest), ['1111']) + expect_results_t(quote(now_budapest), ['1111'], j_cols) # Let's switch to Tokyo time. Tokyo time is always greater than Budapest time. impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") now_tokyo = impala_now() - expect_results_t(self.quote(now_tokyo), ['1111']) + expect_results_t(quote(now_tokyo), ['1111'], j_cols) try: # Interpreting Budapest time in Tokyo time points to the past when the table # didn't exist. - expect_results_t(self.quote(now_budapest), []) + expect_results_t(quote(now_budapest), [], j_cols) assert False except Exception as e: assert "Cannot find a snapshot older than" in str(e) @@ -906,6 +962,7 @@ class TestIcebergTable(IcebergTestSuite): def test_avro_file_format(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-avro', vector, unique_database) + class TestIcebergV2Table(IcebergTestSuite): """Tests related to Iceberg V2 tables.""" diff --git a/tests/util/iceberg_util.py b/tests/util/iceberg_util.py new file mode 100644 index 000000000..9418638e3 --- /dev/null +++ b/tests/util/iceberg_util.py @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime + + +class Snapshot(object): + """Encapsulate an Iceberg Snapshot""" + + def __init__(self, raw_string): + """Construct a snapshot from a row of output from DESCRIBE HISTORY. + The columns, separated by tabs are: + creation_time, snapshot_id, parent_id, is_current_ancestor + for example (using a space not a tab): + 2020-08-30 22:58:08.440000000 8270633197658268308 NULL TRUE + """ + parts = raw_string.split("\t") + self._creation_time = parse_timestamp(parts[0]) + self._snapshot_id = int(parts[1]) + parent_id_str = parts[2] + if parent_id_str == "NULL": + self._parent_id = None + else: + self._parent_id = int(parent_id_str) + is_ancestor_str = parts[3] + if is_ancestor_str == "TRUE": + self._is_current_ancestor = True + elif is_ancestor_str == "FALSE": + self._is_current_ancestor = False + else: + raise ValueError("unexpected is_current_ancestor value {0}".format(is_ancestor_str)) + + def __str__(self): + return "<Snapshot>: id={0} creation={1} parent={2} is_current_ancestor={3}".format( + self._snapshot_id, self._creation_time, self._parent_id, + self._is_current_ancestor) + + def get_creation_time(self): + return self._creation_time + + def get_snapshot_id(self): + return self._snapshot_id + + def get_parent_id(self): + return self._parent_id + + def is_current_ancestor(self): + return self._is_current_ancestor + + +def parse_timestamp(ts_string): + """The client can receive the timestamp in two formats, if the timestamp has + fractional seconds "yyyy-MM-dd HH:mm:ss.SSSSSSSSS" pattern is used, otherwise + "yyyy-MM-dd HH:mm:ss". Additionally, Python's datetime library cannot handle + nanoseconds, therefore in that case the timestamp has to be trimmed.""" + if len(ts_string.split('.')) > 1: + return datetime.datetime.strptime(ts_string[:-3], '%Y-%m-%d %H:%M:%S.%f') + else: + return datetime.datetime.strptime(ts_string, '%Y-%m-%d %H:%M:%S') + + +def quote(s): + return "'{0}'".format(s) + + +def cast_ts(ts): + return "CAST({0} as timestamp)".format(quote(ts)) + + +def get_snapshots(impalad_client, tbl_name, ts_start=None, ts_end=None, + expected_result_size=None, user=None): + """Executes DESCRIBE HISTORY <tbl> through the given client. + If ts_start is set, and ts_end is not, then start_ts is used as the FROM parameter of + DESCRIBE HISTORY. + If both ts_start and ts_end are set, then they are used as parameters to + DESCRIBE HISTORY BETWEEN. + If expected_result_size is set then, this is used to check the number of results. + The output is converted to a list of Snapshot objects, which are returned.""" + if ts_start and ts_end: + query = "DESCRIBE HISTORY {0} BETWEEN {1} AND {2};".format( + tbl_name, cast_ts(ts_start), cast_ts(ts_end)) + elif ts_start: + query = "DESCRIBE HISTORY {0} FROM {1};".format(tbl_name, cast_ts(ts_start)) + else: + query = "DESCRIBE HISTORY {0};".format(tbl_name) + rows = impalad_client.execute(query, user) + if expected_result_size: + assert len(rows.data) == expected_result_size, \ + "got unexpected number of snapshots {0} when expected {1}".format( + len(rows.data), expected_result_size) + results = [] + for snapshot_str in rows.data: + results.append(Snapshot(snapshot_str)) + return results
