cadonna commented on code in PR #12730: URL: https://github.com/apache/kafka/pull/12730#discussion_r992179987
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -421,6 +420,27 @@ private void handleTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartition } } + private void updateInputPartitionsOfStandbyTaskIfTheyChanged(final Task task, + final Set<TopicPartition> inputPartitions) { + /* + We should only update input partitions of a standby task if the input partitions really changed. Updating the + input partitions of tasks also updates the mapping from source nodes to input topics in the processor topology + within the task. The mapping is updated with the topics from the topology metadata. The topology metadata does + not prefix intermediate internal topics with the application ID. Thus, if a standby task has input partitions + from an intermediate internal topic the update of the mapping in the processor topology leads to an invalid + topology exception during recycling of a standby task to an active task when the input queues are created. This + is because the input topics in the processor topology and the input partitions of the task do not match because + the former miss the application ID prefix. + For standby task that have only input partitions from intermediate internal topics this check avoids the invalid + topology exception. Unfortunately, a subtopology might have input partitions subscribed to with a regex + additionally intermediate internal topics which might still lead to an invalid topology exception during recycling + irrespectively of this check here. Thus, there is still a bug to fix here. Review Comment: I opened the following ticket: https://issues.apache.org/jira/browse/KAFKA-14288 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org