Can you reveal code snippet for BufferedConsumerClientAdapterImpl ? I took a look at the logs. There was no log around 19:46:5x in either server or controller log.
Thanks On Tue, Nov 7, 2017 at 8:35 AM, Mana M <manan....@gmail.com> wrote: > Ted, did you get chance to look at the issue? I am also planning to update > to latest version to find out if we still see the same issue. > > On Fri, Nov 3, 2017 at 8:40 PM, Mana M <manan....@gmail.com> wrote: > > > Below are the logs: > > > > Consumer 1 logs, where issue can be seen: https://pastebin.com/PuQhud91 > > Consumer 2 logs: https://pastebin.com/yfJDSGPA > > > > server.log: https://pastebin.com/QKpk0zLn > > controller.log: https://pastebin.com/9T0niwEw > > state-change.log: https://pastebin.com/nrftHPC9 > > > > > > On Fri, Nov 3, 2017 at 1:53 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > > >> Can you pastebin relevant logs from client and broker ? > >> > >> Thanks > >> > >> On Fri, Nov 3, 2017 at 1:37 PM, Manan G <manan....@gmail.com> wrote: > >> > >> > Hello, > >> > > >> > I am using 0.11.0.0 version of Kakfa broker and Java client library. > My > >> > consumer code tracks offsets for each assigned partition and at some > >> time > >> > interval manually commits offsets by specifying partition->offset map. > >> > > >> > What I noticed is, after the rebalance, even if consumer loses some > >> > partitions that were assigned to it previously, offset commit for > those > >> > lost partitions still succeeds by that same consumer! Shouldn't offset > >> > commit fail in this scenario since consumer is trying to commit > offsets > >> for > >> > partitions that are not assigned to it? > >> > > >> > For clarity below are the logs I see with comments: > >> > > >> > // This is when consumer starts for "test" topic and it picks up 3 > >> > partitions > >> > log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0] > >> > > >> > // Now consumer processes 3 records from partition 0 and 7 records > from > >> > partition 2 - confirmed with log statements > >> > log>> ... > >> > > >> > // Rebalance happens - right now, my code does not commit any pending > >> > offsets here and just prints the log statement > >> > log>> onPartitionRevoked: partitions=[test-1, test-2, test-0] > >> > > >> > // After re-balance, consumer loses partition 0 and 1. Again, my code > >> does > >> > not do anything on this callback and just prints the log statement > >> > onPartitionsAssigned: partitions=[test-2] > >> > > >> > // Since the code did not commit offsets during revoke call, after > >> > rebalance, poll() returns all records for assigned partitions since > last > >> > offset commit. > >> > // ... So we re-process 7 records from partition 2. This was confirmed > >> with > >> > log statements. > >> > log>> ... > >> > > >> > // Offset commit gets triggered after some time and due to the bug in > >> the > >> > code, it tries to commit offsets for both partition 0 and 2. > >> > // There is no failure however! I can see on Kafka broker side that > >> offset > >> > for partition 0 is updated to 3. > >> > // I made sure that another consumer that is actually assigned > >> partition 0 > >> > after re-balance has not committed offset yet. > >> > commitOffsets: {0=3, 2=7} > >> > > >> > > >> > Thanks, > >> > M > >> > > >> > > > > >