We do not seek in the onPartitionAssigned. In our test setup (evaluating kafka for a new project) we put a constant load on one of the topics. We have a consumer group pulling messages from the different partitions on the topic.
At a certain point in time, the poll() does not return any messages anymore. When this happens, we just seek() to the current offset, and messages come in again. It is a bit annoying as we do seeks when there are no messages as well, but at least it prevents stalling the client. Bruno > On 25 Jan 2016, at 16:07, Han JU <ju.han.fe...@gmail.com> wrote: > > Hi Bruno, > > Can you tell me a little bit more about that? A seek() in the > `onPartitionAssigned`? > > Thanks. > > 2016-01-25 10:51 GMT+01:00 Han JU <ju.han.fe...@gmail.com>: > >> Ok I'll create a JIRA issue on this. >> >> Thanks! >> >> 2016-01-23 21:47 GMT+01:00 Bruno Rassaerts <bruno.rassae...@novazone.be>: >> >>> +1 here >>> >>> As a workaround we seek to the current offset which resets the current >>> clients internal states and everything continues. >>> >>> Regards, >>> Bruno Rassaerts | Freelance Java Developer >>> >>> Novazone, Edingsesteenweg 302, B-1755 Gooik, Belgium >>> T: +32(0)54/26.02.03 - M:+32(0)477/39.01.15 >>> bruno.rassae...@novazone.be -www.novazone.be >>> >>>> On 23 Jan 2016, at 17:52, Ismael Juma <ism...@juma.me.uk> wrote: >>>> >>>> Hi, >>>> >>>> Can you please file an issue in JIRA so that we make sure this is >>>> investigated? >>>> >>>> Ismael >>>> >>>>> On Fri, Jan 22, 2016 at 3:13 PM, Han JU <ju.han.fe...@gmail.com> >>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I'm prototyping with the new consumer API of kafka 0.9 and I'm >>> particularly >>>>> interested in the `ConsumerRebalanceListener`. >>>>> >>>>> My test setup is like the following: >>>>> - 5M messages pre-loaded in one node kafka 0.9 >>>>> - 12 partitions, auto offset commit set to false >>>>> - in `onPartitionsRevoked`, commit offset and flush the local state >>>>> >>>>> The test run is like the following: >>>>> - launch one process with 2 consumers and let it consume for a while >>>>> - launch another process with 2 consumers, this triggers a >>> rebalancing, >>>>> and let these 2 processes run until messages are all consumed >>>>> >>>>> The code is here: https://gist.github.com/darkjh/fe1e5a5387bf13b4d4dd >>>>> >>>>> So at first, the 2 consumers of the first process each got 6 >>> partitions. >>>>> And after the rebalancing, each consumer got 3 partitions. It's >>> confirmed >>>>> by logging inside the `onPartitionAssigned` callback. >>>>> >>>>> But after the rebalancing, one of the 2 consumers of the first process >>> stop >>>>> receiving messages, even if it has partitions assigned to: >>>>> >>>>> balance-1 pulled 7237 msgs ... >>>>> balance-0 pulled 7263 msgs ... >>>>> 2016-01-22 15:50:37,533 [INFO] [pool-1-thread-2] >>>>> o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed since >>> the >>>>> group is rebalancing, try to re-join group. >>>>> balance-1 flush @ 536637 >>>>> balance-1 committed offset for List(balance-11, balance-10, balance-9, >>>>> balance-8, balance-7, balance-6) >>>>> 2016-01-22 15:50:37,575 [INFO] [pool-1-thread-1] >>>>> o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed since >>> the >>>>> group is rebalancing, try to re-join group. >>>>> balance-0 flush @ 543845 >>>>> balance-0 committed offset for List(balance-5, balance-4, balance-3, >>>>> balance-2, balance-1, balance-0) >>>>> balance-0 got assigned List(balance-5, balance-4, balance-3) >>>>> balance-1 got assigned List(balance-11, balance-10, balance-9) >>>>> balance-1 pulled 3625 msgs ... >>>>> balance-0 pulled 3621 msgs ... >>>>> balance-0 pulled 3631 msgs ... >>>>> balance-0 pulled 3631 msgs ... >>>>> balance-1 pulled 0 msgs ... >>>>> balance-0 pulled 3643 msgs ... >>>>> balance-0 pulled 3643 msgs ... >>>>> balance-1 pulled 0 msgs ... >>>>> balance-0 pulled 3622 msgs ... >>>>> balance-0 pulled 3632 msgs ... >>>>> balance-1 pulled 0 msgs ... >>>>> balance-0 pulled 3637 msgs ... >>>>> balance-0 pulled 3641 msgs ... >>>>> balance-0 pulled 3640 msgs ... >>>>> balance-1 pulled 0 msgs ... >>>>> balance-0 pulled 3632 msgs ... >>>>> balance-0 pulled 3630 msgs ... >>>>> balance-1 pulled 0 msgs ... >>>>> ...... >>>>> >>>>> `balance-0` and `balance-1` are the names of the consumer thread. So >>> after >>>>> the rebalancing, thread `balance-1` continues to poll but no message >>>>> arrive, given that it has got 3 partitions assigned to after the >>>>> rebalancing. >>>>> >>>>> Finally other 3 consumers pulls all their partitions' message, the >>>>> situation is like >>>>> >>>>> GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER >>>>> balance-test, balance, 9, 417467, 417467, 0, consumer-2_/127.0.0.1 >>>>> balance-test, balance, 10, 417467, 417467, 0, consumer-2_/127.0.0.1 >>>>> balance-test, balance, 11, 417467, 417467, 0, consumer-2_/127.0.0.1 >>>>> balance-test, balance, 6, 180269, 417467, 237198, consumer-2_/ >>> 127.0.0.1 >>>>> balance-test, balance, 7, 180036, 417468, 237432, consumer-2_/ >>> 127.0.0.1 >>>>> balance-test, balance, 8, 180197, 417467, 237270, consumer-2_/ >>> 127.0.0.1 >>>>> balance-test, balance, 3, 417467, 417467, 0, consumer-1_/127.0.0.1 >>>>> balance-test, balance, 4, 417468, 417468, 0, consumer-1_/127.0.0.1 >>>>> balance-test, balance, 5, 417468, 417468, 0, consumer-1_/127.0.0.1 >>>>> balance-test, balance, 0, 417467, 417467, 0, consumer-1_/127.0.0.1 >>>>> balance-test, balance, 1, 417467, 417467, 0, consumer-1_/127.0.0.1 >>>>> balance-test, balance, 2, 417467, 417467, 0, consumer-1_/127.0.0.1 >>>>> >>>>> So you can see, partition [6, 7, 8] still has messages, but the >>> consumer >>>>> can't pull them after the rebalancing. >>>>> >>>>> I've tried 0.9.0.0 release, trunk and 0.9.0 branch, for both >>> server/broker >>>>> and client. >>>>> >>>>> I hope the code is clear enough to illustrate/reproduce the problem. >>> It's >>>>> quite a surprise for me because this is the main feature of the new >>>>> consumer API, but it does not seem to work properly. >>>>> Feel free to talk to me for any details. >>>>> -- >>>>> *JU Han* >>>>> >>>>> Software Engineer @ Teads.tv >>>>> >>>>> +33 0619608888 >>>>> >>> >> >> >> >> -- >> *JU Han* >> >> Software Engineer @ Teads.tv >> >> +33 0619608888 >> > > > > -- > *JU Han* > > Software Engineer @ Teads.tv > > +33 0619608888