This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 37e409059437279c960eba71b3bce69ffbd65f2e Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Wed Feb 5 18:38:38 2025 +0100 IMPALA-13737: Directly load file metadata via IcebergFileMetadataLoader Currently we let HdfsTable to drive file metadata loading of Iceberg tables. To have better control over file loading, IcebergTable should use IcebergFileMetadataLoader directly. The underlying HdfsTable can be empty, which will make it easier to remove this dependency completely. Also, it solves the de-duplication of file descriptors in Local Catalog mode. This patch also clarifies the responsibilities of IcebergFileMetadataLoader and IcebergContentFileStore. The former is in charge of loading the file descriptors and decorating them with Iceberg metadata. The latter is only responsible for grouping and storing them in an efficient manner. This patch removes the dependency of IcebergContentFileStore on FeIcebergTable which will make the REST Catalog implementation cleaner. Measurements (Thanks to Gabor Kaszab for the numbers) As mentioned above, this patch de-duplicates the file descriptors in local catalog mode. I.e. it greatly reduces the memory footprint (IMPALA-11265) in the Coordinator when local catalog is being used. The measured table had 110k files, 16400 partitions, 1000 manifests, 1000 snapshots. The memory footprint: Before this patch: 107MB After this patch: 74MB Testing: * no new functionalities added, existing tests should work Change-Id: Iaf7e23ec21b65036b47edadcb4cbe4b64be3baee Reviewed-on: http://gerrit.cloudera.org:8080/22458 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/analysis/OptimizeStmt.java | 4 +- .../org/apache/impala/catalog/FeIcebergTable.java | 17 ++++-- .../apache/impala/catalog/FileMetadataLoader.java | 13 +++-- .../java/org/apache/impala/catalog/HdfsTable.java | 42 ++++++------- .../impala/catalog/IcebergContentFileStore.java | 68 ++++++++-------------- .../impala/catalog/IcebergFileMetadataLoader.java | 49 ++++++++-------- .../org/apache/impala/catalog/IcebergTable.java | 19 +++--- .../impala/catalog/ParallelFileMetadataLoader.java | 21 +------ .../apache/impala/planner/IcebergScanPlanner.java | 7 ++- .../java/org/apache/impala/util/IcebergUtil.java | 23 ++++---- .../impala/catalog/FileMetadataLoaderTest.java | 55 +++++++++-------- 11 files changed, 148 insertions(+), 170 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java index 3fecd954b..de120dc56 100644 --- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java @@ -238,7 +238,9 @@ public class OptimizeStmt extends DmlStatementBase { GroupedContentFiles selectedContentFiles = new GroupedContentFiles(); selectedContentFiles.dataFilesWithoutDeletes = contentFiles; IcebergContentFileStore selectedFiles = - new IcebergContentFileStore(iceTable, selectedContentFiles); + new IcebergContentFileStore(iceTable.getIcebergApiTable(), + iceTable.getContentFileStore().getDataFilesWithoutDeletes(), + selectedContentFiles); return selectedFiles.getDataFilesWithoutDeletes(); } diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java index 8acc4ca41..c6597ef80 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java @@ -708,7 +708,9 @@ public interface FeIcebergTable extends FeFsTable { * Get FileDescriptor by data file location */ public static FileDescriptor getFileDescriptor( - ContentFile<?> contentFile, FeIcebergTable table) throws IOException { + ContentFile<?> contentFile, Table iceApiTable, + boolean requiresDataFilesInTableLocation, + ListMap<TNetworkAddress> hostIndex) throws IOException { Path fileLoc = FileSystemUtil.createFullyQualifiedPath( new Path(contentFile.path().toString())); FileSystem fsForPath = FileSystemUtil.getFileSystemForPath(fileLoc); @@ -719,19 +721,22 @@ public interface FeIcebergTable extends FeFsTable { // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus ourselves. fileStatus = createFileStatus(contentFile, fileLoc); } - return getFileDescriptor(fsForPath, fileStatus, table); + return getFileDescriptor(fsForPath, fileStatus, iceApiTable, + requiresDataFilesInTableLocation, hostIndex); } private static FileDescriptor getFileDescriptor(FileSystem fs, - FileStatus fileStatus, FeIcebergTable table) throws IOException { + FileStatus fileStatus, Table iceApiTable, + boolean requiresDataFilesInTableLocation, + ListMap<TNetworkAddress> hostIndex) throws IOException { Reference<Long> numUnknownDiskIds = new Reference<>(0L); String absPath = null; - Path tableLoc = new Path(table.getIcebergTableLocation()); + Path tableLoc = new Path(iceApiTable.location()); String relPath = FileSystemUtil.relativizePathNoThrow( fileStatus.getPath(), tableLoc); if (relPath == null) { - if (Utils.requiresDataFilesInTableLocation(table)) { + if (requiresDataFilesInTableLocation) { throw new RuntimeException(fileStatus.getPath() + " is outside of the Iceberg table location " + tableLoc); } @@ -751,7 +756,7 @@ public interface FeIcebergTable extends FeFsTable { } return FileDescriptor.create(fileStatus, relPath, locations, - table.getHostIndex(), fileStatus.isEncrypted(), fileStatus.isErasureCoded(), + hostIndex, fileStatus.isEncrypted(), fileStatus.isErasureCoded(), numUnknownDiskIds, absPath); } diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java index f94114d9a..a6e0534cc 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java @@ -96,9 +96,10 @@ public class FileMetadataLoader { * @param fileFormat if non-null and equal to HdfsFileFormat.HUDI_PARQUET, * this loader will filter files based on Hudi's HoodieROTablePathFilter method */ - public FileMetadataLoader(Path partDir, boolean recursive, List<FileDescriptor> oldFds, - ListMap<TNetworkAddress> hostIndex, @Nullable ValidTxnList validTxnList, - @Nullable ValidWriteIdList writeIds, @Nullable HdfsFileFormat fileFormat) { + public FileMetadataLoader(Path partDir, boolean recursive, + Iterable<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, + @Nullable ValidTxnList validTxnList, @Nullable ValidWriteIdList writeIds, + @Nullable HdfsFileFormat fileFormat) { // Either both validTxnList and writeIds are null, or none of them. Preconditions.checkState((validTxnList == null && writeIds == null) || (validTxnList != null && writeIds != null)); @@ -116,9 +117,9 @@ public class FileMetadataLoader { TOTAL_TASKS.incrementAndGet(); } - public FileMetadataLoader(Path partDir, boolean recursive, List<FileDescriptor> oldFds, - ListMap<TNetworkAddress> hostIndex, @Nullable ValidTxnList validTxnList, - @Nullable ValidWriteIdList writeIds) { + public FileMetadataLoader(Path partDir, boolean recursive, + Iterable<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, + @Nullable ValidTxnList validTxnList, @Nullable ValidWriteIdList writeIds) { this(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds, null); } diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 0498cef0e..141f38286 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -58,7 +58,6 @@ import org.apache.impala.analysis.NullLiteral; import org.apache.impala.analysis.NumericLiteral; import org.apache.impala.analysis.PartitionKeyValue; import org.apache.impala.catalog.events.MetastoreEventsProcessor; -import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.Pair; @@ -244,10 +243,11 @@ public class HdfsTable extends Table implements FeFsTable { // Declared as protected to allow third party extension visibility. protected final Map<Long, HdfsPartition> partitionMap_ = new HashMap<>(); - // The data and delete files of the table, for Iceberg tables only. - private GroupedContentFiles icebergFiles_; - // Whether the data and delete files of The Iceberg tables can be outside the location. - private boolean canDataBeOutsideOfTableLocation_; + // Whether to skip file metadata loading. Iceberg tables take care of file metadata + // loading themselves, so it is true iff this HdfsTable is part of an + // IcebergTable object. + // TODO(IMPALA-13734/IMPALA-13738): Remove this flag. + private boolean skipIcebergFileMetadataLoading_ = false; // Map of partition name to HdfsPartition object. Used for speeding up // table metadata loading. It is only populated if this table object is stored in @@ -339,13 +339,8 @@ public class HdfsTable extends Table implements FeFsTable { // to determine if we can skip the table from the topic update. private long lastVersionSeenByTopicUpdate_ = -1; - public void setIcebergFiles(GroupedContentFiles icebergFiles) { - icebergFiles_ = icebergFiles; - } - - public void setCanDataBeOutsideOfTableLocation( - boolean canDataBeOutsideOfTableLocation) { - canDataBeOutsideOfTableLocation_ = canDataBeOutsideOfTableLocation; + public void setSkipIcebergFileMetadataLoading(boolean skipIcebergFileMetadataLoading) { + skipIcebergFileMetadataLoading_ = skipIcebergFileMetadataLoading; } // Represents a set of storage-related statistics aggregated at the table or partition @@ -425,8 +420,6 @@ public class HdfsTable extends Table implements FeFsTable { super(msTbl, db, name, owner); partitionLocationCompressor_ = new HdfsPartitionLocationCompressor(numClusteringCols_); - icebergFiles_ = new GroupedContentFiles(); - canDataBeOutsideOfTableLocation_ = false; } @Override // FeFsTable @@ -814,15 +807,18 @@ public class HdfsTable extends Table implements FeFsTable { isRefresh ? "Refreshing" : "Loading", partBuilders.size(), getFullName()); - // Actually load the partitions. - // TODO(IMPALA-8406): if this fails to load files from one or more partitions, then - // we'll throw an exception here and end up bailing out of whatever catalog operation - // we're in the middle of. This could cause a partial metadata update -- eg we may - // have refreshed the top-level table properties without refreshing the files. - new ParallelFileMetadataLoader(getFileSystem(), partBuilders, validWriteIds_, - validTxnList, Utils.shouldRecursivelyListPartitions(this), - getHostIndex(), debugActions, logPrefix, icebergFiles_, - canDataBeOutsideOfTableLocation_).load(); + + if (!skipIcebergFileMetadataLoading_) { + // Actually load the partitions. + // TODO(IMPALA-8406): if this fails to load files from one or more partitions, then + // we'll throw an exception here and end up bailing out of whatever catalog + // operation we're in the middle of. This could cause a partial metadata update + // -- eg we may have refreshed the top-level table properties without refreshing the + // files. + new ParallelFileMetadataLoader(getFileSystem(), partBuilders, validWriteIds_, + validTxnList, Utils.shouldRecursivelyListPartitions(this), + getHostIndex(), debugActions, logPrefix).load(); + } // TODO(todd): would be good to log a summary of the loading process: // - how many block locations did we reuse/load individually/load via batch diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java index 562a85d50..1878eab12 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java @@ -20,10 +20,8 @@ package org.apache.impala.catalog; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,12 +33,13 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileContent; +import org.apache.iceberg.Table; import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.common.Pair; import org.apache.impala.fb.FbFileDesc; import org.apache.impala.fb.FbFileMetadata; import org.apache.impala.fb.FbIcebergDataFileFormat; +import org.apache.impala.fb.FbIcebergMetadata; import org.apache.impala.thrift.THdfsFileDesc; import org.apache.impala.thrift.TIcebergContentFileStore; import org.apache.impala.thrift.TNetworkAddress; @@ -191,38 +190,36 @@ public class IcebergContentFileStore { public IcebergContentFileStore() {} public IcebergContentFileStore( - FeIcebergTable icebergTable, GroupedContentFiles icebergFiles) { - Preconditions.checkNotNull(icebergTable); + Table iceApiTable, List<FileDescriptor> fileDescriptors, + GroupedContentFiles icebergFiles) { + Preconditions.checkNotNull(iceApiTable); + Preconditions.checkNotNull(fileDescriptors); Preconditions.checkNotNull(icebergFiles); Map<String, FileDescriptor> hdfsFileDescMap = new HashMap<>(); - Collection<? extends FeFsPartition> partitions = - FeCatalogUtils.loadAllPartitions(icebergTable.getFeFsTable()); - for (FeFsPartition partition : partitions) { - for (FileDescriptor fileDesc : partition.getFileDescriptors()) { - Path path = new Path(fileDesc.getAbsolutePath(icebergTable.getHdfsBaseDir())); - hdfsFileDescMap.put(path.toUri().getPath(), fileDesc); - } + for (FileDescriptor fileDesc : fileDescriptors) { + Path path = new Path(fileDesc.getAbsolutePath(iceApiTable.location())); + hdfsFileDescMap.put(path.toUri().getPath(), fileDesc); } for (DataFile dataFile : icebergFiles.dataFilesWithoutDeletes) { Pair<String, EncodedFileDescriptor> pathHashAndFd = - getPathHashAndFd(icebergTable, dataFile, hdfsFileDescMap); + getPathHashAndFd(dataFile, hdfsFileDescMap); dataFilesWithoutDeletes_.add(pathHashAndFd.first, pathHashAndFd.second); } for (DataFile dataFile : icebergFiles.dataFilesWithDeletes) { Pair<String, EncodedFileDescriptor> pathHashAndFd = - getPathHashAndFd(icebergTable, dataFile, hdfsFileDescMap); + getPathHashAndFd(dataFile, hdfsFileDescMap); dataFilesWithDeletes_.add(pathHashAndFd.first, pathHashAndFd.second); } for (DeleteFile deleteFile : icebergFiles.positionDeleteFiles) { Pair<String, EncodedFileDescriptor> pathHashAndFd = - getPathHashAndFd(icebergTable, deleteFile, hdfsFileDescMap); + getPathHashAndFd(deleteFile, hdfsFileDescMap); positionDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second); } for (DeleteFile deleteFile : icebergFiles.equalityDeleteFiles) { Pair<String, EncodedFileDescriptor> pathHashAndFd = - getPathHashAndFd(icebergTable, deleteFile, hdfsFileDescMap); + getPathHashAndFd(deleteFile, hdfsFileDescMap); equalityDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second); } } @@ -291,11 +288,10 @@ public class IcebergContentFileStore { public boolean hasOrc() { return hasOrc_; } public boolean hasParquet() { return hasParquet_; } - private void updateFileFormats(FbFileMetadata icebergMetadata) { + private void updateFileFormats(FbIcebergMetadata icebergMetadata) { Preconditions.checkNotNull(icebergMetadata); - Preconditions.checkNotNull(icebergMetadata.icebergMetadata()); - byte fileFormat = icebergMetadata.icebergMetadata().fileFormat(); + byte fileFormat = icebergMetadata.fileFormat(); if (fileFormat == FbIcebergDataFileFormat.PARQUET) { hasParquet_ = true; } else if (fileFormat == FbIcebergDataFileFormat.ORC) { @@ -306,37 +302,22 @@ public class IcebergContentFileStore { } private Pair<String, EncodedFileDescriptor> getPathHashAndFd( - FeIcebergTable icebergTable, - ContentFile<?> contentFile, - Map<String, FileDescriptor> hdfsFileDescMap) { + ContentFile<?> contentFile, Map<String, FileDescriptor> hdfsFileDescMap) { return new Pair<>( IcebergUtil.getFilePathHash(contentFile), - getOrCreateIcebergFd(icebergTable, hdfsFileDescMap, contentFile)); + getIcebergFd(hdfsFileDescMap, contentFile)); } - private EncodedFileDescriptor getOrCreateIcebergFd( - FeIcebergTable icebergTable, + private EncodedFileDescriptor getIcebergFd( Map<String, FileDescriptor> hdfsFileDescMap, ContentFile<?> contentFile) { Path path = new Path(contentFile.path().toString()); - FileDescriptor fileDesc = null; - if (hdfsFileDescMap.containsKey(path.toUri().getPath())) { - fileDesc = hdfsFileDescMap.get(path.toUri().getPath()); - } else { - if (FeIcebergTable.Utils.requiresDataFilesInTableLocation(icebergTable)) { - LOG.warn("Iceberg file '{}' cannot be found in the HDFS recursive" - + "file listing results.", path); - } - try { - fileDesc = FeIcebergTable.Utils.getFileDescriptor(contentFile, icebergTable); - } catch (IOException e) { - throw new IllegalArgumentException(e.getMessage()); - } - } - Preconditions.checkNotNull(fileDesc); + FileDescriptor fileDesc = hdfsFileDescMap.get(path.toUri().getPath()); - FbFileMetadata icebergMetadata = - IcebergUtil.createIcebergMetadata(icebergTable, contentFile); + FbFileMetadata fileMetadata = fileDesc.getFbFileMetadata(); + Preconditions.checkState(fileMetadata != null); + FbIcebergMetadata icebergMetadata = fileMetadata.icebergMetadata(); + Preconditions.checkState(icebergMetadata != null); updateFileFormats(icebergMetadata); @@ -357,9 +338,6 @@ public class IcebergContentFileStore { return ret; } - // TODO IMPALA-11265: After converting to/from thrift the byte arrays representing the - // file descriptors won't be shared between the HdfsTable and IcebergContentFileStore. - // This redundancy causes unnecessary JVM heap memory usage on the coordinator. public static IcebergContentFileStore fromThrift(TIcebergContentFileStore tFileStore, List<TNetworkAddress> networkAddresses, ListMap<TNetworkAddress> hostIndex) { diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java index d4f6edcf2..af338372e 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java @@ -53,6 +53,7 @@ import org.apache.impala.common.Reference; import org.apache.impala.common.Pair; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.util.IcebergUtil; import org.apache.impala.util.ListMap; import org.apache.impala.util.ThreadNameAnnotator; @@ -70,32 +71,32 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { // invalid value. private final int NEW_FILES_THRESHOLD_DEFAULT = 100; + private final org.apache.iceberg.Table iceTbl_; + // If there are more new files than 'newFilesThreshold_', we should fall back // to regular file metadata loading. private final int newFilesThreshold_; private final GroupedContentFiles icebergFiles_; - private final boolean canDataBeOutsideOfTableLocation_; + private final boolean requiresDataFilesInTableLocation_; private boolean useParallelListing_; - public IcebergFileMetadataLoader(Path partDir, boolean recursive, - List<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, - ValidTxnList validTxnList, ValidWriteIdList writeIds, - GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation) { - this(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds, - icebergFiles, canDataBeOutsideOfTableLocation, + public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl, + Iterable<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, + GroupedContentFiles icebergFiles, boolean requiresDataFilesInTableLocation) { + this(iceTbl, oldFds, hostIndex, icebergFiles, requiresDataFilesInTableLocation, BackendConfig.INSTANCE.icebergReloadNewFilesThreshold()); } - public IcebergFileMetadataLoader(Path partDir, boolean recursive, - List<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, - ValidTxnList validTxnList, ValidWriteIdList writeIds, - GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation, + public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl, + Iterable<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, + GroupedContentFiles icebergFiles, boolean requiresDataFilesInTableLocation, int newFilesThresholdParam) { - super(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds, - HdfsFileFormat.ICEBERG); + super(FileSystemUtil.createFullyQualifiedPath(new Path(iceTbl.location())), true, + oldFds, hostIndex, null, null, HdfsFileFormat.ICEBERG); + iceTbl_ = iceTbl; icebergFiles_ = icebergFiles; - canDataBeOutsideOfTableLocation_ = canDataBeOutsideOfTableLocation; + requiresDataFilesInTableLocation_ = requiresDataFilesInTableLocation; if (newFilesThresholdParam >= 0) { newFilesThreshold_ = newFilesThresholdParam; } else { @@ -131,9 +132,9 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { FileSystem defaultFs = FileSystemUtil.getDefaultFileSystem(); for (ContentFile<?> contentFile : newContentFiles) { FileSystem fsForPath = fsForTable; - // If canDataBeOutsideOfTableLocation is not true, we assume that the file system + // If requiresDataFilesInTableLocation_ is true, we assume that the file system // for all ContentFiles is the same as fsForTable - if (canDataBeOutsideOfTableLocation_) { + if (!requiresDataFilesInTableLocation_) { Path path = new Path(contentFile.path().toString()); fsForPath = path.toUri().getScheme() != null ? FileSystemUtil.getFileSystemForPath(path) : defaultFs; @@ -218,14 +219,16 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { String absPath = null; String relPath = FileSystemUtil.relativizePathNoThrow(stat.getPath(), partDir_); if (relPath == null) { - if (canDataBeOutsideOfTableLocation_) { - absPath = stat.getPath().toString(); - } else { + if (requiresDataFilesInTableLocation_) { throw new IOException(String.format("Failed to load Iceberg datafile %s, because " + "it's outside of the table location", stat.getPath().toUri())); + } else { + absPath = stat.getPath().toString(); } } - return createFd(fs, stat, relPath, numUnknownDiskIds, absPath); + FileDescriptor fsFd = createFd(fs, stat, relPath, numUnknownDiskIds, absPath); + return fsFd.cloneWithFileMetadata( + IcebergUtil.createIcebergMetadata(iceTbl_, contentFile)); } /** @@ -293,11 +296,11 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { new Path(contentFile.path().toString())); String lookupPath = FileSystemUtil.relativizePathNoThrow(contentFilePath, partDir_); if (lookupPath == null) { - if (canDataBeOutsideOfTableLocation_) { - lookupPath = contentFilePath.toString(); - } else { + if (requiresDataFilesInTableLocation_) { throw new IOException(String.format("Failed to load Iceberg datafile %s, because " + "it's outside of the table location", contentFilePath)); + } else { + lookupPath = contentFilePath.toString(); } } return oldFdsByPath_.get(lookupPath); diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 525525592..31730c66b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -20,12 +20,15 @@ package org.apache.impala.catalog; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; + import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -443,19 +446,21 @@ public class IcebergTable extends Table implements FeIcebergTable { GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this, new ArrayList<>(), /*timeTravelSpec=*/null); catalogTimeline.markEvent("Loaded Iceberg files"); - hdfsTable_.setIcebergFiles(icebergFiles); - hdfsTable_.setCanDataBeOutsideOfTableLocation( - !Utils.requiresDataFilesInTableLocation(this)); + hdfsTable_.setSkipIcebergFileMetadataLoading(true); hdfsTable_.load(reuseMetadata, msClient, msTable_, reason, catalogTimeline); - fileStore_ = new IcebergContentFileStore(this, icebergFiles); + IcebergFileMetadataLoader loader = new IcebergFileMetadataLoader( + icebergApiTable_, + fileStore_ == null ? Collections.emptyList() : fileStore_.getAllFiles(), + getHostIndex(), Preconditions.checkNotNull(icebergFiles), + Utils.requiresDataFilesInTableLocation(this)); + loader.load(); + fileStore_ = new IcebergContentFileStore( + icebergApiTable_, loader.getLoadedFds(), icebergFiles); partitionStats_ = Utils.loadPartitionStats(this, icebergFiles); setIcebergTableStats(); loadAllColumnStats(msClient, catalogTimeline); applyPuffinNdvStats(catalogTimeline); setAvroSchema(msClient, msTbl, fileStore_, catalogTimeline); - - // We no longer need to keep Iceberg's content files in memory. - hdfsTable_.setIcebergFiles(null); } catch (Exception e) { throw new IcebergTableLoadingException("Error loading metadata for Iceberg table " + icebergTableLocation_, e); diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java index b3362da03..6f35f146b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.impala.catalog.HdfsPartition.Builder; -import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.Pair; import org.apache.impala.service.BackendConfig; @@ -81,15 +80,6 @@ public class ParallelFileMetadataLoader { ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive, @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix) { - this(fs, partBuilders, writeIdList, validTxnList, isRecursive, hostIndex, debugAction, - logPrefix, new GroupedContentFiles(), false); - } - - public ParallelFileMetadataLoader(FileSystem fs, - Collection<Builder> partBuilders, - ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive, - @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix, - GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation) { if (writeIdList != null || validTxnList != null) { // make sure that both either both writeIdList and validTxnList are set or both // of them are not. @@ -109,14 +99,9 @@ public class ParallelFileMetadataLoader { List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors(); FileMetadataLoader loader; HdfsFileFormat format = e.getValue().get(0).getFileFormat(); - if (format.equals(HdfsFileFormat.ICEBERG)) { - loader = new IcebergFileMetadataLoader(e.getKey(), isRecursive, oldFds, hostIndex, - validTxnList, writeIdList, Preconditions.checkNotNull(icebergFiles), - canDataBeOutsideOfTableLocation); - } else { - loader = new FileMetadataLoader(e.getKey(), isRecursive, oldFds, hostIndex, - validTxnList, writeIdList, format); - } + Preconditions.checkState(!HdfsFileFormat.ICEBERG.equals(format)); + loader = new FileMetadataLoader(e.getKey(), isRecursive, oldFds, hostIndex, + validTxnList, writeIdList, format); // If there is a cached partition mapped to this path, we recompute the block // locations even if the underlying files have not changed. // This is done to keep the cached block metadata up to date. diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java index 7e20f0f5a..8cd74c7ff 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java @@ -730,7 +730,10 @@ public class IcebergScanPlanner { } cachehit = false; try { - fileDesc = FeIcebergTable.Utils.getFileDescriptor(cf, getIceTable()); + fileDesc = FeIcebergTable.Utils.getFileDescriptor(cf, + getIceTable().getIcebergApiTable(), + FeIcebergTable.Utils.requiresDataFilesInTableLocation(getIceTable()), + getIceTable().getHostIndex()); } catch (IOException ex) { throw new ImpalaRuntimeException( "Cannot load file descriptor for " + cf.path(), ex); @@ -741,7 +744,7 @@ public class IcebergScanPlanner { } // Add file descriptor to the cache. fileDesc = fileDesc.cloneWithFileMetadata( - IcebergUtil.createIcebergMetadata(getIceTable(), cf)); + IcebergUtil.createIcebergMetadata(getIceTable().getIcebergApiTable(), cf)); fileStore.addOldFileDescriptor(pathHash, fileDesc); } return new Pair<>(fileDesc, cachehit); diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java index 5ad64f71e..78a99a72f 100644 --- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java +++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java @@ -817,6 +817,7 @@ public class IcebergUtil { public static Object getPartitionValue(Type type, TIcebergPartitionTransformType transformType, String stringValue) throws ImpalaRuntimeException { + // This needs to be consistent with createPartitionTransformValue(). String HIVE_NULL = MetaStoreUtil.DEFAULT_NULL_PARTITION_KEY_VALUE; if (stringValue == null || stringValue.equals(HIVE_NULL)) return null; @@ -1040,15 +1041,14 @@ public class IcebergUtil { } /** - * Extracts metadata from Iceberg data file object 'df'. Such metadata is the file + * Extracts metadata from Iceberg data file object 'cf'. Such metadata is the file * format of the data file, also the partition information the data file belongs. * It creates a flatbuffer so it can be passed between machines and processes without * further de/serialization. */ - public static FbFileMetadata createIcebergMetadata(FeIcebergTable feTbl, - ContentFile cf) { + public static FbFileMetadata createIcebergMetadata(Table iceApiTbl, ContentFile cf) { FlatBufferBuilder fbb = new FlatBufferBuilder(1); - int iceOffset = createIcebergMetadata(feTbl, fbb, cf); + int iceOffset = createIcebergMetadata(iceApiTbl, fbb, cf); fbb.finish(FbFileMetadata.createFbFileMetadata(fbb, iceOffset)); ByteBuffer bb = fbb.dataBuffer().slice(); ByteBuffer compressedBb = ByteBuffer.allocate(bb.capacity()); @@ -1056,12 +1056,12 @@ public class IcebergUtil { return FbFileMetadata.getRootAsFbFileMetadata((ByteBuffer)compressedBb.flip()); } - private static int createIcebergMetadata(FeIcebergTable feTbl, FlatBufferBuilder fbb, + private static int createIcebergMetadata(Table iceApiTbl, FlatBufferBuilder fbb, ContentFile cf) { int partKeysOffset = -1; - PartitionSpec spec = feTbl.getIcebergApiTable().specs().get(cf.specId()); + PartitionSpec spec = iceApiTbl.specs().get(cf.specId()); if (spec != null && !spec.fields().isEmpty()) { - partKeysOffset = createPartitionKeys(feTbl, fbb, spec, cf); + partKeysOffset = createPartitionKeys(fbb, spec, cf); } int eqFieldIdsOffset = -1; List<Integer> eqFieldIds = cf.equalityFieldIds(); @@ -1101,18 +1101,18 @@ public class IcebergUtil { return FbIcebergMetadata.endFbIcebergMetadata(fbb); } - private static int createPartitionKeys(FeIcebergTable feTbl, FlatBufferBuilder fbb, + private static int createPartitionKeys(FlatBufferBuilder fbb, PartitionSpec spec, ContentFile cf) { Preconditions.checkState(spec.fields().size() == cf.partition().size()); int[] partitionKeyOffsets = new int[spec.fields().size()]; for (int i = 0; i < spec.fields().size(); ++i) { partitionKeyOffsets[i] = - createPartitionTransformValue(feTbl, fbb, spec, cf, i); + createPartitionTransformValue(fbb, spec, cf, i); } return FbIcebergMetadata.createPartitionKeysVector(fbb, partitionKeyOffsets); } - private static int createPartitionTransformValue(FeIcebergTable feTbl, + private static int createPartitionTransformValue( FlatBufferBuilder fbb, PartitionSpec spec, ContentFile cf, int fieldIndex) { PartitionField field = spec.fields().get(fieldIndex); Pair<Byte, Integer> transform = getFbTransform(spec.schema(), field); @@ -1123,7 +1123,8 @@ public class IcebergUtil { if (partValue != null) { partValueString = partValue.toString(); } else { - partValueString = feTbl.getNullPartitionKeyValue(); + // This needs to be consistent with getPartitionValue(). + partValueString = MetaStoreUtil.DEFAULT_NULL_PARTITION_KEY_VALUE; } valueOffset = fbb.createString(partValueString); } diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java index 30dc8c75b..397f79d74 100644 --- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java @@ -121,7 +121,7 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ Collections.emptyList(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml1.load(); assertEquals(20, fml1.getStats().loadedFiles); assertEquals(0, fml1.getStats().skippedFiles); @@ -136,7 +136,7 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_non_partitioned", /* oldFds = */ Collections.emptyList(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml2.load(); assertEquals(20, fml2.getStats().loadedFiles); assertEquals(0, fml2.getStats().skippedFiles); @@ -155,13 +155,13 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ Collections.emptyList(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml1.load(); IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ fml1.getLoadedFds(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml1Refresh.load(); assertFalse(fml1Refresh.useParallelListing()); assertEquals(0, fml1Refresh.getStats().loadedFiles); @@ -178,13 +178,13 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_non_partitioned", /* oldFds = */ Collections.emptyList(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml2.load(); IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_non_partitioned", /* oldFds = */ fml2.getLoadedFds(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml2Refresh.load(); assertFalse(fml2Refresh.useParallelListing()); assertEquals(0, fml2Refresh.getStats().loadedFiles); @@ -205,13 +205,13 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ Collections.emptyList(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml1.load(); IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ fml1.getLoadedFds().subList(0, 10), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml1Refresh.load(); assertFalse(fml1Refresh.useParallelListing()); assertEquals(10, fml1Refresh.getStats().loadedFiles); @@ -221,13 +221,13 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_non_partitioned", /* oldFds = */ Collections.emptyList(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml2.load(); IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_non_partitioned", /* oldFds = */ fml2.getLoadedFds().subList(0, 10), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml2Refresh.load(); assertFalse(fml2Refresh.useParallelListing()); assertEquals(10, fml2Refresh.getStats().loadedFiles); @@ -241,13 +241,13 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ Collections.emptyList(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml1.load(); IcebergFileMetadataLoader fml1ForceRefresh = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ fml1.getLoadedFds().subList(0, 10), - /* canDataBeOutsideOfTableLocation = */ false, 10); + /* requiresDataFilesInTableLocation = */ true, 10); fml1ForceRefresh.setForceRefreshBlockLocations(true); fml1ForceRefresh.load(); assertTrue(fml1ForceRefresh.useParallelListing()); @@ -255,7 +255,7 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ fml1.getLoadedFds().subList(0, 10), - /* canDataBeOutsideOfTableLocation = */ false, 10); + /* requiresDataFilesInTableLocation = */ true, 10); fml1Refresh.setForceRefreshBlockLocations(false); fml1Refresh.load(); assertFalse(fml1Refresh.useParallelListing()); @@ -263,13 +263,13 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml1Refresh10 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ fml1.getLoadedFds().subList(0, 10), - /* canDataBeOutsideOfTableLocation = */ false, 10); + /* requiresDataFilesInTableLocation = */ true, 10); fml1Refresh10.load(); assertFalse(fml1Refresh10.useParallelListing()); IcebergFileMetadataLoader fml1Refresh9 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_partitioned", /* oldFds = */ fml1.getLoadedFds().subList(0, 10), - /* canDataBeOutsideOfTableLocation = */ false, 9); + /* requiresDataFilesInTableLocation = */ true, 9); fml1Refresh9.load(); assertTrue(fml1Refresh9.useParallelListing()); @@ -277,23 +277,23 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_non_partitioned", /* oldFds = */ Collections.emptyList(), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); fml2.load(); IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_non_partitioned", /* oldFds = */ fml2.getLoadedFds().subList(0, 10), - /* canDataBeOutsideOfTableLocation = */ false); + /* requiresDataFilesInTableLocation = */ true); IcebergFileMetadataLoader fml2Refresh10 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_non_partitioned", /* oldFds = */ fml2.getLoadedFds().subList(0, 10), - /* canDataBeOutsideOfTableLocation = */ false, 10); + /* requiresDataFilesInTableLocation = */ true, 10); fml2Refresh10.load(); assertFalse(fml2Refresh10.useParallelListing()); IcebergFileMetadataLoader fml2Refresh9 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_non_partitioned", /* oldFds = */ fml2.getLoadedFds().subList(0, 10), - /* canDataBeOutsideOfTableLocation = */ false, 9); + /* requiresDataFilesInTableLocation = */ true, 9); fml2Refresh9.load(); assertTrue(fml2Refresh9.useParallelListing()); } @@ -305,13 +305,13 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_multiple_storage_locations", /* oldFds = */ Collections.emptyList(), - /* canDataBeOutsideOfTableLocation = */ true); + /* requiresDataFilesInTableLocation = */ false); fml1.load(); IcebergFileMetadataLoader fml1Refresh1 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_multiple_storage_locations", /* oldFds = */ fml1.getLoadedFds().subList(0, 1), - /* canDataBeOutsideOfTableLocation = */ true); + /* requiresDataFilesInTableLocation = */ false); fml1Refresh1.load(); assertFalse(fml1Refresh1.useParallelListing()); assertEquals(5, fml1Refresh1.getStats().loadedFiles); @@ -321,7 +321,7 @@ public class FileMetadataLoaderTest { IcebergFileMetadataLoader fml1Refresh5 = getLoaderForIcebergTable(catalog, "functional_parquet", "iceberg_multiple_storage_locations", /* oldFds = */ fml1.getLoadedFds().subList(0, 5), - /* canDataBeOutsideOfTableLocation = */ true); + /* requiresDataFilesInTableLocation = */ false); fml1Refresh5.load(); assertFalse(fml1Refresh5.useParallelListing()); assertEquals(1, fml1Refresh5.getStats().loadedFiles); @@ -331,15 +331,15 @@ public class FileMetadataLoaderTest { private IcebergFileMetadataLoader getLoaderForIcebergTable( CatalogServiceCatalog catalog, String dbName, String tblName, - List<FileDescriptor> oldFds, boolean canDataBeOutsideOfTableLocation) + List<FileDescriptor> oldFds, boolean requiresDataFilesInTableLocation) throws CatalogException { return getLoaderForIcebergTable(catalog, dbName, tblName, oldFds, - canDataBeOutsideOfTableLocation, -1); + requiresDataFilesInTableLocation, -1); } private IcebergFileMetadataLoader getLoaderForIcebergTable( CatalogServiceCatalog catalog, String dbName, String tblName, - List<FileDescriptor> oldFds, boolean canDataBeOutsideOfTableLocation, + List<FileDescriptor> oldFds, boolean requiresDataFilesInTableLocation, int newFilesThreshold) throws CatalogException { ListMap<TNetworkAddress> hostIndex = new ListMap<>(); @@ -348,9 +348,8 @@ public class FileMetadataLoaderTest { Path location = new Path(iceT.getLocation()); GroupedContentFiles iceFiles = IcebergUtil.getIcebergFiles(iceT, /*predicates=*/Collections.emptyList(), /*timeTravelSpec=*/null); - return new IcebergFileMetadataLoader(location, /* recursive=*/true, - oldFds, hostIndex, null, null, iceFiles, canDataBeOutsideOfTableLocation, - newFilesThreshold); + return new IcebergFileMetadataLoader(iceT.getIcebergApiTable(), + oldFds, hostIndex, iceFiles, requiresDataFilesInTableLocation, newFilesThreshold); } private FileMetadataLoader getLoaderForAcidTable(
