AlexYinHan commented on code in PR #26040: URL: https://github.com/apache/flink/pull/26040#discussion_r1925152740
########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java: ########## @@ -92,97 +89,85 @@ public void transferFromCheckpoint( StreamStateHandle sourceHandle, Path targetPath, CloseableRegistry closeableRegistry) throws IOException { LOG.trace("Copy file from checkpoint: {}, {}, {}", sourceHandle, targetPath, dbFileSystem); - copyFileFromCheckpoint(dbFileSystem, sourceHandle, targetPath, closeableRegistry); + copyFileFromCheckpoint(sourceHandle, targetPath, closeableRegistry); } @Override public String toString() { return "CopyDataTransferStrategy{" + ", dbFileSystem=" + dbFileSystem + '}'; } - private static HandleAndLocalPath copyFileToCheckpoint( - FileSystem dbFileSystem, - Path filePath, + private HandleAndLocalPath copyFileToCheckpoint( + Path dbFilePath, long maxTransferBytes, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws IOException { - StreamStateHandle handleByDuplicating = - duplicateFileToCheckpoint( - dbFileSystem, filePath, checkpointStreamFactory, stateScope); - if (handleByDuplicating != null) { - LOG.trace("Duplicate file to checkpoint: {} {}", filePath, handleByDuplicating); - return HandleAndLocalPath.of(handleByDuplicating, filePath.getName()); + + // Get State handle for the DB file + StreamStateHandle sourceStateHandle; + if (dbFileSystem instanceof ForStFlinkFileSystem) { + // Obtain the state handle stored in MappingEntry + // or Construct a FileStateHandle base on the source file + MappingEntry mappingEntry = + ((ForStFlinkFileSystem) dbFileSystem).getMappingEntry(dbFilePath); + Preconditions.checkNotNull(mappingEntry, "dbFile not found: " + dbFilePath); + sourceStateHandle = mappingEntry.getSource().toStateHandle(); + if (mappingEntry.getFileOwnership() == FileOwnership.NOT_OWNED) { + // The file is already owned by JM, simply return the state handle + return HandleAndLocalPath.of(sourceStateHandle, dbFilePath.getName()); + } + } else { + // Construct a FileStateHandle base on the DB file + FileSystem sourceFileSystem = dbFilePath.getFileSystem(); + long fileLength = sourceFileSystem.getFileStatus(dbFilePath).getLen(); + sourceStateHandle = new FileStateHandle(dbFilePath, fileLength); + } + + // Try path-copying first. If failed, fallback to bytes-copying + StreamStateHandle targetStateHandle = + tryPathCopyingToCheckpoint(sourceStateHandle, checkpointStreamFactory, stateScope); + if (targetStateHandle != null) { + LOG.trace("Path-copy file to checkpoint: {} {}", dbFilePath, targetStateHandle); + } else { + targetStateHandle = + bytesCopyingToCheckpoint( + dbFilePath, + maxTransferBytes, + checkpointStreamFactory, + stateScope, + closeableRegistry, + tmpResourcesRegistry); + LOG.trace("Bytes-copy file to checkpoint: {}, {}", dbFilePath, targetStateHandle); } - HandleAndLocalPath handleAndLocalPath = - HandleAndLocalPath.of( - writeFileToCheckpoint( - dbFileSystem, - filePath, - maxTransferBytes, - checkpointStreamFactory, - stateScope, - closeableRegistry, - tmpResourcesRegistry), - filePath.getName()); - LOG.trace("Write file to checkpoint: {}, {}", filePath, handleAndLocalPath.getHandle()); - return handleAndLocalPath; + return HandleAndLocalPath.of(targetStateHandle, dbFilePath.getName()); } /** * Duplicate file to checkpoint storage by calling {@link CheckpointStreamFactory#duplicate} if * possible. Review Comment: Added params and returns in the javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org