gaoyunhaii commented on a change in pull request #18157:
URL: https://github.com/apache/flink/pull/18157#discussion_r785398471



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
##########
@@ -44,7 +46,7 @@
 
     // ------------------------------------------------------------------------
 
-    private final FSDataOutputStream out;
+    private FSDataOutputStream out;

Review comment:
       Although related to the following comments, here we should have methods 
to keep it final, like introducing local variable when creating or narrow down 
the scope of try...catch in the constructor. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
##########
@@ -62,7 +64,15 @@ public FsCheckpointMetadataOutputStream(
         this.metadataFilePath = checkNotNull(metadataFilePath);
         this.exclusiveCheckpointDir = checkNotNull(exclusiveCheckpointDir);
 
-        this.out = fileSystem.create(metadataFilePath, WriteMode.NO_OVERWRITE);
+        try {
+            RecoverableWriter recoverableWriter = 
fileSystem.createRecoverableWriter();

Review comment:
       I have a bit concern on the compatibility when the `RecoverableWriter` 
could not be created: currently it only falls back to the normal output stream 
when the FileSystem throws UnsupportedOperationException. However, since users 
may have customized FileSystem implementation that changes this behavior for 
unsupported cases. 
   
   Might we fallback to the normal output stream with warnings whenever the 
`RecoverableWriter` failed to create? like
   
   ```
   RecoverableWriter recoverableWriter = null;
   try {
       recoverableWriter = ...
   } catch (Throwable e) {
      Log.Warn(...)
   }
   
   if (recoverableWriter != null) {
       // use recoverable writer
   } else {
      // use normal output stream
   }
   
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
##########
@@ -109,13 +119,19 @@ public void close() {
 
             try {
                 out.close();
-                fileSystem.delete(metadataFilePath, false);
+                if (!isRecoverableStream(out)) {
+                    fileSystem.delete(metadataFilePath, false);
+                }
             } catch (Throwable t) {
                 LOG.warn("Could not close the state stream for {}.", 
metadataFilePath, t);
             }
         }
     }
 
+    private boolean isRecoverableStream(FSDataOutputStream out) {

Review comment:
       It seems relaying on conditions and branches in different places might 
complicate the implementation. Might you introduce an interface for the two 
types of implementation? like
   
   ```
   interface MetadataOutputStreamBackend  {
        
       FSDataOutputStream getOutput();
   
       void commit();
   
       void close();
   }
   ```
   
   and have two implementations ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to