lucasbru commented on code in PR #18652:
URL: https://github.com/apache/kafka/pull/18652#discussion_r1930379159


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.unmodifiableSet;
+import static org.apache.kafka.common.utils.Utils.union;
+
+public class ProcessState {

Review Comment:
   nit: could use a javadoc comment for the purpose.
   
   Also - Is this supposed to be used by multiple assignors? Then we probably 
need a unit test. If this is not supposed to be used by multiple assignors, how 
about making this a static inner class of the sticky task assignor to make it 
clear this is not public inteface?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.unmodifiableSet;
+import static org.apache.kafka.common.utils.Utils.union;
+
+public class ProcessState {
+    private final String processId;
+    // number of members
+    private int capacity;
+    private double load;
+    private final Map<String, Integer> memberToTaskCounts;
+    private final Map<String, Set<TaskId>> assignedActiveTasks;
+    private final Map<String, Set<TaskId>> assignedStandbyTasks;
+
+
+    ProcessState(final String processId) {
+        this.processId = processId;
+        this.capacity = 0;
+        this.load = Double.MAX_VALUE;
+        this.assignedActiveTasks = new HashMap<>();
+        this.assignedStandbyTasks = new HashMap<>();
+        this.memberToTaskCounts = new HashMap<>();
+    }
+
+
+    public String processId() {
+        return processId;
+    }
+
+    public int capacity() {
+        return capacity;
+    }
+
+    public int totalTaskCount() {
+        return assignedStandbyTasks().size() + assignedActiveTasks().size();
+    }
+
+    public double load() {
+        return load;
+    }
+
+    public Map<String, Integer> memberToTaskCounts() {
+        return memberToTaskCounts;
+    }
+
+    public Set<TaskId> assignedActiveTasks() {
+        return assignedActiveTasks.values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
+    }
+
+    public Map<String, Set<TaskId>> assignedActiveTasksByMember() {
+        return assignedActiveTasks;
+    }
+
+    public Set<TaskId> assignedStandbyTasks() {
+        return assignedStandbyTasks.values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
+    }
+
+    public Map<String, Set<TaskId>> assignedStandbyTasksByMember() {
+        return assignedStandbyTasks;
+    }
+
+    public void addTask(final String memberId, final TaskId taskId, final 
boolean isActive) {
+        if (isActive) {
+            assignedActiveTasks.putIfAbsent(memberId, new HashSet<>());
+            assignedActiveTasks.get(memberId).add(taskId);
+        } else {
+            assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>());
+            assignedStandbyTasks.get(memberId).add(taskId);
+        }
+        memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1);
+        computeLoad();
+    }
+
+    private void incrementCapacity() {
+        capacity++;
+        computeLoad();
+    }
+    public void computeLoad() {

Review Comment:
   nit: new line missing



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+    // helper data structures:
+    private TaskPairs taskPairs;
+    Map<TaskId, Member> activeTaskToPrevMember;
+    Map<TaskId, Set<Member>> standbyTaskToPrevMember;
+    Map<String, ProcessState> processIdToState;
+    int allTasks;
+    int totalCapacity;
+    int tasksPerMember;
+
+    @Override
+    public String name() {
+        return STICKY_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) throws TaskAssignorException {
+        initialize(groupSpec, topologyDescriber);
+        //active
+        Set<TaskId> activeTasks = toTaskIds(groupSpec, topologyDescriber, 
true);
+        assignActive(activeTasks);
+        //standby
+        final int numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("numStandbyReplicas"));
+        if (numStandbyReplicas > 0) {
+            Set<TaskId> statefulTasks = toTaskIds(groupSpec, 
topologyDescriber, false);
+            assignStandby(statefulTasks, numStandbyReplicas);
+        }
+        return buildGroupAssignment(groupSpec.members().keySet());
+    }
+
+    private Set<TaskId> toTaskIds(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber, final boolean isActive) {
+        Set<TaskId> ret = new HashSet<>();
+        for (String subtopology : groupSpec.subtopologies()) {
+            if (isActive || topologyDescriber.isStateful(subtopology)) {
+                int numberOfPartitions = 
topologyDescriber.numTasks(subtopology);
+                for (int i = 0; i < numberOfPartitions; i++) {
+                    ret.add(new TaskId(subtopology, i));
+                }
+            }
+        }
+        return ret;
+    }
+
+    private void initialize(final GroupSpec groupSpec, final TopologyDescriber 
topologyDescriber) {
+        allTasks = 0;
+        for (String subtopology : groupSpec.subtopologies()) {
+            int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+            allTasks += numberOfPartitions;
+        }
+        totalCapacity = groupSpec.members().size();
+        tasksPerMember = computeTasksPerMember(allTasks, totalCapacity);
+        taskPairs = new TaskPairs(allTasks * (allTasks - 1) / 2);
+        processIdToState = new HashMap<>();
+        activeTaskToPrevMember = new HashMap<>();
+        standbyTaskToPrevMember = new HashMap<>();
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
groupSpec.members().entrySet()) {
+            final String memberId = memberEntry.getKey();
+            final String processId = memberEntry.getValue().processId();
+            final Member member = new Member(processId, memberId);
+            final AssignmentMemberSpec memberSpec = memberEntry.getValue();
+            processIdToState.putIfAbsent(processId, new 
ProcessState(processId));
+            processIdToState.get(processId).addMember(memberId);
+            // prev active tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.activeTasks().entrySet()) {

Review Comment:
   From the point of the assignment algorithm, the assignment is computed given 
the input specification of the group. If somebody dropped out of the group, 
that member will not appear in the specification of the group. If somebody is 
dropping out of the group while the assignment is being computed (whether this 
is possible depends on the definition of "dropping out of the group", I guess), 
this doesn't really matter for the assignor. The assignor will ignore any 
concurrent changes to the group -- if there are relevant changes to the group, 
the group epoch will be bumped again and the assingor will be called again. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";

Review Comment:
   Could it be private? We expose it via `name`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java:
##########
@@ -28,6 +29,11 @@ public interface GroupSpec {
      */
     Map<String, AssignmentMemberSpec> members();
 
+    /**
+     * @return The list of subtopologies.
+     */
+    List<String> subtopologies();

Review Comment:
   This method has been moved to `TopologyDescriber` interface, please don't 
bring it back in `GroupSpec`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    // helper data structures:
+    private TaskPairs taskPairs;

Review Comment:
   Note that there is only one instance of the assignor for all groups. So we 
have to be super careful here to always reset this local state. Also, we will 
retain references to various objects in this assignor after returning from 
`assign`. Could we get by without fields in this class? Or can we at least make 
sure to reset everything after `assign`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    // helper data structures:
+    private TaskPairs taskPairs;
+    Map<TaskId, Member> activeTaskToPrevMember;
+    Map<TaskId, Set<Member>> standbyTaskToPrevMember;
+    Map<String, ProcessState> processIdToState;
+
+    int allTasks;
+    int totalCapacity;
+    int tasksPerMember;
+
+    @Override
+    public String name() {
+        return STICKY_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) throws TaskAssignorException {
+
+        initialize(groupSpec, topologyDescriber);
+
+        //active
+        Set<TaskId> activeTasks = taskIds(groupSpec, topologyDescriber, true);
+        assignActive(activeTasks);
+
+        //standby
+        final int numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("numStandbyReplicas"));

Review Comment:
   Why is the config called `numStandbyReplicas`? In the KIP the name is 
`num.standby.replicas`, so just wondering where you got this from.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    // helper data structures:
+    private TaskPairs taskPairs;
+    Map<TaskId, Member> activeTaskToPrevMember;
+    Map<TaskId, Set<Member>> standbyTaskToPrevMember;
+    Map<String, ProcessState> processIdToState;
+
+    int allTasks;
+    int totalCapacity;
+    int tasksPerMember;
+
+    @Override
+    public String name() {
+        return STICKY_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) throws TaskAssignorException {
+
+        initialize(groupSpec, topologyDescriber);
+
+        //active
+        Set<TaskId> activeTasks = taskIds(groupSpec, topologyDescriber, true);
+        assignActive(activeTasks);
+
+        //standby
+        final int numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("numStandbyReplicas"));
+        if (numStandbyReplicas > 0) {
+            Set<TaskId> statefulTasks = taskIds(groupSpec, topologyDescriber, 
false);
+            assignStandby(statefulTasks, numStandbyReplicas);
+        }
+
+        return buildGroupAssignment(groupSpec.members().keySet());
+    }
+
+    private Set<TaskId> taskIds(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber, final boolean isActive) {
+        Set<TaskId> ret = new HashSet<>();
+        for (String subtopology : groupSpec.subtopologies()) {
+            if (isActive || topologyDescriber.isStateful(subtopology)) {
+                int numberOfPartitions = 
topologyDescriber.numTasks(subtopology);
+                for (int i = 0; i < numberOfPartitions; i++) {
+                    ret.add(new TaskId(subtopology, i));
+                }
+            }
+        }
+        return ret;
+    }
+
+    private void initialize(final GroupSpec groupSpec, final TopologyDescriber 
topologyDescriber) {
+
+        allTasks = 0;
+        for (String subtopology : groupSpec.subtopologies()) {
+            int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+            allTasks += numberOfPartitions;
+        }
+        totalCapacity = groupSpec.members().size();
+        tasksPerMember = computeTasksPerMember(allTasks, totalCapacity);
+
+        taskPairs = new TaskPairs(allTasks * (allTasks - 1) / 2);
+
+        processIdToState = new HashMap<>();
+        activeTaskToPrevMember = new HashMap<>();
+        standbyTaskToPrevMember = new HashMap<>();
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
groupSpec.members().entrySet()) {
+            final String memberId = memberEntry.getKey();
+            final String processId = memberEntry.getValue().processId();
+            final Member member = new Member(processId, memberId);
+            final AssignmentMemberSpec memberSpec = memberEntry.getValue();
+
+            processIdToState.putIfAbsent(processId, new 
ProcessState(processId));
+            processIdToState.get(processId).addMember(memberId);
+
+            // prev active tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.activeTasks().entrySet()) {
+                Set<Integer> partitionNoSet = entry.getValue();
+                for (int partitionNo : partitionNoSet) {
+                    activeTaskToPrevMember.put(new TaskId(entry.getKey(), 
partitionNo), member);
+                }
+            }
+
+            // prev standby tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.standbyTasks().entrySet()) {
+                Set<Integer> partitionNoSet = entry.getValue();
+                for (int partitionNo : partitionNoSet) {
+                    TaskId taskId = new TaskId(entry.getKey(), partitionNo);
+                    standbyTaskToPrevMember.putIfAbsent(taskId, new 
HashSet<>());
+                    standbyTaskToPrevMember.get(taskId).add(member);
+                }
+            }
+        }
+    }
+
+    private GroupAssignment buildGroupAssignment(final Set<String> members) {
+        final Map<String, MemberAssignment> memberAssignments = new 
HashMap<>();
+
+        final Map<String, Set<TaskId>> activeTasksAssignments = 
processIdToState.entrySet().stream()
+            .flatMap(entry -> 
entry.getValue().assignedActiveTasksByMember().entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
(set1, set2) -> {
+                set1.addAll(set2);
+                return set1;
+            }));
+
+        final Map<String, Set<TaskId>> standbyTasksAssignments = 
processIdToState.entrySet().stream()
+            .flatMap(entry -> 
entry.getValue().assignedStandbyTasksByMember().entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
(set1, set2) -> {
+                set1.addAll(set2);
+                return set1;
+            }));
+
+        for (String memberId : members) {
+            Map<String, Set<Integer>> activeTasks = new HashMap<>();
+            if (activeTasksAssignments.containsKey(memberId)) {
+                activeTasks = 
toCompactedTaskIds(activeTasksAssignments.get(memberId));
+            }
+            Map<String, Set<Integer>> standByTasks = new HashMap<>();
+
+            if (standbyTasksAssignments.containsKey(memberId)) {
+                standByTasks = 
toCompactedTaskIds(standbyTasksAssignments.get(memberId));
+            }
+            memberAssignments.put(memberId, new MemberAssignment(activeTasks, 
standByTasks, new HashMap<>()));
+        }
+
+        return new GroupAssignment(memberAssignments);
+    }
+
+    private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId> 
taskIds) {
+        Map<String, Set<Integer>> ret = new HashMap<>();
+        for (TaskId taskId : taskIds) {
+            ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>());
+            ret.get(taskId.subtopologyId()).add(taskId.partition());
+        }
+        return ret;
+    }
+
+    private void assignActive(final Set<TaskId> activeTasks) {
+
+        // 1. re-assigning existing active tasks to clients that previously 
had the same active tasks
+        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+            final TaskId task = it.next();
+            final Member prevMember = activeTaskToPrevMember.get(task);
+            if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+                
processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, 
true);
+                updateHelpers(prevMember, task, true);
+                it.remove();
+            }
+        }
+
+        // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
+        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+            final TaskId task = it.next();
+            final Set<Member> prevMembers = standbyTaskToPrevMember.get(task);
+            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+            if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+                
processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, 
true);
+                updateHelpers(prevMember, task, true);
+                it.remove();
+            }
+        }
+
+        // 3. assign any remaining unassigned tasks
+        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+            final TaskId task = it.next();
+            final Set<Member> allMembers = 
processIdToState.entrySet().stream().flatMap(entry -> 
entry.getValue().memberToTaskCounts().keySet().stream()
+                .map(memberId -> new Member(entry.getKey(), 
memberId))).collect(Collectors.toSet());
+            final Member member = findMemberWithLeastLoad(allMembers, task, 
false);
+            if (member == null) {
+                log.error("Unable to assign active task {} to any member.", 
task);
+                throw new TaskAssignorException("No member available to assign 
active task {}." + task);

Review Comment:
   In which cases can this happen, are we sure we want to crash in all of those?
   
   For example, I guess this can happen if we have no members in the group. If 
we are going to crash in the case where no members are in the group, it would 
be good to document this as a contract in the task assignor interface.
   
   Are there cases where there are members in the group and I cannot find an 
assignment?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+    // helper data structures:
+    private TaskPairs taskPairs;
+    Map<TaskId, Member> activeTaskToPrevMember;
+    Map<TaskId, Set<Member>> standbyTaskToPrevMember;
+    Map<String, ProcessState> processIdToState;
+
+    int allTasks;
+    int totalCapacity;
+    int tasksPerMember;
+
+    @Override
+    public String name() {
+        return STICKY_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) throws TaskAssignorException {
+
+        initialize(groupSpec, topologyDescriber);
+
+        //active
+        Set<TaskId> activeTasks = taskIds(groupSpec, topologyDescriber, true);
+        assignActive(activeTasks);
+
+        //standby
+        final int numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("numStandbyReplicas"));
+        if (numStandbyReplicas > 0) {
+            Set<TaskId> statefulTasks = taskIds(groupSpec, topologyDescriber, 
false);
+            assignStandby(statefulTasks, numStandbyReplicas);
+        }
+
+        return buildGroupAssignment(groupSpec.members().keySet());
+    }
+
+    private Set<TaskId> taskIds(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber, final boolean isActive) {
+        Set<TaskId> ret = new HashSet<>();
+        for (String subtopology : groupSpec.subtopologies()) {
+            if (isActive || topologyDescriber.isStateful(subtopology)) {
+                int numberOfPartitions = 
topologyDescriber.numTasks(subtopology);
+                for (int i = 0; i < numberOfPartitions; i++) {
+                    ret.add(new TaskId(subtopology, i));
+                }
+            }
+        }
+        return ret;
+    }
+
+    private void initialize(final GroupSpec groupSpec, final TopologyDescriber 
topologyDescriber) {
+
+        allTasks = 0;
+        for (String subtopology : groupSpec.subtopologies()) {
+            int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+            allTasks += numberOfPartitions;
+        }
+        totalCapacity = groupSpec.members().size();
+        tasksPerMember = computeTasksPerMember(allTasks, totalCapacity);
+
+        taskPairs = new TaskPairs(allTasks * (allTasks - 1) / 2);
+
+        processIdToState = new HashMap<>();
+        activeTaskToPrevMember = new HashMap<>();
+        standbyTaskToPrevMember = new HashMap<>();
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
groupSpec.members().entrySet()) {
+            final String memberId = memberEntry.getKey();
+            final String processId = memberEntry.getValue().processId();
+            final Member member = new Member(processId, memberId);
+            final AssignmentMemberSpec memberSpec = memberEntry.getValue();
+
+            processIdToState.putIfAbsent(processId, new 
ProcessState(processId));
+            processIdToState.get(processId).addMember(memberId);
+
+            // prev active tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.activeTasks().entrySet()) {
+                Set<Integer> partitionNoSet = entry.getValue();
+                for (int partitionNo : partitionNoSet) {
+                    activeTaskToPrevMember.put(new TaskId(entry.getKey(), 
partitionNo), member);
+                }
+            }
+
+            // prev standby tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.standbyTasks().entrySet()) {
+                Set<Integer> partitionNoSet = entry.getValue();
+                for (int partitionNo : partitionNoSet) {
+                    TaskId taskId = new TaskId(entry.getKey(), partitionNo);
+                    standbyTaskToPrevMember.putIfAbsent(taskId, new 
HashSet<>());
+                    standbyTaskToPrevMember.get(taskId).add(member);
+                }
+            }
+        }
+    }
+
+    private GroupAssignment buildGroupAssignment(final Set<String> members) {
+        final Map<String, MemberAssignment> memberAssignments = new 
HashMap<>();
+
+        final Map<String, Set<TaskId>> activeTasksAssignments = 
processIdToState.entrySet().stream()
+            .flatMap(entry -> 
entry.getValue().assignedActiveTasksByMember().entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
(set1, set2) -> {
+                set1.addAll(set2);
+                return set1;
+            }));
+
+        final Map<String, Set<TaskId>> standbyTasksAssignments = 
processIdToState.entrySet().stream()
+            .flatMap(entry -> 
entry.getValue().assignedStandbyTasksByMember().entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
(set1, set2) -> {
+                set1.addAll(set2);
+                return set1;
+            }));
+
+        for (String memberId : members) {
+            Map<String, Set<Integer>> activeTasks = new HashMap<>();
+            if (activeTasksAssignments.containsKey(memberId)) {
+                activeTasks = 
toCompactedTaskIds(activeTasksAssignments.get(memberId));
+            }
+            Map<String, Set<Integer>> standByTasks = new HashMap<>();
+
+            if (standbyTasksAssignments.containsKey(memberId)) {
+                standByTasks = 
toCompactedTaskIds(standbyTasksAssignments.get(memberId));
+            }
+            memberAssignments.put(memberId, new MemberAssignment(activeTasks, 
standByTasks, new HashMap<>()));
+        }
+
+        return new GroupAssignment(memberAssignments);
+    }
+
+    private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId> 
taskIds) {
+        Map<String, Set<Integer>> ret = new HashMap<>();
+        for (TaskId taskId : taskIds) {
+            ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>());
+            ret.get(taskId.subtopologyId()).add(taskId.partition());
+        }
+        return ret;
+    }
+
+    private void assignActive(final Set<TaskId> activeTasks) {
+
+        // 1. re-assigning existing active tasks to clients that previously 
had the same active tasks
+        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+            final TaskId task = it.next();
+            final Member prevMember = activeTaskToPrevMember.get(task);
+            if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+                
processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, 
true);
+                updateHelpers(prevMember, task, true);
+                it.remove();
+            }
+        }
+
+        // 2. re-assigning tasks to clients that previously have seen the same 
task (as standby task)
+        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+            final TaskId task = it.next();
+            final Set<Member> prevMembers = standbyTaskToPrevMember.get(task);
+            final Member prevMember = findMemberWithLeastLoad(prevMembers, 
task, true);
+            if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
+                
processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, 
true);
+                updateHelpers(prevMember, task, true);
+                it.remove();
+            }
+        }
+
+        // 3. assign any remaining unassigned tasks
+        for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
+            final TaskId task = it.next();
+            final Set<Member> allMembers = 
processIdToState.entrySet().stream().flatMap(entry -> 
entry.getValue().memberToTaskCounts().keySet().stream()
+                .map(memberId -> new Member(entry.getKey(), 
memberId))).collect(Collectors.toSet());
+            final Member member = findMemberWithLeastLoad(allMembers, task, 
false);
+            if (member == null) {
+                log.error("Unable to assign active task {} to any member.", 
task);
+                throw new TaskAssignorException("No member available to assign 
active task {}." + task);
+            }
+            processIdToState.get(member.processId).addTask(member.memberId, 
task, true);
+            it.remove();
+            updateHelpers(member, task, true);
+
+        }
+    }
+
+    private void maybeUpdateTasksPerMember(final int activeTasksNo) {
+        if (activeTasksNo == tasksPerMember) {
+            totalCapacity--;
+            allTasks -= activeTasksNo;
+            tasksPerMember = computeTasksPerMember(allTasks, totalCapacity);
+        }
+    }
+
+    private Member findMemberWithLeastLoad(final Set<Member> members, TaskId 
taskId, final boolean returnSameMember) {
+        if (members == null || members.isEmpty()) {
+            return null;
+        }
+        Set<Member> rightPairs = members.stream()
+            .filter(member  -> taskPairs.hasNewPair(taskId, 
processIdToState.get(member.processId).assignedTasks()))
+            .collect(Collectors.toSet());
+        if (rightPairs.isEmpty()) {
+            rightPairs = members;
+        }
+        Optional<ProcessState> processWithLeastLoad = rightPairs.stream()
+            .map(member  -> processIdToState.get(member.processId))
+            .min(Comparator.comparingDouble(ProcessState::load));
+
+        // processWithLeastLoad must be present at this point, but we do a 
double check
+        if (processWithLeastLoad.isEmpty()) {
+            return null;
+        }
+        // if the same exact former member is needed
+        if (returnSameMember) {
+            return standbyTaskToPrevMember.get(taskId).stream()
+                .filter(standby -> 
standby.processId.equals(processWithLeastLoad.get().processId()))
+                .findFirst()
+                .orElseGet(() -> 
memberWithLeastLoad(processWithLeastLoad.get()));
+        }
+        return memberWithLeastLoad(processWithLeastLoad.get());
+    }
+
+    private Member memberWithLeastLoad(final ProcessState 
processWithLeastLoad) {
+        Optional<String> memberWithLeastLoad = 
processWithLeastLoad.memberToTaskCounts().entrySet().stream()
+            .min(Map.Entry.comparingByValue())
+            .map(Map.Entry::getKey);
+        return memberWithLeastLoad.map(memberId -> new 
Member(processWithLeastLoad.processId(), memberId)).orElse(null);
+    }
+
+    private boolean hasUnfulfilledQuota(final Member member) {
+        return 
processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < tasksPerMember;
+    }
+
+    private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+        for (TaskId task : standbyTasks) {
+            for (int i = 0; i < numStandbyReplicas; i++) {
+
+                final Set<String> availableProcesses = 
processIdToState.values().stream()
+                    .filter(process -> !process.hasTask(task))
+                    .map(ProcessState::processId)
+                    .collect(Collectors.toSet());
+
+                final String errorMessage = "Unable to assign " + 
(numStandbyReplicas - i) +

Review Comment:
   It would probably be good to move the error message construction to a 
private method, so that we do not have to do this string concatenation in every 
iteration of the loop even if no error occurs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to