[ https://issues.apache.org/jira/browse/FLINK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633189#comment-15633189 ]
ASF GitHub Bot commented on FLINK-4960: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2747#discussion_r86372831 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java --- @@ -309,6 +310,85 @@ public void initializeState(OperatorStateHandles operatorStateHandles) throws Ex initializeCalled = true; } + /** + * Takes the different {@link OperatorStateHandles} created by calling {@link #snapshot(long, long)} + * on different instances of {@link AbstractStreamOperatorTestHarness} (each one representing one subtask) + * and repacks them into a single {@link OperatorStateHandles} so that the parallelism of the test + * can change arbitrarily (i.e. be able to scale both up and down). + * <p/> + * After repacking the partial states, use {@link #initializeState(OperatorStateHandles)} to initialize + * a new instance with the resulting state. Bare in mind that for parallelism greater than one, you + * have to use the constructor {@link #AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}. + * + * <p/> + * <b>NOTE: </b> each of the {@code handles} in the argument list is assumed to be from a single task of a single + * operator (i.e. chain length of one). + * + * <p/> + * For an example of how to use it, have a look at + * {@link AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}. + * + * @param handles the different states to be merged. + * @return the resulting state, or {@code null} if no partial states are specified. + */ + public static OperatorStateHandles repackageState(OperatorStateHandles... handles) throws Exception { + + if (handles.length < 1) { + return null; + } else if (handles.length == 1) { + return handles[0]; + } + + List<OperatorStateHandle> mergedManagedOperatorState = new ArrayList<>(handles.length); + List<OperatorStateHandle> mergedRawOperatorState = new ArrayList<>(handles.length); + + List<KeyGroupsStateHandle> mergedManagedKeyedState = new ArrayList<>(handles.length); + List<KeyGroupsStateHandle> mergedRawKeyedState = new ArrayList<>(handles.length); + + for (OperatorStateHandles handle: handles) { + + // each one of the collections are expected to have + // one member as they run with parallelism of 1 + + Collection<OperatorStateHandle> managedOperatorState = handle.getManagedOperatorState(); + Collection<OperatorStateHandle> rawOperatorState = handle.getRawOperatorState(); + Collection<KeyGroupsStateHandle> managedKeyedState = handle.getManagedKeyedState(); + Collection<KeyGroupsStateHandle> rawKeyedState = handle.getRawKeyedState(); + + + if ((managedOperatorState != null && managedOperatorState.size() > 1) || --- End diff -- Is this restriction necessary? I think it would also work if there are several entries in the list and we just add them all together. > Allow the AbstractStreamOperatorTestHarness to test scaling down > ---------------------------------------------------------------- > > Key: FLINK-4960 > URL: https://issues.apache.org/jira/browse/FLINK-4960 > Project: Flink > Issue Type: New Feature > Components: Tests > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently the AbstractStreamOperatorTestHarness allows for testing an > operator when scaling up, through snapshot and restore. This is not enough > as many interesting corner cases arise when scaling down or during > arbitrary combinations of scaling up and down. > This issue targets to add this functionality so that an operator can snapshot > its state, restore with different parallelism, and later scale down or > further up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)