Sorry in fact the test code in gist does not exactly reproduce the problem we're facing. I'm working on that.
2016-02-02 10:46 GMT+01:00 Han JU <ju.han.fe...@gmail.com>: > Thanks Guazhang for the reply! > > So in fact if it's the case you said, if I understand correctly, then the > messages lost should be the last messages. But in our use case it is not > the last messages get lost. And this does not explain that the different > behavior depending on `kill -9` moment (before a commit or after a commit). > If a consumer app is killed before the first flush/commit then every > messages is received correctly. > > For the messages lost, our real app code flushes state and commits offset > regularly (say for 15m). In my test, say I've 45m's data, so I'll have 2 > flush/commit point and 3 trunk of flushed data. If a consumer app process > is `kill -9` after the first flush/commit point and I let the remaining app > runs till the end. I got message lost only in the second trunk. Both first > and third trunk are perfectly handled. > > 2016-02-02 0:18 GMT+01:00 Guozhang Wang <wangg...@gmail.com>: > >> One thing to add, is that by doing this you could possibly get duplicates >> but not data loss, which obeys Kafka's at-least once semantics. >> >> Guozhang >> >> On Mon, Feb 1, 2016 at 3:17 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> >> > Hi Han, >> > >> > I looked at your test code and actually the error is in this line: >> > >> https://gist.github.com/darkjh/437ac72cdd4b1c4ca2e7#file-kafkabug2-scala-L61 >> > >> > where you call "commitSync" in the finally block, which will commit >> > messages that is returned to you from poll() call. >> > >> > >> > More specifically, for example your poll() call returned you a set of >> > messages with offset 0 to 100. From the consumer's point of view once >> they >> > are returned to the user they are considered "consumed", and hence if >> you >> > call commitSync after that they will ALL be committed (i.e. consumer >> will >> > commit offset 100). But if you hit an exception / got a close signal >> while, >> > say, processing message with offset 50, then call commitSync in the >> finally >> > block you will effectively lose messages 50 to 100. >> > >> > Hence as a user of the consumer, one should only call "commit" if she is >> > certain that all messages returned from "poll()" have been processed. >> > >> > Guozhang >> > >> > >> > On Mon, Feb 1, 2016 at 9:59 AM, Han JU <ju.han.fe...@gmail.com> wrote: >> > >> >> Hi, >> >> >> >> One of our usage of kafka is to tolerate arbitrary consumer crash >> without >> >> losing or duplicating messages. So in our code we manually commit >> offset >> >> after successfully persisted the consumer state. >> >> >> >> In prototyping with kafka-0.9's new consumer API, I found that in some >> >> cases, kafka failed to send a part of messages to the consumers even if >> >> the >> >> offsets are handled correctly. >> >> >> >> I've made sure that this time everything is latest on 0.9.0 branch >> >> (d1ff6c7) for both broker and client code. >> >> >> >> Test code snippet is here: >> >> https://gist.github.com/darkjh/437ac72cdd4b1c4ca2e7 >> >> >> >> Test setup: >> >> - 12 partitions >> >> - 2 consumer app process with 2 consumer thread each >> >> - producer produces exactly 1.2M messages in about 2 minutes (enough >> >> time >> >> for us to manual kill -9 consumer) >> >> - a consumer thread commits offset on each 80k messages received (to >> >> simulate our regularly offset commit) >> >> - after all messages are consumed, each consumer thread will write a >> >> number in file indicating how much message it has received. So all >> numbers >> >> should sum to exactly 1.2M if everything goes well >> >> >> >> Test run: >> >> - run the producer >> >> - run the 2 consumer app process in the same time >> >> - wait for the first commit offset (first 80k messages received in >> each >> >> consumer thread) >> >> - after the first commit offset, kill -9 one of the consumer app >> >> - let another consumer app runs till messages are finished >> >> - check the files written by the remaining consumer threads >> >> >> >> And after that, by checking the file, we do not receive 1.2M message >> but >> >> roughly at 1.04M. The lag on kafka of this topic is 0. >> >> If you check the logs of the consumer app with DEBUG level, you'll find >> >> out >> >> that the offsets are correctly handled. 30s (default timeout) after the >> >> kill -9 of one consumer app, the remaining consumer app correctly gets >> >> assigned all the partitions and it starts right from the offsets that >> the >> >> crashed consumer has previously committed. So this makes the message >> lost >> >> quite mysterious for us. >> >> Note that the kill -9 moment is important. If we kill -9 one consumer >> app >> >> *before* the first commit offset, everything goes well. All messages >> >> received, no lost. But when killed *after* the first commit offset, >> >> there'll be messages lost. >> >> >> >> Hope the code is clear to reproduce the problem. I'm available for any >> >> further details needed. >> >> >> >> Thanks! >> >> -- >> >> *JU Han* >> >> >> >> Software Engineer @ Teads.tv >> >> >> >> +33 0619608888 >> >> >> > >> > >> > >> > -- >> > -- Guozhang >> > >> >> >> >> -- >> -- Guozhang >> > > > > -- > *JU Han* > > Software Engineer @ Teads.tv > > +33 0619608888 > -- *JU Han* Software Engineer @ Teads.tv +33 0619608888