[ https://issues.apache.org/jira/browse/KAFKA-2674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14965913#comment-14965913 ]
Guozhang Wang commented on KAFKA-2674: -------------------------------------- Today we are already calling ```commitOffsetSync``` upon consumer closing in the coordinator if auto commit is turned on. In addition, we also call ```commitOffsetSync``` before we call ```onPartitionsRevoked``` during the rebalance if auto commit is turned on. So I think the question is really whether we should also call ```onPartitionsRevoked``` upon closing after we call ```commitOffsetSync``` as well. I prefer adding the ```onPartitionsRevoked``` call since it may be used not only for manual offset management. BTW there is a discrepancy between the old and new consumer in MirrorMaker, that with the old consumer we use a rebalance listener that returns the global assignment in ```onPartitionsAssigned``` whereas in the new consumer it only returns its own assignment. We need to think about how it can be resolved. > ConsumerRebalanceListener.onPartitionsRevoked() is not called on consumer > close > ------------------------------------------------------------------------------- > > Key: KAFKA-2674 > URL: https://issues.apache.org/jira/browse/KAFKA-2674 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.0 > Reporter: Michal Turek > Assignee: Neha Narkhede > > Hi, I'm investigating and testing behavior of new consumer from the planned > release 0.9 and found an inconsistency in calling of rebalance callbacks. > I noticed that ConsumerRebalanceListener.onPartitionsRevoked() is NOT called > during consumer close and application shutdown. It's JavaDoc contract says: > - "This method will be called before a rebalance operation starts and after > the consumer stops fetching data." > - "It is recommended that offsets should be committed in this callback to > either Kafka or a custom offset store to prevent duplicate data." > I believe calling consumer.close() is a start of rebalance operation and even > the local consumer that is actually closing should be notified to be able to > process any rebalance logic including offsets commit (e.g. if auto-commit is > disabled). > There are commented logs of current and expected behaviors. > {noformat} > // Application start > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka version : 0.9.0.0-SNAPSHOT > (AppInfoParser.java:82) > 2015-10-20 15:14:02.208 INFO o.a.k.common.utils.AppInfoParser > [TestConsumer-worker-0]: Kafka commitId : 241b9ab58dcbde0c > (AppInfoParser.java:83) > // Consumer started (the first one in group), rebalance callbacks are called > including empty onPartitionsRevoked() > 2015-10-20 15:14:02.333 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [] > (TestConsumer.java:95) > 2015-10-20 15:14:02.343 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:100) > // Another consumer joined the group, rebalancing > 2015-10-20 15:14:17.345 INFO o.a.k.c.c.internals.Coordinator > [TestConsumer-worker-0]: Attempt to heart beat failed since the group is > rebalancing, try to re-join group. (Coordinator.java:714) > 2015-10-20 15:14:17.346 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, revoked: [testB-1, testA-0, > testB-0, testB-3, testA-2, testB-2, testA-1, testA-4, testB-4, testA-3] > (TestConsumer.java:95) > 2015-10-20 15:14:17.349 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Rebalance callback, assigned: [testB-3, testA-4, > testB-4, testA-3] (TestConsumer.java:100) > // Consumer started closing, there SHOULD be onPartitionsRevoked() callback > to commit offsets like during standard rebalance, but it is missing > 2015-10-20 15:14:39.280 INFO c.a.e.kafka.newapi.TestConsumer [main]: > Closing instance (TestConsumer.java:42) > 2015-10-20 15:14:40.264 INFO c.a.e.kafka.newapi.TestConsumer > [TestConsumer-worker-0]: Worker thread stopped (TestConsumer.java:89) > {noformat} > Workaround is to call onPartitionsRevoked() explicitly and manually just > before calling consumer.close() but it seems dirty and error prone for me. It > can be simply forgotten be someone without such experience. -- This message was sent by Atlassian JIRA (v6.3.4#6332)