gaoyunhaii commented on a change in pull request #18157: URL: https://github.com/apache/flink/pull/18157#discussion_r785398471
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java ########## @@ -44,7 +46,7 @@ // ------------------------------------------------------------------------ - private final FSDataOutputStream out; + private FSDataOutputStream out; Review comment: Although related to the following comments, here we should have methods to keep it final, like introducing local variable when creating or narrow down the scope of try...catch in the constructor. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java ########## @@ -62,7 +64,15 @@ public FsCheckpointMetadataOutputStream( this.metadataFilePath = checkNotNull(metadataFilePath); this.exclusiveCheckpointDir = checkNotNull(exclusiveCheckpointDir); - this.out = fileSystem.create(metadataFilePath, WriteMode.NO_OVERWRITE); + try { + RecoverableWriter recoverableWriter = fileSystem.createRecoverableWriter(); Review comment: I have a bit concern on the compatibility when the `RecoverableWriter` could not be created: currently it only falls back to the normal output stream when the FileSystem throws UnsupportedOperationException. However, since users may have customized FileSystem implementation that changes this behavior for unsupported cases. Might we fallback to the normal output stream with warnings whenever the `RecoverableWriter` failed to create? like ``` RecoverableWriter recoverableWriter = null; try { recoverableWriter = ... } catch (Throwable e) { Log.Warn(...) } if (recoverableWriter != null) { // use recoverable writer } else { // use normal output stream } ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java ########## @@ -109,13 +119,19 @@ public void close() { try { out.close(); - fileSystem.delete(metadataFilePath, false); + if (!isRecoverableStream(out)) { + fileSystem.delete(metadataFilePath, false); + } } catch (Throwable t) { LOG.warn("Could not close the state stream for {}.", metadataFilePath, t); } } } + private boolean isRecoverableStream(FSDataOutputStream out) { Review comment: It seems relaying on conditions and branches in different places might complicate the implementation. Might you introduce an interface for the two types of implementation? like ``` interface MetadataOutputStreamBackend { FSDataOutputStream getOutput(); void commit(); void close(); } ``` and have two implementations ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org