aliehsaeedii commented on code in PR #18652:
URL: https://github.com/apache/kafka/pull/18652#discussion_r1930362996


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class StickyTaskAssignor implements TaskAssignor {
+
+    public static final String STICKY_ASSIGNOR_NAME = "sticky";
+    private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+    // helper data structures:
+    private TaskPairs taskPairs;
+    Map<TaskId, Member> activeTaskToPrevMember;
+    Map<TaskId, Set<Member>> standbyTaskToPrevMember;
+    Map<String, ProcessState> processIdToState;
+    int allTasks;
+    int totalCapacity;
+    int tasksPerMember;
+
+    @Override
+    public String name() {
+        return STICKY_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) throws TaskAssignorException {
+        initialize(groupSpec, topologyDescriber);
+        //active
+        Set<TaskId> activeTasks = toTaskIds(groupSpec, topologyDescriber, 
true);
+        assignActive(activeTasks);
+        //standby
+        final int numStandbyReplicas =
+            groupSpec.assignmentConfigs().isEmpty() ? 0
+                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("numStandbyReplicas"));
+        if (numStandbyReplicas > 0) {
+            Set<TaskId> statefulTasks = toTaskIds(groupSpec, 
topologyDescriber, false);
+            assignStandby(statefulTasks, numStandbyReplicas);
+        }
+        return buildGroupAssignment(groupSpec.members().keySet());
+    }
+
+    private Set<TaskId> toTaskIds(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber, final boolean isActive) {
+        Set<TaskId> ret = new HashSet<>();
+        for (String subtopology : groupSpec.subtopologies()) {
+            if (isActive || topologyDescriber.isStateful(subtopology)) {
+                int numberOfPartitions = 
topologyDescriber.numTasks(subtopology);
+                for (int i = 0; i < numberOfPartitions; i++) {
+                    ret.add(new TaskId(subtopology, i));
+                }
+            }
+        }
+        return ret;
+    }
+
+    private void initialize(final GroupSpec groupSpec, final TopologyDescriber 
topologyDescriber) {
+        allTasks = 0;
+        for (String subtopology : groupSpec.subtopologies()) {
+            int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+            allTasks += numberOfPartitions;
+        }
+        totalCapacity = groupSpec.members().size();
+        tasksPerMember = computeTasksPerMember(allTasks, totalCapacity);
+        taskPairs = new TaskPairs(allTasks * (allTasks - 1) / 2);
+        processIdToState = new HashMap<>();
+        activeTaskToPrevMember = new HashMap<>();
+        standbyTaskToPrevMember = new HashMap<>();
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : 
groupSpec.members().entrySet()) {
+            final String memberId = memberEntry.getKey();
+            final String processId = memberEntry.getValue().processId();
+            final Member member = new Member(processId, memberId);
+            final AssignmentMemberSpec memberSpec = memberEntry.getValue();
+            processIdToState.putIfAbsent(processId, new 
ProcessState(processId));
+            processIdToState.get(processId).addMember(memberId);
+            // prev active tasks
+            for (Map.Entry<String, Set<Integer>> entry : 
memberSpec.activeTasks().entrySet()) {

Review Comment:
   What do you mean by handling @bbejeck? We need the following things for 
assignment computation:
   1. all tasks: we retrieve it from `GroupSpec` and `TopologyDescriber`
   2. prev tasks (to re-assign the same tasks to them for more efficiency, 
specifically with stateful tasks): from the existing members. If a member is 
out of the group, we cannot re-assign tasks to it and have to re-assign its 
tasks to other available members.
   3. current available members: which is also retrieved from `GroupSpec` 
([here](https://github.com/aliehsaeedii/kafka/blob/3fdad5196a5137c1ed6cd8c1bd26454a569838fb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java#L100-L110))
   
   Is there any point I'm missing? If yes, I've missed a very important thing. 
Which integration test is currently covering this? 
   I assume I've implemented the current behaviour, but thanks for 
double-checking.  I think we are lacking some integration tests here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to