dawidwys commented on a change in pull request #14782:
URL: https://github.com/apache/flink/pull/14782#discussion_r566229454



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedStateMapSnapshot.java
##########
@@ -48,6 +53,35 @@ public NestedStateMapSnapshot(NestedStateMap<K, N, S> 
owningStateMap) {
         super(owningStateMap);
     }
 
+    @Override
+    public Iterator<StateEntry<K, N, S>> getIterator(
+            @Nonnull TypeSerializer<K> keySerializer,
+            @Nonnull TypeSerializer<N> namespaceSerializer,
+            @Nonnull TypeSerializer<S> stateSerializer,
+            @Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) {
+        if (stateSnapshotTransformer == null) {
+            return owningStateMap.iterator();
+        } else {
+            return StreamSupport.stream(
+                            
Spliterators.spliteratorUnknownSize(owningStateMap.iterator(), 0),
+                            false)
+                    .map(entry -> transformEntry(stateSnapshotTransformer, 
entry))
+                    .filter(Objects::nonNull)
+                    .iterator();
+        }
+    }
+
+    private StateEntry<K, N, S> transformEntry(
+            StateSnapshotTransformer<S> stateSnapshotTransformer, 
StateEntry<K, N, S> entry) {
+        S transformedState = 
stateSnapshotTransformer.filterOrTransform(entry.getState());
+        if (transformedState != null) {
+            return new StateEntry.SimpleStateEntry<>(

Review comment:
       Do you think it would work if I just add a default method in the 
`StateEntry` interface?:
   
   ```
   public interface StateEntry<K, N, S> {
   
       /** Returns the key of this entry. */
       K getKey();
   
       /** Returns the namespace of this entry. */
       N getNamespace();
   
       /** Returns the state of this entry. */
       S getState();
   
       default StateEntry<K, N, S> 
filterOrTransform(StateSnapshotTransformer<S> transformer) {
           S newState = transformer.filterOrTransform(getState());
           if (newState != null) {
               return new SimpleStateEntry<>(getKey(), getNamespace(), 
newState);
           } else {
               return null;
           }
       }
   }
   ```
   
   The `StateMapEntry` holds additional metadata such as entryVersion, 
stateVersion etc. which we shouldn't mingle with during the `transformEntry`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to