1996fanrui commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1479164422
########## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java: ########## @@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int n }); } - /** Wait for on more completed checkpoint. */ - public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster miniCluster) - throws Exception { + /** + * Wait for a new completed checkpoint. Note: we wait for 2 or more checkpoint to ensure the + * latest checkpoint start after waitForTwoOrMoreCheckpoint is called. + */ + public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception { Review Comment: Thanks for your code, this refactor makes sense to me. I have updated. ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: It seems out-of-scope of this JIRA. Would you mind if we refactor it in a hotfix PR? Your code is ready. After this PR, you can submit a official PR, and I can help review. WDYT? ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -261,7 +263,13 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + // We must wait for a checkpoint that is triggered after calling waitForNewCheckpoint. + // This test will fail if the job recovers from a checkpoint triggered before + // `SubtaskIndexFlatMapper.workCompletedLatch.await` and after calling + // `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects + // `ValueState<Integer> counter` and `ValueState<Integer> sum` after recovery from + // the checkpoint to be the count and sum of all data. Review Comment: Sounds make sense. I have updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org