[ 
https://issues.apache.org/jira/browse/FLINK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633189#comment-15633189
 ] 

ASF GitHub Bot commented on FLINK-4960:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2747#discussion_r86372831
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 ---
    @@ -309,6 +310,85 @@ public void initializeState(OperatorStateHandles 
operatorStateHandles) throws Ex
                initializeCalled = true;
        }
     
    +   /**
    +    * Takes the different {@link OperatorStateHandles} created by calling 
{@link #snapshot(long, long)}
    +    * on different instances of {@link AbstractStreamOperatorTestHarness} 
(each one representing one subtask)
    +    * and repacks them into a single {@link OperatorStateHandles} so that 
the parallelism of the test
    +    * can change arbitrarily (i.e. be able to scale both up and down).
    +    * <p/>
    +    * After repacking the partial states, use {@link 
#initializeState(OperatorStateHandles)} to initialize
    +    * a new instance with the resulting state. Bare in mind that for 
parallelism greater than one, you
    +    * have to use the constructor {@link 
#AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}.
    +    *
    +    * <p/>
    +    * <b>NOTE: </b> each of the {@code handles} in the argument list is 
assumed to be from a single task of a single
    +    * operator (i.e. chain length of one).
    +    *
    +    * <p/>
    +    * For an example of how to use it, have a look at
    +    * {@link 
AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}.
    +    *
    +    * @param handles the different states to be merged.
    +    * @return the resulting state, or {@code null} if no partial states 
are specified.
    +    */
    +   public static OperatorStateHandles 
repackageState(OperatorStateHandles... handles) throws Exception {
    +
    +           if (handles.length < 1) {
    +                   return null;
    +           } else if (handles.length == 1) {
    +                   return handles[0];
    +           }
    +
    +           List<OperatorStateHandle> mergedManagedOperatorState = new 
ArrayList<>(handles.length);
    +           List<OperatorStateHandle> mergedRawOperatorState = new 
ArrayList<>(handles.length);
    +
    +           List<KeyGroupsStateHandle> mergedManagedKeyedState = new 
ArrayList<>(handles.length);
    +           List<KeyGroupsStateHandle> mergedRawKeyedState = new 
ArrayList<>(handles.length);
    +
    +           for (OperatorStateHandles handle: handles) {
    +
    +                   // each one of the collections are expected to have
    +                   // one member as they run with parallelism of 1
    +
    +                   Collection<OperatorStateHandle> managedOperatorState = 
handle.getManagedOperatorState();
    +                   Collection<OperatorStateHandle> rawOperatorState = 
handle.getRawOperatorState();
    +                   Collection<KeyGroupsStateHandle> managedKeyedState = 
handle.getManagedKeyedState();
    +                   Collection<KeyGroupsStateHandle> rawKeyedState = 
handle.getRawKeyedState();
    +
    +
    +                   if ((managedOperatorState != null && 
managedOperatorState.size() > 1) ||
    --- End diff --
    
    Is this restriction necessary? I think it would also work if there are 
several entries in the list and we just add them all together.


> Allow the AbstractStreamOperatorTestHarness to test scaling down
> ----------------------------------------------------------------
>
>                 Key: FLINK-4960
>                 URL: https://issues.apache.org/jira/browse/FLINK-4960
>             Project: Flink
>          Issue Type: New Feature
>          Components: Tests
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> Currently the AbstractStreamOperatorTestHarness allows for testing an 
> operator when scaling up, through snapshot and restore. This is not enough 
> as many interesting corner cases arise when scaling down or during 
> arbitrary combinations of scaling up and down.
> This issue targets to add this functionality so that an operator can snapshot 
> its state, restore with different parallelism, and later scale down or 
> further up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to