guozhangwang commented on code in PR #12427: URL: https://github.com/apache/kafka/pull/12427#discussion_r927038139
########## streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java: ########## @@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() { return Arrays.stream(Thread.currentThread().getStackTrace()) .anyMatch(caller -> "org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && "checkSupplier".equals(caller.getMethodName())); } + + public static StreamTask createStatefulTask(final TaskId taskId, Review Comment: nit: we have other unit test classes duplicating this logic, e.g. in the `DefaultStateUpdaterTest` above, also in `StreamTaskTest#createStatelessTask`. Could we consolidate them all in this class? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ########## @@ -136,10 +139,14 @@ private void createActiveTasks(final Map<TaskId, Set<TopicPartition>> activeTask if (!activeTasksToCreate.isEmpty()) { for (final Task activeTask : activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) { - activeTasksPerId.put(activeTask.id(), activeTask); - pendingActiveTasks.remove(activeTask.id()); - for (final TopicPartition topicPartition : activeTask.inputPartitions()) { - activeTasksPerPartition.put(topicPartition, activeTask); + if (stateUpdater != null) { Review Comment: This is a meta thought: I think we should consider extracting the creation of tasks out of `Tasks` and into the `TaskManager`, and hence also not include `StateUpdater` into `Tasks`. Maybe we can do that later in a follow-up refactoring if you agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org