This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit d63ae9a32572bf9cd87bcc16f482c4219cbafb16 Author: Peter Rozsa <[email protected]> AuthorDate: Thu Nov 9 10:43:04 2023 +0100 IMPALA-12299: Parallelize file listings of Iceberg tables on HDFS/Ozone This change replaces the single-threaded file metadata listing of Iceberg datafiles with a pool-based multithreaded solution. The thread-pool size is calculated based on the filesystem's type, and it's maximized through MAX_HDFS_PARTITIONS_PARALLEL_LOAD and MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD. The parallel tasks are created from the parent directories of the datafiles, this guarantees that every datafile is listed. Manually executed benchmarks with following properties: - 280.000 partitions, 1 files each (worst case) - Thread pool size is 5 (default value for HDFS) - Used minicluster setup as a test-bench The results showed 3-4x improvement for getFileStatuses(): - Self-time of getFileStatuses: 6.599 ms vs 25.399 ms - Query time: 16.37 s vs 34.00 s Tests: exhaustive test suite ran Change-Id: Ic5ca7e873f4ad0cc8dab6a77b62e05d965b4a76d Reviewed-on: http://gerrit.cloudera.org:8080/20700 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/IcebergFileMetadataLoader.java | 64 ++++++++++++++++++---- .../impala/catalog/ParallelFileMetadataLoader.java | 14 ++--- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java index e0540baa7..293257b42 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java @@ -17,6 +17,8 @@ package org.apache.impala.catalog; +import static org.apache.impala.catalog.ParallelFileMetadataLoader.createPool; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -24,9 +26,18 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -188,21 +199,11 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { protected List<FileStatus> getFileStatuses(FileSystem fs, boolean listWithLocations) throws IOException { if (icebergFiles_.isEmpty()) return null; - RemoteIterator<? extends FileStatus> fileStatuses = null; // For the FSs in 'FileSystemUtil#SCHEME_SUPPORT_STORAGE_IDS' (e.g. HDFS, Ozone, // Alluxio, etc.) we ensure the file with block location information, so we're going // to get the block information through 'FileSystemUtil.listFiles'. - if (listWithLocations) { - fileStatuses = FileSystemUtil.listFiles(fs, partDir_, recursive_, debugAction_); - } - Map<Path, FileStatus> nameToFileStatus = Maps.newHashMap(); - if (fileStatuses != null) { - while (fileStatuses.hasNext()) { - FileStatus status = fileStatuses.next(); - nameToFileStatus.put(status.getPath(), status); - } - } - + Map<Path, FileStatus> nameToFileStatus = Collections.emptyMap(); + if (listWithLocations) nameToFileStatus = parallelListing(fs); List<FileStatus> stats = Lists.newLinkedList(); for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) { Path path = FileSystemUtil.createFullyQualifiedPath( @@ -229,6 +230,45 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { return stats; } + private Map<Path, FileStatus> parallelListing(FileSystem fs) throws IOException { + String logPrefix = "Parallel Iceberg file metadata listing"; + ExecutorService pool = createPool(icebergFiles_.size(), fs, logPrefix); + final Set<Path> partitionPaths; + Map<Path, FileStatus> nameToFileStatus = Maps.newConcurrentMap(); + try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix)) { + partitionPaths = icebergFilesByPartition(); + List<Future<Void>> tasks = + partitionPaths.stream() + .map(path -> pool.submit(() -> listingTask(fs, path, nameToFileStatus))) + .collect(Collectors.toList()); + for (Future<Void> task : tasks) { task.get(); } + } catch (ExecutionException | InterruptedException e) { + throw new IOException(String.format("%s: failed to load paths.", logPrefix), e); + } finally { + pool.shutdown(); + } + return nameToFileStatus; + } + + private Set<Path> icebergFilesByPartition() { + return StreamSupport.stream(icebergFiles_.getAllContentFiles().spliterator(), false) + .map(contentFile -> new Path(String.valueOf(contentFile.path())).getParent()) + .collect(Collectors.toSet()); + } + + private Void listingTask(FileSystem fs, Path partitionPath, + Map<Path, FileStatus> nameToFileStatus) throws IOException { + RemoteIterator<? extends FileStatus> remoteIterator = + FileSystemUtil.listFiles(fs, partitionPath, recursive_, debugAction_); + Map<Path, FileStatus> perThreadMapping = new HashMap<>(); + while (remoteIterator.hasNext()) { + FileStatus status = remoteIterator.next(); + perThreadMapping.put(status.getPath(), status); + } + nameToFileStatus.putAll(perThreadMapping); + return null; + } + @VisibleForTesting boolean shouldReuseOldFds() throws IOException { if (oldFdsByPath_ == null || oldFdsByPath_.isEmpty()) return false; diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java index 93e65346a..f0fb5871a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java @@ -169,7 +169,7 @@ public class ParallelFileMetadataLoader { if (loaders_.isEmpty()) return; int failedLoadTasks = 0; - ExecutorService pool = createPool(); + ExecutorService pool = createPool(loaders_.size(), fs_, logPrefix_); try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix_)) { List<Pair<FileMetadataLoader, Future<Void>>> futures = new ArrayList<>(loaders_.size()); @@ -215,18 +215,18 @@ public class ParallelFileMetadataLoader { * clusters. We narrowed it down to scalability bottlenecks in HDFS RPC implementation * (HADOOP-14558) on both the server and the client side. */ - private ExecutorService createPool() { - int numLoaders = loaders_.size(); + public static ExecutorService createPool( + int numLoaders, FileSystem fs, String logPrefix) { Preconditions.checkState(numLoaders > 0); - int poolSize = FileSystemUtil.supportsStorageIds(fs_) ? - MAX_HDFS_PARTITIONS_PARALLEL_LOAD : MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD; + int poolSize = FileSystemUtil.supportsStorageIds(fs) ? + MAX_HDFS_PARTITIONS_PARALLEL_LOAD : + MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD; // Thread pool size need not exceed the number of paths to be loaded. poolSize = Math.min(numLoaders, poolSize); - if (poolSize == 1) { return MoreExecutors.newDirectExecutorService(); } else { - LOG.info(logPrefix_ + " using a thread pool of size {}", poolSize); + LOG.info("{} using a thread pool of size {}", logPrefix, poolSize); return Executors.newFixedThreadPool(poolSize); } }
