[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16126152#comment-16126152 ]
ASF GitHub Bot commented on FLINK-7213: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4353#discussion_r133022095 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java --- @@ -553,31 +551,29 @@ public void testTriggerAndConfirmSimpleCheckpoint() { assertFalse(checkpoint.isDiscarded()); assertFalse(checkpoint.isFullyAcknowledged()); - OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId()); - OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId()); - - Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates(); - - operatorStates.put(opID1, new SpyInjectingOperatorState( - opID1, vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism())); - operatorStates.put(opID2, new SpyInjectingOperatorState( - opID2, vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism())); - // check that the vertices received the trigger checkpoint message { verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class)); } + OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId()); + OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId()); + TaskStateSnapshot taskOperatorSubtaskStates1 = mock(TaskStateSnapshot.class); --- End diff -- Why not create a proper `TaskStateSnapshot` with one entry, rather than mocking? > Introduce state management by OperatorID in TaskManager > ------------------------------------------------------- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.4.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)