chia7712 commented on code in PR #17187: URL: https://github.com/apache/kafka/pull/17187#discussion_r1760120025
########## streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java: ########## @@ -579,12 +580,15 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception validateReceivedMessages(sampleData, outputTopic); // Close kafkaStreams1 (with cleanup) and start it again to force the restoration of the state. - kafkaStreams.close(Duration.ofMillis(5000L)); + kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT)); Review Comment: this test has timeout, so maybe we can remove the timeout from close? ########## streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java: ########## @@ -579,12 +580,15 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception validateReceivedMessages(sampleData, outputTopic); // Close kafkaStreams1 (with cleanup) and start it again to force the restoration of the state. - kafkaStreams.close(Duration.ofMillis(5000L)); + kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT)); IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations); final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener("ks1", RESTORATION_DELAY); kafkaStreams = startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration); + // Must make ensure all the restoring tasks are in active state before starting the new instance. + waitForActiveRestoringTask(kafkaStreams, 5, IntegrationTestUtils.DEFAULT_TIMEOUT); Review Comment: Could you include "suspend" in the comments? for example: `all restoring task must be in active state. otherwise, the first kafka streams won't encounter "suspend" after starting another instance` ########## streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java: ########## @@ -634,6 +634,19 @@ public static void waitForStandbyCompletion(final KafkaStreams streams, } } + /** + * Wait until enough restoring tasks have been started + */ + public static void waitForActiveRestoringTask(final KafkaStreams streams, + final int expectedTasks, + final long timeoutMilliseconds) throws Exception { + TestUtils.waitForCondition(() -> { Review Comment: Could you please use lambda function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org