lucasbru commented on code in PR #18652: URL: https://github.com/apache/kafka/pull/18652#discussion_r1934195622
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java: ########## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.streams.assignor; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class StickyTaskAssignor implements TaskAssignor { + + public static final String STICKY_ASSIGNOR_NAME = "sticky"; + private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class); + + // helper data structures: + private TaskPairs taskPairs; + Map<TaskId, Member> activeTaskToPrevMember; + Map<TaskId, Set<Member>> standbyTaskToPrevMember; + Map<String, ProcessState> processIdToState; + + int allTasks; + int totalCapacity; + int tasksPerMember; + + @Override + public String name() { + return STICKY_ASSIGNOR_NAME; + } + + @Override + public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) throws TaskAssignorException { + + initialize(groupSpec, topologyDescriber); + + //active + Set<TaskId> activeTasks = taskIds(groupSpec, topologyDescriber, true); + assignActive(activeTasks); + + //standby + final int numStandbyReplicas = + groupSpec.assignmentConfigs().isEmpty() ? 0 + : Integer.parseInt(groupSpec.assignmentConfigs().get("numStandbyReplicas")); + if (numStandbyReplicas > 0) { + Set<TaskId> statefulTasks = taskIds(groupSpec, topologyDescriber, false); + assignStandby(statefulTasks, numStandbyReplicas); + } + + return buildGroupAssignment(groupSpec.members().keySet()); + } + + private Set<TaskId> taskIds(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber, final boolean isActive) { + Set<TaskId> ret = new HashSet<>(); + for (String subtopology : groupSpec.subtopologies()) { + if (isActive || topologyDescriber.isStateful(subtopology)) { + int numberOfPartitions = topologyDescriber.numTasks(subtopology); + for (int i = 0; i < numberOfPartitions; i++) { + ret.add(new TaskId(subtopology, i)); + } + } + } + return ret; + } + + private void initialize(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) { + + allTasks = 0; + for (String subtopology : groupSpec.subtopologies()) { + int numberOfPartitions = topologyDescriber.numTasks(subtopology); + allTasks += numberOfPartitions; + } + totalCapacity = groupSpec.members().size(); + tasksPerMember = computeTasksPerMember(allTasks, totalCapacity); + + taskPairs = new TaskPairs(allTasks * (allTasks - 1) / 2); + + processIdToState = new HashMap<>(); + activeTaskToPrevMember = new HashMap<>(); + standbyTaskToPrevMember = new HashMap<>(); + for (Map.Entry<String, AssignmentMemberSpec> memberEntry : groupSpec.members().entrySet()) { + final String memberId = memberEntry.getKey(); + final String processId = memberEntry.getValue().processId(); + final Member member = new Member(processId, memberId); + final AssignmentMemberSpec memberSpec = memberEntry.getValue(); + + processIdToState.putIfAbsent(processId, new ProcessState(processId)); + processIdToState.get(processId).addMember(memberId); + + // prev active tasks + for (Map.Entry<String, Set<Integer>> entry : memberSpec.activeTasks().entrySet()) { + Set<Integer> partitionNoSet = entry.getValue(); + for (int partitionNo : partitionNoSet) { + activeTaskToPrevMember.put(new TaskId(entry.getKey(), partitionNo), member); + } + } + + // prev standby tasks + for (Map.Entry<String, Set<Integer>> entry : memberSpec.standbyTasks().entrySet()) { + Set<Integer> partitionNoSet = entry.getValue(); + for (int partitionNo : partitionNoSet) { + TaskId taskId = new TaskId(entry.getKey(), partitionNo); + standbyTaskToPrevMember.putIfAbsent(taskId, new HashSet<>()); + standbyTaskToPrevMember.get(taskId).add(member); + } + } + } + } + + private GroupAssignment buildGroupAssignment(final Set<String> members) { + final Map<String, MemberAssignment> memberAssignments = new HashMap<>(); + + final Map<String, Set<TaskId>> activeTasksAssignments = processIdToState.entrySet().stream() + .flatMap(entry -> entry.getValue().assignedActiveTasksByMember().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (set1, set2) -> { + set1.addAll(set2); + return set1; + })); + + final Map<String, Set<TaskId>> standbyTasksAssignments = processIdToState.entrySet().stream() + .flatMap(entry -> entry.getValue().assignedStandbyTasksByMember().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (set1, set2) -> { + set1.addAll(set2); + return set1; + })); + + for (String memberId : members) { + Map<String, Set<Integer>> activeTasks = new HashMap<>(); + if (activeTasksAssignments.containsKey(memberId)) { + activeTasks = toCompactedTaskIds(activeTasksAssignments.get(memberId)); + } + Map<String, Set<Integer>> standByTasks = new HashMap<>(); + + if (standbyTasksAssignments.containsKey(memberId)) { + standByTasks = toCompactedTaskIds(standbyTasksAssignments.get(memberId)); + } + memberAssignments.put(memberId, new MemberAssignment(activeTasks, standByTasks, new HashMap<>())); + } + + return new GroupAssignment(memberAssignments); + } + + private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId> taskIds) { + Map<String, Set<Integer>> ret = new HashMap<>(); + for (TaskId taskId : taskIds) { + ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>()); + ret.get(taskId.subtopologyId()).add(taskId.partition()); + } + return ret; + } + + private void assignActive(final Set<TaskId> activeTasks) { + + // 1. re-assigning existing active tasks to clients that previously had the same active tasks + for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { + final TaskId task = it.next(); + final Member prevMember = activeTaskToPrevMember.get(task); + if (prevMember != null && hasUnfulfilledQuota(prevMember)) { + processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); + updateHelpers(prevMember, task, true); + it.remove(); + } + } + + // 2. re-assigning tasks to clients that previously have seen the same task (as standby task) + for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { + final TaskId task = it.next(); + final Set<Member> prevMembers = standbyTaskToPrevMember.get(task); + final Member prevMember = findMemberWithLeastLoad(prevMembers, task, true); + if (prevMember != null && hasUnfulfilledQuota(prevMember)) { + processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); + updateHelpers(prevMember, task, true); + it.remove(); + } + } + + // 3. assign any remaining unassigned tasks + for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { + final TaskId task = it.next(); + final Set<Member> allMembers = processIdToState.entrySet().stream().flatMap(entry -> entry.getValue().memberToTaskCounts().keySet().stream() + .map(memberId -> new Member(entry.getKey(), memberId))).collect(Collectors.toSet()); + final Member member = findMemberWithLeastLoad(allMembers, task, false); + if (member == null) { + log.error("Unable to assign active task {} to any member.", task); + throw new TaskAssignorException("No member available to assign active task {}." + task); Review Comment: Yeah, I think the current StickytaskAssignor does not take this case into account, because it's a member computing the assignment, so by this we know there must be at least one member. For a member to consider its own nonexistence would be quite philosophical :P So it seems to me, we either need to make assignors work for empty groups (which is probably just logging something and returning an empty assignment), or we document in the interface that assignors will throw `TaskAssignorException` for empty groups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org