cadonna commented on code in PR #19275: URL: https://github.com/apache/kafka/pull/19275#discussion_r2024268591
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ########## @@ -616,7 +598,7 @@ public void flushCache() { public void close() throws ProcessorStateException { log.debug("Closing its state manager and all the registered state stores: {}", stores); - if (!stateUpdaterEnabled && changelogReader != null) { + if (changelogReader != null) { changelogReader.unregister(getAllChangelogTopicPartitions()); } Review Comment: Since `stateUpdaterEnabled` is basically always `true` if we remove the flag, the `if`-condition should always be `false` and the code guarded by the `if`-condition should never be executed. IMO, the changelog reader can be removed from the `ProcessorStateManager` since registering changelog topics is done in the state updater. Only the old code path that did not use the state updater needed the changelog reader here. If the `ProcessorStateManager` does not need the changelog reader, the active task creator and the standby task creator do also not need the changelog reader. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Runnable shutdownErrorHook, final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) { - final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); - final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx; final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING); - final String restorationThreadId = stateUpdaterEnabled ? stateUpdaterId : threadId; + final String restorationThreadId = stateUpdaterId; Review Comment: You could directly use `stateUpdaterId` instead of `restorationThreadId` since the distinction between state updater ID and thread ID for restoration does not hold anymore. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ########## @@ -407,30 +406,6 @@ public long position(final TopicPartition partition) { } } - @ParameterizedTest - @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"}) - public void shouldPollWithRightTimeoutWithStateUpdater(final Task.TaskType type) { - setupStateManagerMock(type); - setupStoreMetadata(); - setupStore(); - shouldPollWithRightTimeout(true, type); - } - - @ParameterizedTest - @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"}) - public void shouldPollWithRightTimeoutWithoutStateUpdater(final Task.TaskType type) { - setupStateManagerMock(type); - setupStoreMetadata(); - setupStore(); - shouldPollWithRightTimeout(false, type); - } - - private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled, final Task.TaskType type) { - final Properties properties = new Properties(); - properties.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); - shouldPollWithRightTimeout(properties, type); - } - @ParameterizedTest @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"}) public void shouldPollWithRightTimeoutWithStateUpdaterDefault(final Task.TaskType type) { Review Comment: Could you please include `shouldPollWithRightTimeout()` into this test and rename this test to `shouldPollWithRightTimeout()`? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1716,15 +1699,14 @@ public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE runOnce(processingThreadsEnabled); // the third actually polls, processes the record, and throws the corruption exception - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( + TestUtils.waitForCondition( () -> thread.taskManager().checkStateUpdater( mockTime.milliseconds(), topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) ), 10 * 1000, "State updater never returned tasks."); Review Comment: Please fix the indentation here. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ########## @@ -664,7 +646,7 @@ else if (exception instanceof StreamsException) void recycle() { log.debug("Recycling state for {} task {}.", taskType, taskId); - if (!stateUpdaterEnabled && changelogReader != null) { + if (changelogReader != null) { final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions(); changelogReader.unregister(allChangelogs); } Review Comment: See my above comment. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1736,15 +1718,13 @@ public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE // Now, we can handle the corruption thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks()); - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( + TestUtils.waitForCondition( () -> thread.taskManager().checkStateUpdater( mockTime.milliseconds(), topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) ), 10 * 1000, "State updater never returned tasks."); - } Review Comment: See my comment above about indentation. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2491,26 +2467,17 @@ public Set<TopicPartition> partitions() { "K2".getBytes(), "V2".getBytes())); - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( + TestUtils.waitForCondition( () -> mockRestoreConsumer.assignment().size() == 0, "Never get the assignment"); Review Comment: Fix indentation and could you please also fix the typo? It should be `Never got the assignment`. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1222,15 +1142,15 @@ private long pollPhase() { final ConsumerRecords<byte[], byte[]> records; log.debug("Invoking poll on main Consumer"); - if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled) { + if (state == State.PARTITIONS_ASSIGNED) { // try to fetch some records with zero poll millis // to unblock the restoration as soon as possible records = pollRequests(Duration.ZERO); Review Comment: You can get rid of this branch. Polling with duration zero during `PARTITIONS_ASSIGNED` only applies to the old code path. With the state updater polling should use the configured poll time as stated on line 1153. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org