guozhangwang commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r927038139

##########
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:
##########
@@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() {
         return Arrays.stream(Thread.currentThread().getStackTrace())
                 .anyMatch(caller -> 
"org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && 
"checkSupplier".equals(caller.getMethodName()));
     }
+
+    public static StreamTask createStatefulTask(final TaskId taskId,

Review Comment:
   nit: we have other unit test classes duplicating this logic, e.g. in the 
`DefaultStateUpdaterTest` above, also in `StreamTaskTest#createStatelessTask`. 
Could we consolidate them all in this class?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -136,10 +139,14 @@ private void createActiveTasks(final Map<TaskId, 
Set<TopicPartition>> activeTask
 
         if (!activeTasksToCreate.isEmpty()) {
             for (final Task activeTask : 
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
-                activeTasksPerId.put(activeTask.id(), activeTask);
-                pendingActiveTasks.remove(activeTask.id());
-                for (final TopicPartition topicPartition : 
activeTask.inputPartitions()) {
-                    activeTasksPerPartition.put(topicPartition, activeTask);
+                if (stateUpdater != null) {

Review Comment:
   This is a meta thought: I think we should consider extracting the creation 
of tasks out of `Tasks` and into the `TaskManager`, and hence also not include 
`StateUpdater` into `Tasks`. Maybe we can do that later in a follow-up 
refactoring if you agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to