AlexYinHan commented on code in PR #25924:
URL: https://github.com/apache/flink/pull/25924#discussion_r1920209858


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##########
@@ -167,6 +213,44 @@ public ForStResourceContainer(
         this.metricGroup = metricGroup;
     }
 
+    private static boolean isDbPathUnderCheckpointPath(
+            @Nullable CheckpointStorageAccess checkpointStorageAccess,
+            @Nullable Path dbRemotePath) {
+        if (dbRemotePath == null) {
+            return false;
+        }
+
+        // For checkpoint other that FsCheckpointStorageAccess, we treat it as 
'DB path not under
+        // checkpoint path', since we cannot reuse checkpoint files in such 
case.
+        if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess)
+                || checkpointStorageAccess instanceof 
FsMergingCheckpointStorageAccess) {

Review Comment:
   Done



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##########
@@ -167,6 +213,45 @@ public ForStResourceContainer(
         this.metricGroup = metricGroup;
     }
 
+    private static boolean isDbPathUnderCheckpointPath(
+            @Nullable CheckpointStorageAccess checkpointStorageAccess,
+            @Nullable Path dbRemotePath) {
+        if (dbRemotePath == null) {
+            return false;
+        }
+
+        // For checkpoint other that FsCheckpointStorageAccess, we treat it as 
'DB path not under
+        // checkpoint path', since we cannot reuse checkpoint files in such 
case.
+        // todo: Support enabling 'cp file reuse' with 
FsMergingCheckpointStorageAccess
+        if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess)
+                || checkpointStorageAccess instanceof 
FsMergingCheckpointStorageAccess) {
+            return false;
+        }
+
+        FsCheckpointStorageAccess fsCheckpointStorageAccess =
+                (FsCheckpointStorageAccess) checkpointStorageAccess;
+        FileSystem checkpointFS = fsCheckpointStorageAccess.getFileSystem();
+        FileSystem dbRemoteFS;
+        try {
+            dbRemoteFS = dbRemotePath.getFileSystem();
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to get FileSystem from dbRemotePath: " + 
dbRemotePath, e);
+        }
+
+        if (!checkpointFS.getUri().equals(dbRemoteFS.getUri())) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to