This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d6ff6fdd IMPALA-11662: Improve 'refresh iceberg_tbl_on_oss' 
performance
4d6ff6fdd is described below

commit 4d6ff6fddd77800999e1d6d8ef0de8af5b1684ab
Author: LPL <[email protected]>
AuthorDate: Tue Dec 20 14:29:15 2022 +0800

    IMPALA-11662: Improve 'refresh iceberg_tbl_on_oss' performance
    
    As the cost of directory listing on Cloud Storage Systems such as OSS or
    S3 is higher than the cost on HDFS, we could create the file descriptors
    from the rich metadata provided by Iceberg instead of using
    org.apache.hadoop.fs.FileSystem#listFiles. The only thing missing there
    is the last_modification_time of the files. But since Iceberg files are
    immutable, we could just come up with a special timestamp for these
    files.
    
    At the same time, we can also construct file descriptors ourselves
    during time travel to reduce the cost of requests with OSS services.
    
    Test:
     * existing tests
     * test on COS with my local test environment
    
    Change-Id: If2ee8b6b7559e6590698b46ef1d574e55ed52f9a
    Reviewed-on: http://gerrit.cloudera.org:8080/19379
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Zoltan Borok-Nagy <[email protected]>
---
 .../org/apache/impala/catalog/FeIcebergTable.java  |  67 ++++++----
 .../apache/impala/catalog/FileMetadataLoader.java  | 120 +++++++++++-------
 .../org/apache/impala/catalog/HdfsPartition.java   |  18 +--
 .../java/org/apache/impala/catalog/HdfsTable.java  |  38 ++++--
 .../impala/catalog/IcebergFileMetadataLoader.java  | 139 +++++++++++++++++++++
 .../org/apache/impala/catalog/IcebergTable.java    |  12 +-
 .../impala/catalog/ParallelFileMetadataLoader.java |  32 +++--
 .../catalog/iceberg/GroupedContentFiles.java       |   9 +-
 .../apache/impala/planner/IcebergScanPlanner.java  |   6 +-
 9 files changed, 333 insertions(+), 108 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 043cb25ae..c9c489ab5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -333,6 +333,12 @@ public interface FeIcebergTable extends FeFsTable {
    * Utility functions
    */
   public static abstract class Utils {
+
+    // We need 'FileStatus#modification_time' to know whether the file is 
changed,
+    // meanwhile this property cannot be obtained from the Iceberg metadata. 
But Iceberg
+    // files are immutable, so we can use a special default value.
+    private static final int DEFAULT_MODIFICATION_TIME = 1;
+
     /**
      * Returns true if FeIcebergTable file format is columnar: parquet or orc
      */
@@ -627,20 +633,28 @@ public interface FeIcebergTable extends FeFsTable {
     /**
      * Get FileDescriptor by data file location
      */
-    public static HdfsPartition.FileDescriptor getFileDescriptor(Path fileLoc,
-        Path tableLoc, FeIcebergTable table) throws IOException {
-      FileSystem fs = FileSystemUtil.getFileSystemForPath(tableLoc);
-      FileStatus fileStatus = fs.getFileStatus(fileLoc);
-      return getFileDescriptor(fs, tableLoc, fileStatus, table);
+    public static HdfsPartition.FileDescriptor getFileDescriptor(
+        ContentFile<?> contentFile, FeIcebergTable table) throws IOException {
+      Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
+          new Path(contentFile.path().toString()));
+      FileSystem fsForPath = FileSystemUtil.getFileSystemForPath(fileLoc);
+      FileStatus fileStatus;
+      if (FileSystemUtil.supportsStorageIds(fsForPath)) {
+        fileStatus = createLocatedFileStatus(fileLoc, fsForPath);
+      } else {
+        // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus 
ourselves.
+        fileStatus = createFileStatus(contentFile, fileLoc);
+      }
+      return getFileDescriptor(fsForPath, fileStatus, table);
     }
 
     private static HdfsPartition.FileDescriptor getFileDescriptor(FileSystem 
fs,
-        Path tableLoc, FileStatus fileStatus, FeIcebergTable table)
-        throws IOException {
+        FileStatus fileStatus, FeIcebergTable table) throws IOException {
       Reference<Long> numUnknownDiskIds = new Reference<>(0L);
 
       String relPath = null;
       String absPath = null;
+      Path tableLoc = new Path(table.getIcebergTableLocation());
       URI relUri = tableLoc.toUri().relativize(fileStatus.getPath().toUri());
       if (relUri.isAbsolute() || relUri.getPath().startsWith(Path.SEPARATOR)) {
         if (Utils.requiresDataFilesInTableLocation(table)) {
@@ -659,7 +673,7 @@ public interface FeIcebergTable extends FeFsTable {
 
       BlockLocation[] locations;
       if (fileStatus instanceof LocatedFileStatus) {
-        locations = ((LocatedFileStatus)fileStatus).getBlockLocations();
+        locations = ((LocatedFileStatus) fileStatus).getBlockLocations();
       } else {
         locations = fs.getFileBlockLocations(fileStatus, 0, 
fileStatus.getLen());
       }
@@ -677,7 +691,7 @@ public interface FeIcebergTable extends FeFsTable {
      */
     public static IcebergContentFileStore loadAllPartition(
         IcebergTable table, GroupedContentFiles icebergFiles)
-        throws IOException, TableLoadingException {
+        throws IOException {
       Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap = new 
HashMap<>();
       Collection<HdfsPartition> partitions =
           ((HdfsTable)table.getFeFsTable()).partitionMap_.values();
@@ -708,8 +722,7 @@ public interface FeIcebergTable extends FeFsTable {
     }
 
     private static Pair<String, HdfsPartition.FileDescriptor> getPathHashAndFd(
-        ContentFile contentFile,
-        IcebergTable table,
+        ContentFile<?> contentFile, IcebergTable table,
         Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap) throws 
IOException {
       String pathHash = IcebergUtil.getFilePathHash(contentFile);
       HdfsPartition.FileDescriptor fd = getOrCreateIcebergFd(
@@ -719,7 +732,7 @@ public interface FeIcebergTable extends FeFsTable {
 
     private static FileDescriptor getOrCreateIcebergFd(IcebergTable table,
         Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap,
-        ContentFile contentFile) throws IllegalArgumentException, IOException {
+        ContentFile<?> contentFile) throws IllegalArgumentException, 
IOException {
       Path path = new Path(contentFile.path().toString());
       HdfsPartition.FileDescriptor iceFd = null;
       if (hdfsFileDescMap.containsKey(path.toUri().getPath())) {
@@ -727,19 +740,16 @@ public interface FeIcebergTable extends FeFsTable {
             path.toUri().getPath());
         iceFd = fsFd.cloneWithFileMetadata(
             IcebergUtil.createIcebergMetadata(table, contentFile));
-        return iceFd;
       } else {
         if (Utils.requiresDataFilesInTableLocation(table)) {
           LOG.warn("Iceberg file '{}' cannot be found in the HDFS recursive"
-           + "file listing results.", path.toString());
+           + "file listing results.", path);
         }
-        HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(
-            new Path(contentFile.path().toString()),
-            new Path(table.getIcebergTableLocation()), table);
+        HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(contentFile, 
table);
         iceFd = fileDesc.cloneWithFileMetadata(
             IcebergUtil.createIcebergMetadata(table, contentFile));
-        return iceFd;
       }
+      return iceFd;
     }
 
     /**
@@ -749,8 +759,7 @@ public interface FeIcebergTable extends FeFsTable {
      * TODO(IMPALA-11516): Return better partition stats for V2 tables.
      */
     public static Map<String, TIcebergPartitionStats> loadPartitionStats(
-        IcebergTable table, GroupedContentFiles icebergFiles)
-        throws TableLoadingException {
+        IcebergTable table, GroupedContentFiles icebergFiles) {
       Map<String, TIcebergPartitionStats> nameToStats = new HashMap<>();
       for (ContentFile<?> contentFile : icebergFiles.getAllContentFiles()) {
         String name = getPartitionKey(table, contentFile);
@@ -917,7 +926,9 @@ public interface FeIcebergTable extends FeFsTable {
     }
 
     public static boolean requiresDataFilesInTableLocation(FeIcebergTable 
icebergTable) {
-      Map<String, String> properties = 
icebergTable.getIcebergApiTable().properties();
+      Table icebergApiTable = icebergTable.getIcebergApiTable();
+      Preconditions.checkNotNull(icebergApiTable);
+      Map<String, String> properties = icebergApiTable.properties();
       return !(PropertyUtil.propertyAsBoolean(properties,
           TableProperties.OBJECT_STORE_ENABLED,
           TableProperties.OBJECT_STORE_ENABLED_DEFAULT)
@@ -928,5 +939,19 @@ public interface FeIcebergTable extends FeFsTable {
           || StringUtils
           
.isNotEmpty(properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION)));
     }
+
+    public static FileStatus createLocatedFileStatus(Path path, FileSystem fs)
+        throws IOException {
+      FileStatus fileStatus = fs.getFileStatus(path);
+      Preconditions.checkState(fileStatus.isFile());
+      BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0L,
+          fileStatus.getLen());
+      return new LocatedFileStatus(fileStatus, blockLocations);
+    }
+
+    public static FileStatus createFileStatus(ContentFile<?> contentFile, Path 
path) {
+      return new FileStatus(contentFile.fileSizeInBytes(), false, 0, 0,
+          DEFAULT_MODIFICATION_TIME, path);
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index ee42fab49..9ebc6fa1f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -55,9 +55,9 @@ public class FileMetadataLoader {
   private final static Logger LOG = 
LoggerFactory.getLogger(FileMetadataLoader.class);
   private static final Configuration CONF = new Configuration();
 
-  private final Path partDir_;
-  private final boolean recursive_;
-  private final ImmutableMap<String, FileDescriptor> oldFdsByRelPath_;
+  protected final Path partDir_;
+  protected final boolean recursive_;
+  protected final ImmutableMap<String, FileDescriptor> oldFdsByPath_;
   private final ListMap<TNetworkAddress> hostIndex_;
   @Nullable
   private final ValidWriteIdList writeIds_;
@@ -66,13 +66,13 @@ public class FileMetadataLoader {
   @Nullable
   private final HdfsFileFormat fileFormat_;
 
-  private boolean forceRefreshLocations = false;
+  protected boolean forceRefreshLocations = false;
 
   private List<FileDescriptor> loadedFds_;
   private List<FileDescriptor> loadedInsertDeltaFds_;
   private List<FileDescriptor> loadedDeleteDeltaFds_;
-  private LoadStats loadStats_;
-  private String debugAction_;
+  protected LoadStats loadStats_;
+  protected String debugAction_;
 
   /**
    * @param partDir the dir for which to fetch file metadata
@@ -97,7 +97,7 @@ public class FileMetadataLoader {
     partDir_ = Preconditions.checkNotNull(partDir);
     recursive_ = recursive;
     hostIndex_ = Preconditions.checkNotNull(hostIndex);
-    oldFdsByRelPath_ = Maps.uniqueIndex(oldFds, FileDescriptor::getPath);
+    oldFdsByPath_ = Maps.uniqueIndex(oldFds, FileDescriptor::getPath);
     writeIds_ = writeIds;
     validTxnList_ = validTxnList;
     fileFormat_ = fileFormat;
@@ -174,45 +174,30 @@ public class FileMetadataLoader {
     // assume that most _can_ be reused, in which case it's faster to _not_ 
prefetch
     // the locations.
     boolean listWithLocations = FileSystemUtil.supportsStorageIds(fs) &&
-        (oldFdsByRelPath_.isEmpty() || forceRefreshLocations);
+        (oldFdsByPath_.isEmpty() || forceRefreshLocations);
 
     String msg = String.format("%s file metadata%s from path %s",
-        oldFdsByRelPath_.isEmpty() ? "Loading" : "Refreshing",
+        oldFdsByPath_.isEmpty() ? "Loading" : "Refreshing",
         listWithLocations ? " with eager location-fetching" : "", partDir_);
     LOG.trace(msg);
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
-      RemoteIterator<? extends FileStatus> fileStatuses;
-      if (listWithLocations) {
-        fileStatuses = FileSystemUtil
-            .listFiles(fs, partDir_, recursive_, debugAction_);
-      } else {
-        fileStatuses = FileSystemUtil
-            .listStatus(fs, partDir_, recursive_, debugAction_);
-
-        // TODO(todd): we could look at the result of listing without 
locations, and if
-        // we see that a substantial number of the files have changed, it may 
be better
-        // to go back and re-list with locations vs doing an RPC per file.
-      }
+      List<FileStatus> fileStatuses = getFileStatuses(fs, listWithLocations);
+
       loadedFds_ = new ArrayList<>();
       if (fileStatuses == null) return;
 
-      Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
-
-      List<FileStatus> stats = new ArrayList<>();
-      while (fileStatuses.hasNext()) {
-        stats.add(fileStatuses.next());
-      }
+      Reference<Long> numUnknownDiskIds = new Reference<>(0L);
 
       if (writeIds_ != null) {
-        stats = AcidUtils.filterFilesForAcidState(stats, partDir_, 
validTxnList_,
-            writeIds_, loadStats_);
+        fileStatuses = AcidUtils.filterFilesForAcidState(fileStatuses, 
partDir_,
+            validTxnList_, writeIds_, loadStats_);
       }
 
       if (fileFormat_ == HdfsFileFormat.HUDI_PARQUET) {
-        stats = HudiUtil.filterFilesForHudiROPath(stats);
+        fileStatuses = HudiUtil.filterFilesForHudiROPath(fileStatuses);
       }
 
-      for (FileStatus fileStatus : stats) {
+      for (FileStatus fileStatus : fileStatuses) {
         if (fileStatus.isDirectory()) {
           continue;
         }
@@ -221,16 +206,10 @@ public class FileMetadataLoader {
           ++loadStats_.hiddenFiles;
           continue;
         }
-        String relPath = FileSystemUtil.relativizePath(fileStatus.getPath(), 
partDir_);
-        FileDescriptor fd = oldFdsByRelPath_.get(relPath);
-        if (listWithLocations || forceRefreshLocations || fd == null ||
-            fd.isChanged(fileStatus)) {
-          fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds);
-          ++loadStats_.loadedFiles;
-        } else {
-          ++loadStats_.skippedFiles;
-        }
-        loadedFds_.add(Preconditions.checkNotNull(fd));;
+
+        FileDescriptor fd = getFileDescriptor(fs, listWithLocations, 
numUnknownDiskIds,
+            fileStatus);
+        loadedFds_.add(Preconditions.checkNotNull(fd));
       }
       if (writeIds_ != null) {
         loadedInsertDeltaFds_ = new ArrayList<>();
@@ -250,25 +229,74 @@ public class FileMetadataLoader {
     }
   }
 
+  /**
+   * Return fd created by the given fileStatus or from the 
cache(oldFdsByPath_).
+   */
+  protected FileDescriptor getFileDescriptor(FileSystem fs, boolean 
listWithLocations,
+      Reference<Long> numUnknownDiskIds, FileStatus fileStatus) throws 
IOException {
+    String relPath = FileSystemUtil.relativizePath(fileStatus.getPath(), 
partDir_);
+    FileDescriptor fd = oldFdsByPath_.get(relPath);
+    if (listWithLocations || forceRefreshLocations || fd == null ||
+        fd.isChanged(fileStatus)) {
+      fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds);
+      ++loadStats_.loadedFiles;
+    } else {
+      ++loadStats_.skippedFiles;
+    }
+    return fd;
+  }
+
+  /**
+   * Return located file status list when listWithLocations is true.
+   */
+  protected List<FileStatus> getFileStatuses(FileSystem fs, boolean 
listWithLocations)
+      throws IOException {
+    RemoteIterator<? extends FileStatus> fileStatuses;
+    if (listWithLocations) {
+      fileStatuses = FileSystemUtil
+          .listFiles(fs, partDir_, recursive_, debugAction_);
+    } else {
+      fileStatuses = FileSystemUtil
+          .listStatus(fs, partDir_, recursive_, debugAction_);
+      // TODO(todd): we could look at the result of listing without locations, 
and if
+      // we see that a substantial number of the files have changed, it may be 
better
+      // to go back and re-list with locations vs doing an RPC per file.
+    }
+    if (fileStatuses == null) return null;
+    List<FileStatus> stats = new ArrayList<>();
+    while (fileStatuses.hasNext()) {
+      stats.add(fileStatuses.next());
+    }
+    return stats;
+  }
+
   /**
    * Create a FileDescriptor for the given FileStatus. If the FS supports 
block locations,
    * and FileStatus is a LocatedFileStatus (i.e. the location was prefetched) 
this uses
    * the already-loaded information; otherwise, this may have to remotely look 
up the
    * locations.
+   * 'absPath' is null except for the Iceberg tables, because datafiles of the
+   * Iceberg tables may not be in the table location.
    */
-  private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
-      String relPath, Reference<Long> numUnknownDiskIds) throws IOException {
+  protected FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
+      String relPath, Reference<Long> numUnknownDiskIds, String absPath)
+      throws IOException {
     if (!FileSystemUtil.supportsStorageIds(fs)) {
-      return FileDescriptor.createWithNoBlocks(fileStatus, relPath, null);
+      return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
     }
     BlockLocation[] locations;
     if (fileStatus instanceof LocatedFileStatus) {
-      locations = ((LocatedFileStatus)fileStatus).getBlockLocations();
+      locations = ((LocatedFileStatus) fileStatus).getBlockLocations();
     } else {
       locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
     }
     return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_,
-        HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds);
+        HdfsShim.isErasureCoded(fileStatus), numUnknownDiskIds, absPath);
+  }
+
+  private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
+      String relPath, Reference<Long> numUnknownDiskIds) throws IOException {
+    return createFd(fs, fileStatus, relPath, numUnknownDiskIds, null);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 9d481d540..02e285ba5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -210,13 +210,6 @@ public class HdfsPartition extends CatalogObjectImpl
           fbFileBlockOffsets, isEc, absPath));
     }
 
-    public static FileDescriptor create(FileStatus fileStatus, String relPath,
-        BlockLocation[] blockLocations, ListMap<TNetworkAddress> hostIndex, 
boolean isEc,
-        Reference<Long> numUnknownDiskIds) throws IOException {
-      return create(fileStatus, relPath, blockLocations, hostIndex, isEc,
-          numUnknownDiskIds, null);
-    }
-
     /**
      * Creates the file descriptor of a file represented by 'fileStatus' that
      * resides in a filesystem that doesn't support the BlockLocation API 
(e.g. S3).
@@ -227,7 +220,6 @@ public class HdfsPartition extends CatalogObjectImpl
       return new FileDescriptor(
           createFbFileDesc(fbb, fileStatus, relPath, null, false, absPath));
     }
-
     /**
      * Serializes the metadata of a file descriptor represented by 
'fileStatus' into a
      * FlatBuffer using 'fbb' and returns the associated FbFileDesc object.
@@ -235,18 +227,18 @@ public class HdfsPartition extends CatalogObjectImpl
      * in the underlying buffer. Can be null if there are no blocks.
      */
     private static FbFileDesc createFbFileDesc(FlatBufferBuilder fbb,
-        FileStatus fileStatus, String relPath, int[] fbFileBlockOffets, 
boolean isEc,
+        FileStatus fileStatus, String relPath, int[] fbFileBlockOffsets, 
boolean isEc,
         String absPath) {
       int relPathOffset = fbb.createString(relPath == null ? StringUtils.EMPTY 
: relPath);
       // A negative block vector offset is used when no block offsets are 
specified.
       int blockVectorOffset = -1;
-      if (fbFileBlockOffets != null) {
-        blockVectorOffset = FbFileDesc.createFileBlocksVector(fbb, 
fbFileBlockOffets);
+      if (fbFileBlockOffsets != null) {
+        blockVectorOffset = FbFileDesc.createFileBlocksVector(fbb, 
fbFileBlockOffsets);
       }
       int absPathOffset = -1;
       if (StringUtils.isNotEmpty(absPath)) absPathOffset = 
fbb.createString(absPath);
       FbFileDesc.startFbFileDesc(fbb);
-      // TODO(todd) rename to RelativePathin the FBS
+      // TODO(todd) rename to RelativePath in the FBS
       FbFileDesc.addRelativePath(fbb, relPathOffset);
       FbFileDesc.addLength(fbb, fileStatus.getLen());
       FbFileDesc.addLastModificationTime(fbb, 
fileStatus.getModificationTime());
@@ -261,7 +253,7 @@ public class HdfsPartition extends CatalogObjectImpl
       ByteBuffer bb = fbb.dataBuffer().slice();
       ByteBuffer compressedBb = ByteBuffer.allocate(bb.capacity());
       compressedBb.put(bb);
-      return FbFileDesc.getRootAsFbFileDesc((ByteBuffer)compressedBb.flip());
+      return FbFileDesc.getRootAsFbFileDesc((ByteBuffer) compressedBb.flip());
     }
 
     public String getRelativePath() { return fbFileDescriptor_.relativePath(); 
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 99a34a877..538e813c6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -19,7 +19,6 @@ package org.apache.impala.catalog;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -46,7 +45,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
@@ -60,6 +58,7 @@ import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -239,6 +238,11 @@ public class HdfsTable extends Table implements FeFsTable {
   // Declared as protected to allow third party extension visibility.
   protected final Map<Long, HdfsPartition> partitionMap_ = new HashMap<>();
 
+  // The data and delete files of the table, for Iceberg tables only.
+  private GroupedContentFiles icebergFiles_;
+  // Whether the data and delete files of The Iceberg tables can be outside 
the location.
+  private boolean canDataBeOutsideOfTableLocation_;
+
   // Map of partition name to HdfsPartition object. Used for speeding up
   // table metadata loading. It is only populated if this table object is 
stored in
   // catalog server.
@@ -329,10 +333,19 @@ public class HdfsTable extends Table implements FeFsTable 
{
   // to determine if we can skip the table from the topic update.
   private long lastVersionSeenByTopicUpdate_ = -1;
 
+  public void setIcebergFiles(GroupedContentFiles icebergFiles) {
+    icebergFiles_ = icebergFiles;
+  }
+
+  public void setCanDataBeOutsideOfTableLocation(
+      boolean canDataBeOutsideOfTableLocation) {
+    canDataBeOutsideOfTableLocation_ = canDataBeOutsideOfTableLocation;
+  }
+
   // Represents a set of storage-related statistics aggregated at the table or 
partition
   // level.
   public final static class FileMetadataStats {
-    // Nuber of files in a table/partition.
+    // Number of files in a table/partition.
     public long numFiles;
     // Number of blocks in a table/partition.
     public long numBlocks;
@@ -382,6 +395,8 @@ public class HdfsTable extends Table implements FeFsTable {
     super(msTbl, db, name, owner);
     partitionLocationCompressor_ =
         new HdfsPartitionLocationCompressor(numClusteringCols_);
+    icebergFiles_ = new GroupedContentFiles();
+    canDataBeOutsideOfTableLocation_ = false;
   }
 
   @Override // FeFsTable
@@ -778,7 +793,8 @@ public class HdfsTable extends Table implements FeFsTable {
     // have refreshed the top-level table properties without refreshing the 
files.
     new ParallelFileMetadataLoader(getFileSystem(), partBuilders, 
validWriteIds_,
         validTxnList, Utils.shouldRecursivelyListPartitions(this),
-        getHostIndex(), debugActions, logPrefix).load();
+        getHostIndex(), debugActions, logPrefix, icebergFiles_,
+        canDataBeOutsideOfTableLocation_).load();
 
     // TODO(todd): would be good to log a summary of the loading process:
     // - how many block locations did we reuse/load individually/load via batch
@@ -1213,7 +1229,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
    */
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl,
-      boolean loadParitionFileMetadata, boolean loadTableSchema,
+      boolean loadPartitionFileMetadata, boolean loadTableSchema,
       boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate,
       @Nullable String debugAction, @Nullable Map<String, Long> 
partitionToEventId,
       String reason) throws TableLoadingException {
@@ -1247,10 +1263,10 @@ public class HdfsTable extends Table implements 
FeFsTable {
         if (reuseMetadata) {
           // Incrementally update this table's partitions and file metadata
           Preconditions.checkState(
-              partitionsToUpdate == null || loadParitionFileMetadata);
+              partitionsToUpdate == null || loadPartitionFileMetadata);
           storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl);
           if (msTbl.getPartitionKeysSize() == 0) {
-            if (loadParitionFileMetadata) {
+            if (loadPartitionFileMetadata) {
               storageMetadataLoadTime_ += 
updateUnpartitionedTableFileMd(client,
                   debugAction);
             } else {  // Update the single partition stats in case table stats 
changes.
@@ -1258,7 +1274,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
             }
           } else {
             storageMetadataLoadTime_ += updatePartitionsFromHms(
-                client, partitionsToUpdate, loadParitionFileMetadata,
+                client, partitionsToUpdate, loadPartitionFileMetadata,
                 refreshUpdatedPartitions, partitionToEventId, debugAction);
           }
           LOG.info("Incrementally loaded table metadata for: " + 
getFullName());
@@ -1296,6 +1312,12 @@ public class HdfsTable extends Table implements 
FeFsTable {
     }
   }
 
+  public void load(IMetaStoreClient client,
+      org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+      throws TableLoadingException {
+    load(false, client, msTbl, true, true, false, null, null, null, reason);
+  }
+
   /**
    * Load Primary Key and Foreign Key information for table. Throws 
TableLoadingException
    * if the load fails. Declared as protected to allow third party extensions 
on this
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
new file mode 100644
index 000000000..c365e9cac
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.iceberg.ContentFile;
+import org.apache.impala.catalog.FeIcebergTable.Utils;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.Reference;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+
+/**
+ * Utility for loading the content files metadata of the Iceberg tables.
+ */
+public class IcebergFileMetadataLoader extends FileMetadataLoader {
+  private final GroupedContentFiles icebergFiles_;
+  private final boolean canDataBeOutsideOfTableLocation_;
+
+  public IcebergFileMetadataLoader(Path partDir, boolean recursive,
+      List<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
+      ValidTxnList validTxnList, ValidWriteIdList writeIds,
+      GroupedContentFiles icebergFiles, boolean 
canDataBeOutsideOfTableLocation) {
+    super(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds,
+        HdfsFileFormat.ICEBERG);
+    icebergFiles_ = icebergFiles;
+    canDataBeOutsideOfTableLocation_ = canDataBeOutsideOfTableLocation;
+  }
+
+  /**
+   * Throw exception if the path fails to relativize based on the location of 
the Iceberg
+   * tables, and files is not allowed outside the table location.
+   */
+  @Override
+  protected FileDescriptor getFileDescriptor(FileSystem fs, boolean 
listWithLocations,
+      Reference<Long> numUnknownDiskIds, FileStatus fileStatus) throws 
IOException {
+    String relPath = null;
+    String absPath = null;
+    URI relUri = partDir_.toUri().relativize(fileStatus.getPath().toUri());
+    if (relUri.isAbsolute() || relUri.getPath().startsWith(Path.SEPARATOR)) {
+      if (canDataBeOutsideOfTableLocation_) {
+        absPath = fileStatus.getPath().toString();
+      } else {
+        throw new IOException(String.format("Failed to load Iceberg datafile 
%s, because "
+            + "it's outside of the table location", 
fileStatus.getPath().toUri()));
+      }
+    } else {
+      relPath = relUri.getPath();
+    }
+
+    String path = Strings.isNullOrEmpty(relPath) ? absPath : relPath;
+    FileDescriptor fd = oldFdsByPath_.get(path);
+    if (listWithLocations || forceRefreshLocations || fd == null ||
+        fd.isChanged(fileStatus)) {
+      fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds, absPath);
+      ++loadStats_.loadedFiles;
+    } else {
+      ++loadStats_.skippedFiles;
+    }
+    return fd;
+  }
+
+  /**
+   * Return file status list based on the data and delete files of the Iceberg 
tables.
+   */
+  @Override
+  protected List<FileStatus> getFileStatuses(FileSystem fs, boolean 
listWithLocations)
+      throws IOException {
+    if (icebergFiles_.isEmpty()) return null;
+    RemoteIterator<? extends FileStatus> fileStatuses = null;
+    // For the FSs in 'FileSystemUtil#SCHEME_SUPPORT_STORAGE_IDS' (e.g. HDFS, 
Ozone,
+    // Alluxio, etc.) we ensure the file with block location information, so 
we're going
+    // to get the block information through 'FileSystemUtil.listFiles'.
+    if (listWithLocations) {
+      fileStatuses = FileSystemUtil.listFiles(fs, partDir_, recursive_, 
debugAction_);
+    }
+    Map<Path, FileStatus> nameToFileStatus = Maps.newHashMap();
+    if (fileStatuses != null) {
+      while (fileStatuses.hasNext()) {
+        FileStatus status = fileStatuses.next();
+        nameToFileStatus.put(status.getPath(), status);
+      }
+    }
+
+    List<FileStatus> stats = Lists.newLinkedList();
+    for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
+      Path path = FileSystemUtil.createFullyQualifiedPath(
+          new Path(contentFile.path().toString()));
+      // If data is in the table location, then we can get LocatedFileStatus 
from
+      // 'nameToFileStatus'. If 'nameToFileStatus' does not include the 
ContentFile, we
+      // try to get LocatedFileStatus based on the specific fs(StorageIds are 
supported)
+      // of the actual ContentFile. If the specific fs does not support 
StorageIds, then
+      // we create FileStatus directly by the method
+      // 
'org.apache.impala.catalog.IcebergFileMetadataLoader.createFileStatus'.
+      if (nameToFileStatus.containsKey(path)) {
+        stats.add(nameToFileStatus.get(path));
+      } else {
+        FileSystem fsForPath = FileSystemUtil.getFileSystemForPath(path);
+        if (FileSystemUtil.supportsStorageIds(fsForPath)) {
+          stats.add(Utils.createLocatedFileStatus(path, fsForPath));
+        } else {
+          // To avoid the cost of directory listing on OSS service (e.g. S3A, 
COS, OSS,
+          // etc), we create FileStatus ourselves.
+          stats.add(Utils.createFileStatus(contentFile, path));
+        }
+      }
+    }
+    return stats;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index f66efd828..5b329f121 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -323,7 +323,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
   }
 
   /**
-   * Loads the metadata of a Iceberg table.
+   * Loads the metadata of an Iceberg table.
    * <p>
    * Schema and partitioning schemes are loaded directly from Iceberg whereas 
column stats
    * are loaded from HMS. The function also updates the table schema in HMS in 
order to
@@ -353,10 +353,12 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
         icebergParquetRowGroupSize_ = 
Utils.getIcebergParquetRowGroupSize(msTbl);
         icebergParquetPlainPageSize_ = 
Utils.getIcebergParquetPlainPageSize(msTbl);
         icebergParquetDictPageSize_ = 
Utils.getIcebergParquetDictPageSize(msTbl);
-        hdfsTable_
-            .load(false, msClient, msTable_, true, true, false, null, 
null,null, reason);
-        GroupedContentFiles icebergFiles = IcebergUtil
-            .getIcebergFiles(this, new ArrayList<>(), /*timeTravelSpec=*/null);
+        GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this,
+            new ArrayList<>(), /*timeTravelSpec=*/null);
+        hdfsTable_.setIcebergFiles(icebergFiles);
+        hdfsTable_.setCanDataBeOutsideOfTableLocation(
+            !Utils.requiresDataFilesInTableLocation(this));
+        hdfsTable_.load(msClient, msTable_, reason);
         fileStore_ = Utils.loadAllPartition(this, icebergFiles);
         partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
         setIcebergTableStats();
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 098fdc8a8..93e65346a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -20,7 +20,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -33,8 +32,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.impala.catalog.FeFsTable.Utils;
+import org.apache.impala.catalog.HdfsPartition.Builder;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
@@ -74,10 +74,19 @@ public class ParallelFileMetadataLoader {
   private final FileSystem fs_;
 
   public ParallelFileMetadataLoader(FileSystem fs,
-      Collection<HdfsPartition.Builder> partBuilders,
+      Collection<Builder> partBuilders,
       ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean 
isRecursive,
-      @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String 
logPrefix)
-      throws CatalogException {
+      @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction,
+      String logPrefix) {
+    this(fs, partBuilders, writeIdList, validTxnList, isRecursive, hostIndex, 
debugAction,
+        logPrefix, new GroupedContentFiles(), false);
+  }
+
+  public ParallelFileMetadataLoader(FileSystem fs,
+      Collection<Builder> partBuilders,
+      ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean 
isRecursive,
+      @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String 
logPrefix,
+      GroupedContentFiles icebergFiles, boolean 
canDataBeOutsideOfTableLocation) {
     if (writeIdList != null || validTxnList != null) {
       // make sure that both either both writeIdList and validTxnList are set 
or both
       // of them are not.
@@ -95,9 +104,16 @@ public class ParallelFileMetadataLoader {
     loaders_ = Maps.newHashMap();
     for (Map.Entry<Path, List<HdfsPartition.Builder>> e : 
partsByPath_.entrySet()) {
       List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
-      FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
-          isRecursive, oldFds, hostIndex, validTxnList, writeIdList,
-          e.getValue().get(0).getFileFormat());
+      FileMetadataLoader loader;
+      HdfsFileFormat format = e.getValue().get(0).getFileFormat();
+      if (format.equals(HdfsFileFormat.ICEBERG)) {
+        loader = new IcebergFileMetadataLoader(e.getKey(), isRecursive, 
oldFds, hostIndex,
+            validTxnList, writeIdList, 
Preconditions.checkNotNull(icebergFiles),
+            canDataBeOutsideOfTableLocation);
+      } else {
+        loader = new FileMetadataLoader(e.getKey(), isRecursive, oldFds, 
hostIndex,
+            validTxnList, writeIdList, format);
+      }
       // If there is a cached partition mapped to this path, we recompute the 
block
       // locations even if the underlying files have not changed.
       // This is done to keep the cached block metadata up to date.
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
index 65e06494b..da3aac4da 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
@@ -41,6 +40,8 @@ public class GroupedContentFiles {
   public List<DataFile> dataFilesWithDeletes = new ArrayList<>();
   public Set<DeleteFile> deleteFiles = new HashSet<>();
 
+  public GroupedContentFiles() { }
+
   public GroupedContentFiles(CloseableIterable<FileScanTask> fileScanTasks) {
     for (FileScanTask scanTask : fileScanTasks) {
       if (scanTask.deletes().isEmpty()) {
@@ -52,7 +53,11 @@ public class GroupedContentFiles {
     }
   }
 
-  public Iterable<ContentFile> getAllContentFiles() {
+  public Iterable<ContentFile<?>> getAllContentFiles() {
     return Iterables.concat(dataFilesWithoutDeletes, dataFilesWithDeletes, 
deleteFiles);
   }
+
+  public boolean isEmpty() {
+    return Iterables.isEmpty(getAllContentFiles());
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index d3d5f7122..2df5bc329 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -31,7 +31,6 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.curator.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
@@ -424,10 +423,7 @@ public class IcebergScanPlanner {
       }
       cachehit = false;
       try {
-        fileDesc = FeIcebergTable.Utils.getFileDescriptor(
-            new Path(cf.path().toString()),
-            new Path(getIceTable().getIcebergTableLocation()),
-            getIceTable());
+        fileDesc = FeIcebergTable.Utils.getFileDescriptor(cf, getIceTable());
       } catch (IOException ex) {
         throw new ImpalaRuntimeException(
             "Cannot load file descriptor for " + cf.path(), ex);


Reply via email to