Ted, did you get chance to look at the issue? I am also planning to update
to latest version to find out if we still see the same issue.

On Fri, Nov 3, 2017 at 8:40 PM, Mana M <manan....@gmail.com> wrote:

> Below are the logs:
>
> Consumer 1 logs, where issue can be seen: https://pastebin.com/PuQhud91
> Consumer 2 logs: https://pastebin.com/yfJDSGPA
>
> server.log: https://pastebin.com/QKpk0zLn
> controller.log: https://pastebin.com/9T0niwEw
> state-change.log: https://pastebin.com/nrftHPC9
>
>
> On Fri, Nov 3, 2017 at 1:53 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Can you pastebin relevant logs from client and broker ?
>>
>> Thanks
>>
>> On Fri, Nov 3, 2017 at 1:37 PM, Manan G <manan....@gmail.com> wrote:
>>
>> > Hello,
>> >
>> > I am using 0.11.0.0 version of Kakfa broker and Java client library. My
>> > consumer code tracks offsets for each assigned partition and at some
>> time
>> > interval manually commits offsets by specifying partition->offset map.
>> >
>> > What I noticed is, after the rebalance, even if consumer loses some
>> > partitions that were assigned to it previously, offset commit for those
>> > lost partitions still succeeds by that same consumer! Shouldn't offset
>> > commit fail in this scenario since consumer is trying to commit offsets
>> for
>> > partitions that are not assigned to it?
>> >
>> > For clarity below are the logs I see with comments:
>> >
>> > // This is when consumer starts for "test" topic and it picks up 3
>> > partitions
>> > log>> onPartitionsAssigned: partitions=[test-1, test-2, test-0]
>> >
>> > // Now consumer processes 3 records from partition 0 and 7 records from
>> > partition 2 - confirmed with log statements
>> > log>> ...
>> >
>> > // Rebalance happens - right now, my code does not commit any pending
>> > offsets here and just prints the log statement
>> > log>> onPartitionRevoked: partitions=[test-1, test-2, test-0]
>> >
>> > // After re-balance, consumer loses partition 0 and 1. Again, my code
>> does
>> > not do anything on this callback and just prints the log statement
>> > onPartitionsAssigned: partitions=[test-2]
>> >
>> > // Since the code did not commit offsets during revoke call, after
>> > rebalance, poll() returns all records for assigned partitions since last
>> > offset commit.
>> > // ... So we re-process 7 records from partition 2. This was confirmed
>> with
>> > log statements.
>> > log>> ...
>> >
>> > // Offset commit gets triggered after some time and due to the bug in
>> the
>> > code, it tries to commit offsets for both partition 0 and 2.
>> > // There is no failure however! I can see on Kafka broker side that
>> offset
>> > for partition 0 is updated to 3.
>> > // I made sure that another consumer that is actually assigned
>> partition 0
>> > after re-balance has not committed offset yet.
>> > commitOffsets: {0=3, 2=7}
>> >
>> >
>> > Thanks,
>> > M
>> >
>>
>
>

Reply via email to