G G created KAFKA-9266:
--------------------------

             Summary: KafkaConsumer manual assignment does not reset group 
assignment
                 Key: KAFKA-9266
                 URL: https://issues.apache.org/jira/browse/KAFKA-9266
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.3.0
            Reporter: G G


When using the manual assignment API, SubscriptionState still remembers group 
subscriptions in its groupSubscription member of topics to which it is no 
longer subscribed.

See the following code which shows the unexpected behavior:
{code:java}
    TopicPartition tp1 = new TopicPartition("a", 0);
    TopicPartition tp2 = new TopicPartition("b", 0);
    LogContext logContext = new LogContext();
    SubscriptionState state = new SubscriptionState(logContext, 
OffsetResetStrategy.NONE);
    state.assignFromUser(ImmutableSet.of(tp1, tp2));
    state.unsubscribe();
    state.assignFromUser(ImmutableSet.of(tp1));
    assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Succeeds
    
    state.assignFromUser(ImmutableSet.of(tp1, tp2));
    state.assignFromUser(ImmutableSet.of(tp1));
    assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Fails: 
Expected [a] but was [a, b]
{code}

The problem seems to be that within SubscriptionState.changeSubscription() the 
groupSubscription only grows and is never trimmed if the assignment is manual:
{code}
    private boolean changeSubscription(Set<String> topicsToSubscribe) {
        ...
        groupSubscription = new HashSet<>(groupSubscription);
        groupSubscription.addAll(topicsToSubscribe);
        ....
    }
{code}

This behavior in turn leads to METADATA requests by the client with partitions 
which are actually no longer assigned:
{code}
KafkaConsumer consumer;
consumer.assign(ImmutableList.of(topicPartition1, topicPartition2));
consumer.poll(); // This will cause a MetadataRequest to be sent to the broker 
with topic1 and topic2
consumer.assign(ImmutableList.of(topicPartition1));
consumer.poll(); // This will AGAIN cause a MetadataRequest for topic1 and 
topic2 instead of only topic1
{code}
And this in turn causes the deletion of the topicPartion2 to fail. The 
workaround is to do a consumer.unassign(); before the second consumer.assign();




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to