zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1213882810
########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ########## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { - @Nullable private final Map<StateHandleID, Long> confirmedSstFiles; + @Nullable private final Map<StateHandleID, StreamStateHandle> confirmedSstFiles; - protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) { + protected PreviousSnapshot( + @Nullable Map<StateHandleID, StreamStateHandle> confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional<StreamStateHandle> getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - return Optional.of( - new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks for your patience @rkhachatryan . > But I still recommend limiting its usage to ByteStreamStateHandle, and don't use it to calculate checkpointed size. Before this pr, RocksDBSnapshotStrategyBase use PlaceholderSteamStateHandle no matter the origin handle is ByteStreamStateHandle or FileStateHandle. I suggest only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle. This pr already implemented not use PlaceholderStreamStateHandle calculate checkpointed size, I want keep it. > That might not be necessary, if the registration key is the same key as in IncrementalRemoteKeyedStateHandle.sharedState, as per https://github.com/apache/flink/pull/22669#discussion_r1210450731. Do you mean change IncrementalRemoteKeyedStateHandle like this ? ``` /** Shared state in the incremental checkpoint. */ private final Map<SharedStateRegistryKey, HandleWithLocalPath> sharedState; /** Private state in the incremental checkpoint. */ private final List<HandleWithLocalPath> privateState; private static final class HandleWithLocalPath { StreamStateHandle handle; String localPath; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org