[
https://issues.apache.org/jira/browse/KAFKA-9266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-9266.
----------------------------------
Fix Version/s: 2.3.1
Resolution: Fixed
> KafkaConsumer manual assignment does not reset group assignment
> ---------------------------------------------------------------
>
> Key: KAFKA-9266
> URL: https://issues.apache.org/jira/browse/KAFKA-9266
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 2.3.0
> Reporter: G G
> Priority: Major
> Fix For: 2.3.1
>
>
> When using the manual assignment API, SubscriptionState still remembers group
> subscriptions in its groupSubscription member of topics to which it is no
> longer subscribed.
> See the following code which shows the unexpected behavior:
> {code:java}
> TopicPartition tp1 = new TopicPartition("a", 0);
> TopicPartition tp2 = new TopicPartition("b", 0);
> LogContext logContext = new LogContext();
> SubscriptionState state = new SubscriptionState(logContext,
> OffsetResetStrategy.NONE);
> state.assignFromUser(ImmutableSet.of(tp1, tp2));
> state.unsubscribe();
> state.assignFromUser(ImmutableSet.of(tp1));
> assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Succeeds
>
> state.assignFromUser(ImmutableSet.of(tp1, tp2));
> state.assignFromUser(ImmutableSet.of(tp1));
> assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Fails:
> Expected [a] but was [a, b]
> {code}
> The problem seems to be that within SubscriptionState.changeSubscription()
> the groupSubscription only grows and is never trimmed if the assignment is
> manual:
> {code}
> private boolean changeSubscription(Set<String> topicsToSubscribe) {
> ...
> groupSubscription = new HashSet<>(groupSubscription);
> groupSubscription.addAll(topicsToSubscribe);
> ....
> }
> {code}
> This behavior in turn leads to METADATA requests by the client with
> partitions which are actually no longer assigned:
> {code}
> KafkaConsumer consumer;
> consumer.assign(ImmutableList.of(topicPartition1, topicPartition2));
> consumer.poll(); // This will cause a MetadataRequest to be sent to the
> broker with topic1 and topic2
> consumer.assign(ImmutableList.of(topicPartition1));
> consumer.poll(); // This will AGAIN cause a MetadataRequest for topic1 and
> topic2 instead of only topic1
> {code}
> And this in turn causes the deletion of the topicPartion2 to fail. The
> workaround is to do a consumer.unassign(); before the second
> consumer.assign();
--
This message was sent by Atlassian Jira
(v8.3.4#803005)