Myasuka commented on a change in pull request #18324:
URL: https://github.com/apache/flink/pull/18324#discussion_r788743464



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -368,19 +372,34 @@ public boolean 
deregisterKeySelectionListener(KeySelectionListener<K> listener)
         // collections don't change once started and handles are immutable
         List<ChangelogStateHandle> prevDeltaCopy =
                 new 
ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
+        long incrementalMaterializeSize = 0L;
         if (delta != null && delta.getStateSize() > 0) {
             prevDeltaCopy.add(delta);
+            incrementalMaterializeSize += delta.getIncrementalStateSize();
         }
 
         if (prevDeltaCopy.isEmpty()
                 && 
changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
             return SnapshotResult.empty();
         } else {
+            List<KeyedStateHandle> materializedSnapshot =
+                    changelogStateBackendStateCopy.getMaterializedSnapshot();
+            for (KeyedStateHandle keyedStateHandle : materializedSnapshot) {
+                if (!lastCompletedHandles.contains(keyedStateHandle)) {
+                    incrementalMaterializeSize += 
keyedStateHandle.getStateSize();

Review comment:
       If we do not include the materialization part, we will do not know when 
the materialization completed on each task via the web UI. And we can explain 
the large incremental checkpoint size compared with short async duration in 
documentations of changelog state-backend.
   
   I do think we need to think of it carefully how we define the incremental 
state size fo changelog state-backend.
   
   If the materialization begins after `chk-9` and completes before `chk-11`. 
Will `chk-11` include that part? 
   Moreover, If we think the incremental checkpoint size is the incremental 
size compared with last completed checkpoint, what about the case that `chk-11` 
failed globally but `chk-12` succeed in the end, will `chk-12` include that 
materialized part?
   
   Maybe we can change the definiation of incremental checkpoint size to "The 
incremental checkpoint size compared with last checkpoint (not the last 
completed one)", we can say only `chk-11` would include that materialization 
part no matter whether it succeed in the end.
   
   By doing so, if we can avoid to reupload SST files in the future, we can 
also apply the semantics of incremental checkpoint size.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to