ableegoldman commented on a change in pull request #9640:
URL: https://github.com/apache/kafka/pull/9640#discussion_r560631718



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -100,40 +94,41 @@ boolean reachedCapacity() {
     }
 
     public Set<TaskId> activeTasks() {
-        return unmodifiableSet(activeTasks);
+        return unmodifiableSet(assignedActiveTasks.taskIds());
     }
 
     public int activeTaskCount() {
-        return activeTasks.size();
+        return assignedActiveTasks.taskIds().size();
     }
 
     double activeTaskLoad() {
         return ((double) activeTaskCount()) / capacity;
     }
 
     public void assignActiveTasks(final Collection<TaskId> tasks) {
-        activeTasks.addAll(tasks);
+        assignedActiveTasks.taskIds().addAll(tasks);
     }
 
     public void assignActiveToConsumer(final TaskId task, final String 
consumer) {
-        consumerToAssignedActiveTaskIds.computeIfAbsent(consumer, k -> new 
HashSet<>()).add(task);
+        assignedActiveTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> 
new HashSet<>()).add(task);

Review comment:
       Thanks for the update, I think we're closer but still not quite there 
yet since we've kind of just wrapped the existing data structures up into the 
new `ClientStateTask` class but still access those maps/sets like we did 
before. So there's not much added safety/consistency, which was the original 
intention of this ticket IIUC -- ie to make sure that `activeTasks` and 
`consumerToAssignedActiveTaskIds` hold the same set of tasks overall.
   
   Granted, this might turn out to be pretty difficult to do without a large 
refactoring of the assignment code. Part of the problem is just that we first 
assigned tasks at the client-level, at which point we don't know anything about 
the consumers for each task. Then later we assign tasks to consumers from 
inside the `StreamsPartitionAssignor` class, at which point we don't really 
care so much about the client-level tasks.
   
   But I think we can still make some solid improvements here. For example what 
if we forget about refactoring any of the data structures for now, and just 
focus on adding some sanity checks -- eg inside `assignActiveToConsumer` we 
make sure that the `task` actually is assigned to this client to begin with, ie 
that  it exists in the `activeTasks` set. WDYT, does that make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to