ableegoldman commented on a change in pull request #9640:
URL: https://github.com/apache/kafka/pull/9640#discussion_r556248367



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -54,15 +58,17 @@
     private final Map<String, Set<TaskId>> consumerToPreviousStatefulTaskIds = 
new TreeMap<>();
 
     // the following four maps are used only for logging purposes;

Review comment:
       no need for this comment anymore

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -54,15 +58,17 @@
     private final Map<String, Set<TaskId>> consumerToPreviousStatefulTaskIds = 
new TreeMap<>();
 
     // the following four maps are used only for logging purposes;
-    // TODO KAFKA-10283: we could consider merging them with other 
book-keeping maps at client-levels
-    //                   so that they would not be inconsistent
-    private final Map<String, Set<TaskId>> consumerToPreviousActiveTaskIds = 
new TreeMap<>();
-    private final Map<String, Set<TaskId>> consumerToAssignedActiveTaskIds = 
new TreeMap<>();
-    private final Map<String, Set<TaskId>> consumerToAssignedStandbyTaskIds = 
new TreeMap<>();
-    private final Map<String, Set<TaskId>> consumerToRevokingActiveTaskIds = 
new TreeMap<>();
-
+    private final Map<String, Map<TaskId, Set<ConsumerState>>> 
consumerToTaskIdsConsumerStates = new TreeMap<>();

Review comment:
       This map is great as a concise & efficient way to keep track of the 
tasks, but I'm worried that we're sacrificing some readability. I'm not sure we 
necessarily want to stuff everything into a single data structure, just reduce 
the duplication of effort in tracking tasks at both the client and consumer 
level. If you take a look at the `activeTasks` family of sets above, that's 
basically just a superset of all tasks in the `consumerToAssignedActive` map.
   
   But `consumerToAssignedActive` and `consumerToAssignedStandby`, for example, 
actually track two completely orthogonal things, so I think it seems reasonable 
for them to be in two different data structures. WDYT?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -996,18 +996,19 @@ private void populatePartitionsByHostMaps(final 
Map<HostInfo, Set<TopicPartition
                 log.debug("Requested client {} to schedule a followup 
rebalance", clientId);
             }
 
-            log.info("Client {} per-consumer assignment:\n" +
-                "\tprev owned active {}\n" +
-                "\tprev owned standby {}\n" +
-                "\tassigned active {}\n" +
-                "\trevoking active {}" +
-                "\tassigned standby {}\n",
-                clientId,
-                clientMetadata.state.prevOwnedActiveTasksByConsumer(),
-                clientMetadata.state.prevOwnedStandbyByConsumer(),
-                clientMetadata.state.assignedActiveTasksByConsumer(),
-                clientMetadata.state.revokingActiveTasksByConsumer(),
-                clientMetadata.state.assignedStandbyTasksByConsumer());
+            log.info("Client {} per-consumer assignment\n {}",
+                     clientId,
+                     
clientMetadata.state.assignedTasksConsumerStateByConsumer()
+                                         .entrySet()
+                                         .stream()
+                                         .map(e -> "\tconsumer : " + 
e.getKey() + '\n' +
+                                                   e.getValue().entrySet()
+                                                    .stream()
+                                                    .map(taskIdEntry -> 
"\ttaskId : " + taskIdEntry.getKey()
+                                                                        + " 
states : " + taskIdEntry.getValue())
+                                                    
.collect(Collectors.joining("\n")))
+                                         .collect(Collectors.joining("\n"))
+            );

Review comment:
       This will print everything out with one task per line, right? That might 
be a problem (or at least, an annoyance) in apps with hundreds or more tasks in 
total. Also, just speaking from experience in debugging an application, it's 
more useful to present the information in terms of "here is all the assigned 
active tasks, here are the revoking active, etc..." just so you can see all the 
tasks of each type together. So we may want to keep the original format, or 
something similar at least




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to