davidradl commented on code in PR #26040:
URL: https://github.com/apache/flink/pull/26040#discussion_r1924951833


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java:
##########
@@ -92,97 +89,85 @@ public void transferFromCheckpoint(
             StreamStateHandle sourceHandle, Path targetPath, CloseableRegistry 
closeableRegistry)
             throws IOException {
         LOG.trace("Copy file from checkpoint: {}, {}, {}", sourceHandle, 
targetPath, dbFileSystem);
-        copyFileFromCheckpoint(dbFileSystem, sourceHandle, targetPath, 
closeableRegistry);
+        copyFileFromCheckpoint(sourceHandle, targetPath, closeableRegistry);
     }
 
     @Override
     public String toString() {
         return "CopyDataTransferStrategy{" + ", dbFileSystem=" + dbFileSystem 
+ '}';
     }
 
-    private static HandleAndLocalPath copyFileToCheckpoint(
-            FileSystem dbFileSystem,
-            Path filePath,
+    private HandleAndLocalPath copyFileToCheckpoint(
+            Path dbFilePath,
             long maxTransferBytes,
             CheckpointStreamFactory checkpointStreamFactory,
             CheckpointedStateScope stateScope,
             CloseableRegistry closeableRegistry,
             CloseableRegistry tmpResourcesRegistry)
             throws IOException {
-        StreamStateHandle handleByDuplicating =
-                duplicateFileToCheckpoint(
-                        dbFileSystem, filePath, checkpointStreamFactory, 
stateScope);
-        if (handleByDuplicating != null) {
-            LOG.trace("Duplicate file to checkpoint: {} {}", filePath, 
handleByDuplicating);
-            return HandleAndLocalPath.of(handleByDuplicating, 
filePath.getName());
+
+        // Get State handle for the DB file
+        StreamStateHandle sourceStateHandle;
+        if (dbFileSystem instanceof ForStFlinkFileSystem) {
+            // Obtain the state handle stored in MappingEntry
+            // or Construct a FileStateHandle base on the source file
+            MappingEntry mappingEntry =
+                    ((ForStFlinkFileSystem) 
dbFileSystem).getMappingEntry(dbFilePath);
+            Preconditions.checkNotNull(mappingEntry, "dbFile not found: " + 
dbFilePath);
+            sourceStateHandle = mappingEntry.getSource().toStateHandle();
+            if (mappingEntry.getFileOwnership() == FileOwnership.NOT_OWNED) {
+                // The file is already owned by JM, simply return the state 
handle
+                return HandleAndLocalPath.of(sourceStateHandle, 
dbFilePath.getName());
+            }
+        } else {
+            // Construct a FileStateHandle base on the DB file
+            FileSystem sourceFileSystem = dbFilePath.getFileSystem();
+            long fileLength = 
sourceFileSystem.getFileStatus(dbFilePath).getLen();
+            sourceStateHandle = new FileStateHandle(dbFilePath, fileLength);
+        }
+
+        // Try path-copying first. If failed, fallback to bytes-copying
+        StreamStateHandle targetStateHandle =
+                tryPathCopyingToCheckpoint(sourceStateHandle, 
checkpointStreamFactory, stateScope);

Review Comment:
   tryPathCopyingToCheckpoint can fail with an IOException as well as returning 
null. It would be better to always return null for a failure I think - 
otherwise for an IOException we will not try bytesCopyingToCheckpoint. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to