Myasuka commented on a change in pull request #8693:
URL: https://github.com/apache/flink/pull/8693#discussion_r424875892



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
##########
@@ -58,4 +83,212 @@ public void testNotifyCheckpointComplete() throws Exception 
{
                        assertEquals(newCheckpointId, 
stateManager.getNotifiedCompletedCheckpointId());
                }
        }
+
+       @Test
+       public void testNotifyCheckpointAbortedManyTimes() throws Exception {
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+               SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = 
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
+                       .setEnvironment(mockEnvironment)
+                       .build();
+
+               final OperatorChain<?, ?> operatorChain = 
getOperatorChain(mockEnvironment);
+
+               long notifyAbortedTimes = 
SubtaskCheckpointCoordinatorImpl.DEFAULT_MAX_RECORDED_ABOTRED_CHECKPOINTS + 42;
+               for (int i = 1; i < notifyAbortedTimes; i++) {
+                       subtaskCheckpointCoordinator.notifyCheckpointAborted(i, 
operatorChain, () -> true);
+                       
assertEquals(Math.min(SubtaskCheckpointCoordinatorImpl.DEFAULT_MAX_RECORDED_ABOTRED_CHECKPOINTS,
 i), subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+               }
+       }
+
+       @Test
+       public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws 
Exception {
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+               SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = 
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
+                       .setEnvironment(mockEnvironment)
+                       .build();
+
+               CheckpointOperator checkpointOperator = new 
CheckpointOperator();
+
+               final OperatorChain<String, AbstractStreamOperator<String>> 
operatorChain = operatorChain(checkpointOperator);
+
+               long checkpointId = 42L;
+               // notify checkpoint aborted before execution.
+               
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, 
operatorChain, () -> true);
+               assertEquals(1, 
subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+
+               subtaskCheckpointCoordinator.checkpointState(
+                       new CheckpointMetaData(checkpointId, 
System.currentTimeMillis()),
+                       CheckpointOptions.forCheckpointWithDefaultLocation(),
+                       new CheckpointMetrics(),
+                       operatorChain,
+                       () -> true);
+               assertFalse(checkpointOperator.isCheckpointed());
+               assertEquals(0, 
subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+               assertEquals(0, 
subtaskCheckpointCoordinator.getCloseableRegistry().getAsyncCheckpointRunnables().size());

Review comment:
       Got your point, already removed this two methods and verify the logic 
using other ways.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
##########
@@ -58,4 +83,212 @@ public void testNotifyCheckpointComplete() throws Exception 
{
                        assertEquals(newCheckpointId, 
stateManager.getNotifiedCompletedCheckpointId());
                }
        }
+
+       @Test
+       public void testNotifyCheckpointAbortedManyTimes() throws Exception {
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+               SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = 
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
+                       .setEnvironment(mockEnvironment)
+                       .build();
+
+               final OperatorChain<?, ?> operatorChain = 
getOperatorChain(mockEnvironment);
+
+               long notifyAbortedTimes = 
SubtaskCheckpointCoordinatorImpl.DEFAULT_MAX_RECORDED_ABOTRED_CHECKPOINTS + 42;
+               for (int i = 1; i < notifyAbortedTimes; i++) {
+                       subtaskCheckpointCoordinator.notifyCheckpointAborted(i, 
operatorChain, () -> true);
+                       
assertEquals(Math.min(SubtaskCheckpointCoordinatorImpl.DEFAULT_MAX_RECORDED_ABOTRED_CHECKPOINTS,
 i), subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+               }
+       }
+
+       @Test
+       public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws 
Exception {
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+               SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = 
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
+                       .setEnvironment(mockEnvironment)
+                       .build();
+
+               CheckpointOperator checkpointOperator = new 
CheckpointOperator();
+
+               final OperatorChain<String, AbstractStreamOperator<String>> 
operatorChain = operatorChain(checkpointOperator);
+
+               long checkpointId = 42L;
+               // notify checkpoint aborted before execution.
+               
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, 
operatorChain, () -> true);
+               assertEquals(1, 
subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+
+               subtaskCheckpointCoordinator.checkpointState(
+                       new CheckpointMetaData(checkpointId, 
System.currentTimeMillis()),
+                       CheckpointOptions.forCheckpointWithDefaultLocation(),
+                       new CheckpointMetrics(),
+                       operatorChain,
+                       () -> true);
+               assertFalse(checkpointOperator.isCheckpointed());
+               assertEquals(0, 
subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+               assertEquals(0, 
subtaskCheckpointCoordinator.getCloseableRegistry().getAsyncCheckpointRunnables().size());

Review comment:
       Got your point, will removed this two methods and verify the logic using 
other ways.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to