1996fanrui commented on code in PR #24246:
URL: https://github.com/apache/flink/pull/24246#discussion_r1475680928


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -328,7 +330,7 @@ public void 
testCheckpointRescalingNonPartitionedStateCausesException() throws E
             // wait until the operator handles some data
             StateSourceBase.workStartedLatch.await();
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   IIUC, we can call the newest `waitForNewCheckpoint` here.
   
   From the semantic, the old `waitForOneMoreCheckpoint` wait for one and more 
checkpoints. It check the checkpoint each 100ms by default. It means if the job 
generates 10 checkpoints within 100ms, `waitForOneMoreCheckpoint` will wait 10 
checkpoints.
   
   So I don't think `waitForNewCheckpoint` breaks the semantic 
`waitForOneMoreCheckpoint`.  And it's more clearer than before.
   
   Also, if we introduced the `waitForNewCheckpoint`, we might don't need 
`waitForOneMoreCheckpoint` that checking the checkpoint count. It's needed if 
we have a requirement that must wait for a number of new checkpoints. At least 
I didn't see the strong requirement for now. 
   
   WDYT?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -411,7 +413,7 @@ public void 
testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
             // clear the CollectionSink set for the restarted job
             CollectionSink.clearElementsSet();
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   Do you mean `createJobGraphWithKeyedState` and 
`createJobGraphWithKeyedAndNonPartitionedOperatorState` have redundant code ? 
Or `testCheckpointRescalingWithKeyedAndNonPartitionedState` and 
`testCheckpointRescalingKeyedState`?
   
   
   I checked them, they have a lot of differences in details. Such as:
   
   - Source is different 
   - The parallelism and MaxParallelism is fixed parallelism for 
`NonPartitionedOperator`
   
   I will check could they extract some common code later. If yes, I can submit 
a hotfix PR and cc you.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -513,7 +515,7 @@ public void testCheckpointRescalingPartitionedOperatorState(
         // wait until the operator handles some data
         StateSourceBase.workStartedLatch.await();
 
-        waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+        waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   Same with this comment: 
https://github.com/apache/flink/pull/24246/files#r1474917357



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -261,7 +263,7 @@ public void testCheckpointRescalingKeyedState(boolean 
scaleOut) throws Exception
 
             waitForAllTaskRunning(cluster.getMiniCluster(), 
jobGraph.getJobID(), false);
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());

Review Comment:
   Added, please help double check, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to