guozhangwang commented on a change in pull request #9640:
URL: https://github.com/apache/kafka/pull/9640#discussion_r616343033



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -100,40 +94,41 @@ boolean reachedCapacity() {
     }
 
     public Set<TaskId> activeTasks() {
-        return unmodifiableSet(activeTasks);
+        return unmodifiableSet(assignedActiveTasks.taskIds());
     }
 
     public int activeTaskCount() {
-        return activeTasks.size();
+        return assignedActiveTasks.taskIds().size();
     }
 
     double activeTaskLoad() {
         return ((double) activeTaskCount()) / capacity;
     }
 
     public void assignActiveTasks(final Collection<TaskId> tasks) {
-        activeTasks.addAll(tasks);
+        assignedActiveTasks.taskIds().addAll(tasks);
     }
 
     public void assignActiveToConsumer(final TaskId task, final String 
consumer) {
-        consumerToAssignedActiveTaskIds.computeIfAbsent(consumer, k -> new 
HashSet<>()).add(task);
+        assignedActiveTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> 
new HashSet<>()).add(task);

Review comment:
       @highluck I think the main point here is that, inside the 
`assignActiveToConsumer` we first make sure that the `task` is already in the 
`ClientStateTask.taskIds` set, i.e. that the task is already assigned to the 
client before we assign it to the client's consumer in case any bugs caused 
inconsistent assignment: for example, we assigned the task to instance `A` but 
also at the same time assign it to consumer `2` of instance `B`: currently the 
data structure still does not guarantee that would never happen, but at least 
we can add such sanity check for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to