JingsongLi commented on a change in pull request #12452: URL: https://github.com/apache/flink/pull/12452#discussion_r435778349
########## File path: flink-formats/flink-hadoop-bulk/src/test/resources/log4j2-test.properties ########## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF Review comment: OFF? ########## File path: flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java ########## @@ -142,14 +150,21 @@ public int getVersion() { throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported."); } Review comment: I think you can add some comments to explain in `HadoopPathBasedPartFileWriter`, what we store in state. ########## File path: flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java ########## @@ -96,12 +107,19 @@ private void rename(boolean assertFileExists) throws IOException { } } - private Path generateInProgressFilePath() { + private Path generateInProgressFilePath() throws IOException { checkArgument(targetFilePath.isAbsolute(), "Target file must be absolute"); + FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration); + Path parent = targetFilePath.getParent(); String name = targetFilePath.getName(); - return new Path(parent, "." + name + ".inprogress"); + while (true) { + Path candidate = new Path(parent, "." + name + ".inprogress" + UUID.randomUUID().toString()); Review comment: `".inprogress"` -> `".inprogress."` ########## File path: flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java ########## @@ -142,14 +150,21 @@ public int getVersion() { throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported."); } - Path path = ((HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable).getPath(); + HadoopPathBasedPendingFileRecoverable hadoopRecoverable = + (HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable; + Path path = hadoopRecoverable.getPath(); + Path inProgressPath = hadoopRecoverable.getInProgressPath(); + byte[] pathBytes = path.toUri().toString().getBytes(CHARSET); + byte[] inProgressBytes = inProgressPath.toUri().toString().getBytes(CHARSET); - byte[] targetBytes = new byte[8 + pathBytes.length]; + byte[] targetBytes = new byte[8 + pathBytes.length + 4 + inProgressBytes.length]; Review comment: just `12 + pathBytes.length + inProgressBytes.length` ########## File path: flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java ########## @@ -103,21 +103,29 @@ public void commitAfterRecovery() throws IOException { public PendingFileRecoverable getRecoverable() { return new HadoopPathBasedPendingFileRecoverable( - fileCommitter.getTargetFilePath()); + fileCommitter.getTargetFilePath(), + fileCommitter.getInProgressFilePath()); } } @VisibleForTesting static class HadoopPathBasedPendingFileRecoverable implements PendingFileRecoverable { private final Path path; - public HadoopPathBasedPendingFileRecoverable(Path path) { + private final Path inProgressPath; + + public HadoopPathBasedPendingFileRecoverable(Path path, Path inProgressPath) { Review comment: Can be consistent with `HadoopFsRecoverable`, `Path targetFile, Path tempFile` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org