rkhachatryan commented on a change in pull request #18324:
URL: https://github.com/apache/flink/pull/18324#discussion_r784298983



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -113,6 +124,19 @@ public long getStateSize() {
                     + 
nonMaterialized.stream().mapToLong(StateObject::getStateSize).sum();
         }
 
+        @Override
+        public long getIncrementalStateSize() {
+            long incrementalStateSize =
+                    incrementalMaterializeSize == 
undefinedIncrementalMaterializeSize
+                            ? materialized.stream()
+                                    
.mapToLong(StateObject::getIncrementalStateSize)
+                                    .sum()
+                            : incrementalMaterializeSize;

Review comment:
       Depending on how we define "incremental state size", materialized part 
should be included or not:
   1. if it's everything that was uploaded for **this** checkpoint, then it 
should
   1. if it's the difference from the previous checkpoint, it should **not** be 
included
   
   Right?
   
   It seems problematic to find out what exactly was uploaded for **this** 
checkpoint because multiple checkpoints will likely include the same 
materialized state, and therefore report the same incremental state multiple 
times.
   Besides that, the 2nd option seems more intuitive to me personally.
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to