garyli1019 commented on a change in pull request #3067:
URL: https://github.com/apache/hudi/pull/3067#discussion_r651401752



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
   public static void clean(String path) {
     PROFILES.remove(path);
   }
+
+  /**
+   * Returns all the incremental write file path statuses with the given 
commits metadata.
+   *
+   * @param basePath     Table base path
+   * @param hadoopConf   The hadoop conf
+   * @param metadataList The commits metadata
+   * @return the file statuses array
+   */
+  public static FileStatus[] getWritePathsOfInstants(
+      Path basePath,
+      Configuration hadoopConf,
+      List<HoodieCommitMetadata> metadataList) {
+    FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+    return metadataList.stream().map(metadata -> 
getWritePathsOfInstant(basePath, metadata, fs))
+        .flatMap(Collection::stream).toArray(FileStatus[]::new);
+  }
+
+  private static List<FileStatus> getWritePathsOfInstant(Path basePath, 
HoodieCommitMetadata metadata, FileSystem fs) {
+    return 
metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
+        .map(org.apache.hadoop.fs.Path::new)
+        // filter out the file paths that does not exist, some files may be 
cleaned by
+        // the cleaner.
+        .filter(path -> {
+          try {
+            return fs.exists(path);
+          } catch (IOException e) {
+            LOG.error("Checking exists of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        }).map(path -> {
+          try {
+            return fs.getFileStatus(path);
+          } catch (IOException e) {
+            LOG.error("Get write status of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        })
+        // filter out crushed files

Review comment:
       crushed files might cause errors on the query side. How are those 
crushed files produced? 

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
   public static void clean(String path) {
     PROFILES.remove(path);
   }
+
+  /**
+   * Returns all the incremental write file path statuses with the given 
commits metadata.
+   *
+   * @param basePath     Table base path
+   * @param hadoopConf   The hadoop conf
+   * @param metadataList The commits metadata
+   * @return the file statuses array
+   */
+  public static FileStatus[] getWritePathsOfInstants(
+      Path basePath,
+      Configuration hadoopConf,
+      List<HoodieCommitMetadata> metadataList) {
+    FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+    return metadataList.stream().map(metadata -> 
getWritePathsOfInstant(basePath, metadata, fs))
+        .flatMap(Collection::stream).toArray(FileStatus[]::new);
+  }
+
+  private static List<FileStatus> getWritePathsOfInstant(Path basePath, 
HoodieCommitMetadata metadata, FileSystem fs) {
+    return 
metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
+        .map(org.apache.hadoop.fs.Path::new)
+        // filter out the file paths that does not exist, some files may be 
cleaned by
+        // the cleaner.
+        .filter(path -> {
+          try {
+            return fs.exists(path);
+          } catch (IOException e) {
+            LOG.error("Checking exists of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        }).map(path -> {
+          try {
+            return fs.getFileStatus(path);
+          } catch (IOException e) {
+            LOG.error("Get write status of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        })
+        // filter out crushed files
+        .filter(fileStatus -> fileStatus.getLen() > 0)
+        .collect(Collectors.toList());
+  }
+
+  public static HoodieCommitMetadata getCommitMetadata(

Review comment:
       ditto

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
   public static void clean(String path) {
     PROFILES.remove(path);
   }
+
+  /**
+   * Returns all the incremental write file path statuses with the given 
commits metadata.
+   *
+   * @param basePath     Table base path
+   * @param hadoopConf   The hadoop conf
+   * @param metadataList The commits metadata
+   * @return the file statuses array
+   */
+  public static FileStatus[] getWritePathsOfInstants(
+      Path basePath,
+      Configuration hadoopConf,
+      List<HoodieCommitMetadata> metadataList) {
+    FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+    return metadataList.stream().map(metadata -> 
getWritePathsOfInstant(basePath, metadata, fs))
+        .flatMap(Collection::stream).toArray(FileStatus[]::new);
+  }
+
+  private static List<FileStatus> getWritePathsOfInstant(Path basePath, 
HoodieCommitMetadata metadata, FileSystem fs) {

Review comment:
       add some comments about this static method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to