cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2016429245


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
                                       final Runnable shutdownErrorHook,
                                       final BiConsumer<Throwable, Boolean> 
streamsUncaughtExceptionHandler) {
 
-        final boolean stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());

Review Comment:
   Could you please also remove the internal config `_state.updater.enabled_` 
and all corresponding code from `StreamsConfig`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -237,13 +237,13 @@ State setState(final State newState) {
 
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                 log.debug("Ignoring request to transit from PENDING_SHUTDOWN 
to {}: " +
-                              "only DEAD state is a valid next state", 
newState);
+                        "only DEAD state is a valid next state", newState);
                 // when the state is already in PENDING_SHUTDOWN, all other 
transitions will be
                 // refused but we do not throw exception here
                 return null;
             } else if (state == State.DEAD) {
                 log.debug("Ignoring request to transit from DEAD to {}: " +
-                              "no valid next state after DEAD", newState);
+                        "no valid next state after DEAD", newState);

Review Comment:
   Could you revert this change, please?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(
-            time,
-            config,
-            restorationLogContext,
-            adminClient,
-            restoreConsumer,
-            userStateRestoreListener,
-            userStandbyUpdateListener
+                time,
+                config,
+                restorationLogContext,
+                adminClient,
+                restoreConsumer,
+                userStateRestoreListener,
+                userStandbyUpdateListener
         );
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
         final boolean proceessingThreadsEnabled = 
InternalConfig.processingThreadsEnabled(config.originals());
         final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            cache,
-            time,
-            clientSupplier,
-            threadId,
-            threadIdx,
-            processId,
-            log,
-            stateUpdaterEnabled,
-            proceessingThreadsEnabled
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                cache,
+                time,
+                clientSupplier,
+                threadId,
+                threadIdx,
+                processId,
+                log,
+                proceessingThreadsEnabled
         );
         final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            threadId,
-            log,
-            stateUpdaterEnabled);
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                threadId,
+                log);

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(
-            time,
-            config,
-            restorationLogContext,
-            adminClient,
-            restoreConsumer,
-            userStateRestoreListener,
-            userStandbyUpdateListener
+                time,
+                config,
+                restorationLogContext,
+                adminClient,
+                restoreConsumer,
+                userStateRestoreListener,
+                userStandbyUpdateListener
         );
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
         final boolean proceessingThreadsEnabled = 
InternalConfig.processingThreadsEnabled(config.originals());
         final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            cache,
-            time,
-            clientSupplier,
-            threadId,
-            threadIdx,
-            processId,
-            log,
-            stateUpdaterEnabled,
-            proceessingThreadsEnabled
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                cache,
+                time,
+                clientSupplier,
+                threadId,
+                threadIdx,
+                processId,
+                log,
+                proceessingThreadsEnabled
         );
         final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            threadId,
-            log,
-            stateUpdaterEnabled);
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                threadId,
+                log);
 
         final Tasks tasks = new Tasks(new LogContext(logPrefix));
         final boolean processingThreadsEnabled =
-            InternalConfig.processingThreadsEnabled(config.originals());
+                InternalConfig.processingThreadsEnabled(config.originals());

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -615,27 +601,27 @@ public StreamThread(final Time time,
         ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
 
         ThreadMetrics.addThreadStartTimeMetric(
-            threadId,
-            streamsMetrics,
-            time.milliseconds()
+                threadId,
+                streamsMetrics,
+                time.milliseconds()
         );
         ThreadMetrics.addThreadStateTelemetryMetric(
-            processId.toString(),
-            threadId,
-            streamsMetrics,
-            (metricConfig, now) -> this.state().ordinal());
+                processId.toString(),
+                threadId,
+                streamsMetrics,
+                (metricConfig, now) -> this.state().ordinal());
         ThreadMetrics.addThreadStateMetric(
-            threadId,
-            streamsMetrics,
-            (metricConfig, now) -> this.state());
+                threadId,
+                streamsMetrics,
+                (metricConfig, now) -> this.state());
         ThreadMetrics.addThreadBlockedTimeMetric(
-            threadId,
-            new StreamThreadTotalBlockedTime(
-                mainConsumer,
-                restoreConsumer,
-                taskManager::totalProducerBlockedTime
-            ),
-            streamsMetrics
+                threadId,
+                new StreamThreadTotalBlockedTime(
+                        mainConsumer,
+                        restoreConsumer,
+                        taskManager::totalProducerBlockedTime
+                ),
+                streamsMetrics

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -743,7 +728,7 @@ boolean runLoop() {
                 }
             } catch (final TaskCorruptedException e) {
                 log.warn("Detected the states of tasks " + e.corruptedTasks() 
+ " are corrupted. " +
-                         "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
+                        "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -844,14 +829,14 @@ void maybeGetClientInstanceIds() {
                     }
                 } else {
                     producerInstanceIdFuture.completeExceptionally(
-                        new TimeoutException("Could not retrieve thread 
producer client instance id.")
+                            new TimeoutException("Could not retrieve thread 
producer client instance id.")

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -844,14 +829,14 @@ void maybeGetClientInstanceIds() {
                     }
                 } else {
                     producerInstanceIdFuture.completeExceptionally(
-                        new TimeoutException("Could not retrieve thread 
producer client instance id.")
+                            new TimeoutException("Could not retrieve thread 
producer client instance id.")
                     );
                 }
             }
 
             if (mainConsumerInstanceIdFuture.isDone()
-                && (!stateUpdaterEnabled && 
restoreConsumerInstanceIdFuture.isDone())
-                && producerInstanceIdFuture.isDone()) {
+                    && restoreConsumerInstanceIdFuture.isDone()
+                    && producerInstanceIdFuture.isDone()) {

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(
-            time,
-            config,
-            restorationLogContext,
-            adminClient,
-            restoreConsumer,
-            userStateRestoreListener,
-            userStandbyUpdateListener
+                time,
+                config,
+                restorationLogContext,
+                adminClient,
+                restoreConsumer,
+                userStateRestoreListener,
+                userStandbyUpdateListener
         );
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
         final boolean proceessingThreadsEnabled = 
InternalConfig.processingThreadsEnabled(config.originals());
         final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            cache,
-            time,
-            clientSupplier,
-            threadId,
-            threadIdx,
-            processId,
-            log,
-            stateUpdaterEnabled,
-            proceessingThreadsEnabled
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                cache,
+                time,
+                clientSupplier,
+                threadId,
+                threadIdx,
+                processId,
+                log,
+                proceessingThreadsEnabled

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -237,13 +237,13 @@ State setState(final State newState) {
 
             if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) {
                 log.debug("Ignoring request to transit from PENDING_SHUTDOWN 
to {}: " +
-                              "only DEAD state is a valid next state", 
newState);
+                        "only DEAD state is a valid next state", newState);

Review Comment:
   Could you revert this change, please?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(
-            time,
-            config,
-            restorationLogContext,
-            adminClient,
-            restoreConsumer,
-            userStateRestoreListener,
-            userStandbyUpdateListener
+                time,
+                config,
+                restorationLogContext,
+                adminClient,
+                restoreConsumer,
+                userStateRestoreListener,
+                userStandbyUpdateListener

Review Comment:
   Could you please revert this change?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(
-            time,
-            config,
-            restorationLogContext,
-            adminClient,
-            restoreConsumer,
-            userStateRestoreListener,
-            userStandbyUpdateListener
+                time,
+                config,
+                restorationLogContext,
+                adminClient,
+                restoreConsumer,
+                userStateRestoreListener,
+                userStandbyUpdateListener
         );
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
         final boolean proceessingThreadsEnabled = 
InternalConfig.processingThreadsEnabled(config.originals());
         final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            cache,
-            time,
-            clientSupplier,
-            threadId,
-            threadIdx,
-            processId,
-            log,
-            stateUpdaterEnabled,
-            proceessingThreadsEnabled
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                cache,
+                time,
+                clientSupplier,
+                threadId,
+                threadIdx,
+                processId,
+                log,
+                proceessingThreadsEnabled
         );
         final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            threadId,
-            log,
-            stateUpdaterEnabled);
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                threadId,
+                log);
 
         final Tasks tasks = new Tasks(new LogContext(logPrefix));
         final boolean processingThreadsEnabled =
-            InternalConfig.processingThreadsEnabled(config.originals());
+                InternalConfig.processingThreadsEnabled(config.originals());
 
         final DefaultTaskManager schedulingTaskManager =
-            maybeCreateSchedulingTaskManager(processingThreadsEnabled, 
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
+                maybeCreateSchedulingTaskManager(processingThreadsEnabled, 
topologyMetadata, time, threadId, tasks);
         final StateUpdater stateUpdater =
-            maybeCreateAndStartStateUpdater(
-                stateUpdaterEnabled,
-                streamsMetrics,
-                config,
-                restoreConsumer,
-                changelogReader,
-                topologyMetadata,
-                time,
-                clientId,
-                threadIdx
-            );
+                maybeCreateAndStartStateUpdater(
+                        streamsMetrics,
+                        config,
+                        restoreConsumer,
+                        changelogReader,
+                        topologyMetadata,
+                        time,
+                        clientId,
+                        threadIdx
+                );

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         streamsMetrics.metricsRegistry().addReporter(reporter);
 
         final StreamThread streamThread = new StreamThread(
-            time,
-            config,
-            adminClient,
-            mainConsumer,
-            restoreConsumer,
-            changelogReader,
-            originalReset,
-            taskManager,
-            stateUpdater,
-            streamsMetrics,
-            topologyMetadata,
-            processId,
-            threadId,
-            logContext,
-            referenceContainer.assignmentErrorCode,
-            referenceContainer.nextScheduledRebalanceMs,
-            referenceContainer.nonFatalExceptionsToHandle,
-            shutdownErrorHook,
-            streamsUncaughtExceptionHandler,
-            cache::resize
+                time,
+                config,
+                adminClient,
+                mainConsumer,
+                restoreConsumer,
+                changelogReader,
+                originalReset,
+                taskManager,
+                stateUpdater,
+                streamsMetrics,
+                topologyMetadata,
+                processId,
+                threadId,
+                logContext,
+                referenceContainer.assignmentErrorCode,
+                referenceContainer.nextScheduledRebalanceMs,
+                referenceContainer.nonFatalExceptionsToHandle,
+                shutdownErrorHook,
+                streamsUncaughtExceptionHandler,
+                cache::resize
         );
 
         return streamThread.updateThreadMetadata(adminClientId(clientId));
     }
 
     private static DefaultTaskManager maybeCreateSchedulingTaskManager(final 
boolean processingThreadsEnabled,
-                                                                       final 
boolean stateUpdaterEnabled,
                                                                        final 
TopologyMetadata topologyMetadata,
                                                                        final 
Time time,
                                                                        final 
String threadId,
                                                                        final 
Tasks tasks) {
         if (processingThreadsEnabled) {
-            if (!stateUpdaterEnabled) {
-                throw new IllegalStateException("Processing threads require 
the state updater to be enabled");
-            }
 
             final DefaultTaskManager defaultTaskManager = new 
DefaultTaskManager(
-                time,
-                threadId,
-                tasks,
-                new DefaultTaskExecutorCreator(),
-                topologyMetadata.taskExecutionMetadata(),
-                1
+                    time,
+                    threadId,
+                    tasks,
+                    new DefaultTaskExecutorCreator(),
+                    topologyMetadata.taskExecutionMetadata(),
+                    1

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         streamsMetrics.metricsRegistry().addReporter(reporter);
 
         final StreamThread streamThread = new StreamThread(
-            time,
-            config,
-            adminClient,
-            mainConsumer,
-            restoreConsumer,
-            changelogReader,
-            originalReset,
-            taskManager,
-            stateUpdater,
-            streamsMetrics,
-            topologyMetadata,
-            processId,
-            threadId,
-            logContext,
-            referenceContainer.assignmentErrorCode,
-            referenceContainer.nextScheduledRebalanceMs,
-            referenceContainer.nonFatalExceptionsToHandle,
-            shutdownErrorHook,
-            streamsUncaughtExceptionHandler,
-            cache::resize
+                time,
+                config,
+                adminClient,
+                mainConsumer,
+                restoreConsumer,
+                changelogReader,
+                originalReset,
+                taskManager,
+                stateUpdater,
+                streamsMetrics,
+                topologyMetadata,
+                processId,
+                threadId,
+                logContext,
+                referenceContainer.assignmentErrorCode,
+                referenceContainer.nextScheduledRebalanceMs,
+                referenceContainer.nonFatalExceptionsToHandle,
+                shutdownErrorHook,
+                streamsUncaughtExceptionHandler,
+                cache::resize
         );
 
         return streamThread.updateThreadMetadata(adminClientId(clientId));
     }
 
     private static DefaultTaskManager maybeCreateSchedulingTaskManager(final 
boolean processingThreadsEnabled,
-                                                                       final 
boolean stateUpdaterEnabled,
                                                                        final 
TopologyMetadata topologyMetadata,
                                                                        final 
Time time,
                                                                        final 
String threadId,
                                                                        final 
Tasks tasks) {
         if (processingThreadsEnabled) {
-            if (!stateUpdaterEnabled) {
-                throw new IllegalStateException("Processing threads require 
the state updater to be enabled");
-            }
 
             final DefaultTaskManager defaultTaskManager = new 
DefaultTaskManager(
-                time,
-                threadId,
-                tasks,
-                new DefaultTaskExecutorCreator(),
-                topologyMetadata.taskExecutionMetadata(),
-                1
+                    time,
+                    threadId,
+                    tasks,
+                    new DefaultTaskExecutorCreator(),
+                    topologyMetadata.taskExecutionMetadata(),
+                    1
             );
             defaultTaskManager.startTaskExecutors();
             return defaultTaskManager;
         }
         return null;
     }
 
-    private static StateUpdater maybeCreateAndStartStateUpdater(final boolean 
stateUpdaterEnabled,
-                                                                final 
StreamsMetricsImpl streamsMetrics,
+    private static StateUpdater maybeCreateAndStartStateUpdater(final 
StreamsMetricsImpl streamsMetrics,

Review Comment:
   Since this method now always creates and starts a state updater, could you 
please rename it to `createAndStartStateUpdater()`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(
-            time,
-            config,
-            restorationLogContext,
-            adminClient,
-            restoreConsumer,
-            userStateRestoreListener,
-            userStandbyUpdateListener
+                time,
+                config,
+                restorationLogContext,
+                adminClient,
+                restoreConsumer,
+                userStateRestoreListener,
+                userStandbyUpdateListener
         );
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
         final boolean proceessingThreadsEnabled = 
InternalConfig.processingThreadsEnabled(config.originals());
         final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            cache,
-            time,
-            clientSupplier,
-            threadId,
-            threadIdx,
-            processId,
-            log,
-            stateUpdaterEnabled,
-            proceessingThreadsEnabled
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                cache,
+                time,
+                clientSupplier,
+                threadId,
+                threadIdx,
+                processId,
+                log,
+                proceessingThreadsEnabled
         );
         final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            threadId,
-            log,
-            stateUpdaterEnabled);
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                threadId,
+                log);
 
         final Tasks tasks = new Tasks(new LogContext(logPrefix));
         final boolean processingThreadsEnabled =
-            InternalConfig.processingThreadsEnabled(config.originals());
+                InternalConfig.processingThreadsEnabled(config.originals());
 
         final DefaultTaskManager schedulingTaskManager =
-            maybeCreateSchedulingTaskManager(processingThreadsEnabled, 
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
+                maybeCreateSchedulingTaskManager(processingThreadsEnabled, 
topologyMetadata, time, threadId, tasks);

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(
-            time,
-            config,
-            restorationLogContext,
-            adminClient,
-            restoreConsumer,
-            userStateRestoreListener,
-            userStandbyUpdateListener
+                time,
+                config,
+                restorationLogContext,
+                adminClient,
+                restoreConsumer,
+                userStateRestoreListener,
+                userStandbyUpdateListener
         );
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
         final boolean proceessingThreadsEnabled = 
InternalConfig.processingThreadsEnabled(config.originals());
         final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            cache,
-            time,
-            clientSupplier,
-            threadId,
-            threadIdx,
-            processId,
-            log,
-            stateUpdaterEnabled,
-            proceessingThreadsEnabled
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                cache,
+                time,
+                clientSupplier,
+                threadId,
+                threadIdx,
+                processId,
+                log,
+                proceessingThreadsEnabled
         );
         final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
-            topologyMetadata,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            changelogReader,
-            threadId,
-            log,
-            stateUpdaterEnabled);
+                topologyMetadata,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                changelogReader,
+                threadId,
+                log);
 
         final Tasks tasks = new Tasks(new LogContext(logPrefix));
         final boolean processingThreadsEnabled =
-            InternalConfig.processingThreadsEnabled(config.originals());
+                InternalConfig.processingThreadsEnabled(config.originals());
 
         final DefaultTaskManager schedulingTaskManager =
-            maybeCreateSchedulingTaskManager(processingThreadsEnabled, 
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
+                maybeCreateSchedulingTaskManager(processingThreadsEnabled, 
topologyMetadata, time, threadId, tasks);
         final StateUpdater stateUpdater =
-            maybeCreateAndStartStateUpdater(
-                stateUpdaterEnabled,
-                streamsMetrics,
-                config,
-                restoreConsumer,
-                changelogReader,
-                topologyMetadata,
-                time,
-                clientId,
-                threadIdx
-            );
+                maybeCreateAndStartStateUpdater(
+                        streamsMetrics,
+                        config,
+                        restoreConsumer,
+                        changelogReader,
+                        topologyMetadata,
+                        time,
+                        clientId,
+                        threadIdx
+                );
 
         final TaskManager taskManager = new TaskManager(
-            time,
-            changelogReader,
-            new ProcessId(processId),
-            logPrefix,
-            activeTaskCreator,
-            standbyTaskCreator,
-            tasks,
-            topologyMetadata,
-            adminClient,
-            stateDirectory,
-            stateUpdater,
-            schedulingTaskManager
+                time,
+                changelogReader,
+                new ProcessId(processId),
+                logPrefix,
+                activeTaskCreator,
+                standbyTaskCreator,
+                tasks,
+                topologyMetadata,
+                adminClient,
+                stateDirectory,
+                stateUpdater,
+                schedulingTaskManager

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -615,27 +601,27 @@ public StreamThread(final Time time,
         ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
 
         ThreadMetrics.addThreadStartTimeMetric(
-            threadId,
-            streamsMetrics,
-            time.milliseconds()
+                threadId,
+                streamsMetrics,
+                time.milliseconds()
         );
         ThreadMetrics.addThreadStateTelemetryMetric(
-            processId.toString(),
-            threadId,
-            streamsMetrics,
-            (metricConfig, now) -> this.state().ordinal());
+                processId.toString(),
+                threadId,
+                streamsMetrics,
+                (metricConfig, now) -> this.state().ordinal());

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -615,27 +601,27 @@ public StreamThread(final Time time,
         ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
 
         ThreadMetrics.addThreadStartTimeMetric(
-            threadId,
-            streamsMetrics,
-            time.milliseconds()
+                threadId,
+                streamsMetrics,
+                time.milliseconds()
         );
         ThreadMetrics.addThreadStateTelemetryMetric(
-            processId.toString(),
-            threadId,
-            streamsMetrics,
-            (metricConfig, now) -> this.state().ordinal());
+                processId.toString(),
+                threadId,
+                streamsMetrics,
+                (metricConfig, now) -> this.state().ordinal());
         ThreadMetrics.addThreadStateMetric(
-            threadId,
-            streamsMetrics,
-            (metricConfig, now) -> this.state());
+                threadId,
+                streamsMetrics,
+                (metricConfig, now) -> this.state());

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -760,12 +745,12 @@ boolean runLoop() {
             } catch (final UnsupportedVersionException e) {
                 final String errorMessage = e.getMessage();
                 if (errorMessage != null &&
-                    errorMessage.startsWith("Broker unexpectedly doesn't 
support requireStable flag on version ")) {
+                        errorMessage.startsWith("Broker unexpectedly doesn't 
support requireStable flag on version ")) {

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         streamsMetrics.metricsRegistry().addReporter(reporter);
 
         final StreamThread streamThread = new StreamThread(
-            time,
-            config,
-            adminClient,
-            mainConsumer,
-            restoreConsumer,
-            changelogReader,
-            originalReset,
-            taskManager,
-            stateUpdater,
-            streamsMetrics,
-            topologyMetadata,
-            processId,
-            threadId,
-            logContext,
-            referenceContainer.assignmentErrorCode,
-            referenceContainer.nextScheduledRebalanceMs,
-            referenceContainer.nonFatalExceptionsToHandle,
-            shutdownErrorHook,
-            streamsUncaughtExceptionHandler,
-            cache::resize
+                time,
+                config,
+                adminClient,
+                mainConsumer,
+                restoreConsumer,
+                changelogReader,
+                originalReset,
+                taskManager,
+                stateUpdater,
+                streamsMetrics,
+                topologyMetadata,
+                processId,
+                threadId,
+                logContext,
+                referenceContainer.assignmentErrorCode,
+                referenceContainer.nextScheduledRebalanceMs,
+                referenceContainer.nonFatalExceptionsToHandle,
+                shutdownErrorHook,
+                streamsUncaughtExceptionHandler,
+                cache::resize

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -615,27 +601,27 @@ public StreamThread(final Time time,
         ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
 
         ThreadMetrics.addThreadStartTimeMetric(
-            threadId,
-            streamsMetrics,
-            time.milliseconds()
+                threadId,
+                streamsMetrics,
+                time.milliseconds()

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -801,13 +786,13 @@ void maybeGetClientInstanceIds() {
                     }
                 } else {
                     mainConsumerInstanceIdFuture.completeExceptionally(
-                        new TimeoutException("Could not retrieve main consumer 
client instance id.")
+                            new TimeoutException("Could not retrieve main 
consumer client instance id.")
                     );
                 }
             }
 
 
-            if (!stateUpdaterEnabled && 
!restoreConsumerInstanceIdFuture.isDone()) {
+            if (!restoreConsumerInstanceIdFuture.isDone()) {

Review Comment:
   Shouldn't you delete the whole `if`-clause?
   Since the state updater will be always enabled after this PR, this block 
should always be skipped.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -657,13 +643,12 @@ public StreamThread(final Time time,
         this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
         final int dummyThreadIdx = 1;
         this.maxPollTimeMs = new 
InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", 
"dummyClientId", dummyThreadIdx))
-            .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+                .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -831,7 +816,7 @@ void maybeGetClientInstanceIds() {
                 if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
                     try {
                         producerInstanceIdFuture.complete(
-                            
taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
+                                
taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO)

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -760,12 +745,12 @@ boolean runLoop() {
             } catch (final UnsupportedVersionException e) {
                 final String errorMessage = e.getMessage();
                 if (errorMessage != null &&
-                    errorMessage.startsWith("Broker unexpectedly doesn't 
support requireStable flag on version ")) {
+                        errorMessage.startsWith("Broker unexpectedly doesn't 
support requireStable flag on version ")) {
 
                     log.error("Shutting down because the Kafka cluster seems 
to be on a too old version. " +
-                              "Setting {}=\"{}\" requires broker version 2.5 
or higher.",
-                          StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
-                          StreamsConfig.EXACTLY_ONCE_V2);
+                                    "Setting {}=\"{}\" requires broker version 
2.5 or higher.",
+                            StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
+                            StreamsConfig.EXACTLY_ONCE_V2);

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -822,7 +807,7 @@ void maybeGetClientInstanceIds() {
                     }
                 } else {
                     restoreConsumerInstanceIdFuture.completeExceptionally(
-                        new TimeoutException("Could not retrieve restore 
consumer client instance id.")
+                            new TimeoutException("Could not retrieve restore 
consumer client instance id.")

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -801,13 +786,13 @@ void maybeGetClientInstanceIds() {
                     }
                 } else {
                     mainConsumerInstanceIdFuture.completeExceptionally(
-                        new TimeoutException("Could not retrieve main consumer 
client instance id.")
+                            new TimeoutException("Could not retrieve main 
consumer client instance id.")

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -918,8 +903,8 @@ public void sendShutdownRequest(final AssignorError 
assignorError) {
 
     private void handleTaskMigrated(final TaskMigratedException e) {
         log.warn("Detected that the thread is being fenced. " +
-                     "This implies that this thread missed a rebalance and 
dropped out of the consumer group. " +
-                     "Will close out all assigned tasks and rejoin the 
consumer group.", e);
+                "This implies that this thread missed a rebalance and dropped 
out of the consumer group. " +
+                "Will close out all assigned tasks and rejoin the consumer 
group.", e);

Review Comment:
   Could you please revert the additional indentation?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -844,14 +829,14 @@ void maybeGetClientInstanceIds() {
                     }
                 } else {
                     producerInstanceIdFuture.completeExceptionally(
-                        new TimeoutException("Could not retrieve thread 
producer client instance id.")
+                            new TimeoutException("Could not retrieve thread 
producer client instance id.")
                     );
                 }
             }
 
             if (mainConsumerInstanceIdFuture.isDone()
-                && (!stateUpdaterEnabled && 
restoreConsumerInstanceIdFuture.isDone())
-                && producerInstanceIdFuture.isDone()) {
+                    && restoreConsumerInstanceIdFuture.isDone()

Review Comment:
   I believe you can also remove this line. With the state updater fetching 
this instance ID is done within the state updater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to