janchilling commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2021650402


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
         streamsMetrics.metricsRegistry().addReporter(reporter);
 
         final StreamThread streamThread = new StreamThread(
-            time,
-            config,
-            adminClient,
-            mainConsumer,
-            restoreConsumer,
-            changelogReader,
-            originalReset,
-            taskManager,
-            stateUpdater,
-            streamsMetrics,
-            topologyMetadata,
-            processId,
-            threadId,
-            logContext,
-            referenceContainer.assignmentErrorCode,
-            referenceContainer.nextScheduledRebalanceMs,
-            referenceContainer.nonFatalExceptionsToHandle,
-            shutdownErrorHook,
-            streamsUncaughtExceptionHandler,
-            cache::resize
+                time,
+                config,
+                adminClient,
+                mainConsumer,
+                restoreConsumer,
+                changelogReader,
+                originalReset,
+                taskManager,
+                stateUpdater,
+                streamsMetrics,
+                topologyMetadata,
+                processId,
+                threadId,
+                logContext,
+                referenceContainer.assignmentErrorCode,
+                referenceContainer.nextScheduledRebalanceMs,
+                referenceContainer.nonFatalExceptionsToHandle,
+                shutdownErrorHook,
+                streamsUncaughtExceptionHandler,
+                cache::resize
         );
 
         return streamThread.updateThreadMetadata(adminClientId(clientId));
     }
 
     private static DefaultTaskManager maybeCreateSchedulingTaskManager(final 
boolean processingThreadsEnabled,
-                                                                       final 
boolean stateUpdaterEnabled,
                                                                        final 
TopologyMetadata topologyMetadata,
                                                                        final 
Time time,
                                                                        final 
String threadId,
                                                                        final 
Tasks tasks) {
         if (processingThreadsEnabled) {
-            if (!stateUpdaterEnabled) {
-                throw new IllegalStateException("Processing threads require 
the state updater to be enabled");
-            }
 
             final DefaultTaskManager defaultTaskManager = new 
DefaultTaskManager(
-                time,
-                threadId,
-                tasks,
-                new DefaultTaskExecutorCreator(),
-                topologyMetadata.taskExecutionMetadata(),
-                1
+                    time,
+                    threadId,
+                    tasks,
+                    new DefaultTaskExecutorCreator(),
+                    topologyMetadata.taskExecutionMetadata(),
+                    1
             );
             defaultTaskManager.startTaskExecutors();
             return defaultTaskManager;
         }
         return null;
     }
 
-    private static StateUpdater maybeCreateAndStartStateUpdater(final boolean 
stateUpdaterEnabled,
-                                                                final 
StreamsMetricsImpl streamsMetrics,
+    private static StateUpdater maybeCreateAndStartStateUpdater(final 
StreamsMetricsImpl streamsMetrics,

Review Comment:
   Updated the method name!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to