JingsongLi commented on a change in pull request #12452:
URL: https://github.com/apache/flink/pull/12452#discussion_r435778349



##########
File path: 
flink-formats/flink-hadoop-bulk/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF

Review comment:
       OFF?

##########
File path: 
flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
##########
@@ -142,14 +150,21 @@ public int getVersion() {
                                throw new UnsupportedOperationException("Only 
HadoopPathBasedPendingFileRecoverable is supported.");
                        }
 

Review comment:
       I think you can add some comments to explain in 
`HadoopPathBasedPartFileWriter`, what we store in state.

##########
File path: 
flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java
##########
@@ -96,12 +107,19 @@ private void rename(boolean assertFileExists) throws 
IOException {
                }
        }
 
-       private Path generateInProgressFilePath() {
+       private Path generateInProgressFilePath() throws IOException {
                checkArgument(targetFilePath.isAbsolute(), "Target file must be 
absolute");
 
+               FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), 
configuration);
+
                Path parent = targetFilePath.getParent();
                String name = targetFilePath.getName();
 
-               return new Path(parent, "." + name + ".inprogress");
+               while (true) {
+                       Path candidate = new Path(parent, "." + name + 
".inprogress" + UUID.randomUUID().toString());

Review comment:
       `".inprogress"` -> `".inprogress."`

##########
File path: 
flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
##########
@@ -142,14 +150,21 @@ public int getVersion() {
                                throw new UnsupportedOperationException("Only 
HadoopPathBasedPendingFileRecoverable is supported.");
                        }
 
-                       Path path = ((HadoopPathBasedPendingFileRecoverable) 
pendingFileRecoverable).getPath();
+                       HadoopPathBasedPendingFileRecoverable hadoopRecoverable 
=
+                               (HadoopPathBasedPendingFileRecoverable) 
pendingFileRecoverable;
+                       Path path = hadoopRecoverable.getPath();
+                       Path inProgressPath = 
hadoopRecoverable.getInProgressPath();
+
                        byte[] pathBytes = 
path.toUri().toString().getBytes(CHARSET);
+                       byte[] inProgressBytes = 
inProgressPath.toUri().toString().getBytes(CHARSET);
 
-                       byte[] targetBytes = new byte[8 + pathBytes.length];
+                       byte[] targetBytes = new byte[8 + pathBytes.length + 4 
+ inProgressBytes.length];

Review comment:
       just `12 + pathBytes.length + inProgressBytes.length`

##########
File path: 
flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
##########
@@ -103,21 +103,29 @@ public void commitAfterRecovery() throws IOException {
 
                public PendingFileRecoverable getRecoverable() {
                        return new HadoopPathBasedPendingFileRecoverable(
-                               fileCommitter.getTargetFilePath());
+                               fileCommitter.getTargetFilePath(),
+                               fileCommitter.getInProgressFilePath());
                }
        }
 
        @VisibleForTesting
        static class HadoopPathBasedPendingFileRecoverable implements 
PendingFileRecoverable {
                private final Path path;
 
-               public HadoopPathBasedPendingFileRecoverable(Path path) {
+               private final Path inProgressPath;
+
+               public HadoopPathBasedPendingFileRecoverable(Path path, Path 
inProgressPath) {

Review comment:
       Can be consistent with `HadoopFsRecoverable`, `Path targetFile, Path 
tempFile`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to