zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1213882810


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -395,18 +394,16 @@ public void release() {
     /** Previous snapshot with uploaded sst files. */
     protected static class PreviousSnapshot {
 
-        @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
+        @Nullable private final Map<StateHandleID, StreamStateHandle> 
confirmedSstFiles;
 
-        protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> 
confirmedSstFiles) {
+        protected PreviousSnapshot(
+                @Nullable Map<StateHandleID, StreamStateHandle> 
confirmedSstFiles) {
             this.confirmedSstFiles = confirmedSstFiles;
         }
 
         protected Optional<StreamStateHandle> getUploaded(StateHandleID 
stateHandleID) {
             if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-                // we introduce a placeholder state handle, that is replaced 
with the
-                // original from the shared state registry (created from a 
previous checkpoint)
-                return Optional.of(
-                        new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks for your patience @rkhachatryan .
   
   > But I still recommend limiting its usage to ByteStreamStateHandle, and 
don't use it to calculate checkpointed size.
   
   Before this pr, RocksDBSnapshotStrategyBase use PlaceholderSteamStateHandle 
no matter the origin handle is ByteStreamStateHandle or FileStateHandle. I 
suggest only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle. This pr already implemented not use 
PlaceholderStreamStateHandle calculate checkpointed size, I want keep it.
   
   > That might not be necessary, if the registration key is the same key as in 
IncrementalRemoteKeyedStateHandle.sharedState, as per 
https://github.com/apache/flink/pull/22669#discussion_r1210450731.
   
   Do you mean change IncrementalRemoteKeyedStateHandle like this ?
   ```
       /** Shared state in the incremental checkpoint. */
       private final Map<SharedStateRegistryKey, HandleWithLocalPath> 
sharedState;
   
       /** Private state in the incremental checkpoint. */
       private final List<HandleWithLocalPath> privateState;
   
       private static final class HandleWithLocalPath {
           StreamStateHandle handle;
           String localPath;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to