cadonna commented on code in PR #12730:
URL: https://github.com/apache/kafka/pull/12730#discussion_r992179987


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -421,6 +420,27 @@ private void handleTasksWithoutStateUpdater(final 
Map<TaskId, Set<TopicPartition
         }
     }
 
+    private void updateInputPartitionsOfStandbyTaskIfTheyChanged(final Task 
task,
+                                                                 final 
Set<TopicPartition> inputPartitions) {
+        /*
+        We should only update input partitions of a standby task if the input 
partitions really changed. Updating the
+        input partitions of tasks also updates the mapping from source nodes 
to input topics in the processor topology
+        within the task. The mapping is updated with the topics from the 
topology metadata. The topology metadata does
+        not prefix intermediate internal topics with the application ID. Thus, 
if a standby task has input partitions
+        from an intermediate internal topic the update of the mapping in the 
processor topology leads to an invalid
+        topology exception during recycling of a standby task to an active 
task when the input queues are created. This
+        is because the input topics in the processor topology and the input 
partitions of the task do not match because
+        the former miss the application ID prefix.
+        For standby task that have only input partitions from intermediate 
internal topics this check avoids the invalid
+        topology exception. Unfortunately, a subtopology might have input 
partitions subscribed to with a regex
+        additionally intermediate internal topics which might still lead to an 
invalid topology exception during recycling
+        irrespectively of this check here. Thus, there is still a bug to fix 
here.

Review Comment:
   I opened the following ticket: 
https://issues.apache.org/jira/browse/KAFKA-14288 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to