Hello.
I'm using in-memory Kafka cluster 0.9.0.1 - 2.11 (2 in-memory KafkaServers bound with in-memory zookeeper) with java-based consumer for integration tests. One of the test scenarios checks that application still receives messages after it was restarted. It relies on the fact that restarted consumer group receives the messages from the topic i.e.:
- start Kafka cluster in the same JVM as my tests (2 brokers)
- create a topic (2 partitions, 2 replicas)
- create producer
- create consumer group (2 consumers in the same group)
- send a message 1 and verify that message is received by the group
- stop consumer group
- send message 2
- start consumers again
- verify that 2 was received

Unfortunately, this test scenario fails accidentally.

To verify that consumer group is truly started, I wait until GroupManager on any of the brokers ( consumerCoordinator().groupManager().currentGroups() ) contains my consumer group in Stable state - and only after that I send a message. The problem is that this check does not always work after the group has been stopped , group could remain in PreparingRebalance state long time. I tried to wait until the group is completely removed from GroupManager but since group rebalancing is an eventual process, it also fails accidentally. Rebalancing occurs once per 1000 requests and there's no way to have an influence on it.

So my question is, whether it is possible to check that consumer group is stable after it is restarted.

Thank you in advance.

--

Best regards,
Igor Velichko

Reply via email to