[
https://issues.apache.org/jira/browse/KAFKA-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274470#comment-15274470
]
Jiangjie Qin commented on KAFKA-3664:
-------------------------------------
[~hachikuji] I am not sure. If we still return the partitions of the
unsubscribed topic when user call {{assignment()}}, do we also return the
records of those partitions? It seems weird either way.
The current behavior gives a good user experience to let the unsubscribe take
effect immediately. What surprising is the offsets are not committed during the
following rebalance. Maybe we can just fix that. I am thinking to add a new
state in subscription state such as {{PendingUnsubscribe}}. And the behavior
after unsubscribe would be:
1. assignment() does not include the partitions of the unsubscribed topics.
2. subscription() does not include the unsubscribed topics.
3. poll() does not return messages from the unsubscribed topics.
4. In the rebalance listener, the partitions in {{PendingUnsubscribe}} state
will be passed in as revoked partitions.
5. When auto commit is enabled, during rebalance, we commit the offsets for
partitions that is in the assignment and the partitions that is pending
unsubscribe.
Arguably after subscription changes, there is still an inconsistency between
assignment() and the partitions revoked passed to the rebalance listener, but
it only affect the users who have customized rebalance listener. Chances are
those users do need to know the partitions that are pending unsubscribe to
checkpoint some states.
> When subscription set changes on new consumer, the partitions may be removed
> without offset being committed.
> ------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-3664
> URL: https://issues.apache.org/jira/browse/KAFKA-3664
> Project: Kafka
> Issue Type: Bug
> Reporter: Jiangjie Qin
> Assignee: Vahid Hashemian
>
> When users are using group management, if they call consumer.subscribe() to
> change the subscription, the removed subscriptions will be immediately
> removed and their offset will not be commit. Also the revoked partitions
> passed to the ConsumerRebalanceListener.onPartitionsRevoked() will not
> include those partitions.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)