ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1625023687


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
                                               + "tasks for source topics vs 
changelog topics.");
         }
 
-        final Set<TaskId> logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
-        final Set<DefaultTaskTopicPartition> allTopicPartitions = new 
HashSet<>();
+        final Set<DefaultTaskTopicPartition> topicsRequiringRackInfo = new 
HashSet<>();
+        final AtomicBoolean rackInformationFetched = new AtomicBoolean(false);
+        final Runnable fetchRackInformation = () -> {
+            if (!rackInformationFetched.get()) {
+                RackUtils.annotateTopicPartitionsWithRackInfo(cluster,

Review Comment:
   very small nit (tack onto any followup PR): weird line break, either keep 
everything on one line or move the `cluster` variable to the 2nd line with the 
other params



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
                                               + "tasks for source topics vs 
changelog topics.");
         }
 
-        final Set<TaskId> logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
-        final Set<DefaultTaskTopicPartition> allTopicPartitions = new 
HashSet<>();
+        final Set<DefaultTaskTopicPartition> topicsRequiringRackInfo = new 
HashSet<>();
+        final AtomicBoolean rackInformationFetched = new AtomicBoolean(false);
+        final Runnable fetchRackInformation = () -> {
+            if (!rackInformationFetched.get()) {
+                RackUtils.annotateTopicPartitionsWithRackInfo(cluster,
+                    internalTopicManager, topicsRequiringRackInfo);
+                rackInformationFetched.set(true);
+            }
+        };
+
         final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsForTask = 
new HashMap<>();
+        final Set<TaskId> logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
         logicalTaskIds.forEach(taskId -> {
             final Set<TaskTopicPartition> topicPartitions = new HashSet<>();
 
             for (final TopicPartition topicPartition : 
sourcePartitionsForTask.get(taskId)) {
                 final boolean isSource = true;
                 final boolean isChangelog = 
changelogPartitionsForTask.get(taskId).contains(topicPartition);
                 final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
-                    topicPartition, isSource, isChangelog, null);
-                allTopicPartitions.add(racklessTopicPartition);
+                    topicPartition, isSource, isChangelog, 
fetchRackInformation);
+                topicsRequiringRackInfo.add(racklessTopicPartition);
                 topicPartitions.add(racklessTopicPartition);
             }
 
             for (final TopicPartition topicPartition : 
changelogPartitionsForTask.get(taskId)) {
                 final boolean isSource = 
sourcePartitionsForTask.get(taskId).contains(topicPartition);
                 final boolean isChangelog = true;
                 final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
-                    topicPartition, isSource, isChangelog, null);
-                allTopicPartitions.add(racklessTopicPartition);
+                    topicPartition, isSource, isChangelog, 
fetchRackInformation);
+                if (publicAssignmentConfigs.numStandbyReplicas() > 0) {

Review Comment:
   Note that active tasks will also read from changelog topics (though only 
during the restore phase), so we should be adding changelogs to the 
`topicsRequiringRackInfo` set even if there are no standbys configured
   
   Again you can tack this onto PR #17 or whatever PR is next



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -573,14 +586,24 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
         ));
 
         return new DefaultApplicationState(
-            assignmentConfigs.toPublicAssignmentConfigs(),
+            publicAssignmentConfigs,
             logicalTasks,
             clientMetadataMap
         );
     }
 
-    private static void processStreamsPartitionAssignment(final Map<UUID, 
ClientMetadata> clientMetadataMap,
-                                                          final TaskAssignment 
taskAssignment) {
+    private void processStreamsPartitionAssignment(final 
org.apache.kafka.streams.processor.assignment.TaskAssignor assignor,
+                                                   final TaskAssignment 
taskAssignment,
+                                                   final AssignmentError 
assignmentError,
+                                                   final Map<UUID, 
ClientMetadata> clientMetadataMap,
+                                                   final GroupSubscription 
groupSubscription) {
+        if (assignmentError == AssignmentError.UNKNOWN_PROCESS_ID || 
assignmentError == AssignmentError.UNKNOWN_TASK_ID) {
+            assignor.onAssignmentComputed(new 
GroupAssignment(Collections.emptyMap()), groupSubscription, assignmentError);
+            log.error("Task assignment returning empty GroupAssignment and 
failing due to error {}", assignmentError);

Review Comment:
   Technically speaking we're not returning any GroupAssignment at all for the 
"task assignment" ie the rebalance, since we throw this exception instead of 
returning anything from #assign. A subtle difference in wording but I think 
it's worth clarifying to avoid confusion here
   
   ```suggestion
               log.error("Rebalance failed due to task assignor returning 
assignment with error {}, assignor callback will receive empty GroupAssignment 
due to this error", assignmentError);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -790,10 +814,17 @@ private UserTaskAssignmentListener 
assignTasksToClients(final Cluster fullMetada
             final org.apache.kafka.streams.processor.assignment.TaskAssignor 
assignor = userTaskAssignor.get();
             final TaskAssignment taskAssignment = 
assignor.assign(applicationState);
             final AssignmentError assignmentError = 
validateTaskAssignment(applicationState, taskAssignment);
-            processStreamsPartitionAssignment(clientMetadataMap, 
taskAssignment);
-            userTaskAssignmentListener = (assignment, subscription) -> 
assignor.onAssignmentComputed(assignment, subscription, assignmentError);
+            processStreamsPartitionAssignment(assignor, taskAssignment, 
assignmentError, clientMetadataMap, groupSubscription);
+            customTaskAssignmentListener = (assignment, subscription) -> {
+                assignor.onAssignmentComputed(assignment, subscription, 
assignmentError);
+                if (assignmentError != AssignmentError.NONE) {
+                    log.error("Task assignment returning empty GroupAssignment 
and failing due to error {}", assignmentError);

Review Comment:
   The "returning empty GroupAssignment" part of the log is just talking about 
what we're passing in to the #onAssignmentComputed callback, which in this case 
is not an empty GroupAssignment.
   
   ```suggestion
                       log.error("Rebalance failed due to task assignor 
returning assignment with error {}", assignmentError);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to