Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202516863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java --- @@ -264,6 +265,42 @@ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) } } + public static <T> StateSnapshotKeyGroupReader createKeyGroupPartitionReader( + @Nonnull ElementReaderFunction<T> readerFunction, + @Nonnull KeyGroupElementsConsumer<T> elementConsumer) { + return new PartitioningResultKeyGroupReader<>(readerFunction, elementConsumer); + } + + /** + * General algorithm to read key-grouped state that was written from a {@link PartitioningResult} + * @param <T> --- End diff -- description for `T` is missing.
---