This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d63ae9a32572bf9cd87bcc16f482c4219cbafb16
Author: Peter Rozsa <[email protected]>
AuthorDate: Thu Nov 9 10:43:04 2023 +0100

    IMPALA-12299: Parallelize file listings of Iceberg tables on HDFS/Ozone
    
    This change replaces the single-threaded file metadata listing of
    Iceberg datafiles with a pool-based multithreaded solution. The
    thread-pool size is calculated based on the filesystem's type, and it's
    maximized through MAX_HDFS_PARTITIONS_PARALLEL_LOAD and
    MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD. The parallel tasks are created
    from the parent directories of the datafiles, this guarantees that every
    datafile is listed.
    
    Manually executed benchmarks with following properties:
     - 280.000 partitions, 1 files each (worst case)
     - Thread pool size is 5 (default value for HDFS)
     - Used minicluster setup as a test-bench
    
    The results showed 3-4x improvement for getFileStatuses():
     - Self-time of getFileStatuses: 6.599 ms vs 25.399 ms
     - Query time: 16.37 s vs 34.00 s
    
    Tests: exhaustive test suite ran
    
    Change-Id: Ic5ca7e873f4ad0cc8dab6a77b62e05d965b4a76d
    Reviewed-on: http://gerrit.cloudera.org:8080/20700
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/IcebergFileMetadataLoader.java  | 64 ++++++++++++++++++----
 .../impala/catalog/ParallelFileMetadataLoader.java | 14 ++---
 2 files changed, 59 insertions(+), 19 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
index e0540baa7..293257b42 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.catalog;
 
+import static org.apache.impala.catalog.ParallelFileMetadataLoader.createPool;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -24,9 +26,18 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -188,21 +199,11 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
   protected List<FileStatus> getFileStatuses(FileSystem fs, boolean 
listWithLocations)
       throws IOException {
     if (icebergFiles_.isEmpty()) return null;
-    RemoteIterator<? extends FileStatus> fileStatuses = null;
     // For the FSs in 'FileSystemUtil#SCHEME_SUPPORT_STORAGE_IDS' (e.g. HDFS, 
Ozone,
     // Alluxio, etc.) we ensure the file with block location information, so 
we're going
     // to get the block information through 'FileSystemUtil.listFiles'.
-    if (listWithLocations) {
-      fileStatuses = FileSystemUtil.listFiles(fs, partDir_, recursive_, 
debugAction_);
-    }
-    Map<Path, FileStatus> nameToFileStatus = Maps.newHashMap();
-    if (fileStatuses != null) {
-      while (fileStatuses.hasNext()) {
-        FileStatus status = fileStatuses.next();
-        nameToFileStatus.put(status.getPath(), status);
-      }
-    }
-
+    Map<Path, FileStatus> nameToFileStatus = Collections.emptyMap();
+    if (listWithLocations) nameToFileStatus = parallelListing(fs);
     List<FileStatus> stats = Lists.newLinkedList();
     for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
       Path path = FileSystemUtil.createFullyQualifiedPath(
@@ -229,6 +230,45 @@ public class IcebergFileMetadataLoader extends 
FileMetadataLoader {
     return stats;
   }
 
+  private Map<Path, FileStatus> parallelListing(FileSystem fs) throws 
IOException {
+    String logPrefix = "Parallel Iceberg file metadata listing";
+    ExecutorService pool = createPool(icebergFiles_.size(), fs, logPrefix);
+    final Set<Path> partitionPaths;
+    Map<Path, FileStatus> nameToFileStatus = Maps.newConcurrentMap();
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix)) {
+      partitionPaths = icebergFilesByPartition();
+      List<Future<Void>> tasks =
+          partitionPaths.stream()
+              .map(path -> pool.submit(() -> listingTask(fs, path, 
nameToFileStatus)))
+              .collect(Collectors.toList());
+      for (Future<Void> task : tasks) { task.get(); }
+    } catch (ExecutionException | InterruptedException e) {
+      throw new IOException(String.format("%s: failed to load paths.", 
logPrefix), e);
+    } finally {
+      pool.shutdown();
+    }
+    return nameToFileStatus;
+  }
+
+  private Set<Path> icebergFilesByPartition() {
+    return 
StreamSupport.stream(icebergFiles_.getAllContentFiles().spliterator(), false)
+        .map(contentFile -> new 
Path(String.valueOf(contentFile.path())).getParent())
+        .collect(Collectors.toSet());
+  }
+
+  private Void listingTask(FileSystem fs, Path partitionPath,
+      Map<Path, FileStatus> nameToFileStatus) throws IOException {
+    RemoteIterator<? extends FileStatus> remoteIterator =
+        FileSystemUtil.listFiles(fs, partitionPath, recursive_, debugAction_);
+    Map<Path, FileStatus> perThreadMapping = new HashMap<>();
+    while (remoteIterator.hasNext()) {
+      FileStatus status = remoteIterator.next();
+      perThreadMapping.put(status.getPath(), status);
+    }
+    nameToFileStatus.putAll(perThreadMapping);
+    return null;
+  }
+
   @VisibleForTesting
   boolean shouldReuseOldFds() throws IOException {
     if (oldFdsByPath_ == null || oldFdsByPath_.isEmpty()) return false;
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 93e65346a..f0fb5871a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -169,7 +169,7 @@ public class ParallelFileMetadataLoader {
     if (loaders_.isEmpty()) return;
 
     int failedLoadTasks = 0;
-    ExecutorService pool = createPool();
+    ExecutorService pool = createPool(loaders_.size(), fs_, logPrefix_);
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix_)) {
       List<Pair<FileMetadataLoader, Future<Void>>> futures =
           new ArrayList<>(loaders_.size());
@@ -215,18 +215,18 @@ public class ParallelFileMetadataLoader {
    * clusters. We narrowed it down to scalability bottlenecks in HDFS RPC 
implementation
    * (HADOOP-14558) on both the server and the client side.
    */
-  private ExecutorService createPool() {
-    int numLoaders = loaders_.size();
+  public static ExecutorService createPool(
+      int numLoaders, FileSystem fs, String logPrefix) {
     Preconditions.checkState(numLoaders > 0);
-    int poolSize = FileSystemUtil.supportsStorageIds(fs_) ?
-        MAX_HDFS_PARTITIONS_PARALLEL_LOAD : 
MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD;
+    int poolSize = FileSystemUtil.supportsStorageIds(fs) ?
+        MAX_HDFS_PARTITIONS_PARALLEL_LOAD :
+        MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD;
     // Thread pool size need not exceed the number of paths to be loaded.
     poolSize = Math.min(numLoaders, poolSize);
-
     if (poolSize == 1) {
       return MoreExecutors.newDirectExecutorService();
     } else {
-      LOG.info(logPrefix_ + " using a thread pool of size {}", poolSize);
+      LOG.info("{} using a thread pool of size {}", logPrefix, poolSize);
       return Executors.newFixedThreadPool(poolSize);
     }
   }

Reply via email to