rkhachatryan commented on a change in pull request #18324:
URL: https://github.com/apache/flink/pull/18324#discussion_r788694656



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -368,19 +372,34 @@ public boolean 
deregisterKeySelectionListener(KeySelectionListener<K> listener)
         // collections don't change once started and handles are immutable
         List<ChangelogStateHandle> prevDeltaCopy =
                 new 
ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
+        long incrementalMaterializeSize = 0L;
         if (delta != null && delta.getStateSize() > 0) {
             prevDeltaCopy.add(delta);
+            incrementalMaterializeSize += delta.getIncrementalStateSize();
         }
 
         if (prevDeltaCopy.isEmpty()
                 && 
changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
             return SnapshotResult.empty();
         } else {
+            List<KeyedStateHandle> materializedSnapshot =
+                    changelogStateBackendStateCopy.getMaterializedSnapshot();
+            for (KeyedStateHandle keyedStateHandle : materializedSnapshot) {
+                if (!lastCompletedHandles.contains(keyedStateHandle)) {
+                    incrementalMaterializeSize += 
keyedStateHandle.getStateSize();

Review comment:
       I agree with 2, 3, 4, but still have concerns on 1, and more importantly 
5.
   
   > 1. even chk-10 failed finally, we would still get a large number of 
incremental state size
   
   But the materialization state size will be lost, right?
   To fix this, we'll have to track **sent** materialization sizes (by 
materialization and checkpoint ID); mark them as **reported** upon receiving 
checkpoint confirmation; and then cleanup once next materialization completes.
   To me, this complexity seems unjustified.
   
   > 5. We can add documentation in changelog part then.
   
   I think that most users want to know two things regarding checkpoint size:
   1. Total size on DFS (i.e. including materialized state)
   2. If a checkpoint was slow, how much data was uploaded in the async phase - 
i.e. **without** materialization; including materialization size would actually 
do harm here, as it will complicate the reasoning about async phase duration




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to