Myasuka commented on a change in pull request #18324: URL: https://github.com/apache/flink/pull/18324#discussion_r789338610
########## File path: flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java ########## @@ -368,19 +372,34 @@ public boolean deregisterKeySelectionListener(KeySelectionListener<K> listener) // collections don't change once started and handles are immutable List<ChangelogStateHandle> prevDeltaCopy = new ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized()); + long incrementalMaterializeSize = 0L; if (delta != null && delta.getStateSize() > 0) { prevDeltaCopy.add(delta); + incrementalMaterializeSize += delta.getIncrementalStateSize(); } if (prevDeltaCopy.isEmpty() && changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) { return SnapshotResult.empty(); } else { + List<KeyedStateHandle> materializedSnapshot = + changelogStateBackendStateCopy.getMaterializedSnapshot(); + for (KeyedStateHandle keyedStateHandle : materializedSnapshot) { + if (!lastCompletedHandles.contains(keyedStateHandle)) { + incrementalMaterializeSize += keyedStateHandle.getStateSize(); Review comment: > If so, "incremental checkpoint size" should be the size of data uploaded during the async phase of the given checkpoint. I think "async checkpointed size" instead of "incremental checkpoint size" might be more suitable for your defination. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org