Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107410978 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** + * Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct + * key group index for the given subtask {@link KeyGroupRange}. + * <p> + * <p>This is publicly visible to be used in tests. + */ + public static List<KeyedStateHandle> getKeyedStateHandles( + Collection<? extends KeyedStateHandle> keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { + + List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>(); + + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyGroupRange intersection = keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange); --- End diff -- I wonder if we could somehow introduce a `KeyedStateHandle::intersect(KeyGroupRange)` that again returns a `KeyedStateHandle` with a `KeyGroupRage` that is the intersection of the original range and the argument. Basically a higher level version of what the KeyGroupsStateHandle can do, and the concrete implementations (like `KeyGroupsStateHandle`) know how the virtually split themselves up into a sub-range. This also would transfer less data in the RPC (less offsets) and saves the post-filtering in the backend. Otherwise, we could have a boolean method for just checking intersection, because there is no need to create `KeyGroupRange` objects anymore, because we do not actually use them.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---