AlexYinHan commented on code in PR #25924: URL: https://github.com/apache/flink/pull/25924#discussion_r1920209858
########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java: ########## @@ -167,6 +213,44 @@ public ForStResourceContainer( this.metricGroup = metricGroup; } + private static boolean isDbPathUnderCheckpointPath( + @Nullable CheckpointStorageAccess checkpointStorageAccess, + @Nullable Path dbRemotePath) { + if (dbRemotePath == null) { + return false; + } + + // For checkpoint other that FsCheckpointStorageAccess, we treat it as 'DB path not under + // checkpoint path', since we cannot reuse checkpoint files in such case. + if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess) + || checkpointStorageAccess instanceof FsMergingCheckpointStorageAccess) { Review Comment: Done ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java: ########## @@ -167,6 +213,45 @@ public ForStResourceContainer( this.metricGroup = metricGroup; } + private static boolean isDbPathUnderCheckpointPath( + @Nullable CheckpointStorageAccess checkpointStorageAccess, + @Nullable Path dbRemotePath) { + if (dbRemotePath == null) { + return false; + } + + // For checkpoint other that FsCheckpointStorageAccess, we treat it as 'DB path not under + // checkpoint path', since we cannot reuse checkpoint files in such case. + // todo: Support enabling 'cp file reuse' with FsMergingCheckpointStorageAccess + if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess) + || checkpointStorageAccess instanceof FsMergingCheckpointStorageAccess) { + return false; + } + + FsCheckpointStorageAccess fsCheckpointStorageAccess = + (FsCheckpointStorageAccess) checkpointStorageAccess; + FileSystem checkpointFS = fsCheckpointStorageAccess.getFileSystem(); + FileSystem dbRemoteFS; + try { + dbRemoteFS = dbRemotePath.getFileSystem(); + } catch (IOException e) { + throw new RuntimeException( + "Failed to get FileSystem from dbRemotePath: " + dbRemotePath, e); + } + + if (!checkpointFS.getUri().equals(dbRemoteFS.getUri())) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org