jnh5y commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r894008227
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1686,8 +1685,74 @@ public void shouldUpdateStandbyTask() throws Exception {
final String storeName2 = "table-two";
final String changelogName1 = APPLICATION_ID + "-" + storeName1 +
"-changelog";
final String changelogName2 = APPLICATION_ID + "-" + storeName2 +
"-changelog";
+ final StreamThread thread = createStreamThread(CLIENT_ID, config,
false);
+ final MockConsumer<byte[], byte[]> restoreConsumer =
clientSupplier.restoreConsumer;
+
+ setupThread(storeName1, storeName2, changelogName1, changelogName2,
thread, restoreConsumer, false);
+
+ thread.runOnce();
+
+ final StandbyTask standbyTask1 = standbyTask(thread.taskManager(),
t1p1);
+ final StandbyTask standbyTask2 = standbyTask(thread.taskManager(),
t2p1);
+ assertEquals(task1, standbyTask1.id());
+ assertEquals(task3, standbyTask2.id());
+
+ final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object,
Long>) standbyTask1.getStore(storeName1);
+ final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object,
Long>) standbyTask2.getStore(storeName2);
+
+ assertEquals(0L, store1.approximateNumEntries());
+ assertEquals(0L, store2.approximateNumEntries());
+
+ addStandbyRecordsToRestoreConsumer(restoreConsumer);
+
+ thread.runOnce();
+
+ assertEquals(10L, store1.approximateNumEntries());
+ assertEquals(4L, store2.approximateNumEntries());
+
+ thread.taskManager().shutdown(true);
+ }
+
+ private void addActiveRecordsToRestoreConsumer(final MockConsumer<byte[],
byte[]> restoreConsumer) {
+ for (long i = 0L; i < 10L; i++) {
+ restoreConsumer.addRecord(new ConsumerRecord<>(
+ "stream-thread-test-count-one-changelog",
+ 2,
+ i,
+ ("K" + i).getBytes(),
+ ("V" + i).getBytes()));
+ }
+ }
+
+ private void addStandbyRecordsToRestoreConsumer(final MockConsumer<byte[],
byte[]> restoreConsumer) {
+ // let the store1 be restored from 0 to 10; store2 be restored from 5
(checkpointed) to 10
+ for (long i = 0L; i < 10L; i++) {
+ restoreConsumer.addRecord(new ConsumerRecord<>(
+ "stream-thread-test-count-one-changelog",
Review Comment:
Extracted a constant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]