This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 4d6ff6fdd IMPALA-11662: Improve 'refresh iceberg_tbl_on_oss'
performance
4d6ff6fdd is described below
commit 4d6ff6fddd77800999e1d6d8ef0de8af5b1684ab
Author: LPL <[email protected]>
AuthorDate: Tue Dec 20 14:29:15 2022 +0800
IMPALA-11662: Improve 'refresh iceberg_tbl_on_oss' performance
As the cost of directory listing on Cloud Storage Systems such as OSS or
S3 is higher than the cost on HDFS, we could create the file descriptors
from the rich metadata provided by Iceberg instead of using
org.apache.hadoop.fs.FileSystem#listFiles. The only thing missing there
is the last_modification_time of the files. But since Iceberg files are
immutable, we could just come up with a special timestamp for these
files.
At the same time, we can also construct file descriptors ourselves
during time travel to reduce the cost of requests with OSS services.
Test:
* existing tests
* test on COS with my local test environment
Change-Id: If2ee8b6b7559e6590698b46ef1d574e55ed52f9a
Reviewed-on: http://gerrit.cloudera.org:8080/19379
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Zoltan Borok-Nagy <[email protected]>
---
.../org/apache/impala/catalog/FeIcebergTable.java | 67 ++++++----
.../apache/impala/catalog/FileMetadataLoader.java | 120 +++++++++++-------
.../org/apache/impala/catalog/HdfsPartition.java | 18 +--
.../java/org/apache/impala/catalog/HdfsTable.java | 38 ++++--
.../impala/catalog/IcebergFileMetadataLoader.java | 139 +++++++++++++++++++++
.../org/apache/impala/catalog/IcebergTable.java | 12 +-
.../impala/catalog/ParallelFileMetadataLoader.java | 32 +++--
.../catalog/iceberg/GroupedContentFiles.java | 9 +-
.../apache/impala/planner/IcebergScanPlanner.java | 6 +-
9 files changed, 333 insertions(+), 108 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 043cb25ae..c9c489ab5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -333,6 +333,12 @@ public interface FeIcebergTable extends FeFsTable {
* Utility functions
*/
public static abstract class Utils {
+
+ // We need 'FileStatus#modification_time' to know whether the file is
changed,
+ // meanwhile this property cannot be obtained from the Iceberg metadata.
But Iceberg
+ // files are immutable, so we can use a special default value.
+ private static final int DEFAULT_MODIFICATION_TIME = 1;
+
/**
* Returns true if FeIcebergTable file format is columnar: parquet or orc
*/
@@ -627,20 +633,28 @@ public interface FeIcebergTable extends FeFsTable {
/**
* Get FileDescriptor by data file location
*/
- public static HdfsPartition.FileDescriptor getFileDescriptor(Path fileLoc,
- Path tableLoc, FeIcebergTable table) throws IOException {
- FileSystem fs = FileSystemUtil.getFileSystemForPath(tableLoc);
- FileStatus fileStatus = fs.getFileStatus(fileLoc);
- return getFileDescriptor(fs, tableLoc, fileStatus, table);
+ public static HdfsPartition.FileDescriptor getFileDescriptor(
+ ContentFile<?> contentFile, FeIcebergTable table) throws IOException {
+ Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
+ new Path(contentFile.path().toString()));
+ FileSystem fsForPath = FileSystemUtil.getFileSystemForPath(fileLoc);
+ FileStatus fileStatus;
+ if (FileSystemUtil.supportsStorageIds(fsForPath)) {
+ fileStatus = createLocatedFileStatus(fileLoc, fsForPath);
+ } else {
+ // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus
ourselves.
+ fileStatus = createFileStatus(contentFile, fileLoc);
+ }
+ return getFileDescriptor(fsForPath, fileStatus, table);
}
private static HdfsPartition.FileDescriptor getFileDescriptor(FileSystem
fs,
- Path tableLoc, FileStatus fileStatus, FeIcebergTable table)
- throws IOException {
+ FileStatus fileStatus, FeIcebergTable table) throws IOException {
Reference<Long> numUnknownDiskIds = new Reference<>(0L);
String relPath = null;
String absPath = null;
+ Path tableLoc = new Path(table.getIcebergTableLocation());
URI relUri = tableLoc.toUri().relativize(fileStatus.getPath().toUri());
if (relUri.isAbsolute() || relUri.getPath().startsWith(Path.SEPARATOR)) {
if (Utils.requiresDataFilesInTableLocation(table)) {
@@ -659,7 +673,7 @@ public interface FeIcebergTable extends FeFsTable {
BlockLocation[] locations;
if (fileStatus instanceof LocatedFileStatus) {
- locations = ((LocatedFileStatus)fileStatus).getBlockLocations();
+ locations = ((LocatedFileStatus) fileStatus).getBlockLocations();
} else {
locations = fs.getFileBlockLocations(fileStatus, 0,
fileStatus.getLen());
}
@@ -677,7 +691,7 @@ public interface FeIcebergTable extends FeFsTable {
*/
public static IcebergContentFileStore loadAllPartition(
IcebergTable table, GroupedContentFiles icebergFiles)
- throws IOException, TableLoadingException {
+ throws IOException {
Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap = new
HashMap<>();
Collection<HdfsPartition> partitions =
((HdfsTable)table.getFeFsTable()).partitionMap_.values();
@@ -708,8 +722,7 @@ public interface FeIcebergTable extends FeFsTable {
}
private static Pair<String, HdfsPartition.FileDescriptor> getPathHashAndFd(
- ContentFile contentFile,
- IcebergTable table,
+ ContentFile<?> contentFile, IcebergTable table,
Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap) throws
IOException {
String pathHash = IcebergUtil.getFilePathHash(contentFile);
HdfsPartition.FileDescriptor fd = getOrCreateIcebergFd(
@@ -719,7 +732,7 @@ public interface FeIcebergTable extends FeFsTable {
private static FileDescriptor getOrCreateIcebergFd(IcebergTable table,
Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap,
- ContentFile contentFile) throws IllegalArgumentException, IOException {
+ ContentFile<?> contentFile) throws IllegalArgumentException,
IOException {
Path path = new Path(contentFile.path().toString());
HdfsPartition.FileDescriptor iceFd = null;
if (hdfsFileDescMap.containsKey(path.toUri().getPath())) {
@@ -727,19 +740,16 @@ public interface FeIcebergTable extends FeFsTable {
path.toUri().getPath());
iceFd = fsFd.cloneWithFileMetadata(
IcebergUtil.createIcebergMetadata(table, contentFile));
- return iceFd;
} else {
if (Utils.requiresDataFilesInTableLocation(table)) {
LOG.warn("Iceberg file '{}' cannot be found in the HDFS recursive"
- + "file listing results.", path.toString());
+ + "file listing results.", path);
}
- HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(
- new Path(contentFile.path().toString()),
- new Path(table.getIcebergTableLocation()), table);
+ HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(contentFile,
table);
iceFd = fileDesc.cloneWithFileMetadata(
IcebergUtil.createIcebergMetadata(table, contentFile));
- return iceFd;
}
+ return iceFd;
}
/**
@@ -749,8 +759,7 @@ public interface FeIcebergTable extends FeFsTable {
* TODO(IMPALA-11516): Return better partition stats for V2 tables.
*/
public static Map<String, TIcebergPartitionStats> loadPartitionStats(
- IcebergTable table, GroupedContentFiles icebergFiles)
- throws TableLoadingException {
+ IcebergTable table, GroupedContentFiles icebergFiles) {
Map<String, TIcebergPartitionStats> nameToStats = new HashMap<>();
for (ContentFile<?> contentFile : icebergFiles.getAllContentFiles()) {
String name = getPartitionKey(table, contentFile);
@@ -917,7 +926,9 @@ public interface FeIcebergTable extends FeFsTable {
}
public static boolean requiresDataFilesInTableLocation(FeIcebergTable
icebergTable) {
- Map<String, String> properties =
icebergTable.getIcebergApiTable().properties();
+ Table icebergApiTable = icebergTable.getIcebergApiTable();
+ Preconditions.checkNotNull(icebergApiTable);
+ Map<String, String> properties = icebergApiTable.properties();
return !(PropertyUtil.propertyAsBoolean(properties,
TableProperties.OBJECT_STORE_ENABLED,
TableProperties.OBJECT_STORE_ENABLED_DEFAULT)
@@ -928,5 +939,19 @@ public interface FeIcebergTable extends FeFsTable {
|| StringUtils
.isNotEmpty(properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION)));
}
+
+ public static FileStatus createLocatedFileStatus(Path path, FileSystem fs)
+ throws IOException {
+ FileStatus fileStatus = fs.getFileStatus(path);
+ Preconditions.checkState(fileStatus.isFile());
+ BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0L,
+ fileStatus.getLen());
+ return new LocatedFileStatus(fileStatus, blockLocations);
+ }
+
+ public static FileStatus createFileStatus(ContentFile<?> contentFile, Path
path) {
+ return new FileStatus(contentFile.fileSizeInBytes(), false, 0, 0,
+ DEFAULT_MODIFICATION_TIME, path);
+ }
}
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index ee42fab49..9ebc6fa1f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -55,9 +55,9 @@ public class FileMetadataLoader {
private final static Logger LOG =
LoggerFactory.getLogger(FileMetadataLoader.class);
private static final Configuration CONF = new Configuration();
- private final Path partDir_;
- private final boolean recursive_;
- private final ImmutableMap<String, FileDescriptor> oldFdsByRelPath_;
+ protected final Path partDir_;
+ protected final boolean recursive_;
+ protected final ImmutableMap<String, FileDescriptor> oldFdsByPath_;
private final ListMap<TNetworkAddress> hostIndex_;
@Nullable
private final ValidWriteIdList writeIds_;
@@ -66,13 +66,13 @@ public class FileMetadataLoader {
@Nullable
private final HdfsFileFormat fileFormat_;
- private boolean forceRefreshLocations = false;
+ protected boolean forceRefreshLocations = false;
private List<FileDescriptor> loadedFds_;
private List<FileDescriptor> loadedInsertDeltaFds_;
private List<FileDescriptor> loadedDeleteDeltaFds_;
- private LoadStats loadStats_;
- private String debugAction_;
+ protected LoadStats loadStats_;
+ protected String debugAction_;
/**
* @param partDir the dir for which to fetch file metadata
@@ -97,7 +97,7 @@ public class FileMetadataLoader {
partDir_ = Preconditions.checkNotNull(partDir);
recursive_ = recursive;
hostIndex_ = Preconditions.checkNotNull(hostIndex);
- oldFdsByRelPath_ = Maps.uniqueIndex(oldFds, FileDescriptor::getPath);
+ oldFdsByPath_ = Maps.uniqueIndex(oldFds, FileDescriptor::getPath);
writeIds_ = writeIds;
validTxnList_ = validTxnList;
fileFormat_ = fileFormat;
@@ -174,45 +174,30 @@ public class FileMetadataLoader {
// assume that most _can_ be reused, in which case it's faster to _not_
prefetch
// the locations.
boolean listWithLocations = FileSystemUtil.supportsStorageIds(fs) &&
- (oldFdsByRelPath_.isEmpty() || forceRefreshLocations);
+ (oldFdsByPath_.isEmpty() || forceRefreshLocations);
String msg = String.format("%s file metadata%s from path %s",
- oldFdsByRelPath_.isEmpty() ? "Loading" : "Refreshing",
+ oldFdsByPath_.isEmpty() ? "Loading" : "Refreshing",
listWithLocations ? " with eager location-fetching" : "", partDir_);
LOG.trace(msg);
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
- RemoteIterator<? extends FileStatus> fileStatuses;
- if (listWithLocations) {
- fileStatuses = FileSystemUtil
- .listFiles(fs, partDir_, recursive_, debugAction_);
- } else {
- fileStatuses = FileSystemUtil
- .listStatus(fs, partDir_, recursive_, debugAction_);
-
- // TODO(todd): we could look at the result of listing without
locations, and if
- // we see that a substantial number of the files have changed, it may
be better
- // to go back and re-list with locations vs doing an RPC per file.
- }
+ List<FileStatus> fileStatuses = getFileStatuses(fs, listWithLocations);
+
loadedFds_ = new ArrayList<>();
if (fileStatuses == null) return;
- Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
-
- List<FileStatus> stats = new ArrayList<>();
- while (fileStatuses.hasNext()) {
- stats.add(fileStatuses.next());
- }
+ Reference<Long> numUnknownDiskIds = new Reference<>(0L);
if (writeIds_ != null) {
- stats = AcidUtils.filterFilesForAcidState(stats, partDir_,
validTxnList_,
- writeIds_, loadStats_);
+ fileStatuses = AcidUtils.filterFilesForAcidState(fileStatuses,
partDir_,
+ validTxnList_, writeIds_, loadStats_);
}
if (fileFormat_ == HdfsFileFormat.HUDI_PARQUET) {
- stats = HudiUtil.filterFilesForHudiROPath(stats);
+ fileStatuses = HudiUtil.filterFilesForHudiROPath(fileStatuses);
}
- for (FileStatus fileStatus : stats) {
+ for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
continue;
}
@@ -221,16 +206,10 @@ public class FileMetadataLoader {
++loadStats_.hiddenFiles;
continue;
}
- String relPath = FileSystemUtil.relativizePath(fileStatus.getPath(),
partDir_);
- FileDescriptor fd = oldFdsByRelPath_.get(relPath);
- if (listWithLocations || forceRefreshLocations || fd == null ||
- fd.isChanged(fileStatus)) {
- fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds);
- ++loadStats_.loadedFiles;
- } else {
- ++loadStats_.skippedFiles;
- }
- loadedFds_.add(Preconditions.checkNotNull(fd));;
+
+ FileDescriptor fd = getFileDescriptor(fs, listWithLocations,
numUnknownDiskIds,
+ fileStatus);
+ loadedFds_.add(Preconditions.checkNotNull(fd));
}
if (writeIds_ != null) {
loadedInsertDeltaFds_ = new ArrayList<>();
@@ -250,25 +229,74 @@ public class FileMetadataLoader {
}
}
+ /**
+ * Return fd created by the given fileStatus or from the
cache(oldFdsByPath_).
+ */
+ protected FileDescriptor getFileDescriptor(FileSystem fs, boolean
listWithLocations,
+ Reference<Long> numUnknownDiskIds, FileStatus fileStatus) throws
IOException {
+ String relPath = FileSystemUtil.relativizePath(fileStatus.getPath(),
partDir_);
+ FileDescriptor fd = oldFdsByPath_.get(relPath);
+ if (listWithLocations || forceRefreshLocations || fd == null ||
+ fd.isChanged(fileStatus)) {
+ fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds);
+ ++loadStats_.loadedFiles;
+ } else {
+ ++loadStats_.skippedFiles;
+ }
+ return fd;
+ }
+
+ /**
+ * Return located file status list when listWithLocations is true.
+ */
+ protected List<FileStatus> getFileStatuses(FileSystem fs, boolean
listWithLocations)
+ throws IOException {
+ RemoteIterator<? extends FileStatus> fileStatuses;
+ if (listWithLocations) {
+ fileStatuses = FileSystemUtil
+ .listFiles(fs, partDir_, recursive_, debugAction_);
+ } else {
+ fileStatuses = FileSystemUtil
+ .listStatus(fs, partDir_, recursive_, debugAction_);
+ // TODO(todd): we could look at the result of listing without locations,
and if
+ // we see that a substantial number of the files have changed, it may be
better
+ // to go back and re-list with locations vs doing an RPC per file.
+ }
+ if (fileStatuses == null) return null;
+ List<FileStatus> stats = new ArrayList<>();
+ while (fileStatuses.hasNext()) {
+ stats.add(fileStatuses.next());
+ }
+ return stats;
+ }
+
/**
* Create a FileDescriptor for the given FileStatus. If the FS supports
block locations,
* and FileStatus is a LocatedFileStatus (i.e. the location was prefetched)
this uses
* the already-loaded information; otherwise, this may have to remotely look
up the
* locations.
+ * 'absPath' is null except for the Iceberg tables, because datafiles of the
+ * Iceberg tables may not be in the table location.
*/
- private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
- String relPath, Reference<Long> numUnknownDiskIds) throws IOException {
+ protected FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
+ String relPath, Reference<Long> numUnknownDiskIds, String absPath)
+ throws IOException {
if (!FileSystemUtil.supportsStorageIds(fs)) {
- return FileDescriptor.createWithNoBlocks(fileStatus, relPath, null);
+ return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
}
BlockLocation[] locations;
if (fileStatus instanceof LocatedFileStatus) {
- locations = ((LocatedFileStatus)fileStatus).getBlockLocations();
+ locations = ((LocatedFileStatus) fileStatus).getBlockLocations();
} else {
locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}
return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_,
- HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds);
+ HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds, absPath);
+ }
+
+ private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
+ String relPath, Reference<Long> numUnknownDiskIds) throws IOException {
+ return createFd(fs, fileStatus, relPath, numUnknownDiskIds, null);
}
/**
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 9d481d540..02e285ba5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -210,13 +210,6 @@ public class HdfsPartition extends CatalogObjectImpl
fbFileBlockOffsets, isEc, absPath));
}
- public static FileDescriptor create(FileStatus fileStatus, String relPath,
- BlockLocation[] blockLocations, ListMap<TNetworkAddress> hostIndex,
boolean isEc,
- Reference<Long> numUnknownDiskIds) throws IOException {
- return create(fileStatus, relPath, blockLocations, hostIndex, isEc,
- numUnknownDiskIds, null);
- }
-
/**
* Creates the file descriptor of a file represented by 'fileStatus' that
* resides in a filesystem that doesn't support the BlockLocation API
(e.g. S3).
@@ -227,7 +220,6 @@ public class HdfsPartition extends CatalogObjectImpl
return new FileDescriptor(
createFbFileDesc(fbb, fileStatus, relPath, null, false, absPath));
}
-
/**
* Serializes the metadata of a file descriptor represented by
'fileStatus' into a
* FlatBuffer using 'fbb' and returns the associated FbFileDesc object.
@@ -235,18 +227,18 @@ public class HdfsPartition extends CatalogObjectImpl
* in the underlying buffer. Can be null if there are no blocks.
*/
private static FbFileDesc createFbFileDesc(FlatBufferBuilder fbb,
- FileStatus fileStatus, String relPath, int[] fbFileBlockOffets,
boolean isEc,
+ FileStatus fileStatus, String relPath, int[] fbFileBlockOffsets,
boolean isEc,
String absPath) {
int relPathOffset = fbb.createString(relPath == null ? StringUtils.EMPTY
: relPath);
// A negative block vector offset is used when no block offsets are
specified.
int blockVectorOffset = -1;
- if (fbFileBlockOffets != null) {
- blockVectorOffset = FbFileDesc.createFileBlocksVector(fbb,
fbFileBlockOffets);
+ if (fbFileBlockOffsets != null) {
+ blockVectorOffset = FbFileDesc.createFileBlocksVector(fbb,
fbFileBlockOffsets);
}
int absPathOffset = -1;
if (StringUtils.isNotEmpty(absPath)) absPathOffset =
fbb.createString(absPath);
FbFileDesc.startFbFileDesc(fbb);
- // TODO(todd) rename to RelativePathin the FBS
+ // TODO(todd) rename to RelativePath in the FBS
FbFileDesc.addRelativePath(fbb, relPathOffset);
FbFileDesc.addLength(fbb, fileStatus.getLen());
FbFileDesc.addLastModificationTime(fbb,
fileStatus.getModificationTime());
@@ -261,7 +253,7 @@ public class HdfsPartition extends CatalogObjectImpl
ByteBuffer bb = fbb.dataBuffer().slice();
ByteBuffer compressedBb = ByteBuffer.allocate(bb.capacity());
compressedBb.put(bb);
- return FbFileDesc.getRootAsFbFileDesc((ByteBuffer)compressedBb.flip());
+ return FbFileDesc.getRootAsFbFileDesc((ByteBuffer) compressedBb.flip());
}
public String getRelativePath() { return fbFileDescriptor_.relativePath();
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 99a34a877..538e813c6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -19,7 +19,6 @@ package org.apache.impala.catalog;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -46,7 +45,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
@@ -60,6 +58,7 @@ import org.apache.impala.analysis.NumericLiteral;
import org.apache.impala.analysis.PartitionKeyValue;
import org.apache.impala.catalog.HdfsPartition.FileBlock;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
@@ -239,6 +238,11 @@ public class HdfsTable extends Table implements FeFsTable {
// Declared as protected to allow third party extension visibility.
protected final Map<Long, HdfsPartition> partitionMap_ = new HashMap<>();
+ // The data and delete files of the table, for Iceberg tables only.
+ private GroupedContentFiles icebergFiles_;
+ // Whether the data and delete files of The Iceberg tables can be outside
the location.
+ private boolean canDataBeOutsideOfTableLocation_;
+
// Map of partition name to HdfsPartition object. Used for speeding up
// table metadata loading. It is only populated if this table object is
stored in
// catalog server.
@@ -329,10 +333,19 @@ public class HdfsTable extends Table implements FeFsTable
{
// to determine if we can skip the table from the topic update.
private long lastVersionSeenByTopicUpdate_ = -1;
+ public void setIcebergFiles(GroupedContentFiles icebergFiles) {
+ icebergFiles_ = icebergFiles;
+ }
+
+ public void setCanDataBeOutsideOfTableLocation(
+ boolean canDataBeOutsideOfTableLocation) {
+ canDataBeOutsideOfTableLocation_ = canDataBeOutsideOfTableLocation;
+ }
+
// Represents a set of storage-related statistics aggregated at the table or
partition
// level.
public final static class FileMetadataStats {
- // Nuber of files in a table/partition.
+ // Number of files in a table/partition.
public long numFiles;
// Number of blocks in a table/partition.
public long numBlocks;
@@ -382,6 +395,8 @@ public class HdfsTable extends Table implements FeFsTable {
super(msTbl, db, name, owner);
partitionLocationCompressor_ =
new HdfsPartitionLocationCompressor(numClusteringCols_);
+ icebergFiles_ = new GroupedContentFiles();
+ canDataBeOutsideOfTableLocation_ = false;
}
@Override // FeFsTable
@@ -778,7 +793,8 @@ public class HdfsTable extends Table implements FeFsTable {
// have refreshed the top-level table properties without refreshing the
files.
new ParallelFileMetadataLoader(getFileSystem(), partBuilders,
validWriteIds_,
validTxnList, Utils.shouldRecursivelyListPartitions(this),
- getHostIndex(), debugActions, logPrefix).load();
+ getHostIndex(), debugActions, logPrefix, icebergFiles_,
+ canDataBeOutsideOfTableLocation_).load();
// TODO(todd): would be good to log a summary of the loading process:
// - how many block locations did we reuse/load individually/load via batch
@@ -1213,7 +1229,7 @@ public class HdfsTable extends Table implements FeFsTable
{
*/
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl,
- boolean loadParitionFileMetadata, boolean loadTableSchema,
+ boolean loadPartitionFileMetadata, boolean loadTableSchema,
boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate,
@Nullable String debugAction, @Nullable Map<String, Long>
partitionToEventId,
String reason) throws TableLoadingException {
@@ -1247,10 +1263,10 @@ public class HdfsTable extends Table implements
FeFsTable {
if (reuseMetadata) {
// Incrementally update this table's partitions and file metadata
Preconditions.checkState(
- partitionsToUpdate == null || loadParitionFileMetadata);
+ partitionsToUpdate == null || loadPartitionFileMetadata);
storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl);
if (msTbl.getPartitionKeysSize() == 0) {
- if (loadParitionFileMetadata) {
+ if (loadPartitionFileMetadata) {
storageMetadataLoadTime_ +=
updateUnpartitionedTableFileMd(client,
debugAction);
} else { // Update the single partition stats in case table stats
changes.
@@ -1258,7 +1274,7 @@ public class HdfsTable extends Table implements FeFsTable
{
}
} else {
storageMetadataLoadTime_ += updatePartitionsFromHms(
- client, partitionsToUpdate, loadParitionFileMetadata,
+ client, partitionsToUpdate, loadPartitionFileMetadata,
refreshUpdatedPartitions, partitionToEventId, debugAction);
}
LOG.info("Incrementally loaded table metadata for: " +
getFullName());
@@ -1296,6 +1312,12 @@ public class HdfsTable extends Table implements
FeFsTable {
}
}
+ public void load(IMetaStoreClient client,
+ org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+ throws TableLoadingException {
+ load(false, client, msTbl, true, true, false, null, null, null, reason);
+ }
+
/**
* Load Primary Key and Foreign Key information for table. Throws
TableLoadingException
* if the load fails. Declared as protected to allow third party extensions
on this
diff --git
a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
new file mode 100644
index 000000000..c365e9cac
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.iceberg.ContentFile;
+import org.apache.impala.catalog.FeIcebergTable.Utils;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.Reference;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+
+/**
+ * Utility for loading the content files metadata of the Iceberg tables.
+ */
+public class IcebergFileMetadataLoader extends FileMetadataLoader {
+ private final GroupedContentFiles icebergFiles_;
+ private final boolean canDataBeOutsideOfTableLocation_;
+
+ public IcebergFileMetadataLoader(Path partDir, boolean recursive,
+ List<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
+ ValidTxnList validTxnList, ValidWriteIdList writeIds,
+ GroupedContentFiles icebergFiles, boolean
canDataBeOutsideOfTableLocation) {
+ super(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds,
+ HdfsFileFormat.ICEBERG);
+ icebergFiles_ = icebergFiles;
+ canDataBeOutsideOfTableLocation_ = canDataBeOutsideOfTableLocation;
+ }
+
+ /**
+ * Throw exception if the path fails to relativize based on the location of
the Iceberg
+ * tables, and files is not allowed outside the table location.
+ */
+ @Override
+ protected FileDescriptor getFileDescriptor(FileSystem fs, boolean
listWithLocations,
+ Reference<Long> numUnknownDiskIds, FileStatus fileStatus) throws
IOException {
+ String relPath = null;
+ String absPath = null;
+ URI relUri = partDir_.toUri().relativize(fileStatus.getPath().toUri());
+ if (relUri.isAbsolute() || relUri.getPath().startsWith(Path.SEPARATOR)) {
+ if (canDataBeOutsideOfTableLocation_) {
+ absPath = fileStatus.getPath().toString();
+ } else {
+ throw new IOException(String.format("Failed to load Iceberg datafile
%s, because "
+ + "it's outside of the table location",
fileStatus.getPath().toUri()));
+ }
+ } else {
+ relPath = relUri.getPath();
+ }
+
+ String path = Strings.isNullOrEmpty(relPath) ? absPath : relPath;
+ FileDescriptor fd = oldFdsByPath_.get(path);
+ if (listWithLocations || forceRefreshLocations || fd == null ||
+ fd.isChanged(fileStatus)) {
+ fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds, absPath);
+ ++loadStats_.loadedFiles;
+ } else {
+ ++loadStats_.skippedFiles;
+ }
+ return fd;
+ }
+
+ /**
+ * Return file status list based on the data and delete files of the Iceberg
tables.
+ */
+ @Override
+ protected List<FileStatus> getFileStatuses(FileSystem fs, boolean
listWithLocations)
+ throws IOException {
+ if (icebergFiles_.isEmpty()) return null;
+ RemoteIterator<? extends FileStatus> fileStatuses = null;
+ // For the FSs in 'FileSystemUtil#SCHEME_SUPPORT_STORAGE_IDS' (e.g. HDFS,
Ozone,
+ // Alluxio, etc.) we ensure the file with block location information, so
we're going
+ // to get the block information through 'FileSystemUtil.listFiles'.
+ if (listWithLocations) {
+ fileStatuses = FileSystemUtil.listFiles(fs, partDir_, recursive_,
debugAction_);
+ }
+ Map<Path, FileStatus> nameToFileStatus = Maps.newHashMap();
+ if (fileStatuses != null) {
+ while (fileStatuses.hasNext()) {
+ FileStatus status = fileStatuses.next();
+ nameToFileStatus.put(status.getPath(), status);
+ }
+ }
+
+ List<FileStatus> stats = Lists.newLinkedList();
+ for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
+ Path path = FileSystemUtil.createFullyQualifiedPath(
+ new Path(contentFile.path().toString()));
+ // If data is in the table location, then we can get LocatedFileStatus
from
+ // 'nameToFileStatus'. If 'nameToFileStatus' does not include the
ContentFile, we
+ // try to get LocatedFileStatus based on the specific fs(StorageIds are
supported)
+ // of the actual ContentFile. If the specific fs does not support
StorageIds, then
+ // we create FileStatus directly by the method
+ //
'org.apache.impala.catalog.IcebergFileMetadataLoader.createFileStatus'.
+ if (nameToFileStatus.containsKey(path)) {
+ stats.add(nameToFileStatus.get(path));
+ } else {
+ FileSystem fsForPath = FileSystemUtil.getFileSystemForPath(path);
+ if (FileSystemUtil.supportsStorageIds(fsForPath)) {
+ stats.add(Utils.createLocatedFileStatus(path, fsForPath));
+ } else {
+ // To avoid the cost of directory listing on OSS service (e.g. S3A,
COS, OSS,
+ // etc), we create FileStatus ourselves.
+ stats.add(Utils.createFileStatus(contentFile, path));
+ }
+ }
+ }
+ return stats;
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index f66efd828..5b329f121 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -323,7 +323,7 @@ public class IcebergTable extends Table implements
FeIcebergTable {
}
/**
- * Loads the metadata of a Iceberg table.
+ * Loads the metadata of an Iceberg table.
* <p>
* Schema and partitioning schemes are loaded directly from Iceberg whereas
column stats
* are loaded from HMS. The function also updates the table schema in HMS in
order to
@@ -353,10 +353,12 @@ public class IcebergTable extends Table implements
FeIcebergTable {
icebergParquetRowGroupSize_ =
Utils.getIcebergParquetRowGroupSize(msTbl);
icebergParquetPlainPageSize_ =
Utils.getIcebergParquetPlainPageSize(msTbl);
icebergParquetDictPageSize_ =
Utils.getIcebergParquetDictPageSize(msTbl);
- hdfsTable_
- .load(false, msClient, msTable_, true, true, false, null,
null,null, reason);
- GroupedContentFiles icebergFiles = IcebergUtil
- .getIcebergFiles(this, new ArrayList<>(), /*timeTravelSpec=*/null);
+ GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this,
+ new ArrayList<>(), /*timeTravelSpec=*/null);
+ hdfsTable_.setIcebergFiles(icebergFiles);
+ hdfsTable_.setCanDataBeOutsideOfTableLocation(
+ !Utils.requiresDataFilesInTableLocation(this));
+ hdfsTable_.load(msClient, msTable_, reason);
fileStore_ = Utils.loadAllPartition(this, icebergFiles);
partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
setIcebergTableStats();
diff --git
a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 098fdc8a8..93e65346a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -20,7 +20,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -33,8 +32,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.impala.catalog.FeFsTable.Utils;
+import org.apache.impala.catalog.HdfsPartition.Builder;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
@@ -74,10 +74,19 @@ public class ParallelFileMetadataLoader {
private final FileSystem fs_;
public ParallelFileMetadataLoader(FileSystem fs,
- Collection<HdfsPartition.Builder> partBuilders,
+ Collection<Builder> partBuilders,
ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean
isRecursive,
- @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String
logPrefix)
- throws CatalogException {
+ @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction,
+ String logPrefix) {
+ this(fs, partBuilders, writeIdList, validTxnList, isRecursive, hostIndex,
debugAction,
+ logPrefix, new GroupedContentFiles(), false);
+ }
+
+ public ParallelFileMetadataLoader(FileSystem fs,
+ Collection<Builder> partBuilders,
+ ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean
isRecursive,
+ @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String
logPrefix,
+ GroupedContentFiles icebergFiles, boolean
canDataBeOutsideOfTableLocation) {
if (writeIdList != null || validTxnList != null) {
// make sure that both either both writeIdList and validTxnList are set
or both
// of them are not.
@@ -95,9 +104,16 @@ public class ParallelFileMetadataLoader {
loaders_ = Maps.newHashMap();
for (Map.Entry<Path, List<HdfsPartition.Builder>> e :
partsByPath_.entrySet()) {
List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
- FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
- isRecursive, oldFds, hostIndex, validTxnList, writeIdList,
- e.getValue().get(0).getFileFormat());
+ FileMetadataLoader loader;
+ HdfsFileFormat format = e.getValue().get(0).getFileFormat();
+ if (format.equals(HdfsFileFormat.ICEBERG)) {
+ loader = new IcebergFileMetadataLoader(e.getKey(), isRecursive,
oldFds, hostIndex,
+ validTxnList, writeIdList,
Preconditions.checkNotNull(icebergFiles),
+ canDataBeOutsideOfTableLocation);
+ } else {
+ loader = new FileMetadataLoader(e.getKey(), isRecursive, oldFds,
hostIndex,
+ validTxnList, writeIdList, format);
+ }
// If there is a cached partition mapped to this path, we recompute the
block
// locations even if the underlying files have not changed.
// This is done to keep the cached block metadata up to date.
diff --git
a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
index 65e06494b..da3aac4da 100644
---
a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
+++
b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
@@ -41,6 +40,8 @@ public class GroupedContentFiles {
public List<DataFile> dataFilesWithDeletes = new ArrayList<>();
public Set<DeleteFile> deleteFiles = new HashSet<>();
+ public GroupedContentFiles() { }
+
public GroupedContentFiles(CloseableIterable<FileScanTask> fileScanTasks) {
for (FileScanTask scanTask : fileScanTasks) {
if (scanTask.deletes().isEmpty()) {
@@ -52,7 +53,11 @@ public class GroupedContentFiles {
}
}
- public Iterable<ContentFile> getAllContentFiles() {
+ public Iterable<ContentFile<?>> getAllContentFiles() {
return Iterables.concat(dataFilesWithoutDeletes, dataFilesWithDeletes,
deleteFiles);
}
+
+ public boolean isEmpty() {
+ return Iterables.isEmpty(getAllContentFiles());
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index d3d5f7122..2df5bc329 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -31,7 +31,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.curator.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
@@ -424,10 +423,7 @@ public class IcebergScanPlanner {
}
cachehit = false;
try {
- fileDesc = FeIcebergTable.Utils.getFileDescriptor(
- new Path(cf.path().toString()),
- new Path(getIceTable().getIcebergTableLocation()),
- getIceTable());
+ fileDesc = FeIcebergTable.Utils.getFileDescriptor(cf, getIceTable());
} catch (IOException ex) {
throw new ImpalaRuntimeException(
"Cannot load file descriptor for " + cf.path(), ex);