ableegoldman commented on a change in pull request #9640: URL: https://github.com/apache/kafka/pull/9640#discussion_r556248367
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ########## @@ -54,15 +58,17 @@ private final Map<String, Set<TaskId>> consumerToPreviousStatefulTaskIds = new TreeMap<>(); // the following four maps are used only for logging purposes; Review comment: no need for this comment anymore ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ########## @@ -54,15 +58,17 @@ private final Map<String, Set<TaskId>> consumerToPreviousStatefulTaskIds = new TreeMap<>(); // the following four maps are used only for logging purposes; - // TODO KAFKA-10283: we could consider merging them with other book-keeping maps at client-levels - // so that they would not be inconsistent - private final Map<String, Set<TaskId>> consumerToPreviousActiveTaskIds = new TreeMap<>(); - private final Map<String, Set<TaskId>> consumerToAssignedActiveTaskIds = new TreeMap<>(); - private final Map<String, Set<TaskId>> consumerToAssignedStandbyTaskIds = new TreeMap<>(); - private final Map<String, Set<TaskId>> consumerToRevokingActiveTaskIds = new TreeMap<>(); - + private final Map<String, Map<TaskId, Set<ConsumerState>>> consumerToTaskIdsConsumerStates = new TreeMap<>(); Review comment: This map is great as a concise & efficient way to keep track of the tasks, but I'm worried that we're sacrificing some readability. I'm not sure we necessarily want to stuff everything into a single data structure, just reduce the duplication of effort in tracking tasks at both the client and consumer level. If you take a look at the `activeTasks` family of sets above, that's basically just a superset of all tasks in the `consumerToAssignedActive` map. But `consumerToAssignedActive` and `consumerToAssignedStandby`, for example, actually track two completely orthogonal things, so I think it seems reasonable for them to be in two different data structures. WDYT? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -996,18 +996,19 @@ private void populatePartitionsByHostMaps(final Map<HostInfo, Set<TopicPartition log.debug("Requested client {} to schedule a followup rebalance", clientId); } - log.info("Client {} per-consumer assignment:\n" + - "\tprev owned active {}\n" + - "\tprev owned standby {}\n" + - "\tassigned active {}\n" + - "\trevoking active {}" + - "\tassigned standby {}\n", - clientId, - clientMetadata.state.prevOwnedActiveTasksByConsumer(), - clientMetadata.state.prevOwnedStandbyByConsumer(), - clientMetadata.state.assignedActiveTasksByConsumer(), - clientMetadata.state.revokingActiveTasksByConsumer(), - clientMetadata.state.assignedStandbyTasksByConsumer()); + log.info("Client {} per-consumer assignment\n {}", + clientId, + clientMetadata.state.assignedTasksConsumerStateByConsumer() + .entrySet() + .stream() + .map(e -> "\tconsumer : " + e.getKey() + '\n' + + e.getValue().entrySet() + .stream() + .map(taskIdEntry -> "\ttaskId : " + taskIdEntry.getKey() + + " states : " + taskIdEntry.getValue()) + .collect(Collectors.joining("\n"))) + .collect(Collectors.joining("\n")) + ); Review comment: This will print everything out with one task per line, right? That might be a problem (or at least, an annoyance) in apps with hundreds or more tasks in total. Also, just speaking from experience in debugging an application, it's more useful to present the information in terms of "here is all the assigned active tasks, here are the revoking active, etc..." just so you can see all the tasks of each type together. So we may want to keep the original format, or something similar at least ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org