[ 
https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16126152#comment-16126152
 ] 

ASF GitHub Bot commented on FLINK-7213:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r133022095
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 ---
    @@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() {
                        assertFalse(checkpoint.isDiscarded());
                        assertFalse(checkpoint.isFullyAcknowledged());
     
    -                   OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
    -                   OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
    -
    -                   Map<OperatorID, OperatorState> operatorStates = 
checkpoint.getOperatorStates();
    -
    -                   operatorStates.put(opID1, new SpyInjectingOperatorState(
    -                           opID1, 
vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism()));
    -                   operatorStates.put(opID2, new SpyInjectingOperatorState(
    -                           opID2, 
vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism()));
    -
                        // check that the vertices received the trigger 
checkpoint message
                        {
                                verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
                                verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), 
any(CheckpointOptions.class));
                        }
     
    +                   OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
    +                   OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
    +                   TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
    --- End diff --
    
    Why not create a proper `TaskStateSnapshot` with one entry, rather than 
mocking?


> Introduce state management by OperatorID in TaskManager
> -------------------------------------------------------
>
>                 Key: FLINK-7213
>                 URL: https://issues.apache.org/jira/browse/FLINK-7213
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.4.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to