Kiran created KAFKA-3587:
----------------------------

             Summary: LogCleaner fails due to incorrect offset map computation 
on a replica
                 Key: KAFKA-3587
                 URL: https://issues.apache.org/jira/browse/KAFKA-3587
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.9.0.1
         Environment: Linux
            Reporter: Kiran


Log Cleaner fails to compact a segment even when the number of messages in it 
is less than the offset map.


In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner computes 
segment size by subtracting segment's base offset from the latest offset 
("segmentSize = segment.nextOffset() - segment.baseOffset").  This works fine 
until you create another replica. When you create a replica, it's segment could 
contain data which is already compacted on other brokers. Depending up on the 
type of data, offset difference could be too big, larger than the offset map 
(maxDesiredMapSize), and that causes LogCleaner to fail on that segment.


Scenario:
- Kafka 0.9.0.1
- Cluster has two brokers.
- Server.properties:
log.cleaner.enable=true
log.cleaner.dedupe.buffer.size=10485760 #10MB
log.roll.ms=300000
delete.topic.enable=true
log.cleanup.policy=compact


Steps to reproduce:
1. Create a topic with replication-factor of 1.

./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
test.log.compact.1M --partitions 1 --replication-factor 1 --config 
cleanup.policy=compact --config segment.ms=300000

2. Use kafka-console-producer.sh to produce a single message with the following 
key:
LC1,{"test": "xyz"}

3. Use  kafka-console-producer.sh to produce a large number of messages with 
the following key:
LC2,{"test": "abc"}

4. Let log cleaner run. Make sure log is compacted.  Verify with:
 ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
00000000000000000000.log  --print-data-log

Dumping 00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": "xyz"}
offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 payload: 
{"test": "abc"}

5.  Increase Replication Factor to 2.  Followed these steps: 
http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor

6. Notice that log cleaner fails to compact the newly created replica with the 
following error.
[2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
(kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
segment test.log.compact.1M-0/00000000000000000000.log but offset map can fit 
only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
log.cleaner.threads
        at scala.Predef$.require(Predef.scala:219)
        at 
kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
        at 
kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
        at 
scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
        at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
        at kafka.log.Cleaner.clean(LogCleaner.scala:322)
        at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
(kafka.log.LogCleaner)

7. Examine the entries in the replica segment:

./kafka-run-class.sh kafka.tools.DumpLogSegments --files 
00000000000000000000.log  --print-data-log
There are only 218418 messages in that segment.

However, Log Cleaner seems to think that there are 7206179 messages in that 
segment (as per the above error)


Error stems from this line in LogCleaner.scala:
"""val segmentSize = segment.nextOffset() - segment.baseOffset"""

In Replica's log segment file ( 00000000000000000000.log), ending offset is 
7206178. Beginning offset is 0.  That makes Log Cleaner think that there are 
7206179 messages in that segment although there are only 218418 messages in it.

IMO,  to address this kind of scenario, LogCleaner.scala should check for the 
number of messages in the segment, instead of subtracting beginning offset from 
the ending offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to