This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f58b9fb2a6f22d6085fab6a5623bc48e717d38ea
Author: Gabor Kaszab <[email protected]>
AuthorDate: Tue Feb 4 16:53:21 2025 +0100

    IMPALA-13739: Part2: Introduce IcebergFileDescriptor
    
    FileDescriptor is currently used by both HdfsTable and IcebergTable. It
    has two flatbuffers internally, however, one of them is only relevant
    for Iceberg tables. With this change we move the Iceberg part of
    FileDescriptor into a separate class called IcebergFileDescriptor.
    
    Change-Id: Id08baf4db32197bab74178777c4ba3fec98c2451
    Reviewed-on: http://gerrit.cloudera.org:8080/22456
    Reviewed-by: Daniel Becker <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/analysis/OptimizeStmt.java   |  11 +-
 .../java/org/apache/impala/analysis/TableRef.java  |   9 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |   7 +-
 .../org/apache/impala/catalog/FileDescriptor.java  |  50 +++------
 .../apache/impala/catalog/FileMetadataLoader.java  |   6 +-
 .../impala/catalog/IcebergContentFileStore.java    |  87 ++++++----------
 .../apache/impala/catalog/IcebergDeleteTable.java  |  11 +-
 .../impala/catalog/IcebergEqualityDeleteTable.java |   8 +-
 .../impala/catalog/IcebergFileDescriptor.java      | 100 ++++++++++++++++++
 .../impala/catalog/IcebergFileMetadataLoader.java  |  34 +++++--
 .../impala/catalog/IcebergPositionDeleteTable.java |   9 +-
 .../org/apache/impala/catalog/IcebergTable.java    |   2 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |   6 +-
 .../org/apache/impala/planner/IcebergScanNode.java |  19 ++--
 .../apache/impala/planner/IcebergScanPlanner.java  | 112 ++++++++++++---------
 .../impala/catalog/FileMetadataLoaderTest.java     |  30 +++---
 .../catalog/IcebergContentFileStoreTest.java       |  11 +-
 17 files changed, 301 insertions(+), 211 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
index de120dc56..23b766917 100644
--- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
@@ -23,9 +23,8 @@ import org.apache.iceberg.DataFile;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeIcebergTable;
-import org.apache.impala.catalog.FileDescriptor;
-import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.IcebergContentFileStore;
+import org.apache.impala.catalog.IcebergFileDescriptor;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ByteUnits;
@@ -220,7 +219,7 @@ public class OptimizeStmt extends DmlStatementBase {
         mode_ = filterResult.getOptimizationMode();
 
         if (mode_ == TIcebergOptimizationMode.PARTIAL) {
-          List<FileDescriptor> selectedDataFilesWithoutDeletes =
+          List<IcebergFileDescriptor> selectedDataFilesWithoutDeletes =
               dataFilesWithoutDeletesToFileDescriptors(
                   filterResult.getSelectedFilesWithoutDeletes(), iceTable);
           
tableRef_.setSelectedDataFilesForOptimize(selectedDataFilesWithoutDeletes);
@@ -232,7 +231,7 @@ public class OptimizeStmt extends DmlStatementBase {
     }
   }
 
-  private List<FileDescriptor> dataFilesWithoutDeletesToFileDescriptors(
+  private List<IcebergFileDescriptor> dataFilesWithoutDeletesToFileDescriptors(
       List<DataFile> contentFiles, FeIcebergTable iceTable)
       throws IOException, ImpalaRuntimeException {
     GroupedContentFiles selectedContentFiles = new GroupedContentFiles();
@@ -244,8 +243,8 @@ public class OptimizeStmt extends DmlStatementBase {
     return selectedFiles.getDataFilesWithoutDeletes();
   }
 
-  private void collectAbsolutePaths(List<FileDescriptor> selectedFiles) {
-    for (FileDescriptor fileDesc : selectedFiles) {
+  private void collectAbsolutePaths(List<IcebergFileDescriptor> selectedFiles) 
{
+    for (IcebergFileDescriptor fileDesc : selectedFiles) {
       org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(
           fileDesc.getAbsolutePath(((FeIcebergTable) 
table_).getHdfsBaseDir()));
       selectedIcebergFilePaths_.add(path.toUri().toString());
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java 
b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index 7a6160058..a4091928b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -33,8 +33,7 @@ import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.FileDescriptor;
-import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.IcebergFileDescriptor;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.rewrite.ExprRewriter;
@@ -166,7 +165,7 @@ public class TableRef extends StmtNode {
 
   // Iceberg data files without deletes selected for OPTIMIZE from this table 
ref.
   // Used only in PARTIAL optimization mode, otherwise it is null.
-  private List<FileDescriptor> selectedDataFilesWithoutDeletesForOptimize_;
+  private List<IcebergFileDescriptor> 
selectedDataFilesWithoutDeletesForOptimize_;
 
   // END: Members that need to be reset()
   /////////////////////////////////////////
@@ -851,11 +850,11 @@ public class TableRef extends StmtNode {
     return res;
   }
 
-  public void setSelectedDataFilesForOptimize(List<FileDescriptor> fileDescs) {
+  public void setSelectedDataFilesForOptimize(List<IcebergFileDescriptor> 
fileDescs) {
     selectedDataFilesWithoutDeletesForOptimize_ = fileDescs;
   }
 
-  public List<FileDescriptor> getSelectedDataFilesForOptimize() {
+  public List<IcebergFileDescriptor> getSelectedDataFilesForOptimize() {
     return selectedDataFilesWithoutDeletesForOptimize_;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index c6597ef80..d66bc3e5a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -707,7 +707,7 @@ public interface FeIcebergTable extends FeFsTable {
     /**
      * Get FileDescriptor by data file location
      */
-    public static FileDescriptor getFileDescriptor(
+    public static FileDescriptor getHdfsFileDescriptor(
         ContentFile<?> contentFile, Table iceApiTable,
         boolean requiresDataFilesInTableLocation,
         ListMap<TNetworkAddress> hostIndex) throws IOException {
@@ -721,11 +721,12 @@ public interface FeIcebergTable extends FeFsTable {
         // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus 
ourselves.
         fileStatus = createFileStatus(contentFile, fileLoc);
       }
-      return getFileDescriptor(fsForPath, fileStatus, iceApiTable,
+      return getHdfsFileDescriptor(fsForPath, fileStatus, iceApiTable,
           requiresDataFilesInTableLocation, hostIndex);
     }
 
-    private static FileDescriptor getFileDescriptor(FileSystem fs,
+    private static FileDescriptor getHdfsFileDescriptor(
+        FileSystem fs,
         FileStatus fileStatus, Table iceApiTable,
         boolean requiresDataFilesInTableLocation,
         ListMap<TNetworkAddress> hostIndex) throws IOException {
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileDescriptor.java 
b/fe/src/main/java/org/apache/impala/catalog/FileDescriptor.java
index d8ac36add..aa611585f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileDescriptor.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.flatbuffers.FlatBufferBuilder;
 
@@ -37,7 +38,6 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.fb.FbCompression;
 import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.fb.FbFileDesc;
-import org.apache.impala.fb.FbFileMetadata;
 import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.util.ListMap;
@@ -55,35 +55,34 @@ public class FileDescriptor implements 
Comparable<FileDescriptor> {
   // Internal representation of a file descriptor using a FlatBuffer.
   private final FbFileDesc fbFileDescriptor_;
 
-  // Internal representation of additional file metadata, e.g. Iceberg 
metadata.
-  private final FbFileMetadata fbFileMetadata_;
-
-  private FileDescriptor(FbFileDesc fileDescData) {
-    fbFileDescriptor_ = fileDescData;
-    fbFileMetadata_ = null;
-  }
-
-  public FileDescriptor(FbFileDesc fileDescData, FbFileMetadata fileMetadata) {
+  protected FileDescriptor(FbFileDesc fileDescData) {
     fbFileDescriptor_ = fileDescData;
-    fbFileMetadata_ = fileMetadata;
   }
 
   public static FileDescriptor fromThrift(THdfsFileDesc desc) {
     ByteBuffer bb = ByteBuffer.wrap(desc.getFile_desc_data());
-    if (desc.isSetFile_metadata()) {
-      ByteBuffer bbMd = ByteBuffer.wrap(desc.getFile_metadata());
-      return new FileDescriptor(FbFileDesc.getRootAsFbFileDesc(bb),
-          FbFileMetadata.getRootAsFbFileMetadata(bbMd));
-    }
+    Preconditions.checkState(!desc.isSetFile_metadata());
     return new FileDescriptor(FbFileDesc.getRootAsFbFileDesc(bb));
   }
 
+  public THdfsFileDesc toThrift() {
+    THdfsFileDesc fd = new THdfsFileDesc();
+    ByteBuffer bb = fbFileDescriptor_.getByteBuffer();
+    fd.setFile_desc_data(bb);
+    return fd;
+  }
+
   /**
    * Clone the descriptor, but change the replica indexes to reference the new 
host
    * index 'dstIndex' instead of the original index 'origIndex'.
    */
   public FileDescriptor cloneWithNewHostIndex(
       List<TNetworkAddress> origIndex, ListMap<TNetworkAddress> dstIndex) {
+    return new FileDescriptor(fbFileDescWithNewHostIndex(origIndex, dstIndex));
+  }
+
+  protected FbFileDesc fbFileDescWithNewHostIndex(
+      List<TNetworkAddress> origIndex, ListMap<TNetworkAddress> dstIndex) {
     // First clone the flatbuffer with no changes.
     ByteBuffer oldBuf = fbFileDescriptor_.getByteBuffer();
     ByteBuffer newBuf = ByteBuffer.allocate(oldBuf.remaining());
@@ -103,11 +102,8 @@ public class FileDescriptor implements 
Comparable<FileDescriptor> {
         it.mutateReplicaHostIdxs(j, FileBlock.makeReplicaIdx(isCached, 
newHostIdx));
       }
     }
-    return new FileDescriptor(cloned, fbFileMetadata_);
-  }
 
-  public FileDescriptor cloneWithFileMetadata(FbFileMetadata fileMetadata) {
-    return new FileDescriptor(fbFileDescriptor_, fileMetadata);
+    return cloned;
   }
 
   /**
@@ -277,20 +273,6 @@ public class FileDescriptor implements 
Comparable<FileDescriptor> {
     return fbFileDescriptor_;
   }
 
-  public FbFileMetadata getFbFileMetadata() {
-    return fbFileMetadata_;
-  }
-
-  public THdfsFileDesc toThrift() {
-    THdfsFileDesc fd = new THdfsFileDesc();
-    ByteBuffer bb = fbFileDescriptor_.getByteBuffer();
-    fd.setFile_desc_data(bb);
-    if (fbFileMetadata_ != null) {
-      fd.setFile_metadata(fbFileMetadata_.getByteBuffer());
-    }
-    return fd;
-  }
-
   @Override
   public String toString() {
     int numFileBlocks = getNumFileBlocks();
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index a6e0534cc..6298f5bac 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -61,7 +61,7 @@ public class FileMetadataLoader {
 
   protected final Path partDir_;
   protected final boolean recursive_;
-  protected final ImmutableMap<String, FileDescriptor> oldFdsByPath_;
+  protected final ImmutableMap<String, ? extends FileDescriptor> oldFdsByPath_;
   private final ListMap<TNetworkAddress> hostIndex_;
   @Nullable
   private final ValidWriteIdList writeIds_;
@@ -97,7 +97,7 @@ public class FileMetadataLoader {
    *   this loader will filter files based on Hudi's HoodieROTablePathFilter 
method
    */
   public FileMetadataLoader(Path partDir, boolean recursive,
-      Iterable<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
+      Iterable<? extends FileDescriptor> oldFds, ListMap<TNetworkAddress> 
hostIndex,
       @Nullable ValidTxnList validTxnList, @Nullable ValidWriteIdList writeIds,
       @Nullable HdfsFileFormat fileFormat) {
     // Either both validTxnList and writeIds are null, or none of them.
@@ -118,7 +118,7 @@ public class FileMetadataLoader {
   }
 
   public FileMetadataLoader(Path partDir, boolean recursive,
-      Iterable<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
+      Iterable<? extends FileDescriptor> oldFds, ListMap<TNetworkAddress> 
hostIndex,
       @Nullable ValidTxnList validTxnList, @Nullable ValidWriteIdList 
writeIds) {
     this(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds, null);
   }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
index 1878eab12..2a3caa5cf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
@@ -66,41 +66,17 @@ public class IcebergContentFileStore {
     }
   }
 
-  protected static FileDescriptor decode(EncodedFileDescriptor encodedFd) {
+  protected static IcebergFileDescriptor decode(EncodedFileDescriptor 
encodedFd) {
+    Preconditions.checkNotNull(encodedFd.fileDesc_);
     Preconditions.checkNotNull(encodedFd.fileMetadata_);
 
-    return new FileDescriptor(
+    return new IcebergFileDescriptor(
         FbFileDesc.getRootAsFbFileDesc(ByteBuffer.wrap(encodedFd.fileDesc_)),
-        FbFileMetadata.getRootAsFbFileMetadata(
-            ByteBuffer.wrap(encodedFd.fileMetadata_))) {
-      // Whenever getting the FileDescriptor for the same path hash it will be 
a
-      // different FileDescriptor object. As a result the default equals() and
-      // hashCode() functions can't be used to judge equality between these
-      // FileDescriptors.
-      @Override
-      public boolean equals(Object obj) {
-        if (obj == null) return false;
-        if (!(obj instanceof FileDescriptor)) return false;
-        FileDescriptor otherFD = (FileDescriptor) obj;
-        if ((this.getFbFileMetadata() == null && otherFD.getFbFileMetadata() 
!= null) ||
-            (this.getFbFileMetadata() != null && otherFD.getFbFileMetadata() 
== null)) {
-          return false;
-        }
-        return this.getFbFileDescriptor().getByteBuffer().array() ==
-            otherFD.getFbFileDescriptor().getByteBuffer().array() &&
-            this.getFbFileMetadata().getByteBuffer().array() ==
-                otherFD.getFbFileMetadata().getByteBuffer().array();
-      }
-
-      @Override
-      public int hashCode() {
-        return getAbsolutePath().hashCode();
-      }
-    };
+        
FbFileMetadata.getRootAsFbFileMetadata(ByteBuffer.wrap(encodedFd.fileMetadata_)));
   }
 
   // Function to convert from a FileDescriptor to an EncodedFileDescriptor.
-  protected static EncodedFileDescriptor encode(FileDescriptor fd) {
+  protected static EncodedFileDescriptor encode(IcebergFileDescriptor fd) {
     return new EncodedFileDescriptor(
         encodeFB(fd.getFbFileDescriptor()),
         encodeFB(fd.getFbFileMetadata()));
@@ -132,7 +108,7 @@ public class IcebergContentFileStore {
       return false;
     }
 
-    public FileDescriptor get(String pathHash) {
+    public IcebergFileDescriptor get(String pathHash) {
       if (!fileDescMap_.containsKey(pathHash)) return null;
       return decode(fileDescMap_.get(pathHash));
     }
@@ -141,7 +117,7 @@ public class IcebergContentFileStore {
       return fileDescList_.size();
     }
 
-    List<FileDescriptor> getList() {
+    List<IcebergFileDescriptor> getList() {
       return Lists.transform(fileDescList_, fd -> 
IcebergContentFileStore.decode(fd));
     }
 
@@ -160,7 +136,7 @@ public class IcebergContentFileStore {
         List<TNetworkAddress> networkAddresses, ListMap<TNetworkAddress> 
hostIndex) {
       MapListContainer ret = new MapListContainer();
       for (Map.Entry<String, THdfsFileDesc> entry : thriftMap.entrySet()) {
-        FileDescriptor fd = FileDescriptor.fromThrift(entry.getValue());
+        IcebergFileDescriptor fd = 
IcebergFileDescriptor.fromThrift(entry.getValue());
         Preconditions.checkNotNull(fd);
         if (networkAddresses != null) {
           Preconditions.checkNotNull(hostIndex);
@@ -190,76 +166,77 @@ public class IcebergContentFileStore {
   public IcebergContentFileStore() {}
 
   public IcebergContentFileStore(
-      Table iceApiTable, List<FileDescriptor> fileDescriptors,
+      Table iceApiTable, List<IcebergFileDescriptor> fileDescriptors,
       GroupedContentFiles icebergFiles) {
     Preconditions.checkNotNull(iceApiTable);
     Preconditions.checkNotNull(fileDescriptors);
     Preconditions.checkNotNull(icebergFiles);
 
-    Map<String, FileDescriptor> hdfsFileDescMap = new HashMap<>();
+    Map<String, IcebergFileDescriptor> fileDescMap = new HashMap<>();
     for (FileDescriptor fileDesc : fileDescriptors) {
+      Preconditions.checkState(fileDesc instanceof IcebergFileDescriptor);
       Path path = new Path(fileDesc.getAbsolutePath(iceApiTable.location()));
-      hdfsFileDescMap.put(path.toUri().getPath(), fileDesc);
+      fileDescMap.put(path.toUri().getPath(), (IcebergFileDescriptor)fileDesc);
     }
 
     for (DataFile dataFile : icebergFiles.dataFilesWithoutDeletes) {
       Pair<String, EncodedFileDescriptor> pathHashAndFd =
-          getPathHashAndFd(dataFile, hdfsFileDescMap);
+          getPathHashAndFd(dataFile, fileDescMap);
       dataFilesWithoutDeletes_.add(pathHashAndFd.first, pathHashAndFd.second);
     }
     for (DataFile dataFile : icebergFiles.dataFilesWithDeletes) {
       Pair<String, EncodedFileDescriptor> pathHashAndFd =
-          getPathHashAndFd(dataFile, hdfsFileDescMap);
+          getPathHashAndFd(dataFile, fileDescMap);
       dataFilesWithDeletes_.add(pathHashAndFd.first, pathHashAndFd.second);
     }
     for (DeleteFile deleteFile : icebergFiles.positionDeleteFiles) {
       Pair<String, EncodedFileDescriptor> pathHashAndFd =
-          getPathHashAndFd(deleteFile, hdfsFileDescMap);
+          getPathHashAndFd(deleteFile, fileDescMap);
       positionDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second);
     }
     for (DeleteFile deleteFile : icebergFiles.equalityDeleteFiles) {
       Pair<String, EncodedFileDescriptor> pathHashAndFd =
-          getPathHashAndFd(deleteFile, hdfsFileDescMap);
+          getPathHashAndFd(deleteFile, fileDescMap);
       equalityDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second);
     }
   }
 
   // This is only invoked during time travel, when we are querying a snapshot 
that has
   // data files which have been removed since.
-  public void addOldFileDescriptor(String pathHash, FileDescriptor desc) {
+  public void addOldFileDescriptor(String pathHash, IcebergFileDescriptor 
desc) {
     oldFileDescMap_.put(pathHash, encode(desc));
   }
 
-  public FileDescriptor getDataFileDescriptor(String pathHash) {
-    FileDescriptor desc = dataFilesWithoutDeletes_.get(pathHash);
+  public IcebergFileDescriptor getDataFileDescriptor(String pathHash) {
+    IcebergFileDescriptor desc = dataFilesWithoutDeletes_.get(pathHash);
     if (desc != null) return desc;
     return dataFilesWithDeletes_.get(pathHash);
   }
 
-  public FileDescriptor getDeleteFileDescriptor(String pathHash) {
-    FileDescriptor ret = positionDeleteFiles_.get(pathHash);
+  public IcebergFileDescriptor getDeleteFileDescriptor(String pathHash) {
+    IcebergFileDescriptor ret = positionDeleteFiles_.get(pathHash);
     if (ret != null) return ret;
     return equalityDeleteFiles_.get(pathHash);
   }
 
-  public FileDescriptor getOldFileDescriptor(String pathHash) {
+  public IcebergFileDescriptor getOldFileDescriptor(String pathHash) {
     if (!oldFileDescMap_.containsKey(pathHash)) return null;
     return decode(oldFileDescMap_.get(pathHash));
   }
 
-  public List<FileDescriptor> getDataFilesWithoutDeletes() {
+  public List<IcebergFileDescriptor> getDataFilesWithoutDeletes() {
     return dataFilesWithoutDeletes_.getList();
   }
 
-  public List<FileDescriptor> getDataFilesWithDeletes() {
+  public List<IcebergFileDescriptor> getDataFilesWithDeletes() {
     return dataFilesWithDeletes_.getList();
   }
 
-  public List<FileDescriptor> getPositionDeleteFiles() {
+  public List<IcebergFileDescriptor> getPositionDeleteFiles() {
     return positionDeleteFiles_.getList();
   }
 
-  public List<FileDescriptor> getEqualityDeleteFiles() {
+  public List<IcebergFileDescriptor> getEqualityDeleteFiles() {
     return equalityDeleteFiles_.getList();
   }
 
@@ -270,7 +247,7 @@ public class IcebergContentFileStore {
            equalityDeleteFiles_.getNumFiles();
   }
 
-  public Iterable<FileDescriptor> getAllFiles() {
+  public Iterable<IcebergFileDescriptor> getAllFiles() {
     return Iterables.concat(
         dataFilesWithoutDeletes_.getList(),
         dataFilesWithDeletes_.getList(),
@@ -278,7 +255,7 @@ public class IcebergContentFileStore {
         equalityDeleteFiles_.getList());
   }
 
-  public Iterable<FileDescriptor> getAllDataFiles() {
+  public Iterable<IcebergFileDescriptor> getAllDataFiles() {
     return Iterables.concat(
         dataFilesWithoutDeletes_.getList(),
         dataFilesWithDeletes_.getList());
@@ -302,17 +279,17 @@ public class IcebergContentFileStore {
   }
 
   private Pair<String, EncodedFileDescriptor> getPathHashAndFd(
-      ContentFile<?> contentFile, Map<String, FileDescriptor> hdfsFileDescMap) 
{
+      ContentFile<?> contentFile, Map<String, IcebergFileDescriptor> 
fileDescMap) {
     return new Pair<>(
         IcebergUtil.getFilePathHash(contentFile),
-        getIcebergFd(hdfsFileDescMap, contentFile));
+        getIcebergFd(fileDescMap, contentFile));
   }
 
   private EncodedFileDescriptor getIcebergFd(
-      Map<String, FileDescriptor> hdfsFileDescMap,
+      Map<String, IcebergFileDescriptor> fileDescMap,
       ContentFile<?> contentFile) {
     Path path = new Path(contentFile.path().toString());
-    FileDescriptor fileDesc = hdfsFileDescMap.get(path.toUri().getPath());
+    IcebergFileDescriptor fileDesc = fileDescMap.get(path.toUri().getPath());
 
     FbFileMetadata fileMetadata = fileDesc.getFbFileMetadata();
     Preconditions.checkState(fileMetadata != null);
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergDeleteTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergDeleteTable.java
index a5ff5c51b..3f41799d1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergDeleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergDeleteTable.java
@@ -43,11 +43,14 @@ public abstract class IcebergDeleteTable extends 
VirtualTable implements FeIcebe
     protected final static int INVALID_MAP_VALUE_ID = -1;
 
     protected FeIcebergTable baseTable_;
-    protected Set<FileDescriptor> deleteFiles_;
+    protected Set<IcebergFileDescriptor> deleteFiles_;
     protected long deleteRecordsCount_;
 
-    public IcebergDeleteTable(FeIcebergTable baseTable, String name,
-        Set<FileDescriptor> deleteFiles, long deleteRecordsCount) {
+    public IcebergDeleteTable(
+        FeIcebergTable baseTable,
+        String name,
+        Set<IcebergFileDescriptor> deleteFiles,
+        long deleteRecordsCount) {
       super(baseTable.getMetaStoreTable(), baseTable.getDb(), name,
           baseTable.getOwnerUser());
       baseTable_ = baseTable;
@@ -65,7 +68,7 @@ public abstract class IcebergDeleteTable extends VirtualTable 
implements FeIcebe
     @Override
     public TTableStats getTTableStats() {
       long totalBytes = 0;
-      for (FileDescriptor df : deleteFiles_) {
+      for (IcebergFileDescriptor df : deleteFiles_) {
           totalBytes += df.getFileLength();
       }
       TTableStats ret = new TTableStats(getNumRows());
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergEqualityDeleteTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergEqualityDeleteTable.java
index 0503bb21f..f6ae7c532 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergEqualityDeleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergEqualityDeleteTable.java
@@ -34,8 +34,12 @@ import org.apache.iceberg.types.Types.NestedField;
  */
 public class IcebergEqualityDeleteTable extends IcebergDeleteTable  {
 
-  public IcebergEqualityDeleteTable(FeIcebergTable baseTable, String name,
-      Set<FileDescriptor> deleteFiles, List<Integer> equalityIds, long 
deleteRecordsCount)
+  public IcebergEqualityDeleteTable(
+      FeIcebergTable baseTable,
+      String name,
+      Set<IcebergFileDescriptor> deleteFiles,
+      List<Integer> equalityIds,
+      long deleteRecordsCount)
       throws ImpalaRuntimeException {
     super(baseTable, name, deleteFiles, deleteRecordsCount);
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergFileDescriptor.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergFileDescriptor.java
new file mode 100644
index 000000000..652a492d1
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileDescriptor.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.fb.FbFileDesc;
+import org.apache.impala.fb.FbFileMetadata;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class IcebergFileDescriptor extends FileDescriptor {
+  // Internal representation of Iceberg metadata.
+  private final FbFileMetadata fbFileMetadata_;
+
+  public IcebergFileDescriptor(FbFileDesc fileDescData, FbFileMetadata 
fileMetadata) {
+    super(fileDescData);
+
+    Preconditions.checkNotNull(fileMetadata);
+    Preconditions.checkNotNull(fileMetadata.icebergMetadata());
+    fbFileMetadata_ = fileMetadata;
+  }
+
+  public static IcebergFileDescriptor fromThrift(THdfsFileDesc desc) {
+    Preconditions.checkState(desc.isSetFile_metadata());
+
+    ByteBuffer bb = ByteBuffer.wrap(desc.getFile_desc_data());
+    ByteBuffer bbMd = ByteBuffer.wrap(desc.getFile_metadata());
+
+    return new IcebergFileDescriptor(
+        FbFileDesc.getRootAsFbFileDesc(bb),
+        FbFileMetadata.getRootAsFbFileMetadata(bbMd));
+  }
+
+  @Override
+  public THdfsFileDesc toThrift() {
+    THdfsFileDesc fd = super.toThrift();
+    fd.setFile_metadata(fbFileMetadata_.getByteBuffer());
+    return fd;
+  }
+
+  public static IcebergFileDescriptor cloneWithFileMetadata(
+      FileDescriptor fd, FbFileMetadata fileMetadata) {
+    return new IcebergFileDescriptor(fd.getFbFileDescriptor(), fileMetadata);
+  }
+
+  @Override
+  public IcebergFileDescriptor cloneWithNewHostIndex(
+      List<TNetworkAddress> origIndex, ListMap<TNetworkAddress> dstIndex) {
+    return new IcebergFileDescriptor(
+        fbFileDescWithNewHostIndex(origIndex, dstIndex), fbFileMetadata_);
+  }
+
+  public FbFileMetadata getFbFileMetadata() {
+    return fbFileMetadata_;
+  }
+
+  // The default equals() and hashCode() functions aren't sufficient for
+  // IcebergFileDescriptors. The reason is that these are stored in
+  // IcebergContentFileStore in an encoded format and every 'get' operation on 
the store
+  // returns a different object for the same file descriptor. Instead, this 
function
+  // checks for the equality of the underlying byte array representations.
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof IcebergFileDescriptor)) return false;
+
+    IcebergFileDescriptor otherFD = (IcebergFileDescriptor) obj;
+    Preconditions.checkNotNull(this.getFbFileMetadata());
+    Preconditions.checkNotNull(otherFD.getFbFileMetadata());
+
+    return this.getFbFileDescriptor().getByteBuffer().array() ==
+        otherFD.getFbFileDescriptor().getByteBuffer().array() &&
+        this.getFbFileMetadata().getByteBuffer().array() ==
+            otherFD.getFbFileMetadata().getByteBuffer().array();
+  }
+
+  @Override
+  public int hashCode() {
+    return getAbsolutePath().hashCode();
+  }
+}
\ No newline at end of file
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
index af338372e..ac56f8d12 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -22,6 +22,7 @@ import static 
org.apache.impala.catalog.ParallelFileMetadataLoader.createPool;
 import static org.apache.impala.catalog.ParallelFileMetadataLoader.getPoolSize;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -42,8 +43,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.iceberg.ContentFile;
 import org.apache.impala.catalog.FeIcebergTable.Utils;
 import org.apache.impala.catalog.HdfsTable.FileMetadataStats;
@@ -82,14 +81,14 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
   private boolean useParallelListing_;
 
   public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl,
-      Iterable<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
+      Iterable<IcebergFileDescriptor> oldFds, ListMap<TNetworkAddress> 
hostIndex,
       GroupedContentFiles icebergFiles, boolean 
requiresDataFilesInTableLocation) {
     this(iceTbl, oldFds, hostIndex, icebergFiles, 
requiresDataFilesInTableLocation,
         BackendConfig.INSTANCE.icebergReloadNewFilesThreshold());
   }
 
   public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl,
-      Iterable<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
+      Iterable<IcebergFileDescriptor> oldFds, ListMap<TNetworkAddress> 
hostIndex,
       GroupedContentFiles icebergFiles, boolean 
requiresDataFilesInTableLocation,
       int newFilesThresholdParam) {
     super(FileSystemUtil.createFullyQualifiedPath(new 
Path(iceTbl.location())), true,
@@ -117,6 +116,14 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
     }
   }
 
+  public List<IcebergFileDescriptor> getLoadedIcebergFds() {
+    Preconditions.checkState(loadedFds_ != null,
+        "Must have successfully loaded first");
+    return loadedFds_.stream()
+        .map(fd -> (IcebergFileDescriptor)fd)
+        .collect(Collectors.toList());
+  }
+
   private void loadInternal() throws CatalogException, IOException {
     loadedFds_ = new ArrayList<>();
     loadStats_ = new LoadStats(partDir_);
@@ -144,7 +151,7 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
       if (FileSystemUtil.supportsStorageIds(fsForPath)) {
         filesSupportsStorageIds.add(Pair.create(fsForPath, contentFile));
       } else {
-        FileDescriptor fd = createFd(fsForPath, contentFile, null, null);
+        IcebergFileDescriptor fd = createFd(fsForPath, contentFile, null, 
null);
         loadedFds_.add(fd);
         fileMetadataStats_.accumulate(fd);
         ++loadStats_.loadedFiles;
@@ -164,7 +171,7 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
       Path path = FileSystemUtil.createFullyQualifiedPath(
           new Path(contentFileInfo.getSecond().path().toString()));
       FileStatus stat = nameToFileStatus.get(path);
-      FileDescriptor fd = createFd(contentFileInfo.getFirst(),
+      IcebergFileDescriptor fd = createFd(contentFileInfo.getFirst(),
           contentFileInfo.getSecond(), stat, numUnknownDiskIds);
       loadedFds_.add(fd);
       fileMetadataStats_.accumulate(fd);
@@ -203,7 +210,7 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
     return newContentFiles;
   }
 
-  private FileDescriptor createFd(FileSystem fs, ContentFile<?> contentFile,
+  private IcebergFileDescriptor createFd(FileSystem fs, ContentFile<?> 
contentFile,
       FileStatus stat, Reference<Long> numUnknownDiskIds) throws IOException {
     if (stat == null) {
       Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
@@ -226,8 +233,9 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
         absPath = stat.getPath().toString();
       }
     }
-    FileDescriptor fsFd = createFd(fs, stat, relPath, numUnknownDiskIds, 
absPath);
-    return fsFd.cloneWithFileMetadata(
+
+    return IcebergFileDescriptor.cloneWithFileMetadata(
+        createFd(fs, stat, relPath, numUnknownDiskIds, absPath),
         IcebergUtil.createIcebergMetadata(iceTbl_, contentFile));
   }
 
@@ -291,7 +299,7 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
     return null;
   }
 
-  FileDescriptor getOldFd(ContentFile<?> contentFile) throws IOException {
+  IcebergFileDescriptor getOldFd(ContentFile<?> contentFile) throws 
IOException {
     Path contentFilePath = FileSystemUtil.createFullyQualifiedPath(
         new Path(contentFile.path().toString()));
     String lookupPath = FileSystemUtil.relativizePathNoThrow(contentFilePath, 
partDir_);
@@ -303,6 +311,10 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
         lookupPath = contentFilePath.toString();
       }
     }
-    return oldFdsByPath_.get(lookupPath);
+    FileDescriptor fd = oldFdsByPath_.get(lookupPath);
+    if (fd == null) return null;
+
+    Preconditions.checkState(fd instanceof IcebergFileDescriptor);
+    return (IcebergFileDescriptor) fd;
   }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java
index 06ed85aff..2988edd34 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java
@@ -37,9 +37,12 @@ public class IcebergPositionDeleteTable extends 
IcebergDeleteTable  {
         new TColumnStats());
   }
 
-  public IcebergPositionDeleteTable(FeIcebergTable baseTable, String name,
-      Set<FileDescriptor> deleteFiles,
-      long deleteRecordsCount, TColumnStats filePathsStats) {
+  public IcebergPositionDeleteTable(
+      FeIcebergTable baseTable,
+      String name,
+      Set<IcebergFileDescriptor> deleteFiles,
+      long deleteRecordsCount,
+      TColumnStats filePathsStats) {
     super(baseTable, name, deleteFiles, deleteRecordsCount);
     Column filePath = new IcebergColumn(FILE_PATH_COLUMN, Type.STRING, 
/*comment=*/"",
         colsByPos_.size(), IcebergTable.V2_FILE_PATH_FIELD_ID, 
INVALID_MAP_KEY_ID,
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 31730c66b..7a20d88bf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -455,7 +455,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
             Utils.requiresDataFilesInTableLocation(this));
         loader.load();
         fileStore_ = new IcebergContentFileStore(
-            icebergApiTable_, loader.getLoadedFds(), icebergFiles);
+            icebergApiTable_, loader.getLoadedIcebergFds(), icebergFiles);
         partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
         setIcebergTableStats();
         loadAllColumnStats(msClient, catalogTimeline);
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 396db73ba..b16f48a14 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -60,6 +60,7 @@ import org.apache.impala.catalog.FileBlock;
 import org.apache.impala.catalog.FileDescriptor;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.IcebergFileDescriptor;
 import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Table;
@@ -1535,8 +1536,9 @@ public class HdfsScanNode extends ScanNode {
         hdfsFileSplit.setIs_encrypted(fileDesc.getIsEncrypted());
         hdfsFileSplit.setIs_erasure_coded(fileDesc.getIsEc());
         scanRange.setHdfs_file_split(hdfsFileSplit);
-        if (fileDesc.getFbFileMetadata() != null) {
-          
scanRange.setFile_metadata(fileDesc.getFbFileMetadata().getByteBuffer());
+        if (fileDesc instanceof IcebergFileDescriptor) {
+          scanRange.setFile_metadata(
+              
((IcebergFileDescriptor)fileDesc).getFbFileMetadata().getByteBuffer());
         }
         TScanRangeLocationList scanRangeLocations = new 
TScanRangeLocationList();
         scanRangeLocations.scan_range = scanRange;
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index 5cd64aebf..4eaae76fb 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -35,6 +35,7 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FileDescriptor;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.IcebergFileDescriptor;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.ThriftSerializationCtx;
@@ -56,7 +57,7 @@ public class IcebergScanNode extends HdfsScanNode {
   // List of files needed to be scanned by this scan node. The list is sorted 
in case of
   // partitioned tables, so partition range scans are scheduled more evenly.
   // See IMPALA-12765 for details.
-  private List<FileDescriptor> fileDescs_;
+  private List<IcebergFileDescriptor> fileDescs_;
 
   // Indicates that the files in 'fileDescs_' are sorted.
   private boolean filesAreSorted_ = false;
@@ -81,14 +82,14 @@ public class IcebergScanNode extends HdfsScanNode {
   private final PlanNodeId deleteFileScanNodeId;
 
   public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts,
-      MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs,
+      MultiAggregateInfo aggInfo, List<IcebergFileDescriptor> fileDescs,
       List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts, long 
snapshotId) {
     this(id, tblRef, conjuncts, aggInfo, fileDescs, nonIdentityConjuncts,
         skippedConjuncts, null, snapshotId);
   }
 
   public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts,
-      MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs,
+      MultiAggregateInfo aggInfo, List<IcebergFileDescriptor> fileDescs,
       List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts, PlanNodeId 
deleteId,
       long snapshotId) {
     super(id, tblRef.getDesc(), conjuncts,
@@ -127,11 +128,13 @@ public class IcebergScanNode extends HdfsScanNode {
     if (sampledFiles_ != null) {
       for (List<FileDescriptor> sampledFileDescs : sampledFiles_.values()) {
         for (FileDescriptor fd : sampledFileDescs) {
-          cardinality_ += 
fd.getFbFileMetadata().icebergMetadata().recordCount();
+          Preconditions.checkState(fd instanceof IcebergFileDescriptor);
+          IcebergFileDescriptor iceFd = (IcebergFileDescriptor) fd;
+          cardinality_ += 
iceFd.getFbFileMetadata().icebergMetadata().recordCount();
         }
       }
     } else {
-      for (FileDescriptor fd : fileDescs_) {
+      for (IcebergFileDescriptor fd : fileDescs_) {
         cardinality_ += fd.getFbFileMetadata().icebergMetadata().recordCount();
       }
     }
@@ -179,14 +182,14 @@ public class IcebergScanNode extends HdfsScanNode {
     if (limit != -1) {
       long cnt = 0;
       List<FileDescriptor> ret = new ArrayList<>();
-      for (FileDescriptor fd : fileDescs_) {
+      for (IcebergFileDescriptor fd : fileDescs_) {
         if (cnt == limit) break;
         ret.add(fd);
         ++cnt;
       }
       return ret;
     } else {
-      return fileDescs_;
+      return new ArrayList<>(fileDescs_);
     }
   }
 
@@ -267,7 +270,7 @@ public class IcebergScanNode extends HdfsScanNode {
     boolean hasParquet = false;
     boolean hasOrc = false;
     boolean hasAvro = false;
-    for (FileDescriptor fileDesc : fileDescs_) {
+    for (IcebergFileDescriptor fileDesc : fileDescs_) {
       byte fileFormat = 
fileDesc.getFbFileMetadata().icebergMetadata().fileFormat();
       if (fileFormat == FbIcebergDataFileFormat.PARQUET) {
         hasParquet = true;
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 8cd74c7ff..1a20f487f 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -64,6 +64,7 @@ import org.apache.impala.catalog.FileDescriptor;
 import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.IcebergContentFileStore;
 import org.apache.impala.catalog.IcebergEqualityDeleteTable;
+import org.apache.impala.catalog.IcebergFileDescriptor;
 import org.apache.impala.catalog.IcebergPositionDeleteTable;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
@@ -114,14 +115,14 @@ public class IcebergScanPlanner {
   private List<Expr> nonIdentityConjuncts_ = new ArrayList<>();
 
   // Containers for different groupings of file descriptors.
-  private List<FileDescriptor> dataFilesWithoutDeletes_ = new ArrayList<>();
-  private List<FileDescriptor> dataFilesWithDeletes_ = new ArrayList<>();
-  private Set<FileDescriptor> positionDeleteFiles_ = new HashSet<>();
+  private List<IcebergFileDescriptor> dataFilesWithoutDeletes_ = new 
ArrayList<>();
+  private List<IcebergFileDescriptor> dataFilesWithDeletes_ = new 
ArrayList<>();
+  private Set<IcebergFileDescriptor> positionDeleteFiles_ = new HashSet<>();
 
   // Holds all the equalityFieldIds from the equality delete file descriptors 
involved in
   // this query.
   private Set<Integer> allEqualityFieldIds_ = new HashSet<>();
-  private Map<List<Integer>, Set<FileDescriptor>> equalityIdsToDeleteFiles_ =
+  private Map<List<Integer>, Set<IcebergFileDescriptor>> 
equalityIdsToDeleteFiles_ =
       new HashMap<>();
 
 
@@ -468,7 +469,7 @@ public class IcebergScanPlanner {
         getOrderedEqualityFieldIds(equalityDeletesRecordCount_);
     JoinNode joinNode = null;
     for (List<Integer> equalityIds : orderedEqualityFieldIds) {
-      Set<FileDescriptor> equalityDeleteFiles =
+      Set<IcebergFileDescriptor> equalityDeleteFiles =
           equalityIdsToDeleteFiles_.get(equalityIds);
       Preconditions.checkState(equalityDeleteFiles != null &&
           !equalityDeleteFiles.isEmpty());
@@ -563,14 +564,15 @@ public class IcebergScanPlanner {
         if (residualExpr != null && !(residualExpr instanceof True)) {
           residualExpressions_.add(residualExpr);
         }
-        Pair<FileDescriptor, Boolean> fileDesc = 
getFileDescriptor(fileScanTask.file());
+        Pair<IcebergFileDescriptor, Boolean> fileDesc =
+            getFileDescriptor(fileScanTask.file());
         if (!fileDesc.second) ++dataFilesCacheMisses;
         if (fileScanTask.deletes().isEmpty()) {
           dataFilesWithoutDeletes_.add(fileDesc.first);
         } else {
           dataFilesWithDeletes_.add(fileDesc.first);
           for (DeleteFile delFile : fileScanTask.deletes()) {
-            Pair<FileDescriptor, Boolean> delFileDesc = 
getFileDescriptor(delFile);
+            Pair<IcebergFileDescriptor, Boolean> delFileDesc = 
getFileDescriptor(delFile);
             if (!delFileDesc.second) ++dataFilesCacheMisses;
             if (delFile.content() == FileContent.EQUALITY_DELETES) {
               addEqualityDeletesAndIds(delFileDesc.first);
@@ -594,7 +596,7 @@ public class IcebergScanPlanner {
     updateDeleteStatistics();
   }
 
-  private void addEqualityDeletesAndIds(FileDescriptor fd) {
+  private void addEqualityDeletesAndIds(IcebergFileDescriptor fd) {
     FbIcebergMetadata fileMetadata = fd.getFbFileMetadata().icebergMetadata();
     Preconditions.checkState(fileMetadata.equalityFieldIdsLength() > 0,
         "Equality delete file doesn't have equality field IDs: " + 
fd.toString());
@@ -609,10 +611,10 @@ public class IcebergScanPlanner {
     equalityIdsToDeleteFiles_.get(eqFieldIdList).add(fd);
   }
 
-  private void initEqualityIds(List<FileDescriptor> equalityDeleteFiles) {
+  private void initEqualityIds(List<IcebergFileDescriptor> 
equalityDeleteFiles) {
     Preconditions.checkState(allEqualityFieldIds_.isEmpty());
     Preconditions.checkState(equalityIdsToDeleteFiles_.isEmpty());
-    for (FileDescriptor fd : equalityDeleteFiles) addEqualityDeletesAndIds(fd);
+    for (IcebergFileDescriptor fd : equalityDeleteFiles) 
addEqualityDeletesAndIds(fd);
   }
 
   private void filterConjuncts() {
@@ -653,19 +655,19 @@ public class IcebergScanPlanner {
   }
 
   private void updateDeleteStatistics() {
-    for (FileDescriptor fd : dataFilesWithDeletes_) {
+    for (IcebergFileDescriptor fd : dataFilesWithDeletes_) {
       updateDataFilesWithDeletesStatistics(fd);
     }
-    for (FileDescriptor fd : positionDeleteFiles_) {
+    for (IcebergFileDescriptor fd : positionDeleteFiles_) {
       updatePositionDeleteFilesStatistics(fd);
     }
-    for (Map.Entry<List<Integer>, Set<FileDescriptor>> entry :
+    for (Map.Entry<List<Integer>, Set<IcebergFileDescriptor>> entry :
         equalityIdsToDeleteFiles_.entrySet()) {
       updateEqualityDeleteFilesStatistics(entry.getKey(), entry.getValue());
     }
   }
 
-  private void updateDataFilesWithDeletesStatistics(FileDescriptor fd) {
+  private void updateDataFilesWithDeletesStatistics(IcebergFileDescriptor fd) {
     String path = fd.getAbsolutePath(getIceTable().getLocation());
     long pathSize = path.length();
     dataFilesWithDeletesSumPaths_ += pathSize;
@@ -674,14 +676,15 @@ public class IcebergScanPlanner {
     }
   }
 
-  private void updatePositionDeleteFilesStatistics(FileDescriptor fd) {
+  private void updatePositionDeleteFilesStatistics(IcebergFileDescriptor fd) {
     positionDeletesRecordCount_ += getRecordCount(fd);
   }
 
-  private void updateEqualityDeleteFilesStatistics(List<Integer> equalityIds,
-      Set<FileDescriptor> fileDescriptors) {
+  private void updateEqualityDeleteFilesStatistics(
+      List<Integer> equalityIds,
+      Set<IcebergFileDescriptor> fileDescriptors) {
     long numRecords = 0;
-    for (FileDescriptor fd : fileDescriptors) {
+    for (IcebergFileDescriptor fd : fileDescriptors) {
       numRecords += getRecordCount(fd);
       equalityDeleteSequenceNumbers_.add(
           fd.getFbFileMetadata().icebergMetadata().dataSequenceNumber());
@@ -689,7 +692,7 @@ public class IcebergScanPlanner {
     equalityDeletesRecordCount_.put(equalityIds, numRecords);
   }
 
-  private long getRecordCount(FileDescriptor fd) {
+  private long getRecordCount(IcebergFileDescriptor fd) {
     long recordCount = fd.getFbFileMetadata().icebergMetadata().recordCount();
     // 'record_count' is a required field for Iceberg data files, but let's 
still
     // prepare for the case when a compute engine doesn't fill it.
@@ -708,46 +711,57 @@ public class IcebergScanPlanner {
     return colStats;
   }
 
-  private Pair<FileDescriptor, Boolean> getFileDescriptor(ContentFile cf)
+  /*
+   * Returns an IcebergFileDescriptor and an indicator whether it was found in 
the cache.
+   * True for cache hit and false for cache miss.
+   */
+  private Pair<IcebergFileDescriptor, Boolean> 
getFileDescriptor(ContentFile<?> cf)
       throws ImpalaRuntimeException {
-    boolean cachehit = true;
     String pathHash = IcebergUtil.getFilePathHash(cf);
     IcebergContentFileStore fileStore = getIceTable().getContentFileStore();
-    FileDescriptor fileDesc = cf.content() == FileContent.DATA ?
+
+    IcebergFileDescriptor iceFileDesc = cf.content() == FileContent.DATA ?
         fileStore.getDataFileDescriptor(pathHash) :
         fileStore.getDeleteFileDescriptor(pathHash);
+    if (iceFileDesc != null) {
+      return new Pair<>(iceFileDesc, true);
+    }
 
-    if (fileDesc == null) {
-      if (tblRef_.getTimeTravelSpec() == null) {
-        // We should always find the data files in the cache when not doing 
time travel.
-        throw new ImpalaRuntimeException("Cannot find file in cache: " + 
cf.path()
-            + " with snapshot id: " + getIceTable().snapshotId());
-      }
-      // We can still find the file descriptor among the old file descriptors.
-      fileDesc = fileStore.getOldFileDescriptor(pathHash);
-      if (fileDesc != null) {
-        return new Pair<>(fileDesc, true);
-      }
-      cachehit = false;
-      try {
-        fileDesc = FeIcebergTable.Utils.getFileDescriptor(cf,
-            getIceTable().getIcebergApiTable(),
-            
FeIcebergTable.Utils.requiresDataFilesInTableLocation(getIceTable()),
-            getIceTable().getHostIndex());
-      } catch (IOException ex) {
-        throw new ImpalaRuntimeException(
-            "Cannot load file descriptor for " + cf.path(), ex);
-      }
-      if (fileDesc == null) {
+    if (tblRef_.getTimeTravelSpec() == null) {
+      // We should always find the data files in the cache when not doing time 
travel.
+      throw new ImpalaRuntimeException("Cannot find file in cache: " + 
cf.path()
+          + " with snapshot id: " + getIceTable().snapshotId());
+    }
+    // We can still find the file descriptor among the old file descriptors.
+    iceFileDesc = fileStore.getOldFileDescriptor(pathHash);
+    if (iceFileDesc != null) {
+      return new Pair<>(iceFileDesc, true);
+    }
+
+    FileDescriptor hdfsFileDesc = null;
+    try {
+      hdfsFileDesc = FeIcebergTable.Utils.getHdfsFileDescriptor(cf,
+          getIceTable().getIcebergApiTable(),
+          FeIcebergTable.Utils.requiresDataFilesInTableLocation(getIceTable()),
+          getIceTable().getHostIndex());
+      if (hdfsFileDesc == null) {
         throw new ImpalaRuntimeException(
             "Cannot load file descriptor for: " + cf.path());
       }
-      // Add file descriptor to the cache.
-      fileDesc = fileDesc.cloneWithFileMetadata(
-          
IcebergUtil.createIcebergMetadata(getIceTable().getIcebergApiTable(), cf));
-      fileStore.addOldFileDescriptor(pathHash, fileDesc);
+    } catch (IOException ex) {
+      throw new ImpalaRuntimeException(
+          "Cannot load file descriptor for " + cf.path(), ex);
     }
-    return new Pair<>(fileDesc, cachehit);
+
+    Preconditions.checkNotNull(hdfsFileDesc);
+
+    // Add file descriptor to the cache.
+    iceFileDesc = IcebergFileDescriptor.cloneWithFileMetadata(
+        hdfsFileDesc,
+        IcebergUtil.createIcebergMetadata(getIceTable().getIcebergApiTable(), 
cf));
+    fileStore.addOldFileDescriptor(pathHash, iceFileDesc);
+
+    return new Pair<>(iceFileDesc, false);
   }
 
   /**
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java 
b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index 397f79d74..a645c1081 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -160,7 +160,7 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedFds(),
+        /* oldFds = */ fml1.getLoadedIcebergFds(),
         /* requiresDataFilesInTableLocation = */ true);
     fml1Refresh.load();
     assertFalse(fml1Refresh.useParallelListing());
@@ -183,7 +183,7 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_non_partitioned",
-        /* oldFds = */ fml2.getLoadedFds(),
+        /* oldFds = */ fml2.getLoadedIcebergFds(),
         /* requiresDataFilesInTableLocation = */ true);
     fml2Refresh.load();
     assertFalse(fml2Refresh.useParallelListing());
@@ -210,7 +210,7 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true);
     fml1Refresh.load();
     assertFalse(fml1Refresh.useParallelListing());
@@ -226,7 +226,7 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_non_partitioned",
-        /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+        /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true);
     fml2Refresh.load();
     assertFalse(fml2Refresh.useParallelListing());
@@ -246,7 +246,7 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml1ForceRefresh = 
getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true, 10);
     fml1ForceRefresh.setForceRefreshBlockLocations(true);
     fml1ForceRefresh.load();
@@ -254,7 +254,7 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true, 10);
     fml1Refresh.setForceRefreshBlockLocations(false);
     fml1Refresh.load();
@@ -262,13 +262,13 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml1Refresh10 = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true, 10);
     fml1Refresh10.load();
     assertFalse(fml1Refresh10.useParallelListing());
     IcebergFileMetadataLoader fml1Refresh9 = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true, 9);
     fml1Refresh9.load();
     assertTrue(fml1Refresh9.useParallelListing());
@@ -282,17 +282,17 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_non_partitioned",
-        /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+        /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true);
     IcebergFileMetadataLoader fml2Refresh10 = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_non_partitioned",
-        /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+        /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true, 10);
     fml2Refresh10.load();
     assertFalse(fml2Refresh10.useParallelListing());
     IcebergFileMetadataLoader fml2Refresh9 = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_non_partitioned",
-        /* oldFds = */ fml2.getLoadedFds().subList(0, 10),
+        /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true, 9);
     fml2Refresh9.load();
     assertTrue(fml2Refresh9.useParallelListing());
@@ -310,7 +310,7 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml1Refresh1 = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_multiple_storage_locations",
-        /* oldFds = */ fml1.getLoadedFds().subList(0, 1),
+        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 1),
         /* requiresDataFilesInTableLocation = */ false);
     fml1Refresh1.load();
     assertFalse(fml1Refresh1.useParallelListing());
@@ -320,7 +320,7 @@ public class FileMetadataLoaderTest {
 
     IcebergFileMetadataLoader fml1Refresh5 = getLoaderForIcebergTable(catalog,
         "functional_parquet", "iceberg_multiple_storage_locations",
-        /* oldFds = */ fml1.getLoadedFds().subList(0, 5),
+        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 5),
         /* requiresDataFilesInTableLocation = */ false);
     fml1Refresh5.load();
     assertFalse(fml1Refresh5.useParallelListing());
@@ -331,7 +331,7 @@ public class FileMetadataLoaderTest {
 
   private IcebergFileMetadataLoader getLoaderForIcebergTable(
       CatalogServiceCatalog catalog, String dbName, String tblName,
-      List<FileDescriptor> oldFds, boolean requiresDataFilesInTableLocation)
+      List<IcebergFileDescriptor> oldFds, boolean 
requiresDataFilesInTableLocation)
       throws CatalogException {
     return getLoaderForIcebergTable(catalog, dbName, tblName, oldFds,
         requiresDataFilesInTableLocation, -1);
@@ -339,7 +339,7 @@ public class FileMetadataLoaderTest {
 
   private IcebergFileMetadataLoader getLoaderForIcebergTable(
       CatalogServiceCatalog catalog, String dbName, String tblName,
-      List<FileDescriptor> oldFds, boolean requiresDataFilesInTableLocation,
+      List<IcebergFileDescriptor> oldFds, boolean 
requiresDataFilesInTableLocation,
       int newFilesThreshold)
       throws CatalogException {
     ListMap<TNetworkAddress> hostIndex = new ListMap<>();
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java 
b/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java
index 3eed0e7fb..e0a150aa7 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java
@@ -48,15 +48,6 @@ public class IcebergContentFileStoreTest {
   @After
   public void cleanUp() { catalog_.close(); }
 
-  @Test
-  public void testDecodeWithoutIcebergMetadata() {
-    expectedException.expect(NullPointerException.class);
-
-    FileDescriptor fileDesc = new FileDescriptor(createFbFileDesc(),null);
-
-    IcebergContentFileStore.decode(IcebergContentFileStore.encode(fileDesc));
-  }
-
   private FbFileDesc createFbFileDesc() {
     FlatBufferBuilder fbb = new FlatBufferBuilder(1);
     fbb.finish(
@@ -74,7 +65,7 @@ public class IcebergContentFileStoreTest {
         "iceberg_v2_partitioned_position_deletes_orc");
     assertTrue(iceTbl.getContentFileStore().getNumFiles() > 0);
 
-    for (FileDescriptor fileDesc : iceTbl.getContentFileStore().getAllFiles()) 
{
+    for (IcebergFileDescriptor fileDesc : 
iceTbl.getContentFileStore().getAllFiles()) {
       FileDescriptor serdeFileDesc =
           
IcebergContentFileStore.decode(IcebergContentFileStore.encode(fileDesc));
       assertTrue(fileDesc != serdeFileDesc);

Reply via email to