We have a Kafka broker we use for testing that we have recently updated from 
0.9.0.1 to 0.11.0.0 and our java consumer is built using the 0.11.0.0 client. 
The consumers manually commit offsets and are consuming messages as expected 
since the upgrade. If we restart the consumers they fetch the previously 
committed offset from the broker and restart processing new messages as 
expected. Kafka Manager reports the offsets we expect to see. However, if we 
restart the broker the consumer receives an old offset from the broker and we 
can end up re-processing several days' worth of messages.

We have identified the __consumers_offset partition where the offsets are being 
stored and if we use the console consumer to consume from that partition we see 
a new message appear each time our consumer commits its offsets. The commands 
we use are:

echo "exclude.internal.topics=false" > /tmp/consumer.config
/opt/kafka/bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config 
--formatter 
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 
--bootstrap-server localhost:9092 --topic __consumer_offsets 
--consumer-property group.id=test-offsets-consumer-group --partition 43

And the output shows our consumer group and topic partition for each commit the 
consumer sends, the reported offset is correct.

[ta-eng-cob1-tstat-events,ta-eng-cob1-ayla,0]::[OffsetMetadata[1833602,NO_METADATA],CommitTime
 1507632714328,ExpirationTime 1507719114328]

We also used the following command to check that these commits also trigger a 
new record to be written to the latest __consumer_offset_43 partition log file 
and we see a new record added to the partition log file every time the consumer 
commits offsets.

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log 
--files /var/lib/kafka/__consumer_offsets-43/00000000000003780553.log

baseOffset: 4006387 lastOffset: 4006387 baseSequence: 0 lastSequence: 0 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 16 isTransactional: 
false position: 30359857 CreateTime: 1507632866696 isvalid: true size: 147 
magic: 2 compresscodec: NONE crc: 1175188994

Everything appears to be working as expected until we restart the broker which 
then returns an old offset to the consumer.

For example, in the consumer debug output we see the last commit before the 
broker restart is 1828033

2017-10-09 11:55:16,056 DEBUG [pool-7-thread-2] 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Group 
ta-eng-cob1-tstat-events committed offset 1828033 for partition 
ta-eng-cob1-ayla-0

After we restart the broker we see the consumer receive an old offset of 
1791273 from the broker.

2017-10-09 11:57:22,735 DEBUG [pool-7-thread-2] 
org.apache.kafka.clients.consumer.internals.Fetcher: Resetting offset for 
partition ta-eng-cob1-ayla-0 to the committed offset 1791273

If we just restart the consumer the fetch returns the correct offset 
information from the broker.

2017-10-09 11:52:25,984 DEBUG [pool-7-thread-2] 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Group 
ta-eng-cob1-tstat-events committed offset 1828015 for partition 
ta-eng-cob1-ayla-0
2017-10-09 11:53:21,658 DEBUG [pool-7-thread-2] 
org.apache.kafka.clients.consumer.internals.Fetcher: Resetting offset for 
partition ta-eng-cob1-ayla-0 to the committed offset 1828015

There don't appear to be any errors in the broker logs to indicate a problem, 
so the question is what is making the broker return the incorrect offset when 
it is restarted?

Thanks,
Phil Luckhurst

Reply via email to