This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ce35e81bcaaa7dca3cccee028fea82e36dd941c0
Author: Gabor Kaszab <[email protected]>
AuthorDate: Tue Sep 24 14:50:24 2024 +0200

    IMPALA-11265: Part2: Store Iceberg file descriptors in encoded format
    
    The file descriptors in HdfsPartition are cached as byte arrays to keep
    the memory footprint low. They are transformed into actual
    FileDescriptor objects once queried.
    This patch changes IcebergContentFileStore to similarly use byte arrays
    as an internal representation for file descriptors. Note, file
    descriptors for Iceberg tables have 2 components: one is the same as in
    HdfsPartition and the other stores Iceberg specific file metadata in an
    additional byte array.
    
    Measurements and observations:
     - I have a test table that has 110k data files. For this table the JVM
       memory usage in the catalogd got reduced from 80MB to 65MB.
     - Both HdfsPartition.FileDescriptor and IcebergContentFileStore use
       flatbuffers and in turn byte arrays to represent file descriptors and
       these byte arrays are shared between these 2 places. As a result
       there is no redundancy in storing the file descriptors both for the
       Iceberg and the Hdfs table.
     - There is no measurable difference in planning times with this patch.
    
    Change-Id: I9d7794df999bdaf118158eace26cea610f911c0a
    Reviewed-on: http://gerrit.cloudera.org:8080/21869
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Gabor Kaszab <[email protected]>
---
 .../org/apache/impala/analysis/OptimizeStmt.java   |   4 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  74 --------
 .../org/apache/impala/catalog/HdfsPartition.java   |   6 +-
 .../impala/catalog/IcebergContentFileStore.java    | 206 +++++++++++++++++----
 .../org/apache/impala/catalog/IcebergTable.java    |   2 +-
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |   4 +-
 .../apache/impala/planner/IcebergScanPlanner.java  |   2 +
 .../catalog/IcebergContentFileStoreTest.java       | 124 +++++++++++++
 8 files changed, 311 insertions(+), 111 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
index 59f4f9b09..a011e8064 100644
--- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
@@ -236,8 +236,8 @@ public class OptimizeStmt extends DmlStatementBase {
       throws IOException, ImpalaRuntimeException {
     GroupedContentFiles selectedContentFiles = new GroupedContentFiles();
     selectedContentFiles.dataFilesWithoutDeletes = contentFiles;
-    IcebergContentFileStore selectedFiles = FeIcebergTable.Utils
-        .loadAllPartition(iceTable, selectedContentFiles);
+    IcebergContentFileStore selectedFiles =
+        new IcebergContentFileStore(iceTable, selectedContentFiles);
     return selectedFiles.getDataFilesWithoutDeletes();
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index af2f9efdd..6837a140d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -43,8 +43,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ContentFile;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
@@ -53,7 +51,6 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.transforms.Transform;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.impala.analysis.IcebergPartitionField;
@@ -68,7 +65,6 @@ import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaRuntimeException;
-import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.Reference;
 import org.apache.impala.fb.FbFileBlock;
@@ -761,76 +757,6 @@ public interface FeIcebergTable extends FeFsTable {
           numUnknownDiskIds, absPath);
     }
 
-    /**
-     * Returns the FileDescriptors loaded by the internal HdfsTable. To avoid 
returning
-     * the metadata files the resultset is limited to the files that are 
tracked by
-     * Iceberg. Both the HdfsBaseDir and the DataFile path can contain the 
scheme in their
-     * path, using org.apache.hadoop.fs.Path to normalize the paths.
-     */
-    public static IcebergContentFileStore loadAllPartition(
-        FeIcebergTable table, GroupedContentFiles icebergFiles)
-        throws IOException, ImpalaRuntimeException {
-      Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap = new 
HashMap<>();
-      Collection<? extends FeFsPartition> partitions =
-          FeCatalogUtils.loadAllPartitions(table.getFeFsTable());
-      for (FeFsPartition partition : partitions) {
-        for (FileDescriptor fileDesc : partition.getFileDescriptors()) {
-          Path path = new 
Path(fileDesc.getAbsolutePath(table.getHdfsBaseDir()));
-          hdfsFileDescMap.put(path.toUri().getPath(), fileDesc);
-        }
-      }
-      IcebergContentFileStore fileStore = new IcebergContentFileStore();
-      Pair<String, HdfsPartition.FileDescriptor> pathHashAndFd;
-      for (DataFile dataFile : icebergFiles.dataFilesWithoutDeletes) {
-        pathHashAndFd = getPathHashAndFd(dataFile, table, hdfsFileDescMap);
-        fileStore.addDataFileWithoutDeletes(pathHashAndFd.first, 
pathHashAndFd.second);
-      }
-      for (DataFile dataFile : icebergFiles.dataFilesWithDeletes) {
-        pathHashAndFd = getPathHashAndFd(dataFile, table, hdfsFileDescMap);
-        fileStore.addDataFileWithDeletes(pathHashAndFd.first, 
pathHashAndFd.second);
-      }
-      for (DeleteFile deleteFile : icebergFiles.positionDeleteFiles) {
-        pathHashAndFd = getPathHashAndFd(deleteFile, table, hdfsFileDescMap);
-        fileStore.addPositionDeleteFile(pathHashAndFd.first, 
pathHashAndFd.second);
-      }
-      for (DeleteFile deleteFile : icebergFiles.equalityDeleteFiles) {
-        pathHashAndFd = getPathHashAndFd(deleteFile, table, hdfsFileDescMap);
-        fileStore.addEqualityDeleteFile(pathHashAndFd.first, 
pathHashAndFd.second);
-      }
-      return fileStore;
-    }
-
-    private static Pair<String, HdfsPartition.FileDescriptor> getPathHashAndFd(
-        ContentFile<?> contentFile, FeIcebergTable table,
-        Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap) throws 
IOException {
-      String pathHash = IcebergUtil.getFilePathHash(contentFile);
-      HdfsPartition.FileDescriptor fd = getOrCreateIcebergFd(
-          table, hdfsFileDescMap, contentFile);
-      return new Pair<>(pathHash, fd);
-    }
-
-    private static FileDescriptor getOrCreateIcebergFd(FeIcebergTable table,
-        Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap,
-        ContentFile<?> contentFile) throws IllegalArgumentException, 
IOException {
-      Path path = new Path(contentFile.path().toString());
-      HdfsPartition.FileDescriptor iceFd = null;
-      if (hdfsFileDescMap.containsKey(path.toUri().getPath())) {
-        HdfsPartition.FileDescriptor fsFd = hdfsFileDescMap.get(
-            path.toUri().getPath());
-        iceFd = fsFd.cloneWithFileMetadata(
-            IcebergUtil.createIcebergMetadata(table, contentFile));
-      } else {
-        if (Utils.requiresDataFilesInTableLocation(table)) {
-          LOG.warn("Iceberg file '{}' cannot be found in the HDFS recursive"
-           + "file listing results.", path);
-        }
-        HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(contentFile, 
table);
-        iceFd = fileDesc.cloneWithFileMetadata(
-            IcebergUtil.createIcebergMetadata(table, contentFile));
-      }
-      return iceFd;
-    }
-
     /**
      * Return the PartitionContent loaded by the internal HdfsTable. To avoid 
returning
      * the metadata files the result set is limited to the files that are 
tracked by
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index e194a05e7..dd454e55d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -123,7 +123,7 @@ public class HdfsPartition extends CatalogObjectImpl
       fbFileMetadata_ = null;
     }
 
-    private FileDescriptor(FbFileDesc fileDescData, FbFileMetadata 
fileMetadata) {
+    public FileDescriptor(FbFileDesc fileDescData, FbFileMetadata 
fileMetadata) {
       fbFileDescriptor_ = fileDescData;
       fbFileMetadata_ = fileMetadata;
     }
@@ -308,6 +308,10 @@ public class HdfsPartition extends CatalogObjectImpl
       return fbFileDescriptor_.fileBlocks(idx);
     }
 
+    public FbFileDesc getFbFileDescriptor() {
+      return fbFileDescriptor_;
+    }
+
     public FbFileMetadata getFbFileMetadata() {
       return fbFileMetadata_;
     }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
index ede9fec25..57ee5f537 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
@@ -18,8 +18,12 @@
 package org.apache.impala.catalog;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,28 +31,102 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.curator.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
+import org.apache.impala.common.Pair;
+import org.apache.impala.fb.FbFileDesc;
+import org.apache.impala.fb.FbFileMetadata;
 import org.apache.impala.fb.FbIcebergDataFileFormat;
 import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.TIcebergContentFileStore;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.ListMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Helper class for storing Iceberg file descriptors. It stores data and 
delete files
  * separately, while also storing file descriptors belonging to earlier 
snapshots.
  */
 public class IcebergContentFileStore {
+  final static Logger LOG = 
LoggerFactory.getLogger(IcebergContentFileStore.class);
+
+  private static class EncodedFileDescriptor {
+    public final byte[] fileDesc_;
+    public final byte[] fileMetadata_;
+
+    public EncodedFileDescriptor(byte[] fDesc, byte[] fMeta) {
+      this.fileDesc_ = fDesc;
+      this.fileMetadata_ = fMeta;
+    }
+  }
+
+  protected static FileDescriptor decode(EncodedFileDescriptor encodedFd) {
+    Preconditions.checkNotNull(encodedFd.fileMetadata_);
+
+    return new FileDescriptor(
+        FbFileDesc.getRootAsFbFileDesc(ByteBuffer.wrap(encodedFd.fileDesc_)),
+        FbFileMetadata.getRootAsFbFileMetadata(
+            ByteBuffer.wrap(encodedFd.fileMetadata_))) {
+      // Whenever getting the FileDescriptor for the same path hash it will be 
a
+      // different FileDescriptor object. As a result the default equals() and
+      // hashCode() functions can't be used to judge equality between these
+      // FileDescriptors.
+      @Override
+      public boolean equals(Object obj) {
+        if (obj == null) return false;
+        if (!(obj instanceof FileDescriptor)) return false;
+        FileDescriptor otherFD = (FileDescriptor) obj;
+        if ((this.getFbFileMetadata() == null && otherFD.getFbFileMetadata() 
!= null) ||
+            (this.getFbFileMetadata() != null && otherFD.getFbFileMetadata() 
== null)) {
+          return false;
+        }
+        return this.getFbFileDescriptor().getByteBuffer().array() ==
+            otherFD.getFbFileDescriptor().getByteBuffer().array() &&
+            this.getFbFileMetadata().getByteBuffer().array() ==
+                otherFD.getFbFileMetadata().getByteBuffer().array();
+      }
+
+      @Override
+      public int hashCode() {
+        return getAbsolutePath().hashCode();
+      }
+    };
+  }
+
+  // Function to convert from a FileDescriptor to an EncodedFileDescriptor.
+  protected static EncodedFileDescriptor encode(FileDescriptor fd) {
+    return new EncodedFileDescriptor(
+        encodeFB(fd.getFbFileDescriptor()),
+        encodeFB(fd.getFbFileMetadata()));
+  }
+
+  private static byte[] encodeFB(com.google.flatbuffers.Table fbObject) {
+    if (fbObject == null) return null;
+
+    ByteBuffer bb = fbObject.getByteBuffer();
+    byte[] arr = bb.array();
+    Preconditions.checkState(bb.arrayOffset() == 0 && bb.remaining() == 
arr.length);
+
+    return arr;
+  }
 
   // Auxiliary class for holding file descriptors in both a map and a list.
   private static class MapListContainer {
     // Key is the ContentFile path hash, value is FileDescriptor transformed 
from DataFile
-    private final Map<String, FileDescriptor> fileDescMap_ = new HashMap<>();
-    private final List<FileDescriptor> fileDescList_ = new ArrayList<>();
+    private final Map<String, EncodedFileDescriptor> fileDescMap_ = new 
HashMap<>();
+    private final List<EncodedFileDescriptor> fileDescList_ = new 
ArrayList<>();
 
     // Adds a file to the map. If this is a new entry, then add it to the list 
as well.
     // Return true if 'desc' was a new entry.
-    public boolean add(String pathHash, FileDescriptor desc) {
+    public boolean add(String pathHash, EncodedFileDescriptor desc) {
       if (fileDescMap_.put(pathHash, desc) == null) {
         fileDescList_.add(desc);
         return true;
@@ -57,21 +135,25 @@ public class IcebergContentFileStore {
     }
 
     public FileDescriptor get(String pathHash) {
-      return fileDescMap_.get(pathHash);
+      if (!fileDescMap_.containsKey(pathHash)) return null;
+      return decode(fileDescMap_.get(pathHash));
     }
 
     public long getNumFiles() {
       return fileDescList_.size();
     }
 
-    List<FileDescriptor> getList() { return fileDescList_; }
+    List<FileDescriptor> getList() {
+      return Lists.transform(fileDescList_, fd -> 
IcebergContentFileStore.decode(fd));
+    }
 
     // It's enough to only convert the map part to thrift.
     Map<String, THdfsFileDesc> toThrift() {
       Map<String, THdfsFileDesc> ret = new HashMap<>();
-      for (Map.Entry<String, HdfsPartition.FileDescriptor> entry :
-          fileDescMap_.entrySet()) {
-        ret.put(entry.getKey(), entry.getValue().toThrift());
+      for (Map.Entry<String, EncodedFileDescriptor> entry : 
fileDescMap_.entrySet()) {
+        ret.put(
+            entry.getKey(),
+            decode(entry.getValue()).toThrift());
       }
       return ret;
     }
@@ -86,7 +168,7 @@ public class IcebergContentFileStore {
           Preconditions.checkNotNull(hostIndex);
           fd = fd.cloneWithNewHostIndex(networkAddresses, hostIndex);
         }
-        ret.add(entry.getKey(), fd);
+        ret.add(entry.getKey(), encode(fd));
       }
       return ret;
     }
@@ -99,7 +181,7 @@ public class IcebergContentFileStore {
   private MapListContainer equalityDeleteFiles_ = new MapListContainer();
 
   // Caches file descriptors loaded during time-travel queries.
-  private final ConcurrentMap<String, FileDescriptor> oldFileDescMap_ =
+  private final ConcurrentMap<String, EncodedFileDescriptor> oldFileDescMap_ =
       new ConcurrentHashMap<>();
 
   // Flags to indicate file formats used in the table.
@@ -109,34 +191,47 @@ public class IcebergContentFileStore {
 
   public IcebergContentFileStore() {}
 
-  public void addDataFileWithoutDeletes(String pathHash, FileDescriptor desc) {
-    if (dataFilesWithoutDeletes_.add(pathHash, desc)) {
-      updateFileFormats(desc);
+  public IcebergContentFileStore(
+      FeIcebergTable icebergTable, GroupedContentFiles icebergFiles) {
+    Preconditions.checkNotNull(icebergTable);
+    Preconditions.checkNotNull(icebergFiles);
+
+    Map<String, FileDescriptor> hdfsFileDescMap = new HashMap<>();
+    Collection<? extends FeFsPartition> partitions =
+        FeCatalogUtils.loadAllPartitions(icebergTable.getFeFsTable());
+    for (FeFsPartition partition : partitions) {
+      for (FileDescriptor fileDesc : partition.getFileDescriptors()) {
+        Path path = new 
Path(fileDesc.getAbsolutePath(icebergTable.getHdfsBaseDir()));
+        hdfsFileDescMap.put(path.toUri().getPath(), fileDesc);
+      }
     }
-  }
 
-  public void addDataFileWithDeletes(String pathHash, FileDescriptor desc) {
-    if (dataFilesWithDeletes_.add(pathHash, desc)) {
-      updateFileFormats(desc);
+    for (DataFile dataFile : icebergFiles.dataFilesWithoutDeletes) {
+      Pair<String, EncodedFileDescriptor> pathHashAndFd =
+          getPathHashAndFd(icebergTable, dataFile, hdfsFileDescMap);
+      dataFilesWithoutDeletes_.add(pathHashAndFd.first, pathHashAndFd.second);
     }
-  }
-
-  public void addPositionDeleteFile(String pathHash, FileDescriptor desc) {
-    if (positionDeleteFiles_.add(pathHash, desc)) {
-      updateFileFormats(desc);
+    for (DataFile dataFile : icebergFiles.dataFilesWithDeletes) {
+      Pair<String, EncodedFileDescriptor> pathHashAndFd =
+          getPathHashAndFd(icebergTable, dataFile, hdfsFileDescMap);
+      dataFilesWithDeletes_.add(pathHashAndFd.first, pathHashAndFd.second);
+    }
+    for (DeleteFile deleteFile : icebergFiles.positionDeleteFiles) {
+      Pair<String, EncodedFileDescriptor> pathHashAndFd =
+          getPathHashAndFd(icebergTable, deleteFile, hdfsFileDescMap);
+      positionDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second);
+    }
+    for (DeleteFile deleteFile : icebergFiles.equalityDeleteFiles) {
+      Pair<String, EncodedFileDescriptor> pathHashAndFd =
+          getPathHashAndFd(icebergTable, deleteFile, hdfsFileDescMap);
+      equalityDeleteFiles_.add(pathHashAndFd.first, pathHashAndFd.second);
     }
-  }
-
-  public void addEqualityDeleteFile(String pathHash, FileDescriptor desc) {
-    Preconditions.checkState(
-        desc.getFbFileMetadata().icebergMetadata().equalityFieldIdsLength() > 
0);
-    if (equalityDeleteFiles_.add(pathHash, desc)) updateFileFormats(desc);
   }
 
   // This is only invoked during time travel, when we are querying a snapshot 
that has
   // data files which have been removed since.
   public void addOldFileDescriptor(String pathHash, FileDescriptor desc) {
-    oldFileDescMap_.put(pathHash, desc);
+    oldFileDescMap_.put(pathHash, encode(desc));
   }
 
   public FileDescriptor getDataFileDescriptor(String pathHash) {
@@ -152,7 +247,8 @@ public class IcebergContentFileStore {
   }
 
   public FileDescriptor getOldFileDescriptor(String pathHash) {
-    return oldFileDescMap_.get(pathHash);
+    if (!oldFileDescMap_.containsKey(pathHash)) return null;
+    return decode(oldFileDescMap_.get(pathHash));
   }
 
   public List<FileDescriptor> getDataFilesWithoutDeletes() {
@@ -196,8 +292,11 @@ public class IcebergContentFileStore {
   public boolean hasOrc() { return hasOrc_; }
   public boolean hasParquet() { return hasParquet_; }
 
-  private void updateFileFormats(FileDescriptor desc) {
-    byte fileFormat = desc.getFbFileMetadata().icebergMetadata().fileFormat();
+  private void updateFileFormats(FbFileMetadata icebergMetadata) {
+    Preconditions.checkNotNull(icebergMetadata);
+    Preconditions.checkNotNull(icebergMetadata.icebergMetadata());
+
+    byte fileFormat = icebergMetadata.icebergMetadata().fileFormat();
     if (fileFormat == FbIcebergDataFileFormat.PARQUET) {
       hasParquet_ = true;
     } else if (fileFormat == FbIcebergDataFileFormat.ORC) {
@@ -207,6 +306,46 @@ public class IcebergContentFileStore {
     }
   }
 
+  private Pair<String, EncodedFileDescriptor> getPathHashAndFd(
+      FeIcebergTable icebergTable,
+      ContentFile<?> contentFile,
+      Map<String, FileDescriptor> hdfsFileDescMap) {
+    return new Pair<>(
+        IcebergUtil.getFilePathHash(contentFile),
+        getOrCreateIcebergFd(icebergTable, hdfsFileDescMap, contentFile));
+  }
+
+  private EncodedFileDescriptor getOrCreateIcebergFd(
+      FeIcebergTable icebergTable,
+      Map<String, FileDescriptor> hdfsFileDescMap,
+      ContentFile<?> contentFile) {
+    Path path = new Path(contentFile.path().toString());
+    FileDescriptor fileDesc = null;
+    if (hdfsFileDescMap.containsKey(path.toUri().getPath())) {
+      fileDesc = hdfsFileDescMap.get(path.toUri().getPath());
+    } else {
+      if (FeIcebergTable.Utils.requiresDataFilesInTableLocation(icebergTable)) 
{
+        LOG.warn("Iceberg file '{}' cannot be found in the HDFS recursive"
+            + "file listing results.", path);
+      }
+      try {
+        fileDesc = FeIcebergTable.Utils.getFileDescriptor(contentFile, 
icebergTable);
+      } catch (IOException e) {
+        throw new IllegalArgumentException(e.getMessage());
+      }
+    }
+    Preconditions.checkNotNull(fileDesc);
+
+    FbFileMetadata icebergMetadata =
+        IcebergUtil.createIcebergMetadata(icebergTable, contentFile);
+
+    updateFileFormats(icebergMetadata);
+
+    return new EncodedFileDescriptor(
+        encodeFB(fileDesc.getFbFileDescriptor()),
+        encodeFB(icebergMetadata));
+  }
+
   public TIcebergContentFileStore toThrift() {
     TIcebergContentFileStore ret = new TIcebergContentFileStore();
     
ret.setPath_hash_to_data_file_without_deletes(dataFilesWithoutDeletes_.toThrift());
@@ -219,6 +358,9 @@ public class IcebergContentFileStore {
     return ret;
   }
 
+  // TODO IMPALA-11265: After converting to/from thrift the byte arrays 
representing the
+  // file descriptors won't be shared between the HdfsTable and 
IcebergContentFileStore.
+  // This redundancy causes unnecessary JVM heap memory usage on the 
coordinator.
   public static IcebergContentFileStore fromThrift(TIcebergContentFileStore 
tFileStore,
       List<TNetworkAddress> networkAddresses,
       ListMap<TNetworkAddress> hostIndex) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 768d3870b..8a7a8bac3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -439,7 +439,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
         hdfsTable_.setCanDataBeOutsideOfTableLocation(
             !Utils.requiresDataFilesInTableLocation(this));
         hdfsTable_.load(reuseMetadata, msClient, msTable_, reason, 
catalogTimeline);
-        fileStore_ = Utils.loadAllPartition(this, icebergFiles);
+        fileStore_ = new IcebergContentFileStore(this, icebergFiles);
         partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
         setIcebergTableStats();
         loadAllColumnStats(msClient, catalogTimeline);
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
index 5cb4d163f..2130af0e6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -87,6 +87,7 @@ public class IcebergCtasTarget extends CtasTargetTable 
implements FeIcebergTable
   private String icebergTableLocation_;
   private String icebergCatalogLocation_;
   private HdfsStorageDescriptor hdfsSd_;
+  private final IcebergContentFileStore icebergContentFiles_;
 
   public IcebergCtasTarget(FeDb db, org.apache.hadoop.hive.metastore.api.Table 
msTbl,
         List<ColumnDef> columnDefs, List<String> primaryKeyNames,
@@ -103,6 +104,7 @@ public class IcebergCtasTarget extends CtasTargetTable 
implements FeIcebergTable
     icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTbl);
     icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTbl);
     hdfsSd_ = HdfsStorageDescriptor.fromStorageDescriptor(name_, 
msTable_.getSd());
+    icebergContentFiles_ = new IcebergContentFileStore();
   }
 
   private void createIcebergSchema(List<ColumnDef> columnDefs,
@@ -185,7 +187,7 @@ public class IcebergCtasTarget extends CtasTargetTable 
implements FeIcebergTable
 
   @Override
   public IcebergContentFileStore getContentFileStore() {
-    return new IcebergContentFileStore();
+    return icebergContentFiles_;
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index b51832c66..3d2f76f0d 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -592,6 +592,8 @@ public class IcebergScanPlanner {
 
   private void addEqualityDeletesAndIds(FileDescriptor fd) {
     FbIcebergMetadata fileMetadata = fd.getFbFileMetadata().icebergMetadata();
+    Preconditions.checkState(fileMetadata.equalityFieldIdsLength() > 0,
+        "Equality delete file doesn't have equality field IDs: " + 
fd.toString());
     List<Integer> eqFieldIdList = new ArrayList<>();
     for (int i = 0; i < fileMetadata.equalityFieldIdsLength(); ++i) {
       eqFieldIdList.add(fileMetadata.equalityFieldIds(i));
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java 
b/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java
new file mode 100644
index 000000000..6d335c6bb
--- /dev/null
+++ 
b/fe/src/test/java/org/apache/impala/catalog/IcebergContentFileStoreTest.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.fb.FbFileDesc;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.Test;
+
+public class IcebergContentFileStoreTest {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private CatalogServiceCatalog catalog_;
+
+  @Before
+  public void init() {
+    catalog_ = CatalogServiceTestCatalog.create();
+  }
+
+  @After
+  public void cleanUp() { catalog_.close(); }
+
+  @Test
+  public void testDecodeWithoutIcebergMetadata() {
+    expectedException.expect(NullPointerException.class);
+
+    FileDescriptor fileDesc = new FileDescriptor(createFbFileDesc(),null);
+
+    IcebergContentFileStore.decode(IcebergContentFileStore.encode(fileDesc));
+  }
+
+  private FbFileDesc createFbFileDesc() {
+    FlatBufferBuilder fbb = new FlatBufferBuilder(1);
+    fbb.finish(
+        FbFileDesc.createFbFileDesc(fbb, 0, 10L, (byte) 0, 10000L, 0, false, 
0, false));
+    ByteBuffer bb = fbb.dataBuffer().slice();
+    ByteBuffer compressedBb = ByteBuffer.allocate(bb.capacity());
+    compressedBb.put(bb);
+    return FbFileDesc.getRootAsFbFileDesc((ByteBuffer) compressedBb.flip());
+  }
+
+  @Test
+  public void testEncodeDecode() throws Exception {
+    IcebergTable iceTbl = loadIcebergTable(
+        "functional_parquet",
+        "iceberg_v2_partitioned_position_deletes_orc");
+    assertTrue(iceTbl.getContentFileStore().getNumFiles() > 0);
+
+    for (FileDescriptor fileDesc : iceTbl.getContentFileStore().getAllFiles()) 
{
+      FileDescriptor serdeFileDesc =
+          
IcebergContentFileStore.decode(IcebergContentFileStore.encode(fileDesc));
+      assertTrue(fileDesc != serdeFileDesc);
+      assertEquals(serdeFileDesc, fileDesc);
+    }
+  }
+
+  // Verify that the underlying representation of the file descriptors are 
shared between
+  // the Iceberg table and the HdfsTable within the Iceberg table. In other 
words this
+  // checks that storing the file descriptors also in both places doesn't 
result in
+  // redundant JVM memory usage.
+  @Test
+  public void testFileDescriptorsAreShared() throws Exception {
+    IcebergTable iceTbl = loadIcebergTable(
+        "functional_parquet",
+        "iceberg_v2_partitioned_position_deletes");
+    assertTrue(iceTbl.getContentFileStore().getNumFiles() > 0);
+
+    Map<String, FileDescriptor> fileDescsByPath = new HashMap<>();
+    for (FileDescriptor fileDesc : iceTbl.getContentFileStore().getAllFiles()) 
{
+      fileDescsByPath.put(fileDesc.getRelativePath(), fileDesc);
+    }
+
+    assertTrue(iceTbl.getFeFsTable() instanceof HdfsTable);
+    HdfsTable hdfsTable = (HdfsTable) iceTbl.getFeFsTable();
+    for (PrunablePartition partition : hdfsTable.getPartitions()) {
+      assertTrue(partition instanceof HdfsPartition);
+      HdfsPartition hdfsPartition = (HdfsPartition) partition;
+      for (FileDescriptor hdfsFileDesc : hdfsPartition.getFileDescriptors()) {
+        FileDescriptor iceFileDesc = 
fileDescsByPath.get(hdfsFileDesc.getRelativePath());
+        assertNotNull(iceFileDesc);
+        assertNotSame(iceFileDesc, hdfsFileDesc);
+        assertEquals(iceFileDesc.getFbFileDescriptor().getByteBuffer().array(),
+            hdfsFileDesc.getFbFileDescriptor().getByteBuffer().array());
+      }
+    }
+  }
+
+  private IcebergTable loadIcebergTable(String dbName, String tblName) throws 
Exception {
+    Table tbl = catalog_.getOrLoadTable(dbName, tblName,"test",null);
+    assertTrue(tbl instanceof IcebergTable);
+    IcebergTable iceTbl = (IcebergTable) tbl;
+    assertNotNull(iceTbl.getContentFileStore());
+    return iceTbl;
+  }
+}

Reply via email to