chenyulin0719 commented on code in PR #17187:
URL: https://github.com/apache/kafka/pull/17187#discussion_r1764864632


##########
streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java:
##########
@@ -579,12 +580,15 @@ public void 
shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception
         validateReceivedMessages(sampleData, outputTopic);
 
         // Close kafkaStreams1 (with cleanup) and start it again to force the 
restoration of the state.
-        kafkaStreams.close(Duration.ofMillis(5000L));
+        
kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
 
         final TestStateRestoreListener kafkaStreams1StateRestoreListener = new 
TestStateRestoreListener("ks1", RESTORATION_DELAY);
         kafkaStreams = startKafkaStreams(builder, 
kafkaStreams1StateRestoreListener, kafkaStreams1Configuration);
 
+        // Must make ensure all the restoring tasks are in active state before 
starting the new instance.
+        waitForActiveRestoringTask(kafkaStreams, 5, 
IntegrationTestUtils.DEFAULT_TIMEOUT);

Review Comment:
   Updated. I tried to rephrase it in my word. Please let me know if it's still 
not clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to