ableegoldman commented on a change in pull request #9640: URL: https://github.com/apache/kafka/pull/9640#discussion_r560631718
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ########## @@ -100,40 +94,41 @@ boolean reachedCapacity() { } public Set<TaskId> activeTasks() { - return unmodifiableSet(activeTasks); + return unmodifiableSet(assignedActiveTasks.taskIds()); } public int activeTaskCount() { - return activeTasks.size(); + return assignedActiveTasks.taskIds().size(); } double activeTaskLoad() { return ((double) activeTaskCount()) / capacity; } public void assignActiveTasks(final Collection<TaskId> tasks) { - activeTasks.addAll(tasks); + assignedActiveTasks.taskIds().addAll(tasks); } public void assignActiveToConsumer(final TaskId task, final String consumer) { - consumerToAssignedActiveTaskIds.computeIfAbsent(consumer, k -> new HashSet<>()).add(task); + assignedActiveTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> new HashSet<>()).add(task); Review comment: Thanks for the update, I think we're closer but still not quite there yet since we've kind of just wrapped the existing data structures up into the new `ClientStateTask` class but still access those maps/sets like we did before. So there's not much added safety/consistency, which was the original intention of this ticket IIUC -- ie to make sure that `activeTasks` and `consumerToAssignedActiveTaskIds` hold the same set of tasks overall. Granted, this might turn out to be pretty difficult to do without a large refactoring of the assignment code. Part of the problem is just that we first assigned tasks at the client-level, at which point we don't know anything about the consumers for each task. Then later we assign tasks to consumers from inside the `StreamsPartitionAssignor` class, at which point we don't really care so much about the client-level tasks. But I think we can still make some solid improvements here. For example what if we forget about refactoring any of the data structures for now, and just focus on adding some sanity checks -- eg inside `assignActiveToConsumer` we make sure that the `task` actually is assigned to this client to begin with, ie that it exists in the `activeTasks` set. WDYT, does that make sense? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org