garyli1019 commented on a change in pull request #3067:
URL: https://github.com/apache/hudi/pull/3067#discussion_r651401752
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
public static void clean(String path) {
PROFILES.remove(path);
}
+
+ /**
+ * Returns all the incremental write file path statuses with the given
commits metadata.
+ *
+ * @param basePath Table base path
+ * @param hadoopConf The hadoop conf
+ * @param metadataList The commits metadata
+ * @return the file statuses array
+ */
+ public static FileStatus[] getWritePathsOfInstants(
+ Path basePath,
+ Configuration hadoopConf,
+ List<HoodieCommitMetadata> metadataList) {
+ FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+ return metadataList.stream().map(metadata ->
getWritePathsOfInstant(basePath, metadata, fs))
+ .flatMap(Collection::stream).toArray(FileStatus[]::new);
+ }
+
+ private static List<FileStatus> getWritePathsOfInstant(Path basePath,
HoodieCommitMetadata metadata, FileSystem fs) {
+ return
metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
+ .map(org.apache.hadoop.fs.Path::new)
+ // filter out the file paths that does not exist, some files may be
cleaned by
+ // the cleaner.
+ .filter(path -> {
+ try {
+ return fs.exists(path);
+ } catch (IOException e) {
+ LOG.error("Checking exists of path: {} error", path);
+ throw new HoodieException(e);
+ }
+ }).map(path -> {
+ try {
+ return fs.getFileStatus(path);
+ } catch (IOException e) {
+ LOG.error("Get write status of path: {} error", path);
+ throw new HoodieException(e);
+ }
+ })
+ // filter out crushed files
Review comment:
crushed files might cause errors on the query side. How are those
crushed files produced?
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
public static void clean(String path) {
PROFILES.remove(path);
}
+
+ /**
+ * Returns all the incremental write file path statuses with the given
commits metadata.
+ *
+ * @param basePath Table base path
+ * @param hadoopConf The hadoop conf
+ * @param metadataList The commits metadata
+ * @return the file statuses array
+ */
+ public static FileStatus[] getWritePathsOfInstants(
+ Path basePath,
+ Configuration hadoopConf,
+ List<HoodieCommitMetadata> metadataList) {
+ FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+ return metadataList.stream().map(metadata ->
getWritePathsOfInstant(basePath, metadata, fs))
+ .flatMap(Collection::stream).toArray(FileStatus[]::new);
+ }
+
+ private static List<FileStatus> getWritePathsOfInstant(Path basePath,
HoodieCommitMetadata metadata, FileSystem fs) {
+ return
metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
+ .map(org.apache.hadoop.fs.Path::new)
+ // filter out the file paths that does not exist, some files may be
cleaned by
+ // the cleaner.
+ .filter(path -> {
+ try {
+ return fs.exists(path);
+ } catch (IOException e) {
+ LOG.error("Checking exists of path: {} error", path);
+ throw new HoodieException(e);
+ }
+ }).map(path -> {
+ try {
+ return fs.getFileStatus(path);
+ } catch (IOException e) {
+ LOG.error("Get write status of path: {} error", path);
+ throw new HoodieException(e);
+ }
+ })
+ // filter out crushed files
+ .filter(fileStatus -> fileStatus.getLen() > 0)
+ .collect(Collectors.toList());
+ }
+
+ public static HoodieCommitMetadata getCommitMetadata(
Review comment:
ditto
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
public static void clean(String path) {
PROFILES.remove(path);
}
+
+ /**
+ * Returns all the incremental write file path statuses with the given
commits metadata.
+ *
+ * @param basePath Table base path
+ * @param hadoopConf The hadoop conf
+ * @param metadataList The commits metadata
+ * @return the file statuses array
+ */
+ public static FileStatus[] getWritePathsOfInstants(
+ Path basePath,
+ Configuration hadoopConf,
+ List<HoodieCommitMetadata> metadataList) {
+ FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+ return metadataList.stream().map(metadata ->
getWritePathsOfInstant(basePath, metadata, fs))
+ .flatMap(Collection::stream).toArray(FileStatus[]::new);
+ }
+
+ private static List<FileStatus> getWritePathsOfInstant(Path basePath,
HoodieCommitMetadata metadata, FileSystem fs) {
Review comment:
add some comments about this static method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]