This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ce35e81bcaaa7dca3cccee028fea82e36dd941c0 Author: Gabor Kaszab <[email protected]> AuthorDate: Tue Sep 24 14:50:24 2024 +0200 IMPALA-11265: Part2: Store Iceberg file descriptors in encoded format The file descriptors in HdfsPartition are cached as byte arrays to keep the memory footprint low. They are transformed into actual FileDescriptor objects once queried. This patch changes IcebergContentFileStore to similarly use byte arrays as an internal representation for file descriptors. Note, file descriptors for Iceberg tables have 2 components: one is the same as in HdfsPartition and the other stores Iceberg specific file metadata in an additional byte array. Measurements and observations: - I have a test table that has 110k data files. For this table the JVM memory usage in the catalogd got reduced from 80MB to 65MB. - Both HdfsPartition.FileDescriptor and IcebergContentFileStore use flatbuffers and in turn byte arrays to represent file descriptors and these byte arrays are shared between these 2 places. As a result there is no redundancy in storing the file descriptors both for the Iceberg and the Hdfs table. - There is no measurable difference in planning times with this patch. Change-Id: I9d7794df999bdaf118158eace26cea610f911c0a Reviewed-on: http://gerrit.cloudera.org:8080/21869 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Gabor Kaszab <[email protected]> --- .../org/apache/impala/analysis/OptimizeStmt.java | 4 +- .../org/apache/impala/catalog/FeIcebergTable.java | 74 -------- .../org/apache/impala/catalog/HdfsPartition.java | 6 +- .../impala/catalog/IcebergContentFileStore.java | 206 +++++++++++++++++---- .../org/apache/impala/catalog/IcebergTable.java | 2 +- .../impala/catalog/iceberg/IcebergCtasTarget.java | 4 +- .../apache/impala/planner/IcebergScanPlanner.java | 2 + .../catalog/IcebergContentFileStoreTest.java | 124 +++++++++++++ 8 files changed, 311 insertions(+), 111 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java index 59f4f9b09..a011e8064 100644 --- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java @@ -236,8 +236,8 @@ public class OptimizeStmt extends DmlStatementBase { throws IOException, ImpalaRuntimeException { GroupedContentFiles selectedContentFiles = new GroupedContentFiles(); selectedContentFiles.dataFilesWithoutDeletes = contentFiles; - IcebergContentFileStore selectedFiles = FeIcebergTable.Utils - .loadAllPartition(iceTable, selectedContentFiles); + IcebergContentFileStore selectedFiles = + new IcebergContentFileStore(iceTable, selectedContentFiles); return selectedFiles.getDataFilesWithoutDeletes(); } diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java index af2f9efdd..6837a140d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java @@ -43,8 +43,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; @@ -53,7 +51,6 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.impala.analysis.IcebergPartitionField; @@ -68,7 +65,6 @@ import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaRuntimeException; -import org.apache.impala.common.Pair; import org.apache.impala.common.PrintUtils; import org.apache.impala.common.Reference; import org.apache.impala.fb.FbFileBlock; @@ -761,76 +757,6 @@ public interface FeIcebergTable extends FeFsTable { numUnknownDiskIds, absPath); } - /** - * Returns the FileDescriptors loaded by the internal HdfsTable. To avoid returning - * the metadata files the resultset is limited to the files that are tracked by - * Iceberg. Both the HdfsBaseDir and the DataFile path can contain the scheme in their - * path, using org.apache.hadoop.fs.Path to normalize the paths. - */ - public static IcebergContentFileStore loadAllPartition( - FeIcebergTable table, GroupedContentFiles icebergFiles) - throws IOException, ImpalaRuntimeException { - Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap = new HashMap<>(); - Collection<? extends FeFsPartition> partitions = - FeCatalogUtils.loadAllPartitions(table.getFeFsTable()); - for (FeFsPartition partition : partitions) { - for (FileDescriptor fileDesc : partition.getFileDescriptors()) { - Path path = new Path(fileDesc.getAbsolutePath(table.getHdfsBaseDir())); - hdfsFileDescMap.put(path.toUri().getPath(), fileDesc); - } - } - IcebergContentFileStore fileStore = new IcebergContentFileStore(); - Pair<String, HdfsPartition.FileDescriptor> pathHashAndFd; - for (DataFile dataFile : icebergFiles.dataFilesWithoutDeletes) { - pathHashAndFd = getPathHashAndFd(dataFile, table, hdfsFileDescMap); - fileStore.addDataFileWithoutDeletes(pathHashAndFd.first, pathHashAndFd.second); - } - for (DataFile dataFile : icebergFiles.dataFilesWithDeletes) { - pathHashAndFd = getPathHashAndFd(dataFile, table, hdfsFileDescMap); - fileStore.addDataFileWithDeletes(pathHashAndFd.first, pathHashAndFd.second); - } - for (DeleteFile deleteFile : icebergFiles.positionDeleteFiles) { - pathHashAndFd = getPathHashAndFd(deleteFile, table, hdfsFileDescMap); - fileStore.addPositionDeleteFile(pathHashAndFd.first, pathHashAndFd.second); - } - for (DeleteFile deleteFile : icebergFiles.equalityDeleteFiles) { - pathHashAndFd = getPathHashAndFd(deleteFile, table, hdfsFileDescMap); - fileStore.addEqualityDeleteFile(pathHashAndFd.first, pathHashAndFd.second); - } - return fileStore; - } - - private static Pair<String, HdfsPartition.FileDescriptor> getPathHashAndFd( - ContentFile<?> contentFile, FeIcebergTable table, - Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap) throws IOException { - String pathHash = IcebergUtil.getFilePathHash(contentFile); - HdfsPartition.FileDescriptor fd = getOrCreateIcebergFd( - table, hdfsFileDescMap, contentFile); - return new Pair<>(pathHash, fd); - } - - private static FileDescriptor getOrCreateIcebergFd(FeIcebergTable table, - Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap, - ContentFile<?> contentFile) throws IllegalArgumentException, IOException { - Path path = new Path(contentFile.path().toString()); - HdfsPartition.FileDescriptor iceFd = null; - if (hdfsFileDescMap.containsKey(path.toUri().getPath())) { - HdfsPartition.FileDescriptor fsFd = hdfsFileDescMap.get( - path.toUri().getPath()); - iceFd = fsFd.cloneWithFileMetadata( - IcebergUtil.createIcebergMetadata(table, contentFile)); - } else { - if (Utils.requiresDataFilesInTableLocation(table)) { - LOG.warn("Iceberg file '{}' cannot be found in the HDFS recursive" - + "file listing results.", path); - } - HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(contentFile, table); - iceFd = fileDesc.cloneWithFileMetadata( - IcebergUtil.createIcebergMetadata(table, contentFile)); - } - return iceFd; - } - /** * Return the PartitionContent loaded by the internal HdfsTable. To avoid returning * the metadata files the result set is limited to the files that are tracked by diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index e194a05e7..dd454e55d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -123,7 +123,7 @@ public class HdfsPartition extends CatalogObjectImpl fbFileMetadata_ = null; } - private FileDescriptor(FbFileDesc fileDescData, FbFileMetadata fileMetadata) { + public FileDescriptor(FbFileDesc fileDescData, FbFileMetadata fileMetadata) { fbFileDescriptor_ = fileDescData; fbFileMetadata_ = fileMetadata; } @@ -308,6 +308,10 @@ public class HdfsPartition extends CatalogObjectImpl return fbFileDescriptor_.fileBlocks(idx); } + public FbFileDesc getFbFileDescriptor() { + return fbFileDescriptor_; + } + public FbFileMetadata getFbFileMetadata() { return fbFileMetadata_; } diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java index ede9fec25..57ee5f537 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java @@ -18,8 +18,12 @@ package org.apache.impala.catalog; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,28 +31,102 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.curator.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; +import org.apache.impala.catalog.iceberg.GroupedContentFiles; +import org.apache.impala.common.Pair; +import org.apache.impala.fb.FbFileDesc; +import org.apache.impala.fb.FbFileMetadata; import org.apache.impala.fb.FbIcebergDataFileFormat; import org.apache.impala.thrift.THdfsFileDesc; import org.apache.impala.thrift.TIcebergContentFileStore; import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.util.IcebergUtil; import org.apache.impala.util.ListMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Helper class for storing Iceberg file descriptors. It stores data and delete files * separately, while also storing file descriptors belonging to earlier snapshots. */ public class IcebergContentFileStore { + final static Logger LOG = LoggerFactory.getLogger(IcebergContentFileStore.class); + + private static class EncodedFileDescriptor { + public final byte[] fileDesc_; + public final byte[] fileMetadata_; + + public EncodedFileDescriptor(byte[] fDesc, byte[] fMeta) { + this.fileDesc_ = fDesc; + this.fileMetadata_ = fMeta; + } + } + + protected static FileDescriptor decode(EncodedFileDescriptor encodedFd) { + Preconditions.checkNotNull(encodedFd.fileMetadata_); + + return new FileDescriptor( + FbFileDesc.getRootAsFbFileDesc(ByteBuffer.wrap(encodedFd.fileDesc_)), + FbFileMetadata.getRootAsFbFileMetadata( + ByteBuffer.wrap(encodedFd.fileMetadata_))) { + // Whenever getting the FileDescriptor for the same path hash it will be a + // different FileDescriptor object. As a result the default equals() and + // hashCode() functions can't be used to judge equality between these + // FileDescriptors. + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (!(obj instanceof FileDescriptor)) return false; + FileDescriptor otherFD = (FileDescriptor) obj; + if ((this.getFbFileMetadata() == null && otherFD.getFbFileMetadata() != null) || + (this.getFbFileMetadata() != null && otherFD.getFbFileMetadata() == null)) { + return false; + } + return this.getFbFileDescriptor().getByteBuffer().array() == + otherFD.getFbFileDescriptor().getByteBuffer().array() && + this.getFbFileMetadata().getByteBuffer().array() == + otherFD.getFbFileMetadata().getByteBuffer().array(); + } + + @Override + public int hashCode() { + return getAbsolutePath().hashCode(); + } + }; + } + + // Function to convert from a FileDescriptor to an EncodedFileDescriptor. + protected static EncodedFileDescriptor encode(FileDescriptor fd) { + return new EncodedFileDescriptor( + encodeFB(fd.getFbFileDescriptor()), + encodeFB(fd.getFbFileMetadata())); + } + + private static byte[] encodeFB(com.google.flatbuffers.Table fbObject) { + if (fbObject == null) return null; + + ByteBuffer bb = fbObject.getByteBuffer(); + byte[] arr = bb.array(); + Preconditions.checkState(bb.arrayOffset() == 0 && bb.remaining() == arr.length); + + return arr; + } // Auxiliary class for holding file descriptors in both a map and a list. private static class MapListContainer { // Key is the ContentFile path hash, value is FileDescriptor transformed from DataFile - private final Map<String, FileDescriptor> fileDescMap_ = new HashMap<>(); - private final List<FileDescriptor> fileDescList_ = new ArrayList<>(); + private final Map<String, EncodedFileDescriptor> fileDescMap_ = new HashMap<>(); + private final List<EncodedFileDescriptor> fileDescList_ = new ArrayList<>(); // Adds a file to the map. If this is a new entry, then add it to the list as well. // Return true if 'desc' was a new entry. - public boolean add(String pathHash, FileDescriptor desc) { + public boolean add(String pathHash, EncodedFileDescriptor desc) { if (fileDescMap_.put(pathHash, desc) == null) { fileDescList_.add(desc); return true; @@ -57,21 +135,25 @@ public class IcebergContentFileStore { } public FileDescriptor get(String pathHash) { - return fileDescMap_.get(pathHash); + if (!fileDescMap_.containsKey(pathHash)) return null; + return decode(fileDescMap_.get(pathHash)); } public long getNumFiles() { return fileDescList_.size(); } - List<FileDescriptor> getList() { return fileDescList_; } + List<FileDescriptor> getList() { + return Lists.transform(fileDescList_, fd -> IcebergContentFileStore.decode(fd)); + } // It's enough to only convert the map part to thrift. Map<String, THdfsFileDesc> toThrift() { Map<String, THdfsFileDesc> ret = new HashMap<>(); - for (Map.Entry<String, HdfsPartition.FileDescriptor> entry : - fileDescMap_.entrySet()) { - ret.put(entry.getKey(), entry.getValue().toThrift()); + for (Map.Entry<String, EncodedFileDescriptor> entry : fileDescMap_.entrySet()) { + ret.put( + entry.getKey(), + decode(entry.getValue()).toThrift()); } return ret; } @@ -86,7 +168,7 @@ public class IcebergContentFileStore { Preconditions.checkNotNull(hostIndex); fd = fd.cloneWithNewHostIndex(networkAddresses, hostIndex); } - ret.add(entry.getKey(), fd); + ret.add(entry.getKey(), encode(fd)); } return ret; } @@ -99,7 +181,7 @@ public class IcebergContentFileStore { private MapListContainer equalityDeleteFiles_ = new MapListContainer(); // Caches file descriptors loaded during time-travel queries. - private final ConcurrentMap<String, FileDescriptor> oldFileDescMap_ = + private final ConcurrentMap<String, EncodedFileDescriptor> oldFileDescMap_ = new ConcurrentHashMap<>(); // Flags to indicate file formats used in the table. @@ -109,34 +191,47 @@ public class IcebergContentFileStore { public IcebergContentFileStore() {} - public void addDataFileWithoutDeletes(String pathHash, FileDescriptor desc) { - if (dataFilesWithoutDeletes_.add(pathHash, desc)) { - updateFileFormats(desc); + public IcebergContentFileStore( + FeIcebergTable icebergTable, GroupedContentFiles icebergFiles) { + Preconditions.checkNotNull(icebergTable); + Preconditions.checkNotNull(icebergFiles); + + Map<String, FileDescriptor> hdfsFileDescMap = new HashMap<>(); + Collection<? extends FeFsPartition> partitions = + FeCatalogUtils.loadAllPartitions(icebergTable.getFeFsTable()); + for (FeFsPartition partition : partitions) { + for (FileDescriptor fileDesc : partition.getFileDescriptors()) { + Path path = new Path(fileDesc.getAbsolutePath(icebergTable.getHdfsBaseDir())); + hdfsFileDescMap.put(path.toUri().getPath(), fileDesc); + } } - } - public void addDataFileWithDeletes(String pathHash, FileDescriptor desc) { - if (dataFilesWithDeletes_.add(pathHash, desc)) { - updateFileFormats(desc); + for (DataFile dataFile : icebergFiles.dataFilesWithoutDeletes) { + Pair<String, EncodedFileDescriptor> pathHashAndFd = + getPathHashAndFd(icebergTable, dataFile, hdfsFileDescMap); + dataFilesWithoutDeletes_.add(pathHashAndFd.first, pathHashAndFd.second); } - } - - public void addPositionDeleteFile(String pathHash, FileDescriptor desc) { - if (positionDeleteFiles_.add(pathHash, desc)) { - updateFileFormats(desc); + for (DataFile dataFile : icebergFiles.dataFilesWithDeletes) { + Pair<String, EncodedFileDescriptor> pathHashAndFd = + getPathHashAndFd(icebergTable, dataFile, hdfsFileDescMap); + dataFilesWithDeletes_.add(pathHashAndFd.first, pathHashAndFd.second); + } + for (DeleteFile deleteFile : icebergFiles.positionDeleteFiles) { + Pair<String, EncodedFileDescriptor> pathHashAndFd = + getPathHashAndFd(icebergTable, deleteFile, hdfsFileDescMap); + positionDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second); + } + for (DeleteFile deleteFile : icebergFiles.equalityDeleteFiles) { + Pair<String, EncodedFileDescriptor> pathHashAndFd = + getPathHashAndFd(icebergTable, deleteFile, hdfsFileDescMap); + equalityDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second); } - } - - public void addEqualityDeleteFile(String pathHash, FileDescriptor desc) { - Preconditions.checkState( - desc.getFbFileMetadata().icebergMetadata().equalityFieldIdsLength() > 0); - if (equalityDeleteFiles_.add(pathHash, desc)) updateFileFormats(desc); } // This is only invoked during time travel, when we are querying a snapshot that has // data files which have been removed since. public void addOldFileDescriptor(String pathHash, FileDescriptor desc) { - oldFileDescMap_.put(pathHash, desc); + oldFileDescMap_.put(pathHash, encode(desc)); } public FileDescriptor getDataFileDescriptor(String pathHash) { @@ -152,7 +247,8 @@ public class IcebergContentFileStore { } public FileDescriptor getOldFileDescriptor(String pathHash) { - return oldFileDescMap_.get(pathHash); + if (!oldFileDescMap_.containsKey(pathHash)) return null; + return decode(oldFileDescMap_.get(pathHash)); } public List<FileDescriptor> getDataFilesWithoutDeletes() { @@ -196,8 +292,11 @@ public class IcebergContentFileStore { public boolean hasOrc() { return hasOrc_; } public boolean hasParquet() { return hasParquet_; } - private void updateFileFormats(FileDescriptor desc) { - byte fileFormat = desc.getFbFileMetadata().icebergMetadata().fileFormat(); + private void updateFileFormats(FbFileMetadata icebergMetadata) { + Preconditions.checkNotNull(icebergMetadata); + Preconditions.checkNotNull(icebergMetadata.icebergMetadata()); + + byte fileFormat = icebergMetadata.icebergMetadata().fileFormat(); if (fileFormat == FbIcebergDataFileFormat.PARQUET) { hasParquet_ = true; } else if (fileFormat == FbIcebergDataFileFormat.ORC) { @@ -207,6 +306,46 @@ public class IcebergContentFileStore { } } + private Pair<String, EncodedFileDescriptor> getPathHashAndFd( + FeIcebergTable icebergTable, + ContentFile<?> contentFile, + Map<String, FileDescriptor> hdfsFileDescMap) { + return new Pair<>( + IcebergUtil.getFilePathHash(contentFile), + getOrCreateIcebergFd(icebergTable, hdfsFileDescMap, contentFile)); + } + + private EncodedFileDescriptor getOrCreateIcebergFd( + FeIcebergTable icebergTable, + Map<String, FileDescriptor> hdfsFileDescMap, + ContentFile<?> contentFile) { + Path path = new Path(contentFile.path().toString()); + FileDescriptor fileDesc = null; + if (hdfsFileDescMap.containsKey(path.toUri().getPath())) { + fileDesc = hdfsFileDescMap.get(path.toUri().getPath()); + } else { + if (FeIcebergTable.Utils.requiresDataFilesInTableLocation(icebergTable)) { + LOG.warn("Iceberg file '{}' cannot be found in the HDFS recursive" + + "file listing results.", path); + } + try { + fileDesc = FeIcebergTable.Utils.getFileDescriptor(contentFile, icebergTable); + } catch (IOException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + Preconditions.checkNotNull(fileDesc); + + FbFileMetadata icebergMetadata = + IcebergUtil.createIcebergMetadata(icebergTable, contentFile); + + updateFileFormats(icebergMetadata); + + return new EncodedFileDescriptor( + encodeFB(fileDesc.getFbFileDescriptor()), + encodeFB(icebergMetadata)); + } + public TIcebergContentFileStore toThrift() { TIcebergContentFileStore ret = new TIcebergContentFileStore(); ret.setPath_hash_to_data_file_without_deletes(dataFilesWithoutDeletes_.toThrift()); @@ -219,6 +358,9 @@ public class IcebergContentFileStore { return ret; } + // TODO IMPALA-11265: After converting to/from thrift the byte arrays representing the + // file descriptors won't be shared between the HdfsTable and IcebergContentFileStore. + // This redundancy causes unnecessary JVM heap memory usage on the coordinator. public static IcebergContentFileStore fromThrift(TIcebergContentFileStore tFileStore, List<TNetworkAddress> networkAddresses, ListMap<TNetworkAddress> hostIndex) { diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 768d3870b..8a7a8bac3 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -439,7 +439,7 @@ public class IcebergTable extends Table implements FeIcebergTable { hdfsTable_.setCanDataBeOutsideOfTableLocation( !Utils.requiresDataFilesInTableLocation(this)); hdfsTable_.load(reuseMetadata, msClient, msTable_, reason, catalogTimeline); - fileStore_ = Utils.loadAllPartition(this, icebergFiles); + fileStore_ = new IcebergContentFileStore(this, icebergFiles); partitionStats_ = Utils.loadPartitionStats(this, icebergFiles); setIcebergTableStats(); loadAllColumnStats(msClient, catalogTimeline); diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java index 5cb4d163f..2130af0e6 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java @@ -87,6 +87,7 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable private String icebergTableLocation_; private String icebergCatalogLocation_; private HdfsStorageDescriptor hdfsSd_; + private final IcebergContentFileStore icebergContentFiles_; public IcebergCtasTarget(FeDb db, org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs, List<String> primaryKeyNames, @@ -103,6 +104,7 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTbl); icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTbl); hdfsSd_ = HdfsStorageDescriptor.fromStorageDescriptor(name_, msTable_.getSd()); + icebergContentFiles_ = new IcebergContentFileStore(); } private void createIcebergSchema(List<ColumnDef> columnDefs, @@ -185,7 +187,7 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable @Override public IcebergContentFileStore getContentFileStore() { - return new IcebergContentFileStore(); + return icebergContentFiles_; } @Override diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java index b51832c66..3d2f76f0d 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java @@ -592,6 +592,8 @@ public class IcebergScanPlanner { private void addEqualityDeletesAndIds(FileDescriptor fd) { FbIcebergMetadata fileMetadata = fd.getFbFileMetadata().icebergMetadata(); + Preconditions.checkState(fileMetadata.equalityFieldIdsLength() > 0, + "Equality delete file doesn't have equality field IDs: " + fd.toString()); List<Integer> eqFieldIdList = new ArrayList<>(); for (int i = 0; i < fileMetadata.equalityFieldIdsLength(); ++i) { eqFieldIdList.add(fileMetadata.equalityFieldIds(i)); diff --git a/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java b/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java new file mode 100644 index 000000000..6d335c6bb --- /dev/null +++ b/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; + +import com.google.flatbuffers.FlatBufferBuilder; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.impala.catalog.HdfsPartition.FileDescriptor; +import org.apache.impala.fb.FbFileDesc; +import org.apache.impala.testutil.CatalogServiceTestCatalog; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.ExpectedException; +import org.junit.Test; + +public class IcebergContentFileStoreTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private CatalogServiceCatalog catalog_; + + @Before + public void init() { + catalog_ = CatalogServiceTestCatalog.create(); + } + + @After + public void cleanUp() { catalog_.close(); } + + @Test + public void testDecodeWithoutIcebergMetadata() { + expectedException.expect(NullPointerException.class); + + FileDescriptor fileDesc = new FileDescriptor(createFbFileDesc(),null); + + IcebergContentFileStore.decode(IcebergContentFileStore.encode(fileDesc)); + } + + private FbFileDesc createFbFileDesc() { + FlatBufferBuilder fbb = new FlatBufferBuilder(1); + fbb.finish( + FbFileDesc.createFbFileDesc(fbb, 0, 10L, (byte) 0, 10000L, 0, false, 0, false)); + ByteBuffer bb = fbb.dataBuffer().slice(); + ByteBuffer compressedBb = ByteBuffer.allocate(bb.capacity()); + compressedBb.put(bb); + return FbFileDesc.getRootAsFbFileDesc((ByteBuffer) compressedBb.flip()); + } + + @Test + public void testEncodeDecode() throws Exception { + IcebergTable iceTbl = loadIcebergTable( + "functional_parquet", + "iceberg_v2_partitioned_position_deletes_orc"); + assertTrue(iceTbl.getContentFileStore().getNumFiles() > 0); + + for (FileDescriptor fileDesc : iceTbl.getContentFileStore().getAllFiles()) { + FileDescriptor serdeFileDesc = + IcebergContentFileStore.decode(IcebergContentFileStore.encode(fileDesc)); + assertTrue(fileDesc != serdeFileDesc); + assertEquals(serdeFileDesc, fileDesc); + } + } + + // Verify that the underlying representation of the file descriptors are shared between + // the Iceberg table and the HdfsTable within the Iceberg table. In other words this + // checks that storing the file descriptors also in both places doesn't result in + // redundant JVM memory usage. + @Test + public void testFileDescriptorsAreShared() throws Exception { + IcebergTable iceTbl = loadIcebergTable( + "functional_parquet", + "iceberg_v2_partitioned_position_deletes"); + assertTrue(iceTbl.getContentFileStore().getNumFiles() > 0); + + Map<String, FileDescriptor> fileDescsByPath = new HashMap<>(); + for (FileDescriptor fileDesc : iceTbl.getContentFileStore().getAllFiles()) { + fileDescsByPath.put(fileDesc.getRelativePath(), fileDesc); + } + + assertTrue(iceTbl.getFeFsTable() instanceof HdfsTable); + HdfsTable hdfsTable = (HdfsTable) iceTbl.getFeFsTable(); + for (PrunablePartition partition : hdfsTable.getPartitions()) { + assertTrue(partition instanceof HdfsPartition); + HdfsPartition hdfsPartition = (HdfsPartition) partition; + for (FileDescriptor hdfsFileDesc : hdfsPartition.getFileDescriptors()) { + FileDescriptor iceFileDesc = fileDescsByPath.get(hdfsFileDesc.getRelativePath()); + assertNotNull(iceFileDesc); + assertNotSame(iceFileDesc, hdfsFileDesc); + assertEquals(iceFileDesc.getFbFileDescriptor().getByteBuffer().array(), + hdfsFileDesc.getFbFileDescriptor().getByteBuffer().array()); + } + } + } + + private IcebergTable loadIcebergTable(String dbName, String tblName) throws Exception { + Table tbl = catalog_.getOrLoadTable(dbName, tblName,"test",null); + assertTrue(tbl instanceof IcebergTable); + IcebergTable iceTbl = (IcebergTable) tbl; + assertNotNull(iceTbl.getContentFileStore()); + return iceTbl; + } +}
