ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1625023687
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe + "tasks for source topics vs changelog topics."); } - final Set<TaskId> logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet()); - final Set<DefaultTaskTopicPartition> allTopicPartitions = new HashSet<>(); + final Set<DefaultTaskTopicPartition> topicsRequiringRackInfo = new HashSet<>(); + final AtomicBoolean rackInformationFetched = new AtomicBoolean(false); + final Runnable fetchRackInformation = () -> { + if (!rackInformationFetched.get()) { + RackUtils.annotateTopicPartitionsWithRackInfo(cluster, Review Comment: very small nit (tack onto any followup PR): weird line break, either keep everything on one line or move the `cluster` variable to the 2nd line with the other params ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe + "tasks for source topics vs changelog topics."); } - final Set<TaskId> logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet()); - final Set<DefaultTaskTopicPartition> allTopicPartitions = new HashSet<>(); + final Set<DefaultTaskTopicPartition> topicsRequiringRackInfo = new HashSet<>(); + final AtomicBoolean rackInformationFetched = new AtomicBoolean(false); + final Runnable fetchRackInformation = () -> { + if (!rackInformationFetched.get()) { + RackUtils.annotateTopicPartitionsWithRackInfo(cluster, + internalTopicManager, topicsRequiringRackInfo); + rackInformationFetched.set(true); + } + }; + final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsForTask = new HashMap<>(); + final Set<TaskId> logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet()); logicalTaskIds.forEach(taskId -> { final Set<TaskTopicPartition> topicPartitions = new HashSet<>(); for (final TopicPartition topicPartition : sourcePartitionsForTask.get(taskId)) { final boolean isSource = true; final boolean isChangelog = changelogPartitionsForTask.get(taskId).contains(topicPartition); final DefaultTaskTopicPartition racklessTopicPartition = new DefaultTaskTopicPartition( - topicPartition, isSource, isChangelog, null); - allTopicPartitions.add(racklessTopicPartition); + topicPartition, isSource, isChangelog, fetchRackInformation); + topicsRequiringRackInfo.add(racklessTopicPartition); topicPartitions.add(racklessTopicPartition); } for (final TopicPartition topicPartition : changelogPartitionsForTask.get(taskId)) { final boolean isSource = sourcePartitionsForTask.get(taskId).contains(topicPartition); final boolean isChangelog = true; final DefaultTaskTopicPartition racklessTopicPartition = new DefaultTaskTopicPartition( - topicPartition, isSource, isChangelog, null); - allTopicPartitions.add(racklessTopicPartition); + topicPartition, isSource, isChangelog, fetchRackInformation); + if (publicAssignmentConfigs.numStandbyReplicas() > 0) { Review Comment: Note that active tasks will also read from changelog topics (though only during the restore phase), so we should be adding changelogs to the `topicsRequiringRackInfo` set even if there are no standbys configured Again you can tack this onto PR #17 or whatever PR is next ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -573,14 +586,24 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe )); return new DefaultApplicationState( - assignmentConfigs.toPublicAssignmentConfigs(), + publicAssignmentConfigs, logicalTasks, clientMetadataMap ); } - private static void processStreamsPartitionAssignment(final Map<UUID, ClientMetadata> clientMetadataMap, - final TaskAssignment taskAssignment) { + private void processStreamsPartitionAssignment(final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor, + final TaskAssignment taskAssignment, + final AssignmentError assignmentError, + final Map<UUID, ClientMetadata> clientMetadataMap, + final GroupSubscription groupSubscription) { + if (assignmentError == AssignmentError.UNKNOWN_PROCESS_ID || assignmentError == AssignmentError.UNKNOWN_TASK_ID) { + assignor.onAssignmentComputed(new GroupAssignment(Collections.emptyMap()), groupSubscription, assignmentError); + log.error("Task assignment returning empty GroupAssignment and failing due to error {}", assignmentError); Review Comment: Technically speaking we're not returning any GroupAssignment at all for the "task assignment" ie the rebalance, since we throw this exception instead of returning anything from #assign. A subtle difference in wording but I think it's worth clarifying to avoid confusion here ```suggestion log.error("Rebalance failed due to task assignor returning assignment with error {}, assignor callback will receive empty GroupAssignment due to this error", assignmentError); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -790,10 +814,17 @@ private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetada final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = userTaskAssignor.get(); final TaskAssignment taskAssignment = assignor.assign(applicationState); final AssignmentError assignmentError = validateTaskAssignment(applicationState, taskAssignment); - processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); - userTaskAssignmentListener = (assignment, subscription) -> assignor.onAssignmentComputed(assignment, subscription, assignmentError); + processStreamsPartitionAssignment(assignor, taskAssignment, assignmentError, clientMetadataMap, groupSubscription); + customTaskAssignmentListener = (assignment, subscription) -> { + assignor.onAssignmentComputed(assignment, subscription, assignmentError); + if (assignmentError != AssignmentError.NONE) { + log.error("Task assignment returning empty GroupAssignment and failing due to error {}", assignmentError); Review Comment: The "returning empty GroupAssignment" part of the log is just talking about what we're passing in to the #onAssignmentComputed callback, which in this case is not an empty GroupAssignment. ```suggestion log.error("Rebalance failed due to task assignor returning assignment with error {}", assignmentError); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org