tim-patterson commented on a change in pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#discussion_r810985840
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1028,116 +1045,113 @@ private boolean addClientAssignments(final
Set<TaskId> statefulTasks,
}
/**
- * Generate an assignment that tries to preserve thread-level stickiness
of stateful tasks without violating
- * balance. The stateful and total task load are both balanced across
threads. Tasks without previous owners
- * will be interleaved by group id to spread subtopologies across threads
and further balance the workload.
+ * Generate an assignment that tries to preserve thread-level stickiness
for stateful tasks without violating
+ * balance. The tasks are balanced across threads. Stateful tasks without
previous owners will be interleaved by
+ * group id to spread subtopologies across threads and further balance the
workload.
+ * threadLoad is a map that keeps track of task load per thread across
multiple calls so actives and standbys
+ * are evenly distributed
*/
- static Map<String, List<TaskId>> assignTasksToThreads(final
Collection<TaskId> statefulTasksToAssign,
- final
Collection<TaskId> statelessTasksToAssign,
+ static Map<String, List<TaskId>> assignTasksToThreads(final
Collection<TaskId> tasksToAssign,
+ final boolean
isStateful,
final
SortedSet<String> consumers,
- final ClientState
state) {
+ final ClientState
state,
+ final Map<String,
Integer> threadLoad) {
final Map<String, List<TaskId>> assignment = new HashMap<>();
for (final String consumer : consumers) {
assignment.put(consumer, new ArrayList<>());
}
- final List<TaskId> unassignedStatelessTasks = new
ArrayList<>(statelessTasksToAssign);
- Collections.sort(unassignedStatelessTasks);
-
- final Iterator<TaskId> unassignedStatelessTasksIter =
unassignedStatelessTasks.iterator();
+ int totalTasks = tasksToAssign.size();
+ for (final Integer threadTaskCount : threadLoad.values()) {
+ totalTasks += threadTaskCount;
+ }
- final int minStatefulTasksPerThread = (int) Math.floor(((double)
statefulTasksToAssign.size()) / consumers.size());
- final PriorityQueue<TaskId> unassignedStatefulTasks = new
PriorityQueue<>(statefulTasksToAssign);
+ final int minTasksPerThread = (int) Math.floor(((double) totalTasks) /
consumers.size());
+ final PriorityQueue<TaskId> unassignedTasks = new
PriorityQueue<>(tasksToAssign);
final Queue<String> consumersToFill = new LinkedList<>();
// keep track of tasks that we have to skip during the first pass in
case we can reassign them later
// using tree-map to make sure the iteration ordering over keys are
preserved
final Map<TaskId, String> unassignedTaskToPreviousOwner = new
TreeMap<>();
- if (!unassignedStatefulTasks.isEmpty()) {
- // First assign stateful tasks to previous owner, up to the min
expected tasks/thread
+ if (!unassignedTasks.isEmpty()) {
+ // First assign tasks to previous owner, up to the min expected
tasks/thread if these are stateful
for (final String consumer : consumers) {
final List<TaskId> threadAssignment = assignment.get(consumer);
-
- for (final TaskId task : state.prevTasksByLag(consumer)) {
- if (unassignedStatefulTasks.contains(task)) {
- if (threadAssignment.size() <
minStatefulTasksPerThread) {
- threadAssignment.add(task);
- unassignedStatefulTasks.remove(task);
- } else {
- unassignedTaskToPreviousOwner.put(task, consumer);
+ // The number of tasks we have to assign here to hit
minTasksPerThread
+ final int tasksTargetCount = minTasksPerThread -
threadLoad.getOrDefault(consumer, 0);
+
+ if (isStateful) {
+ for (final TaskId task : state.prevTasksByLag(consumer)) {
+ if (unassignedTasks.contains(task)) {
+ if (threadAssignment.size() < tasksTargetCount) {
+ threadAssignment.add(task);
+ unassignedTasks.remove(task);
+ } else {
+ unassignedTaskToPreviousOwner.put(task,
consumer);
+ }
}
}
}
- if (threadAssignment.size() < minStatefulTasksPerThread) {
+ if (threadAssignment.size() < tasksTargetCount) {
consumersToFill.offer(consumer);
}
}
// Next interleave remaining unassigned tasks amongst unfilled
consumers
while (!consumersToFill.isEmpty()) {
- final TaskId task = unassignedStatefulTasks.poll();
+ final TaskId task = unassignedTasks.poll();
if (task != null) {
final String consumer = consumersToFill.poll();
final List<TaskId> threadAssignment =
assignment.get(consumer);
threadAssignment.add(task);
- if (threadAssignment.size() < minStatefulTasksPerThread) {
+ final int threadTaskCount = threadAssignment.size() +
threadLoad.getOrDefault(consumer, 0);
+ if (threadTaskCount < minTasksPerThread) {
consumersToFill.offer(consumer);
}
} else {
throw new TaskAssignmentException("Ran out of unassigned
stateful tasks but some members were not at capacity");
}
}
- // At this point all consumers are at the min capacity, so there
may be up to N - 1 unassigned
- // stateful tasks still remaining that should now be distributed
over the consumers
- if (!unassignedStatefulTasks.isEmpty()) {
- consumersToFill.addAll(consumers);
+ // At this point all consumers are at the min or min + 1 capacity.
+ // The min + 1 case can occur for standbys where there's fewer
standbys than consumers and after assigning
+ // the active tasks some consumers already have min + 1 one tasks
assigned.
+ // The tasks still remaining should now be distributed over the
consumers that are still at min capacity
+ if (!unassignedTasks.isEmpty()) {
+ for (final String consumer : consumers) {
+ final int taskCount = assignment.get(consumer).size() +
threadLoad.getOrDefault(consumer, 0);
+ if (taskCount == minTasksPerThread) {
+ consumersToFill.add(consumer);
+ }
+ }
// Go over the tasks we skipped earlier and assign them to
their previous owner when possible
for (final Map.Entry<TaskId, String> taskEntry :
unassignedTaskToPreviousOwner.entrySet()) {
final TaskId task = taskEntry.getKey();
final String consumer = taskEntry.getValue();
- if (consumersToFill.contains(consumer) &&
unassignedStatefulTasks.contains(task)) {
+ if (consumersToFill.contains(consumer) &&
unassignedTasks.contains(task)) {
assignment.get(consumer).add(task);
- unassignedStatefulTasks.remove(task);
+ unassignedTasks.remove(task);
// Remove this consumer since we know it is now at
minCapacity + 1
consumersToFill.remove(consumer);
}
}
// Now just distribute the remaining unassigned stateful tasks
over the consumers still at min capacity
- for (final TaskId task : unassignedStatefulTasks) {
+ for (final TaskId task : unassignedTasks) {
final String consumer = consumersToFill.poll();
final List<TaskId> threadAssignment =
assignment.get(consumer);
threadAssignment.add(task);
}
-
-
- // There must be at least one consumer still at min capacity
while all the others are at min
- // capacity + 1, so start distributing stateless tasks to get
all consumers back to the same count
- while (unassignedStatelessTasksIter.hasNext()) {
- final String consumer = consumersToFill.poll();
- if (consumer != null) {
- final TaskId task =
unassignedStatelessTasksIter.next();
- unassignedStatelessTasksIter.remove();
- assignment.get(consumer).add(task);
- } else {
- break;
- }
- }
}
}
-
- // Now just distribute tasks while circling through all the consumers
- consumersToFill.addAll(consumers);
-
- while (unassignedStatelessTasksIter.hasNext()) {
- final TaskId task = unassignedStatelessTasksIter.next();
- final String consumer = consumersToFill.poll();
- assignment.get(consumer).add(task);
- consumersToFill.offer(consumer);
+ // Update threadLoad
+ for (final Map.Entry<String, List<TaskId>> taskEntry :
assignment.entrySet()) {
Review comment:
My only worry with that is that to get a proper balance it would rely on
the caller to always assign stateless tasks last which might not be clear to
the caller.
Happy to stick an if statement around it if you think it's worth it though.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -997,6 +1002,152 @@ public void testAssignWithStandbyReplicas() {
assertEquals(standbyPartitionsByHost, info20.standbyPartitionByHost());
}
+ @Test
+ public void testAssignWithStandbyReplicasBalanceSparse() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+ builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source1");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store1", false),
"processor");
+
+ final List<String> topics = asList("topic1");
+
+ createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+ adminClient =
createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+ singletonList(APPLICATION_ID + "-store1-changelog"),
+ singletonList(3))
+ );
+
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
1));
+
+ final List<String> client1Consumers = asList("consumer10",
"consumer11", "consumer12", "consumer13");
+ final List<String> client2Consumers = asList("consumer20",
"consumer21", "consumer22");
+
+ for (final String consumerId : client1Consumers) {
+ subscriptions.put(consumerId,
+ new Subscription(
+ topics,
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS,
USER_END_POINT).encode()));
+ }
+ for (final String consumerId : client2Consumers) {
+ subscriptions.put(consumerId,
+ new Subscription(
+ topics,
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS,
USER_END_POINT).encode()));
+ }
+
+ final Map<String, Assignment> assignments =
+ partitionAssignor.assign(metadata, new
GroupSubscription(subscriptions)).groupAssignment();
+
+ // Consumers
+ final AssignmentInfo info10 =
AssignmentInfo.decode(assignments.get("consumer10").userData());
+ final AssignmentInfo info11 =
AssignmentInfo.decode(assignments.get("consumer11").userData());
+ final AssignmentInfo info12 =
AssignmentInfo.decode(assignments.get("consumer12").userData());
+ final AssignmentInfo info13 =
AssignmentInfo.decode(assignments.get("consumer13").userData());
+ final AssignmentInfo info20 =
AssignmentInfo.decode(assignments.get("consumer20").userData());
+ final AssignmentInfo info21 =
AssignmentInfo.decode(assignments.get("consumer21").userData());
+ final AssignmentInfo info22 =
AssignmentInfo.decode(assignments.get("consumer22").userData());
+
+ // Check each consumer has no more than 1 task
+ assertTrue(info10.activeTasks().size() + info10.standbyTasks().size()
<= 1);
+ assertTrue(info11.activeTasks().size() + info11.standbyTasks().size()
<= 1);
+ assertTrue(info12.activeTasks().size() + info12.standbyTasks().size()
<= 1);
+ assertTrue(info13.activeTasks().size() + info13.standbyTasks().size()
<= 1);
+ assertTrue(info20.activeTasks().size() + info20.standbyTasks().size()
<= 1);
+ assertTrue(info21.activeTasks().size() + info21.standbyTasks().size()
<= 1);
+ assertTrue(info22.activeTasks().size() + info22.standbyTasks().size()
<= 1);
+ }
+
+ @Test
+ public void testAssignWithStandbyReplicasBalanceDense() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+ builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source1");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store1", false),
"processor");
+
+ final List<String> topics = asList("topic1");
+
+ createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+ adminClient =
createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+ singletonList(APPLICATION_ID + "-store1-changelog"),
+ singletonList(3))
+ );
+
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
1));
+
+ subscriptions.put("consumer10",
+ new Subscription(
+ topics,
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS,
USER_END_POINT).encode()));
+ subscriptions.put("consumer20",
+ new Subscription(
+ topics,
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS,
USER_END_POINT).encode()));
+
+ final Map<String, Assignment> assignments =
+ partitionAssignor.assign(metadata, new
GroupSubscription(subscriptions)).groupAssignment();
+
+ // Consumers
+ final AssignmentInfo info10 =
AssignmentInfo.decode(assignments.get("consumer10").userData());
+ final AssignmentInfo info20 =
AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+ // Check each consumer has 3 tasks
+ assertEquals(3, info10.activeTasks().size() +
info10.standbyTasks().size());
+ assertEquals(3, info20.activeTasks().size() +
info20.standbyTasks().size());
+ // Check that not all the actives are on one node
+ assertTrue(info10.activeTasks().size() < 3);
+ assertTrue(info20.activeTasks().size() < 3);
+ }
+
+ @Test
+ public void testAssignWithStandbyReplicasBalanceWithStatelessTasks() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+ builder.addProcessor("processor_with_state", new
MockApiProcessorSupplier<>(), "source1");
+ builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source1");
+ builder.addStateStore(new MockKeyValueStoreBuilder("store1", false),
"processor_with_state");
+
+ final List<String> topics = asList("topic1");
+
+ createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+ adminClient =
createMockAdminClientForAssignor(getTopicPartitionOffsetsMap(
+ singletonList(APPLICATION_ID + "-store1-changelog"),
+ singletonList(3))
+ );
+
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
1));
+
+ subscriptions.put("consumer10",
+ new Subscription(
+ topics,
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS,
USER_END_POINT).encode()));
+ subscriptions.put("consumer11",
+ new Subscription(
+ topics,
+ getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS,
USER_END_POINT).encode()));
+ subscriptions.put("consumer20",
+ new Subscription(
+ topics,
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS,
USER_END_POINT).encode()));
+ subscriptions.put("consumer21",
+ new Subscription(
+ topics,
+ getInfo(UUID_2, EMPTY_TASKS, EMPTY_TASKS,
USER_END_POINT).encode()));
+
+ final Map<String, Assignment> assignments =
+ partitionAssignor.assign(metadata, new
GroupSubscription(subscriptions)).groupAssignment();
Review comment:
Good catch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]