[ 
https://issues.apache.org/jira/browse/KAFKA-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274470#comment-15274470
 ] 

Jiangjie Qin commented on KAFKA-3664:
-------------------------------------

[~hachikuji] I am not sure. If we still return the partitions of the 
unsubscribed topic when user call {{assignment()}}, do we also return the 
records of those partitions? It seems weird either way. 

The current behavior gives a good user experience to let the unsubscribe take 
effect immediately. What surprising is the offsets are not committed during the 
following rebalance. Maybe we can just fix that. I am thinking to add a new 
state in subscription state such as {{PendingUnsubscribe}}. And the behavior 
after unsubscribe would be:
1. assignment() does not include the partitions of the unsubscribed topics.
2. subscription() does not include the unsubscribed topics.
3. poll() does not return messages from the unsubscribed topics.
4. In the rebalance listener, the partitions in {{PendingUnsubscribe}} state 
will be passed in as revoked partitions.
5. When auto commit is enabled, during rebalance, we commit the offsets for 
partitions that is in the assignment and the partitions that is pending 
unsubscribe.

Arguably after subscription changes, there is still an inconsistency between 
assignment() and the partitions revoked passed to the rebalance listener, but 
it only affect the users who have customized rebalance listener. Chances are 
those users do need to know the partitions that are pending unsubscribe to 
checkpoint some states.

> When subscription set changes on new consumer, the partitions may be removed 
> without offset being committed.
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3664
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3664
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jiangjie Qin
>            Assignee: Vahid Hashemian
>
> When users are using group management, if they call consumer.subscribe() to 
> change the subscription, the removed subscriptions will be immediately 
> removed and their offset will not be commit. Also the revoked partitions 
> passed to the ConsumerRebalanceListener.onPartitionsRevoked() will not 
> include those partitions. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to