Myasuka commented on a change in pull request #8693: URL: https://github.com/apache/flink/pull/8693#discussion_r424875892
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java ########## @@ -58,4 +83,212 @@ public void testNotifyCheckpointComplete() throws Exception { assertEquals(newCheckpointId, stateManager.getNotifiedCompletedCheckpointId()); } } + + @Test + public void testNotifyCheckpointAbortedManyTimes() throws Exception { + MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder() + .setEnvironment(mockEnvironment) + .build(); + + final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment); + + long notifyAbortedTimes = SubtaskCheckpointCoordinatorImpl.DEFAULT_MAX_RECORDED_ABOTRED_CHECKPOINTS + 42; + for (int i = 1; i < notifyAbortedTimes; i++) { + subtaskCheckpointCoordinator.notifyCheckpointAborted(i, operatorChain, () -> true); + assertEquals(Math.min(SubtaskCheckpointCoordinatorImpl.DEFAULT_MAX_RECORDED_ABOTRED_CHECKPOINTS, i), subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + } + } + + @Test + public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception { + MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder() + .setEnvironment(mockEnvironment) + .build(); + + CheckpointOperator checkpointOperator = new CheckpointOperator(); + + final OperatorChain<String, AbstractStreamOperator<String>> operatorChain = operatorChain(checkpointOperator); + + long checkpointId = 42L; + // notify checkpoint aborted before execution. + subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true); + assertEquals(1, subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + + subtaskCheckpointCoordinator.checkpointState( + new CheckpointMetaData(checkpointId, System.currentTimeMillis()), + CheckpointOptions.forCheckpointWithDefaultLocation(), + new CheckpointMetrics(), + operatorChain, + () -> true); + assertFalse(checkpointOperator.isCheckpointed()); + assertEquals(0, subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + assertEquals(0, subtaskCheckpointCoordinator.getCloseableRegistry().getAsyncCheckpointRunnables().size()); Review comment: Got your point, already removed this two methods and verify the logic using other ways. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java ########## @@ -58,4 +83,212 @@ public void testNotifyCheckpointComplete() throws Exception { assertEquals(newCheckpointId, stateManager.getNotifiedCompletedCheckpointId()); } } + + @Test + public void testNotifyCheckpointAbortedManyTimes() throws Exception { + MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder() + .setEnvironment(mockEnvironment) + .build(); + + final OperatorChain<?, ?> operatorChain = getOperatorChain(mockEnvironment); + + long notifyAbortedTimes = SubtaskCheckpointCoordinatorImpl.DEFAULT_MAX_RECORDED_ABOTRED_CHECKPOINTS + 42; + for (int i = 1; i < notifyAbortedTimes; i++) { + subtaskCheckpointCoordinator.notifyCheckpointAborted(i, operatorChain, () -> true); + assertEquals(Math.min(SubtaskCheckpointCoordinatorImpl.DEFAULT_MAX_RECORDED_ABOTRED_CHECKPOINTS, i), subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + } + } + + @Test + public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception { + MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder() + .setEnvironment(mockEnvironment) + .build(); + + CheckpointOperator checkpointOperator = new CheckpointOperator(); + + final OperatorChain<String, AbstractStreamOperator<String>> operatorChain = operatorChain(checkpointOperator); + + long checkpointId = 42L; + // notify checkpoint aborted before execution. + subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true); + assertEquals(1, subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + + subtaskCheckpointCoordinator.checkpointState( + new CheckpointMetaData(checkpointId, System.currentTimeMillis()), + CheckpointOptions.forCheckpointWithDefaultLocation(), + new CheckpointMetrics(), + operatorChain, + () -> true); + assertFalse(checkpointOperator.isCheckpointed()); + assertEquals(0, subtaskCheckpointCoordinator.getAbortedCheckpointSize()); + assertEquals(0, subtaskCheckpointCoordinator.getCloseableRegistry().getAsyncCheckpointRunnables().size()); Review comment: Got your point, will removed this two methods and verify the logic using other ways. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org