This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 711797e7fbda6f30fc49d91e30ad6ab31a4f4a69
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Fri Aug 29 09:37:20 2025 +0200

    IMPALA-14349: Encode FileDescriptors in time in loading Iceberg Tables
    
    With this patch we create Iceberg file descriptors from
    LocatedFileStatus objects during IcebergFileMetadataLoader's
    parallelListing(). This has the following benefits:
     * We parallelize the creation of Iceberg file descriptor objects
     * We don't need to maintain a large hash map with all the
       LocatedFileStatus objects at once. Now we only need to keep a few
       LocatedFileStatus objects per partition in memory while we are
       converting them to Iceberg file descriptors. I.e., the GC is free to
       destroy the LocatedFileStatus objects we don't use anymore.
    
    This patch retires startup flag 'iceberg_reload_new_files_threshold'.
    Since IMPALA-13254 we only list partitions that have new data files,
    and we load them in parallel, i.e. efficient incremental table loading
    is already covered. From that point the startup flag only added
    unnecessary code complexity.
    
    Measurements
    
    I created two tables (from tpcds.store_sales) to measure table loading
    times for large tables:
    
    Table #1:
      PARTITIONED BY SPEC(ss_item_sk, BUCKET(5, ss_sold_time_sk))
      partitions: 107818
      files: 754726
    
    Table #2:
      PARTITIONED BY SPEC(ss_item_sk)
      partitions: 18000
      files: 504224
    
    Time taken in IcebergFileMetadataLoader.load() during full table reload:
    +----------+-------+------+---------+
    |          | Base  | New  | Speedup |
    +----------+-------+------+---------+
    | Table #1 | 17.3s | 8.1s |    2.14 |
    | Table #2 |  7.8s | 4.3s |     1.8 |
    +----------+-------+------+---------+
    
    I measured incremental table loading only for Table #2 (since there are
    more files per partition this is the worse scenario for the new code, as
    it only uses file listings, and each new file were created in a separate
    partition)
    
    Time taken in IcebergFileMetadataLoader.load() during incremental table
    reload:
    +------------+------+------+---------+
    | #new files | Base | New  | Speedup |
    +------------+------+------+---------+
    |          1 | 1.4s | 1.6s |     0.9 |
    |        100 | 1.5s | 1.9s |     0.8 |
    |        200 | 1.5s | 1.5s |       1 |
    +------------+------+------+---------+
    
    We lose a few tenths of a second, but I think the simplified code
    justifies it.
    
    Testing:
     * some tests were updated because we we don't have
       startup flag 'iceberg_reload_new_files_threshold' anymore
    
    Change-Id: Ia1c2a7119d76db7ce7c43caec2ccb122a014851b
    Reviewed-on: http://gerrit.cloudera.org:8080/23363
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/common/global-flags.cc                      |   6 +-
 be/src/util/backend-gflag-util.cc                  |   2 -
 common/thrift/BackendGflags.thrift                 |   2 -
 .../apache/impala/catalog/FileMetadataLoader.java  |   7 +-
 .../impala/catalog/IcebergFileMetadataLoader.java  | 234 ++++++++++-----------
 .../org/apache/impala/catalog/IcebergTable.java    |   6 +-
 .../impala/catalog/ParallelFileMetadataLoader.java |   2 +-
 .../org/apache/impala/service/BackendConfig.java   |   4 -
 .../impala/catalog/FileMetadataLoaderTest.java     |  80 +------
 9 files changed, 125 insertions(+), 218 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index e7751e0ab..ee2d5533a 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -416,11 +416,6 @@ DEFINE_int64(update_catalogd_rpc_resend_interval_ms, 100, 
"(Advanced) Interval (
     "with which the statestore resends the update catalogd RPC to a subscriber 
if the "
     "statestore has failed to send the RPC to the subscriber.");
 
-DEFINE_int32(iceberg_reload_new_files_threshold, 100, "(Advanced) If during a 
table "
-    "refresh the number of new files are greater than this, catalogd will use 
a "
-    "recursive file listing to load file metadata. If number of new files are 
less or "
-    "equal to this, catalogd will load the file metadata one by one.");
-
 DEFINE_bool(iceberg_allow_datafiles_in_table_location_only, true, "If true, 
Impala "
     "does not allow Iceberg data file locations outside of the table directory 
during "
     "reads");
@@ -500,6 +495,7 @@ REMOVED_FLAG(enable_partitioned_aggregation);
 REMOVED_FLAG(enable_partitioned_hash_join);
 REMOVED_FLAG(enable_phj_probe_side_filtering);
 REMOVED_FLAG(enable_rm);
+REMOVED_FLAG(iceberg_reload_new_files_threshold);
 REMOVED_FLAG(kerberos_reinit_interval);
 REMOVED_FLAG(ldap_manual_config);
 REMOVED_FLAG(llama_addresses);
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 90385f0f4..39dec440f 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -109,7 +109,6 @@ DECLARE_bool(enable_reload_events);
 DECLARE_string(geospatial_library);
 DECLARE_string(file_metadata_reload_properties);
 DECLARE_string(java_weigher);
-DECLARE_int32(iceberg_reload_new_files_threshold);
 DECLARE_bool(enable_skipping_older_events);
 DECLARE_bool(enable_json_scanner);
 DECLARE_bool(iceberg_allow_datafiles_in_table_location_only);
@@ -494,7 +493,6 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_thrift_rpc_max_message_size(ThriftInternalRpcMaxMessageSize());
   cfg.__set_scan_range_cost_factor(FLAGS_scan_range_cost_factor);
   cfg.__set_use_jamm_weigher(FLAGS_java_weigher == "jamm");
-  
cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold);
   cfg.__set_enable_skipping_older_events(FLAGS_enable_skipping_older_events);
   cfg.__set_enable_json_scanner(FLAGS_enable_json_scanner);
   cfg.__set_iceberg_allow_datafiles_in_table_location_only(
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 445a40dc2..b186009c8 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -260,8 +260,6 @@ struct TBackendGflags {
 
   114: required bool use_jamm_weigher
 
-  115: required i32 iceberg_reload_new_files_threshold
-
   116: required bool enable_skipping_older_events;
 
   117: required bool enable_json_scanner
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index fce49a9fe..749721e6a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -307,14 +307,13 @@ public class FileMetadataLoader {
   protected FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
       String relPath, Reference<Long> numUnknownDiskIds, String absPath)
       throws IOException {
-    if (!FileSystemUtil.supportsStorageIds(fs)) {
-      return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
-    }
     BlockLocation[] locations;
     if (fileStatus instanceof LocatedFileStatus) {
       locations = ((LocatedFileStatus) fileStatus).getBlockLocations();
-    } else {
+    } else if (FileSystemUtil.supportsStorageIds(fs)) {
       locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+    } else {
+      return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
     }
     return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_,
         fileStatus.isEncrypted(), fileStatus.isErasureCoded(), 
numUnknownDiskIds,
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
index fbfb95fa9..c51904ccf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -17,32 +17,30 @@
 
 package org.apache.impala.catalog;
 
+import static org.apache.impala.catalog.ParallelFileMetadataLoader.
+    MAX_HDFS_PARTITIONS_PARALLEL_LOAD;
 import static 
org.apache.impala.catalog.ParallelFileMetadataLoader.TOTAL_THREADS;
 import static org.apache.impala.catalog.ParallelFileMetadataLoader.createPool;
-import static org.apache.impala.catalog.ParallelFileMetadataLoader.getPoolSize;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.codahale.metrics.Clock;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.iceberg.ContentFile;
@@ -50,9 +48,9 @@ import org.apache.impala.catalog.FeFsTable.FileMetadataStats;
 import org.apache.impala.catalog.FeIcebergTable.Utils;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.Reference;
 import org.apache.impala.common.Pair;
-import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.ListMap;
@@ -68,43 +66,20 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
   private final static Logger LOG = LoggerFactory.getLogger(
       IcebergFileMetadataLoader.class);
 
-  // Default value of 'newFilesThreshold_' if the given parameter or startup 
flag have
-  // invalid value.
-  private final int NEW_FILES_THRESHOLD_DEFAULT = 100;
-
   private final org.apache.iceberg.Table iceTbl_;
-
-  // If there are more new files than 'newFilesThreshold_', we should fall back
-  // to regular file metadata loading.
-  private final int newFilesThreshold_;
-
+  private final Path tablePath_;
   private final GroupedContentFiles icebergFiles_;
   private final boolean requiresDataFilesInTableLocation_;
-  private boolean useParallelListing_;
 
   public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl,
       Iterable<IcebergFileDescriptor> oldFds, ListMap<TNetworkAddress> 
hostIndex,
       GroupedContentFiles icebergFiles, boolean 
requiresDataFilesInTableLocation) {
-    this(iceTbl, oldFds, hostIndex, icebergFiles, 
requiresDataFilesInTableLocation,
-        BackendConfig.INSTANCE.icebergReloadNewFilesThreshold());
-  }
-
-  public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl,
-      Iterable<IcebergFileDescriptor> oldFds, ListMap<TNetworkAddress> 
hostIndex,
-      GroupedContentFiles icebergFiles, boolean 
requiresDataFilesInTableLocation,
-      int newFilesThresholdParam) {
     super(iceTbl.location(), true, oldFds, hostIndex, null, null,
         HdfsFileFormat.ICEBERG);
     iceTbl_ = iceTbl;
+    tablePath_ = FileSystemUtil.createFullyQualifiedPath(new 
Path(iceTbl.location()));
     icebergFiles_ = icebergFiles;
     requiresDataFilesInTableLocation_ = requiresDataFilesInTableLocation;
-    if (newFilesThresholdParam >= 0) {
-      newFilesThreshold_ = newFilesThresholdParam;
-    } else {
-      newFilesThreshold_ = NEW_FILES_THRESHOLD_DEFAULT;
-      LOG.warn("Ignoring invalid new files threshold: {} " +
-          "using value: {}", newFilesThresholdParam, newFilesThreshold_);
-    }
   }
 
   @Override
@@ -127,18 +102,17 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
   }
 
   private void loadInternal() throws CatalogException, IOException {
-    Path partPath = FileSystemUtil.createFullyQualifiedPath(new 
Path(partDir_));
     loadedFds_ = new ArrayList<>();
     loadStats_ = new LoadStats(partDir_);
     fileMetadataStats_ = new FileMetadataStats();
 
     // Process the existing Fd ContentFile and return the newly added 
ContentFile
-    Iterable<ContentFile<?>> newContentFiles = 
loadContentFilesWithOldFds(partPath);
+    Iterable<ContentFile<?>> newContentFiles = 
loadContentFilesWithOldFds(tablePath_);
     // Iterate through all the newContentFiles, determine if StorageIds are 
supported,
     // and use different handling methods accordingly.
     // This considers that different ContentFiles are on different FileSystems
-    List<Pair<FileSystem, ContentFile<?>>> filesSupportsStorageIds = 
Lists.newArrayList();
-    FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(partPath);
+    List<ContentFile<?>> filesSupportsStorageIds = Lists.newArrayList();
+    FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(tablePath_);
     FileSystem defaultFs = FileSystemUtil.getDefaultFileSystem();
     for (ContentFile<?> contentFile : newContentFiles) {
       FileSystem fsForPath = fsForTable;
@@ -152,56 +126,28 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
       // If the specific fs does not support StorageIds, then
       // we create FileDescriptor directly
       if (FileSystemUtil.supportsStorageIds(fsForPath)) {
-        filesSupportsStorageIds.add(Pair.create(fsForPath, contentFile));
+        filesSupportsStorageIds.add(contentFile);
       } else {
-        IcebergFileDescriptor fd = createFd(fsForPath, contentFile, null, 
partPath, null);
-        loadedFds_.add(fd);
-        fileMetadataStats_.accumulate(fd);
-        ++loadStats_.loadedFiles;
+        IcebergFileDescriptor fd = createNonLocatedFd(fsForPath, contentFile, 
tablePath_);
+        registerNewlyLoadedFd(fd);
       }
     }
-    // If the number of filesSupportsStorageIds are greater than 
newFilesThreshold,
-    // we will use a recursive file listing to load file metadata. If number 
of new
-    // files are less or equal to this, we will load the metadata of the newly 
added
-    // files one by one
-    useParallelListing_ = filesSupportsStorageIds.size() > newFilesThreshold_;
-    Reference<Long> numUnknownDiskIds = new Reference<>(0L);
-    Map<Path, FileStatus> nameToFileStatus = Collections.emptyMap();
-    if (useParallelListing_) {
-      nameToFileStatus = parallelListing(filesSupportsStorageIds);
-    }
-    for (Pair<FileSystem, ContentFile<?>> contentFileInfo : 
filesSupportsStorageIds) {
-      Path path = FileSystemUtil.createFullyQualifiedPath(
-          new Path(contentFileInfo.getSecond().path().toString()));
-      FileStatus stat = nameToFileStatus.get(path);
-      loadFdFromStorage(contentFileInfo, stat, partPath, numUnknownDiskIds);
+    AtomicLong numUnknownDiskIds = new AtomicLong();
+    List<IcebergFileDescriptor> newFds = 
parallelListing(filesSupportsStorageIds,
+        numUnknownDiskIds);
+    for (IcebergFileDescriptor fd : newFds) {
+      registerNewlyLoadedFd(fd);
     }
-    loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
+    loadStats_.unknownDiskIds += numUnknownDiskIds.get();
     if (LOG.isTraceEnabled()) {
       LOG.trace(loadStats_.debugString());
     }
   }
 
-  private void loadFdFromStorage(Pair<FileSystem, ContentFile<?>> 
contentFileInfo,
-      FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds)
-      throws CatalogException {
-    try {
-      IcebergFileDescriptor fd = createFd(contentFileInfo.getFirst(),
-          contentFileInfo.getSecond(), stat, partPath, numUnknownDiskIds);
-      loadedFds_.add(fd);
-      ++loadStats_.loadedFiles;
-      fileMetadataStats_.accumulate(fd);
-    } catch (IOException e) {
-      StringWriter w = new StringWriter();
-      e.printStackTrace(new PrintWriter(w));
-      LOG.warn(String.format("Failed to load Iceberg content file: '%s' Caused 
by: %s",
-          contentFileInfo.getSecond().path().toString(), w));
-    }
-  }
-
-  @VisibleForTesting
-  boolean useParallelListing() {
-    return useParallelListing_;
+  private void registerNewlyLoadedFd(IcebergFileDescriptor fd) {
+    loadedFds_.add(fd);
+    fileMetadataStats_.accumulate(fd);
+    ++loadStats_.loadedFiles;
   }
 
   /**
@@ -227,20 +173,38 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
     return newContentFiles;
   }
 
-  private IcebergFileDescriptor createFd(FileSystem fs, ContentFile<?> 
contentFile,
+  private IcebergFileDescriptor createNonLocatedFd(FileSystem fs,
+      ContentFile<?> contentFile, Path partPath) throws CatalogException, 
IOException {
+    Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
+        new Path(contentFile.path().toString()));
+    // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus 
ourselves.
+    FileStatus stat = Utils.createFileStatus(contentFile, fileLoc);
+
+    Pair<String, String> absPathRelPath = getAbsPathRelPath(partPath, stat);
+    String absPath = absPathRelPath.first;
+    String relPath = absPathRelPath.second;
+
+    return IcebergFileDescriptor.cloneWithFileMetadata(
+        createFd(fs, stat, relPath, null, absPath),
+        IcebergUtil.createIcebergMetadata(iceTbl_, contentFile));
+  }
+
+  private IcebergFileDescriptor createLocatedFd(ContentFile<?> contentFile,
       FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds)
       throws CatalogException, IOException {
-    if (stat == null) {
-      Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
-          new Path(contentFile.path().toString()));
-      if (FileSystemUtil.supportsStorageIds(fs)) {
-        stat = Utils.createLocatedFileStatus(fileLoc, fs);
-      } else {
-        // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus 
ourselves.
-        stat = Utils.createFileStatus(contentFile, fileLoc);
-      }
-    }
+    Preconditions.checkState(stat instanceof LocatedFileStatus);
+
+    Pair<String, String> absPathRelPath = getAbsPathRelPath(partPath, stat);
+    String absPath = absPathRelPath.first;
+    String relPath = absPathRelPath.second;
 
+    return IcebergFileDescriptor.cloneWithFileMetadata(
+        createFd(null, stat, relPath, numUnknownDiskIds, absPath),
+        IcebergUtil.createIcebergMetadata(iceTbl_, contentFile));
+  }
+
+  Pair<String, String> getAbsPathRelPath(Path partPath, FileStatus stat)
+      throws TableLoadingException {
     String absPath = null;
     String relPath = FileSystemUtil.relativizePathNoThrow(stat.getPath(), 
partPath);
     if (relPath == null) {
@@ -251,70 +215,100 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
         absPath = stat.getPath().toString();
       }
     }
-
-    return IcebergFileDescriptor.cloneWithFileMetadata(
-        createFd(fs, stat, relPath, numUnknownDiskIds, absPath),
-        IcebergUtil.createIcebergMetadata(iceTbl_, contentFile));
+    return new Pair<>(absPath, relPath);
   }
 
   /**
    * Using a thread pool to perform parallel List operations on the 
FileSystem, this takes
    * into account the situation where multiple FileSystems exist within the 
ContentFiles.
    */
-  private Map<Path, FileStatus> parallelListing(
-      Iterable<Pair<FileSystem, ContentFile<?>>> contentFiles) throws 
IOException {
-    final Set<Path> partitionPaths = collectPartitionPaths(contentFiles);
-    if (partitionPaths.size() == 0) return Collections.emptyMap();
+  private List<IcebergFileDescriptor> parallelListing(
+      List<ContentFile<?>> contentFiles,
+      AtomicLong numUnknownDiskIds) throws IOException {
+    final Map<Path, List<ContentFile<?>>> partitionPaths =
+        collectPartitionPaths(contentFiles);
+    if (partitionPaths.isEmpty()) return Collections.emptyList();
+    List<IcebergFileDescriptor> ret = new ArrayList<>();
     String logPrefix = "Parallel Iceberg file metadata listing";
-    // Use the file system type of the table's root path as
-    // the basis for determining the pool size.
-    int poolSize = getPoolSize(partitionPaths.size(),
-        FileSystemUtil.getFileSystemForPath(partDir_));
+    int poolSize = getPoolSize(partitionPaths.size());
     ExecutorService pool = createPool(poolSize, logPrefix);
     TOTAL_THREADS.addAndGet(poolSize);
-    Map<Path, FileStatus> nameToFileStatus = Maps.newConcurrentMap();
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix)) {
       TOTAL_TASKS.addAndGet(partitionPaths.size());
-      List<Future<Void>> tasks =
-          partitionPaths.stream()
-              .map(path -> pool.submit(() -> {
+      List<Future<List<IcebergFileDescriptor>>> tasks =
+          partitionPaths.entrySet().stream()
+              .map(entry -> pool.submit(() -> {
                 try {
-                  return listingTask(path, nameToFileStatus);
+                  return createFdsForPartition(entry.getKey(), 
entry.getValue(),
+                      numUnknownDiskIds);
                 } finally {
                   TOTAL_TASKS.decrementAndGet();
                 }
               }))
               .collect(Collectors.toList());
-      for (Future<Void> task : tasks) { task.get(); }
+      for (Future<List<IcebergFileDescriptor>> task : tasks) {
+        ret.addAll(task.get());
+      }
     } catch (ExecutionException | InterruptedException e) {
       throw new IOException(String.format("%s: failed to load paths.", 
logPrefix), e);
     } finally {
       TOTAL_THREADS.addAndGet(-poolSize);
       pool.shutdown();
     }
-    return nameToFileStatus;
+    return ret;
   }
 
-  private Set<Path> collectPartitionPaths(
-      Iterable<Pair<FileSystem, ContentFile<?>>> contentFiles) {
-    return StreamSupport.stream(contentFiles.spliterator(), false)
-        .map(contentFile ->
-            new 
Path(String.valueOf(contentFile.getSecond().path())).getParent())
-        .collect(Collectors.toSet());
+  private Map<Path, List<ContentFile<?>>> collectPartitionPaths(
+      List<ContentFile<?>> contentFiles) {
+    final Clock clock = Clock.defaultClock();
+    long startTime = clock.getTick();
+    Map<Path, List<ContentFile<?>>> ret = contentFiles.stream()
+        .collect(Collectors.groupingBy(
+            cf -> new Path(String.valueOf(cf.path())).getParent(),
+            HashMap::new,
+            Collectors.toList()
+        ));
+    long duration = clock.getTick() - startTime;
+    LOG.info("Collected {} Iceberg content files into {} partitions. Duration: 
{}",
+        contentFiles.size(), ret.size(), PrintUtils.printTimeNs(duration));
+    return ret;
   }
 
-  private Void listingTask(Path partitionPath,
-      Map<Path, FileStatus> nameToFileStatus) throws IOException {
+  /**
+   * Returns thread pool size for listing files in parallel from storage 
systems that
+   * provide block location information.
+   */
+  private static int getPoolSize(int numLoaders) {
+    return Math.min(numLoaders, MAX_HDFS_PARTITIONS_PARALLEL_LOAD);
+  }
+
+  private List<IcebergFileDescriptor> createFdsForPartition(Path partitionPath,
+      List<ContentFile<?>> contentFiles, AtomicLong numUnknownDiskIds)
+      throws IOException, CatalogException {
     FileSystem fs = FileSystemUtil.getFileSystemForPath(partitionPath);
     RemoteIterator<? extends FileStatus> remoteIterator =
         FileSystemUtil.listFiles(fs, partitionPath, recursive_, debugAction_);
-    Map<Path, FileStatus> perThreadMapping = new HashMap<>();
+    Map<Path, FileStatus> pathToFileStatus = new HashMap<>();
     while (remoteIterator.hasNext()) {
       FileStatus status = remoteIterator.next();
-      perThreadMapping.put(status.getPath(), status);
+      pathToFileStatus.put(status.getPath(), status);
+    }
+    List<IcebergFileDescriptor> ret = new ArrayList<>();
+    Reference<Long> localNumUnknownDiskIds = new Reference<>(0L);
+    for (ContentFile<?> contentFile : contentFiles) {
+      Path path = FileSystemUtil.createFullyQualifiedPath(
+          new Path(contentFile.path().toString()));
+      FileStatus stat = pathToFileStatus.get(path);
+      if (stat == null) {
+        LOG.warn(String.format(
+            "Failed to load Iceberg content file: '%s', Not found on storage",
+            contentFile.path().toString()));
+        continue;
+      }
+      ret.add(createLocatedFd(contentFile, stat, tablePath_, 
localNumUnknownDiskIds));
     }
-    nameToFileStatus.putAll(perThreadMapping);
-    return null;
+    numUnknownDiskIds.addAndGet(localNumUnknownDiskIds.getRef());
+    return ret;
   }
 
   IcebergFileDescriptor getOldFd(ContentFile<?> contentFile, Path partPath)
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 6a71f007b..a55068306 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -47,6 +47,7 @@ import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.IcebergPartitionTransform;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
@@ -513,7 +514,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
           
((BaseTable)icebergApiTable_).operations().current().metadataFileLocation();
       GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this,
           new ArrayList<>(), /*timeTravelSpec=*/null);
-      catalogTimeline.markEvent("Loaded Iceberg files");
+      catalogTimeline.markEvent("Loaded Iceberg content file list");
       // We use IcebergFileMetadataLoader directly to load file metadata, so 
we don't
       // want 'hdfsTable_' to do any file loading.
       hdfsTable_.setSkipIcebergFileMetadataLoading(true);
@@ -527,6 +528,7 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
           getHostIndex(), Preconditions.checkNotNull(icebergFiles),
           Utils.requiresDataFilesInTableLocation(this));
       loader.load();
+      catalogTimeline.markEvent("Loaded Iceberg file descriptors");
       fileStore_ = new IcebergContentFileStore(
           icebergApiTable_, loader.getLoadedIcebergFds(), icebergFiles);
       partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
@@ -539,6 +541,8 @@ public class IcebergTable extends Table implements 
FeIcebergTable {
     } finally {
       storageMetadataLoadTime_ = ctxStorageLdTime.stop();
     }
+    LOG.info("Loaded file and block metadata for {}. Time taken: {}",
+        getFullName(), PrintUtils.printTimeNs(storageMetadataLoadTime_));
   }
 
   private boolean canSkipReload() {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 39d42380a..1d5a0c6b6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -62,7 +62,7 @@ public class ParallelFileMetadataLoader {
   private final static Logger LOG = LoggerFactory.getLogger(
       ParallelFileMetadataLoader.class);
 
-  private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
+  public static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
       BackendConfig.INSTANCE.maxHdfsPartsParallelLoad();
   private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD =
       BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad();
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index a2b44df2b..a53e8b0cd 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -432,10 +432,6 @@ public class BackendConfig {
     return backendCfg_.use_jamm_weigher;
   }
 
-  public int icebergReloadNewFilesThreshold() {
-    return backendCfg_.iceberg_reload_new_files_threshold;
-  }
-
   public boolean icebergAllowDatafileInTableLocationOnly() {
     return backendCfg_.iceberg_allow_datafiles_in_table_location_only;
   }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java 
b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index 75188e1e5..27d095d26 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -212,7 +212,6 @@ public class FileMetadataLoaderTest {
         /* oldFds = */ fml1.getLoadedIcebergFds(),
         /* requiresDataFilesInTableLocation = */ true);
     fml1Refresh.load();
-    assertFalse(fml1Refresh.useParallelListing());
     assertEquals(0, fml1Refresh.getStats().loadedFiles);
     assertEquals(20, fml1Refresh.getStats().skippedFiles);
     assertEquals(20, fml1Refresh.getLoadedFds().size());
@@ -235,7 +234,6 @@ public class FileMetadataLoaderTest {
         /* oldFds = */ fml2.getLoadedIcebergFds(),
         /* requiresDataFilesInTableLocation = */ true);
     fml2Refresh.load();
-    assertFalse(fml2Refresh.useParallelListing());
     assertEquals(0, fml2Refresh.getStats().loadedFiles);
     assertEquals(20, fml2Refresh.getStats().skippedFiles);
     assertEquals(20, fml2Refresh.getLoadedFds().size());
@@ -262,7 +260,6 @@ public class FileMetadataLoaderTest {
         /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true);
     fml1Refresh.load();
-    assertFalse(fml1Refresh.useParallelListing());
     assertEquals(10, fml1Refresh.getStats().loadedFiles);
     assertEquals(10, fml1Refresh.getStats().skippedFiles);
     assertEquals(20, fml1Refresh.getLoadedFds().size());
@@ -278,75 +275,11 @@ public class FileMetadataLoaderTest {
         /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
         /* requiresDataFilesInTableLocation = */ true);
     fml2Refresh.load();
-    assertFalse(fml2Refresh.useParallelListing());
     assertEquals(10, fml2Refresh.getStats().loadedFiles);
     assertEquals(10, fml2Refresh.getStats().skippedFiles);
     assertEquals(20, fml2Refresh.getLoadedFds().size());
   }
 
-  @Test
-  public void testIcebergNewFilesThreshold() throws IOException, 
CatalogException {
-    CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
-    IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
-        "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ Collections.emptyList(),
-        /* requiresDataFilesInTableLocation = */ true);
-    fml1.load();
-
-    IcebergFileMetadataLoader fml1ForceRefresh = 
getLoaderForIcebergTable(catalog,
-        "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
-        /* requiresDataFilesInTableLocation = */ true, 10);
-    fml1ForceRefresh.setForceRefreshBlockLocations(true);
-    fml1ForceRefresh.load();
-    assertTrue(fml1ForceRefresh.useParallelListing());
-
-    IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
-        "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
-        /* requiresDataFilesInTableLocation = */ true, 10);
-    fml1Refresh.setForceRefreshBlockLocations(false);
-    fml1Refresh.load();
-    assertFalse(fml1Refresh.useParallelListing());
-
-    IcebergFileMetadataLoader fml1Refresh10 = getLoaderForIcebergTable(catalog,
-        "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
-        /* requiresDataFilesInTableLocation = */ true, 10);
-    fml1Refresh10.load();
-    assertFalse(fml1Refresh10.useParallelListing());
-    IcebergFileMetadataLoader fml1Refresh9 = getLoaderForIcebergTable(catalog,
-        "functional_parquet", "iceberg_partitioned",
-        /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
-        /* requiresDataFilesInTableLocation = */ true, 9);
-    fml1Refresh9.load();
-    assertTrue(fml1Refresh9.useParallelListing());
-
-
-    IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
-        "functional_parquet", "iceberg_non_partitioned",
-        /* oldFds = */ Collections.emptyList(),
-        /* requiresDataFilesInTableLocation = */ true);
-    fml2.load();
-
-    IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
-        "functional_parquet", "iceberg_non_partitioned",
-        /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
-        /* requiresDataFilesInTableLocation = */ true);
-    IcebergFileMetadataLoader fml2Refresh10 = getLoaderForIcebergTable(catalog,
-        "functional_parquet", "iceberg_non_partitioned",
-        /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
-        /* requiresDataFilesInTableLocation = */ true, 10);
-    fml2Refresh10.load();
-    assertFalse(fml2Refresh10.useParallelListing());
-    IcebergFileMetadataLoader fml2Refresh9 = getLoaderForIcebergTable(catalog,
-        "functional_parquet", "iceberg_non_partitioned",
-        /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
-        /* requiresDataFilesInTableLocation = */ true, 9);
-    fml2Refresh9.load();
-    assertTrue(fml2Refresh9.useParallelListing());
-  }
-
   @Test
   public void testIcebergMultipleStorageLocations() throws IOException, 
CatalogException {
     CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
@@ -362,7 +295,6 @@ public class FileMetadataLoaderTest {
         /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 1),
         /* requiresDataFilesInTableLocation = */ false);
     fml1Refresh1.load();
-    assertFalse(fml1Refresh1.useParallelListing());
     assertEquals(5, fml1Refresh1.getStats().loadedFiles);
     assertEquals(1, fml1Refresh1.getStats().skippedFiles);
     assertEquals(6, fml1Refresh1.getLoadedFds().size());
@@ -372,7 +304,6 @@ public class FileMetadataLoaderTest {
         /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 5),
         /* requiresDataFilesInTableLocation = */ false);
     fml1Refresh5.load();
-    assertFalse(fml1Refresh5.useParallelListing());
     assertEquals(1, fml1Refresh5.getStats().loadedFiles);
     assertEquals(5, fml1Refresh5.getStats().skippedFiles);
     assertEquals(6, fml1Refresh5.getLoadedFds().size());
@@ -382,15 +313,6 @@ public class FileMetadataLoaderTest {
       CatalogServiceCatalog catalog, String dbName, String tblName,
       List<IcebergFileDescriptor> oldFds, boolean 
requiresDataFilesInTableLocation)
       throws CatalogException {
-    return getLoaderForIcebergTable(catalog, dbName, tblName, oldFds,
-        requiresDataFilesInTableLocation, -1);
-  }
-
-  private IcebergFileMetadataLoader getLoaderForIcebergTable(
-      CatalogServiceCatalog catalog, String dbName, String tblName,
-      List<IcebergFileDescriptor> oldFds, boolean 
requiresDataFilesInTableLocation,
-      int newFilesThreshold)
-      throws CatalogException {
     ListMap<TNetworkAddress> hostIndex = new ListMap<>();
     FeIcebergTable iceT = (FeIcebergTable)catalog.getOrLoadTable(
         dbName, tblName, "test", null);
@@ -398,7 +320,7 @@ public class FileMetadataLoaderTest {
     GroupedContentFiles iceFiles = IcebergUtil.getIcebergFiles(iceT,
         /*predicates=*/Collections.emptyList(), /*timeTravelSpec=*/null);
     return new IcebergFileMetadataLoader(iceT.getIcebergApiTable(),
-        oldFds, hostIndex, iceFiles, requiresDataFilesInTableLocation, 
newFilesThreshold);
+        oldFds, hostIndex, iceFiles, requiresDataFilesInTableLocation);
   }
 
   private FileMetadataLoader getLoaderForAcidTable(


Reply via email to