[
https://issues.apache.org/jira/browse/KAFKA-7144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555137#comment-16555137
]
ASF GitHub Bot commented on KAFKA-7144:
---------------------------------------
guozhangwang closed pull request #5390: KAFKA-7144: Fix task assignment to be
even
URL: https://github.com/apache/kafka/pull/5390
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 5b54d08c032..8767d0f6bea 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -270,7 +270,7 @@ boolean hasNewPair(final TaskId task1,
if (!active && !pairs.contains(pair(task1, taskId))) {
return true;
}
- if (!pairs.contains(pair(task1, taskId)) && task1.topicGroupId
!= taskId.topicGroupId) {
+ if (!pairs.contains(pair(task1, taskId))) {
return true;
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index ed22e3c30de..d431dbeae27 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -151,6 +151,25 @@ public void shouldAssignBasedOnCapacity() {
assertThat(clients.get(p2).activeTasks().size(), equalTo(2));
}
+ @Test
+ public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() {
+
+ createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02,
task03,
+ task04, task05,
task10);
+
+ createClient(p2, 1);
+
+ final StickyTaskAssignor taskAssignor = createTaskAssignor(task10,
task00, task01, task02, task03, task04, task05);
+
+ final Set<TaskId> expectedClientITasks = new
HashSet<>(Arrays.asList(task00, task01, task10, task05));
+ final Set<TaskId> expectedClientIITasks = new
HashSet<>(Arrays.asList(task02, task03, task04));
+
+ taskAssignor.assign(0);
+
+ assertThat(clients.get(p1).activeTasks(),
equalTo(expectedClientITasks));
+ assertThat(clients.get(p2).activeTasks(),
equalTo(expectedClientIITasks));
+ }
+
@Test
public void shouldKeepActiveTaskStickynessWhenMoreClientThanActiveTasks() {
final int p5 = 5;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Kafka Streams doesn't properly balance partition assignment
> -----------------------------------------------------------
>
> Key: KAFKA-7144
> URL: https://issues.apache.org/jira/browse/KAFKA-7144
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0
> Reporter: James Cheng
> Assignee: Bill Bejeck
> Priority: Major
> Fix For: 2.1.0
>
> Attachments: OneThenTwelve.java
>
>
> Kafka Streams doesn't always spread the tasks across all available
> instances/threads
> I have a topology which consumes a single partition topic and goes .through()
> a 12 partition topic. The makes 13 partitions.
>
> I then started 2 instances of the application. I would have expected the 13
> partitions to be split across the 2 instances roughly evenly (7 partitions on
> one, 6 partitions on the other).
> Instead, one instance gets 12 partitions, and the other instance gets 1
> partition.
>
> Repro case attached. I ran it a couple times, and it was fairly repeatable.
> Setup for the repro:
> {code:java}
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic one
> --partitions 1 --replication-factor 1
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic twelve
> --partitions 12 --replication-factor 1
> $ echo foo | kafkacat -P -b 127.0.0.1 -t one
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)