cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2024268591


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -616,7 +598,7 @@ public void flushCache() {
     public void close() throws ProcessorStateException {
         log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
 
-        if (!stateUpdaterEnabled && changelogReader != null) {
+        if (changelogReader != null) {
             changelogReader.unregister(getAllChangelogTopicPartitions());
         }

Review Comment:
   Since `stateUpdaterEnabled` is basically always `true` if we remove the 
flag, the `if`-condition should always be `false` and the code guarded by the 
`if`-condition should never be executed.
   
   IMO, the changelog reader can be removed from the `ProcessorStateManager` 
since registering changelog topics is done in the state updater. Only the old 
code path that did not use the state updater needed the changelog reader here.
   
   If the `ProcessorStateManager` does not need the changelog reader, the 
active task creator and the standby task creator do also not need the changelog 
reader.  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
                                       final Runnable shutdownErrorHook,
                                       final BiConsumer<Throwable, Boolean> 
streamsUncaughtExceptionHandler) {
 
-        final boolean stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());
-
         final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
         final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, 
STATE_UPDATER_ID_SUBSTRING);
-        final String restorationThreadId = stateUpdaterEnabled ? 
stateUpdaterId : threadId;
+        final String restorationThreadId = stateUpdaterId;

Review Comment:
   You could directly use `stateUpdaterId` instead of `restorationThreadId` 
since the distinction between state updater ID and thread ID for restoration 
does not hold anymore.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -407,30 +406,6 @@ public long position(final TopicPartition partition) {
         }
     }
 
-    @ParameterizedTest
-    @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
-    public void shouldPollWithRightTimeoutWithStateUpdater(final Task.TaskType 
type) {
-        setupStateManagerMock(type);
-        setupStoreMetadata();
-        setupStore();
-        shouldPollWithRightTimeout(true, type);
-    }
-
-    @ParameterizedTest
-    @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
-    public void shouldPollWithRightTimeoutWithoutStateUpdater(final 
Task.TaskType type) {
-        setupStateManagerMock(type);
-        setupStoreMetadata();
-        setupStore();
-        shouldPollWithRightTimeout(false, type);
-    }
-
-    private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled, 
final Task.TaskType type) {
-        final Properties properties = new Properties();
-        properties.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
-        shouldPollWithRightTimeout(properties, type);
-    }
-
     @ParameterizedTest
     @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
     public void shouldPollWithRightTimeoutWithStateUpdaterDefault(final 
Task.TaskType type) {

Review Comment:
   Could you please include `shouldPollWithRightTimeout()` into this test and 
rename this test to `shouldPollWithRightTimeout()`? 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1716,15 +1699,14 @@ public void 
shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
         runOnce(processingThreadsEnabled);
 
         // the third actually polls, processes the record, and throws the 
corruption exception
-        if (stateUpdaterEnabled) {
-            TestUtils.waitForCondition(
+        TestUtils.waitForCondition(
                 () -> thread.taskManager().checkStateUpdater(
                     mockTime.milliseconds(),
                     topicPartitions -> 
mockConsumer.seekToBeginning(singleton(t1p1))
                 ),
                 10 * 1000,
                 "State updater never returned tasks.");

Review Comment:
   Please fix the indentation here. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -664,7 +646,7 @@ else if (exception instanceof StreamsException)
     void recycle() {
         log.debug("Recycling state for {} task {}.", taskType, taskId);
 
-        if (!stateUpdaterEnabled && changelogReader != null) {
+        if (changelogReader != null) {
             final List<TopicPartition> allChangelogs = 
getAllChangelogTopicPartitions();
             changelogReader.unregister(allChangelogs);
         }

Review Comment:
   See my above comment.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1736,15 +1718,13 @@ public void 
shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
 
         // Now, we can handle the corruption
         
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks());
-        if (stateUpdaterEnabled) {
-            TestUtils.waitForCondition(
+        TestUtils.waitForCondition(
                 () -> thread.taskManager().checkStateUpdater(
                     mockTime.milliseconds(),
                     topicPartitions -> 
mockConsumer.seekToBeginning(singleton(t1p1))
                 ),
                 10 * 1000,
                 "State updater never returned tasks.");
-        }

Review Comment:
   See my comment above about indentation.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2491,26 +2467,17 @@ public Set<TopicPartition> partitions() {
             "K2".getBytes(),
             "V2".getBytes()));
 
-        if (stateUpdaterEnabled) {
-            TestUtils.waitForCondition(
+        TestUtils.waitForCondition(
                 () -> mockRestoreConsumer.assignment().size() == 0,
                 "Never get the assignment");

Review Comment:
   Fix indentation and could you please also fix the typo? It should be `Never 
got the assignment`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1222,15 +1142,15 @@ private long pollPhase() {
         final ConsumerRecords<byte[], byte[]> records;
         log.debug("Invoking poll on main Consumer");
 
-        if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled) {
+        if (state == State.PARTITIONS_ASSIGNED) {
             // try to fetch some records with zero poll millis
             // to unblock the restoration as soon as possible
             records = pollRequests(Duration.ZERO);

Review Comment:
   You can get rid of this branch. Polling with duration zero during 
`PARTITIONS_ASSIGNED` only applies to the old code path. With the state updater 
polling should use the configured poll time as stated on line 1153.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to