prashantwason commented on a change in pull request #2343:
URL: https://github.com/apache/hudi/pull/2343#discussion_r546119001
##########
File path:
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -51,6 +51,7 @@
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
Review comment:
I wanted to do that but noticed that HoodieBackedTableMetadata is in
hudi-common which does not have spark.
Some new interface / refactoring will be required. Can handle this in a
different ticket.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext
jsc, HoodieTableMetaClient
}
});
- LOG.info("Committing " + partitionFileList.size() + " partitions and " +
stats[0] + " files to metadata");
+ LOG.info("Committing " + partitionToFileStatus.size() + " partitions and "
+ stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}
+ /**
+ * Function to find hoodie partitions and list files in them in parallel.
+ *
+ * @param jsc
+ * @param datasetMetaClient
+ * @return Map of partition names to a list of FileStatus for all the files
in the partition
+ */
+ private Map<String, List<FileStatus>>
parallelFileSystemListing(JavaSparkContext jsc,
+ HoodieTableMetaClient datasetMetaClient) {
+ List<Path> pathsToList = new LinkedList<>();
+ pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+ Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+
+ while (!pathsToList.isEmpty()) {
+ // List all directories in parallel
+ List<Pair<Path, FileStatus[]>> dirToFileListing =
+ jsc.parallelize(pathsToList, Math.min(pathsToList.size(),
jsc.defaultParallelism()))
+ .map(path -> {
+ FileSystem fs = datasetMetaClient.getFs();
+ return Pair.of(path, fs.listStatus(path));
+ }).collect();
+ pathsToList.clear();
+
+ // If the listing reveals a directory, add it to queue. If the listing
reveals a hoodie partition, add it to
+ // the results.
+ dirToFileListing.forEach(p -> {
+ List<FileStatus> filesInDir = Arrays.stream(p.getRight())
+ .filter(fs ->
!fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
+ .collect(Collectors.toList());
Review comment:
Done.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext
jsc, HoodieTableMetaClient
}
});
- LOG.info("Committing " + partitionFileList.size() + " partitions and " +
stats[0] + " files to metadata");
+ LOG.info("Committing " + partitionToFileStatus.size() + " partitions and "
+ stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}
+ /**
+ * Function to find hoodie partitions and list files in them in parallel.
+ *
+ * @param jsc
+ * @param datasetMetaClient
+ * @return Map of partition names to a list of FileStatus for all the files
in the partition
+ */
+ private Map<String, List<FileStatus>>
parallelFileSystemListing(JavaSparkContext jsc,
+ HoodieTableMetaClient datasetMetaClient) {
+ List<Path> pathsToList = new LinkedList<>();
+ pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
+ Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
+
+ while (!pathsToList.isEmpty()) {
+ // List all directories in parallel
+ List<Pair<Path, FileStatus[]>> dirToFileListing =
+ jsc.parallelize(pathsToList, Math.min(pathsToList.size(),
jsc.defaultParallelism()))
Review comment:
Done
##########
File path:
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -369,10 +343,56 @@ private void bootstrapFromFilesystem(JavaSparkContext
jsc, HoodieTableMetaClient
}
});
- LOG.info("Committing " + partitionFileList.size() + " partitions and " +
stats[0] + " files to metadata");
+ LOG.info("Committing " + partitionToFileStatus.size() + " partitions and "
+ stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
}
+ /**
+ * Function to find hoodie partitions and list files in them in parallel.
+ *
+ * @param jsc
+ * @param datasetMetaClient
+ * @return Map of partition names to a list of FileStatus for all the files
in the partition
+ */
+ private Map<String, List<FileStatus>>
parallelFileSystemListing(JavaSparkContext jsc,
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]