1996fanrui commented on code in PR #24246: URL: https://github.com/apache/flink/pull/24246#discussion_r1475680928
########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -328,7 +330,7 @@ public void testCheckpointRescalingNonPartitionedStateCausesException() throws E // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: IIUC, we can call the newest `waitForNewCheckpoint` here. From the semantic, the old `waitForOneMoreCheckpoint` wait for one and more checkpoints. It check the checkpoint each 100ms by default. It means if the job generates 10 checkpoints within 100ms, `waitForOneMoreCheckpoint` will wait 10 checkpoints. So I don't think `waitForNewCheckpoint` breaks the semantic `waitForOneMoreCheckpoint`. And it's more clearer than before. Also, if we introduced the `waitForNewCheckpoint`, we might don't need `waitForOneMoreCheckpoint` that checking the checkpoint count. It's needed if we have a requirement that must wait for a number of new checkpoints. At least I didn't see the strong requirement for now. WDYT? ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -411,7 +413,7 @@ public void testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: Do you mean `createJobGraphWithKeyedState` and `createJobGraphWithKeyedAndNonPartitionedOperatorState` have redundant code ? Or `testCheckpointRescalingWithKeyedAndNonPartitionedState` and `testCheckpointRescalingKeyedState`? I checked them, they have a lot of differences in details. Such as: - Source is different - The parallelism and MaxParallelism is fixed parallelism for `NonPartitionedOperator` I will check could they extract some common code later. If yes, I can submit a hotfix PR and cc you. ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -513,7 +515,7 @@ public void testCheckpointRescalingPartitionedOperatorState( // wait until the operator handles some data StateSourceBase.workStartedLatch.await(); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: Same with this comment: https://github.com/apache/flink/pull/24246/files#r1474917357 ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java: ########## @@ -261,7 +263,7 @@ public void testCheckpointRescalingKeyedState(boolean scaleOut) throws Exception waitForAllTaskRunning(cluster.getMiniCluster(), jobGraph.getJobID(), false); - waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster()); + waitForNewCheckpoint(jobID, cluster.getMiniCluster()); Review Comment: Added, please help double check, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org