Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184093390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java --- @@ -103,17 +104,17 @@ public void add(T value) throws IOException { private static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> { - private final FoldingStateDescriptor<T, ACC> stateDescriptor; + private final HeapFoldingState<?, ?, T, ACC> stateRef; private final FoldFunction<T, ACC> foldFunction; - FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) { - this.stateDescriptor = Preconditions.checkNotNull(stateDesc); - this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction()); + FoldTransformation(FoldFunction<T, ACC> foldFunction, HeapFoldingState<?, ?, T, ACC> stateRef) { + this.stateRef = Preconditions.checkNotNull(stateRef); --- End diff -- Maybe the more honest and simple way is making this a non-static inner class instead of passing a reference to `HeapFoldingState.this` in the constructor. Essentially it does the same.
---