danny0405 commented on a change in pull request #3067:
URL: https://github.com/apache/hudi/pull/3067#discussion_r651408416
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
public static void clean(String path) {
PROFILES.remove(path);
}
+
+ /**
+ * Returns all the incremental write file path statuses with the given
commits metadata.
+ *
+ * @param basePath Table base path
+ * @param hadoopConf The hadoop conf
+ * @param metadataList The commits metadata
+ * @return the file statuses array
+ */
+ public static FileStatus[] getWritePathsOfInstants(
+ Path basePath,
+ Configuration hadoopConf,
+ List<HoodieCommitMetadata> metadataList) {
+ FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+ return metadataList.stream().map(metadata ->
getWritePathsOfInstant(basePath, metadata, fs))
+ .flatMap(Collection::stream).toArray(FileStatus[]::new);
+ }
+
+ private static List<FileStatus> getWritePathsOfInstant(Path basePath,
HoodieCommitMetadata metadata, FileSystem fs) {
+ return
metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
+ .map(org.apache.hadoop.fs.Path::new)
+ // filter out the file paths that does not exist, some files may be
cleaned by
+ // the cleaner.
+ .filter(path -> {
+ try {
+ return fs.exists(path);
+ } catch (IOException e) {
+ LOG.error("Checking exists of path: {} error", path);
+ throw new HoodieException(e);
+ }
+ }).map(path -> {
+ try {
+ return fs.getFileStatus(path);
+ } catch (IOException e) {
+ LOG.error("Get write status of path: {} error", path);
+ throw new HoodieException(e);
+ }
+ })
+ // filter out crushed files
Review comment:
The write should not affect the read. The code was added long time ago,
a committed file (merge handle) was later modified by the following
modification instants. The first version write handle was not closed until
checkpoint success event received(it was modified now), a merge handle may be
empty if it does not invoke close.
We can till keep the filtering to make the read robust.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]