This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.11.1-rc2-prep in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 58e25ba8563bb45b629fd534eb12423aeb948e6e Author: Sivabalan Narayanan <[email protected]> AuthorDate: Sat Jun 11 16:17:42 2022 -0400 [HUDI-4221] Fixing getAllPartitionPaths perf hit w/ FileSystemBackedMetadata (#5829) --- .../metadata/FileSystemBackedTableMetadata.java | 39 ++++++++++++---------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index b77bb12c49..f029995ba0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -68,13 +68,14 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { @Override public List<String> getAllPartitionPaths() throws IOException { - FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get()); + Path basePath = new Path(datasetBasePath); + FileSystem fs = basePath.getFileSystem(hadoopConf.get()); if (assumeDatePartitioning) { return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath); } List<Path> pathsToList = new CopyOnWriteArrayList<>(); - pathsToList.add(new Path(datasetBasePath)); + pathsToList.add(basePath); List<String> partitionPaths = new CopyOnWriteArrayList<>(); while (!pathsToList.isEmpty()) { @@ -82,27 +83,31 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size()); // List all directories in parallel - List<FileStatus[]> dirToFileListing = engineContext.map(pathsToList, path -> { + List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> { FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); - return fileSystem.listStatus(path); + return Pair.of(path, fileSystem.listStatus(path)); }, listingParallelism); pathsToList.clear(); // if current dictionary contains PartitionMetadata, add it to result // if current dictionary does not contain PartitionMetadata, add it to queue - dirToFileListing.stream().flatMap(Arrays::stream).parallel() - .forEach(fileStatus -> { - if (fileStatus.isDirectory()) { - if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) { - partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())); - } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { - pathsToList.add(fileStatus.getPath()); - } - } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { - String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent()); - partitionPaths.add(partitionName); - } - }); + dirToFileListing.forEach(p -> { + Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel() + .filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) + .findFirst()); + + if (partitionMetaFile.isPresent()) { + // Is a partition. + String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft()); + partitionPaths.add(partitionName); + } else { + // Add sub-dirs to the queue + pathsToList.addAll(Arrays.stream(p.getRight()) + .filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) + .map(fileStatus -> fileStatus.getPath()) + .collect(Collectors.toList())); + } + }); } return partitionPaths; }
