This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 72f047715fe [HUDI-6476] Improve the performance of 
getAllPartitionPaths (#9121)
72f047715fe is described below

commit 72f047715fe8f2ad9ff19a31728fbfb761fbe0d9
Author: Wechar Yu <[email protected]>
AuthorDate: Wed Jul 5 12:14:24 2023 +0800

    [HUDI-6476] Improve the performance of getAllPartitionPaths (#9121)
    
    Currently Hudi will list all status of files in hudi table directory, which 
can be avoid to improve the performance of #getAllPartitionPaths, especially 
for the non-partitioned table with many files. What we change in this patch:
    
    * reduce a stage in getPartitionPathWithPathPrefix()
    * only check directory to find the PartitionMetadata
    * avoid listStatus of .hoodie/.hoodie_partition_metadata
---
 .../metadata/FileSystemBackedTableMetadata.java    | 52 +++++++++-------------
 1 file changed, 22 insertions(+), 30 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 69c237d6684..6a6f46a65ef 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Implementation of {@link HoodieTableMetadata} based file-system-backed 
table metadata.
@@ -106,42 +107,33 @@ public class FileSystemBackedTableMetadata implements 
HoodieTableMetadata {
       // TODO: Get the parallelism from HoodieWriteConfig
       int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
pathsToList.size());
 
-      // List all directories in parallel
+      // List all directories in parallel:
+      // if current dictionary contains PartitionMetadata, add it to result
+      // if current dictionary does not contain PartitionMetadata, add its 
subdirectory to queue to be processed.
       engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all 
partitions with prefix " + relativePathPrefix);
-      List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, 
path -> {
+      // result below holds a list of pair. first entry in the pair optionally 
holds the deduced list of partitions.
+      // and second entry holds optionally a directory path to be processed 
further.
+      List<Pair<Option<String>, Option<Path>>> result = 
engineContext.flatMap(pathsToList, path -> {
         FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
-        return Arrays.stream(fileSystem.listStatus(path));
+        if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) {
+          return 
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), path)), Option.empty()));
+        }
+        return Arrays.stream(fileSystem.listStatus(path, p -> {
+          try {
+            return fileSystem.isDirectory(p) && 
!p.getName().equals(HoodieTableMetaClient.METAFOLDER_NAME);
+          } catch (IOException e) {
+            // noop
+          }
+          return false;
+        })).map(status -> Pair.of(Option.empty(), 
Option.of(status.getPath())));
       }, listingParallelism);
       pathsToList.clear();
 
-      // if current dictionary contains PartitionMetadata, add it to result
-      // if current dictionary does not contain PartitionMetadata, add it to 
queue to be processed.
-      int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
dirToFileListing.size());
-      if (!dirToFileListing.isEmpty()) {
-        // result below holds a list of pair. first entry in the pair 
optionally holds the deduced list of partitions.
-        // and second entry holds optionally a directory path to be processed 
further.
-        engineContext.setJobStatus(this.getClass().getSimpleName(), 
"Processing listed partitions");
-        List<Pair<Option<String>, Option<Path>>> result = 
engineContext.map(dirToFileListing, fileStatus -> {
-          FileSystem fileSystem = 
fileStatus.getPath().getFileSystem(hadoopConf.get());
-          if (fileStatus.isDirectory()) {
-            if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, 
fileStatus.getPath())) {
-              return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), fileStatus.getPath())), Option.empty());
-            } else if 
(!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) 
{
-              return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
-            }
-          } else if 
(fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
 {
-            String partitionName = FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), fileStatus.getPath().getParent());
-            return Pair.of(Option.of(partitionName), Option.empty());
-          }
-          return Pair.of(Option.empty(), Option.empty());
-        }, fileListingParallelism);
-
-        partitionPaths.addAll(result.stream().filter(entry -> 
entry.getKey().isPresent()).map(entry -> entry.getKey().get())
-            .collect(Collectors.toList()));
+      partitionPaths.addAll(result.stream().filter(entry -> 
entry.getKey().isPresent()).map(entry -> entry.getKey().get())
+          .collect(Collectors.toList()));
 
-        pathsToList.addAll(result.stream().filter(entry -> 
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
-            .collect(Collectors.toList()));
-      }
+      pathsToList.addAll(result.stream().filter(entry -> 
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+          .collect(Collectors.toList()));
     }
     return partitionPaths;
   }

Reply via email to