guozhangwang commented on a change in pull request #9640: URL: https://github.com/apache/kafka/pull/9640#discussion_r616343033
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ########## @@ -100,40 +94,41 @@ boolean reachedCapacity() { } public Set<TaskId> activeTasks() { - return unmodifiableSet(activeTasks); + return unmodifiableSet(assignedActiveTasks.taskIds()); } public int activeTaskCount() { - return activeTasks.size(); + return assignedActiveTasks.taskIds().size(); } double activeTaskLoad() { return ((double) activeTaskCount()) / capacity; } public void assignActiveTasks(final Collection<TaskId> tasks) { - activeTasks.addAll(tasks); + assignedActiveTasks.taskIds().addAll(tasks); } public void assignActiveToConsumer(final TaskId task, final String consumer) { - consumerToAssignedActiveTaskIds.computeIfAbsent(consumer, k -> new HashSet<>()).add(task); + assignedActiveTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> new HashSet<>()).add(task); Review comment: @highluck I think the main point here is that, inside the `assignActiveToConsumer` we first make sure that the `task` is already in the `ClientStateTask.taskIds` set, i.e. that the task is already assigned to the client before we assign it to the client's consumer in case any bugs caused inconsistent assignment: for example, we assigned the task to instance `A` but also at the same time assign it to consumer `2` of instance `B`: currently the data structure still does not guarantee that would never happen, but at least we can add such sanity check for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org