[ https://issues.apache.org/jira/browse/KAFKA-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274470#comment-15274470 ]
Jiangjie Qin commented on KAFKA-3664: ------------------------------------- [~hachikuji] I am not sure. If we still return the partitions of the unsubscribed topic when user call {{assignment()}}, do we also return the records of those partitions? It seems weird either way. The current behavior gives a good user experience to let the unsubscribe take effect immediately. What surprising is the offsets are not committed during the following rebalance. Maybe we can just fix that. I am thinking to add a new state in subscription state such as {{PendingUnsubscribe}}. And the behavior after unsubscribe would be: 1. assignment() does not include the partitions of the unsubscribed topics. 2. subscription() does not include the unsubscribed topics. 3. poll() does not return messages from the unsubscribed topics. 4. In the rebalance listener, the partitions in {{PendingUnsubscribe}} state will be passed in as revoked partitions. 5. When auto commit is enabled, during rebalance, we commit the offsets for partitions that is in the assignment and the partitions that is pending unsubscribe. Arguably after subscription changes, there is still an inconsistency between assignment() and the partitions revoked passed to the rebalance listener, but it only affect the users who have customized rebalance listener. Chances are those users do need to know the partitions that are pending unsubscribe to checkpoint some states. > When subscription set changes on new consumer, the partitions may be removed > without offset being committed. > ------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-3664 > URL: https://issues.apache.org/jira/browse/KAFKA-3664 > Project: Kafka > Issue Type: Bug > Reporter: Jiangjie Qin > Assignee: Vahid Hashemian > > When users are using group management, if they call consumer.subscribe() to > change the subscription, the removed subscriptions will be immediately > removed and their offset will not be commit. Also the revoked partitions > passed to the ConsumerRebalanceListener.onPartitionsRevoked() will not > include those partitions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)