This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 72f047715fe [HUDI-6476] Improve the performance of
getAllPartitionPaths (#9121)
72f047715fe is described below
commit 72f047715fe8f2ad9ff19a31728fbfb761fbe0d9
Author: Wechar Yu <[email protected]>
AuthorDate: Wed Jul 5 12:14:24 2023 +0800
[HUDI-6476] Improve the performance of getAllPartitionPaths (#9121)
Currently Hudi will list all status of files in hudi table directory, which
can be avoid to improve the performance of #getAllPartitionPaths, especially
for the non-partitioned table with many files. What we change in this patch:
* reduce a stage in getPartitionPathWithPathPrefix()
* only check directory to find the PartitionMetadata
* avoid listStatus of .hoodie/.hoodie_partition_metadata
---
.../metadata/FileSystemBackedTableMetadata.java | 52 +++++++++-------------
1 file changed, 22 insertions(+), 30 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 69c237d6684..6a6f46a65ef 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -47,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Implementation of {@link HoodieTableMetadata} based file-system-backed
table metadata.
@@ -106,42 +107,33 @@ public class FileSystemBackedTableMetadata implements
HoodieTableMetadata {
// TODO: Get the parallelism from HoodieWriteConfig
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
pathsToList.size());
- // List all directories in parallel
+ // List all directories in parallel:
+ // if current dictionary contains PartitionMetadata, add it to result
+ // if current dictionary does not contain PartitionMetadata, add its
subdirectory to queue to be processed.
engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing all
partitions with prefix " + relativePathPrefix);
- List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList,
path -> {
+ // result below holds a list of pair. first entry in the pair optionally
holds the deduced list of partitions.
+ // and second entry holds optionally a directory path to be processed
further.
+ List<Pair<Option<String>, Option<Path>>> result =
engineContext.flatMap(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
- return Arrays.stream(fileSystem.listStatus(path));
+ if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) {
+ return
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), path)), Option.empty()));
+ }
+ return Arrays.stream(fileSystem.listStatus(path, p -> {
+ try {
+ return fileSystem.isDirectory(p) &&
!p.getName().equals(HoodieTableMetaClient.METAFOLDER_NAME);
+ } catch (IOException e) {
+ // noop
+ }
+ return false;
+ })).map(status -> Pair.of(Option.empty(),
Option.of(status.getPath())));
}, listingParallelism);
pathsToList.clear();
- // if current dictionary contains PartitionMetadata, add it to result
- // if current dictionary does not contain PartitionMetadata, add it to
queue to be processed.
- int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM,
dirToFileListing.size());
- if (!dirToFileListing.isEmpty()) {
- // result below holds a list of pair. first entry in the pair
optionally holds the deduced list of partitions.
- // and second entry holds optionally a directory path to be processed
further.
- engineContext.setJobStatus(this.getClass().getSimpleName(),
"Processing listed partitions");
- List<Pair<Option<String>, Option<Path>>> result =
engineContext.map(dirToFileListing, fileStatus -> {
- FileSystem fileSystem =
fileStatus.getPath().getFileSystem(hadoopConf.get());
- if (fileStatus.isDirectory()) {
- if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem,
fileStatus.getPath())) {
- return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), fileStatus.getPath())), Option.empty());
- } else if
(!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
{
- return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
- }
- } else if
(fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
{
- String partitionName = FSUtils.getRelativePartitionPath(new
Path(datasetBasePath), fileStatus.getPath().getParent());
- return Pair.of(Option.of(partitionName), Option.empty());
- }
- return Pair.of(Option.empty(), Option.empty());
- }, fileListingParallelism);
-
- partitionPaths.addAll(result.stream().filter(entry ->
entry.getKey().isPresent()).map(entry -> entry.getKey().get())
- .collect(Collectors.toList()));
+ partitionPaths.addAll(result.stream().filter(entry ->
entry.getKey().isPresent()).map(entry -> entry.getKey().get())
+ .collect(Collectors.toList()));
- pathsToList.addAll(result.stream().filter(entry ->
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
- .collect(Collectors.toList()));
- }
+ pathsToList.addAll(result.stream().filter(entry ->
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+ .collect(Collectors.toList()));
}
return partitionPaths;
}