Hi,

I'm prototyping with the new consumer API of kafka 0.9 and I'm particularly
interested in the `ConsumerRebalanceListener`.

My test setup is like the following:
  - 5M messages pre-loaded in one node kafka 0.9
  - 12 partitions, auto offset commit set to false
  - in `onPartitionsRevoked`, commit offset and flush the local state

The test run is like the following:
  - launch one process with 2 consumers and let it consume for a while
  - launch another process with 2 consumers, this triggers a rebalancing,
and let these 2 processes run until messages are all consumed

The code is here: https://gist.github.com/darkjh/fe1e5a5387bf13b4d4dd

So at first, the 2 consumers of the first process each got 6 partitions.
And after the rebalancing, each consumer got 3 partitions. It's confirmed
by logging inside the `onPartitionAssigned` callback.

But after the rebalancing, one of the 2 consumers of the first process stop
receiving messages, even if it has partitions assigned to:

balance-1 pulled 7237 msgs ...
balance-0 pulled 7263 msgs ...
2016-01-22 15:50:37,533 [INFO] [pool-1-thread-2]
o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed since the
group is rebalancing, try to re-join group.
balance-1 flush @ 536637
balance-1 committed offset for List(balance-11, balance-10, balance-9,
balance-8, balance-7, balance-6)
2016-01-22 15:50:37,575 [INFO] [pool-1-thread-1]
o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed since the
group is rebalancing, try to re-join group.
balance-0 flush @ 543845
balance-0 committed offset for List(balance-5, balance-4, balance-3,
balance-2, balance-1, balance-0)
balance-0 got assigned List(balance-5, balance-4, balance-3)
balance-1 got assigned List(balance-11, balance-10, balance-9)
balance-1 pulled 3625 msgs ...
balance-0 pulled 3621 msgs ...
balance-0 pulled 3631 msgs ...
balance-0 pulled 3631 msgs ...
balance-1 pulled 0 msgs ...
balance-0 pulled 3643 msgs ...
balance-0 pulled 3643 msgs ...
balance-1 pulled 0 msgs ...
balance-0 pulled 3622 msgs ...
balance-0 pulled 3632 msgs ...
balance-1 pulled 0 msgs ...
balance-0 pulled 3637 msgs ...
balance-0 pulled 3641 msgs ...
balance-0 pulled 3640 msgs ...
balance-1 pulled 0 msgs ...
balance-0 pulled 3632 msgs ...
balance-0 pulled 3630 msgs ...
balance-1 pulled 0 msgs ...
......

`balance-0` and `balance-1` are the names of the consumer thread. So after
the rebalancing, thread `balance-1` continues to poll but no message
arrive, given that it has got 3 partitions assigned to after the
rebalancing.

Finally other 3 consumers pulls all their partitions' message, the
situation is like

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
balance-test, balance, 9, 417467, 417467, 0, consumer-2_/127.0.0.1
balance-test, balance, 10, 417467, 417467, 0, consumer-2_/127.0.0.1
balance-test, balance, 11, 417467, 417467, 0, consumer-2_/127.0.0.1
balance-test, balance, 6, 180269, 417467, 237198, consumer-2_/127.0.0.1
balance-test, balance, 7, 180036, 417468, 237432, consumer-2_/127.0.0.1
balance-test, balance, 8, 180197, 417467, 237270, consumer-2_/127.0.0.1
balance-test, balance, 3, 417467, 417467, 0, consumer-1_/127.0.0.1
balance-test, balance, 4, 417468, 417468, 0, consumer-1_/127.0.0.1
balance-test, balance, 5, 417468, 417468, 0, consumer-1_/127.0.0.1
balance-test, balance, 0, 417467, 417467, 0, consumer-1_/127.0.0.1
balance-test, balance, 1, 417467, 417467, 0, consumer-1_/127.0.0.1
balance-test, balance, 2, 417467, 417467, 0, consumer-1_/127.0.0.1

So you can see, partition [6, 7, 8] still has messages, but the consumer
can't pull them after the rebalancing.

I've tried 0.9.0.0 release, trunk and 0.9.0 branch, for both server/broker
and client.

I hope the code is clear enough to illustrate/reproduce the problem. It's
quite a surprise for me because this is the main feature of the new
consumer API, but it does not seem to work properly.
Feel free to talk to me for any details.
-- 
*JU Han*

Software Engineer @ Teads.tv

+33 0619608888

Reply via email to