rkhachatryan commented on a change in pull request #18324: URL: https://github.com/apache/flink/pull/18324#discussion_r784298983
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java ########## @@ -113,6 +124,19 @@ public long getStateSize() { + nonMaterialized.stream().mapToLong(StateObject::getStateSize).sum(); } + @Override + public long getIncrementalStateSize() { + long incrementalStateSize = + incrementalMaterializeSize == undefinedIncrementalMaterializeSize + ? materialized.stream() + .mapToLong(StateObject::getIncrementalStateSize) + .sum() + : incrementalMaterializeSize; Review comment: Depending on how we define "incremental state size", materialized part should be included or not: 1. if it's everything that was uploaded for **this** checkpoint, then it should 1. if it's the difference from the previous checkpoint, it should **not** be included Right? It seems problematic to find out what exactly was uploaded for **this** checkpoint because multiple checkpoints will likely include the same materialized state, and therefore report the same incremental state multiple times. Besides that, the 2nd option seems more intuitive to me personally. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org