Ted, did you get chance to look at the issue? I am also planning to update to latest version to find out if we still see the same issue.
On Fri, Nov 3, 2017 at 8:40 PM, Mana M <manan....@gmail.com> wrote: > Below are the logs: > > Consumer 1 logs, where issue can be seen: https://pastebin.com/PuQhud91 > Consumer 2 logs: https://pastebin.com/yfJDSGPA > > server.log: https://pastebin.com/QKpk0zLn > controller.log: https://pastebin.com/9T0niwEw > state-change.log: https://pastebin.com/nrftHPC9 > > > On Fri, Nov 3, 2017 at 1:53 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Can you pastebin relevant logs from client and broker ? >> >> Thanks >> >> On Fri, Nov 3, 2017 at 1:37 PM, Manan G <manan....@gmail.com> wrote: >> >> > Hello, >> > >> > I am using 0.11.0.0 version of Kakfa broker and Java client library. My >> > consumer code tracks offsets for each assigned partition and at some >> time >> > interval manually commits offsets by specifying partition->offset map. >> > >> > What I noticed is, after the rebalance, even if consumer loses some >> > partitions that were assigned to it previously, offset commit for those >> > lost partitions still succeeds by that same consumer! Shouldn't offset >> > commit fail in this scenario since consumer is trying to commit offsets >> for >> > partitions that are not assigned to it? >> > >> > For clarity below are the logs I see with comments: >> > >> > // This is when consumer starts for "test" topic and it picks up 3 >> > partitions >> > log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0] >> > >> > // Now consumer processes 3 records from partition 0 and 7 records from >> > partition 2 - confirmed with log statements >> > log>> ... >> > >> > // Rebalance happens - right now, my code does not commit any pending >> > offsets here and just prints the log statement >> > log>> onPartitionRevoked: partitions=[test-1, test-2, test-0] >> > >> > // After re-balance, consumer loses partition 0 and 1. Again, my code >> does >> > not do anything on this callback and just prints the log statement >> > onPartitionsAssigned: partitions=[test-2] >> > >> > // Since the code did not commit offsets during revoke call, after >> > rebalance, poll() returns all records for assigned partitions since last >> > offset commit. >> > // ... So we re-process 7 records from partition 2. This was confirmed >> with >> > log statements. >> > log>> ... >> > >> > // Offset commit gets triggered after some time and due to the bug in >> the >> > code, it tries to commit offsets for both partition 0 and 2. >> > // There is no failure however! I can see on Kafka broker side that >> offset >> > for partition 0 is updated to 3. >> > // I made sure that another consumer that is actually assigned >> partition 0 >> > after re-balance has not committed offset yet. >> > commitOffsets: {0=3, 2=7} >> > >> > >> > Thanks, >> > M >> > >> > >