carp84 commented on a change in pull request #15205: URL: https://github.com/apache/flink/pull/15205#discussion_r596552594
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java ########## @@ -41,9 +41,18 @@ /** The offset to the contiguous key groups. */ private final int keyGroupOffset; + public int getKeyGroupOffset() { + return keyGroupOffset; + } + /** Snapshots of state partitioned by key-group. */ @Nonnull private final List<CopyOnWriteStateMapSnapshot<K, N, S>> stateMapSnapshots; + @Nonnull + public List<CopyOnWriteStateMapSnapshot<K, N, S>> getStateMapSnapshots() { Review comment: Ditto ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java ########## @@ -41,9 +41,18 @@ /** The offset to the contiguous key groups. */ private final int keyGroupOffset; + public int getKeyGroupOffset() { Review comment: Is this some method we need to introduce in this PR? Or maybe pair with the changes including its invocation in later PR? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java ########## @@ -285,6 +285,11 @@ public S get(K key, N namespace) { } e.stateVersion = stateMapVersion; e.state = getStateSerializer().copy(e.state); + } else if (e.stateVersion < stateMapVersion) { + // the entry is not used by any active snapshot; + // if it WAS used then it should NOT be included into the next one if it takes + // versioning into account + e.stateVersion = stateMapVersion; } Review comment: I believe it worth adding a test to cover this case. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org