davidradl commented on code in PR #26040: URL: https://github.com/apache/flink/pull/26040#discussion_r1924951833
########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java: ########## @@ -92,97 +89,85 @@ public void transferFromCheckpoint( StreamStateHandle sourceHandle, Path targetPath, CloseableRegistry closeableRegistry) throws IOException { LOG.trace("Copy file from checkpoint: {}, {}, {}", sourceHandle, targetPath, dbFileSystem); - copyFileFromCheckpoint(dbFileSystem, sourceHandle, targetPath, closeableRegistry); + copyFileFromCheckpoint(sourceHandle, targetPath, closeableRegistry); } @Override public String toString() { return "CopyDataTransferStrategy{" + ", dbFileSystem=" + dbFileSystem + '}'; } - private static HandleAndLocalPath copyFileToCheckpoint( - FileSystem dbFileSystem, - Path filePath, + private HandleAndLocalPath copyFileToCheckpoint( + Path dbFilePath, long maxTransferBytes, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws IOException { - StreamStateHandle handleByDuplicating = - duplicateFileToCheckpoint( - dbFileSystem, filePath, checkpointStreamFactory, stateScope); - if (handleByDuplicating != null) { - LOG.trace("Duplicate file to checkpoint: {} {}", filePath, handleByDuplicating); - return HandleAndLocalPath.of(handleByDuplicating, filePath.getName()); + + // Get State handle for the DB file + StreamStateHandle sourceStateHandle; + if (dbFileSystem instanceof ForStFlinkFileSystem) { + // Obtain the state handle stored in MappingEntry + // or Construct a FileStateHandle base on the source file + MappingEntry mappingEntry = + ((ForStFlinkFileSystem) dbFileSystem).getMappingEntry(dbFilePath); + Preconditions.checkNotNull(mappingEntry, "dbFile not found: " + dbFilePath); + sourceStateHandle = mappingEntry.getSource().toStateHandle(); + if (mappingEntry.getFileOwnership() == FileOwnership.NOT_OWNED) { + // The file is already owned by JM, simply return the state handle + return HandleAndLocalPath.of(sourceStateHandle, dbFilePath.getName()); + } + } else { + // Construct a FileStateHandle base on the DB file + FileSystem sourceFileSystem = dbFilePath.getFileSystem(); + long fileLength = sourceFileSystem.getFileStatus(dbFilePath).getLen(); + sourceStateHandle = new FileStateHandle(dbFilePath, fileLength); + } + + // Try path-copying first. If failed, fallback to bytes-copying + StreamStateHandle targetStateHandle = + tryPathCopyingToCheckpoint(sourceStateHandle, checkpointStreamFactory, stateScope); Review Comment: tryPathCopyingToCheckpoint can fail with an IOException as well as returning null. It would be better to always return null for a failure I think - otherwise for an IOException we will not try bytesCopyingToCheckpoint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org