wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1501173995
##########
streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java:
##########
@@ -120,23 +119,11 @@ public void before(final TestInfo testInfo) throws
Exception {
final Consumed<String, Long> stringLongConsumed =
Consumed.with(Serdes.String(), Serdes.Long());
- final KeyValueStoreBuilder<String, Long> storeBuilder = new
KeyValueStoreBuilder<>(
- Stores.persistentKeyValueStore(globalStore),
- Serdes.String(),
- Serdes.Long(),
- mockTime);
-
- builder.addGlobalStore(
- storeBuilder,
- globalStoreTopic,
- Consumed.with(Serdes.String(), Serdes.Long()),
- new MockApiProcessorSupplier<>()
- );
+ builder.globalTable(globalStoreTopic, stringLongConsumed,
Materialized.as(globalStore));
Review Comment:
I changed this as the reprocessing is just done as normal processing instead
of a restore. The test says this
>This test asserts that when Kafka Streams is closing and shutsdown a
StreamThread the closing of the GlobalStreamThread happens after all the
StreamThreads are completely stopped. The test validates the Processor still
has access to the GlobalStateStore while closing. Otherwise if the
GlobalStreamThread were to close underneath the StreamThread an exception would
be thrown as the GlobalStreamThread closes all global stores on closing.
This still tests what the test is intending to test, just reprocessing
doesn't keep the store open like the restore operation does.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]