Jason Gustafson created KAFKA-5316:
--------------------------------------

             Summary: Log cleaning can increase message size and cause cleaner 
to crash with buffer overflow
                 Key: KAFKA-5316
                 URL: https://issues.apache.org/jira/browse/KAFKA-5316
             Project: Kafka
          Issue Type: Bug
            Reporter: Jason Gustafson
            Assignee: Jason Gustafson


We have observed in practice that it is possible for a compressed record set to 
grow after cleaning. Since the size of the cleaner's input and output buffers 
are identical, this can lead to overflow of the write buffer:

{code}
[2017-05-23 15:05:15,480] ERROR [kafka-log-cleaner-thread-0], Error due to  
(kafka.log.LogCleaner)
java.nio.BufferOverflowException
        at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
        at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.java:104)
        at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:163)
        at 
org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:114)
        at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
        at 
kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
        at 
kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
        at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
        at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at kafka.log.Cleaner.clean(LogCleaner.scala:362)
        at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-05-23 15:05:15,481] INFO [kafka-log-cleaner-thread-0], Stopped  
(kafka.log.LogCleaner)
{code}

It is also then possible for a compressed message set to grow beyond the max 
message size. Due to the changes in KIP-74 to alter fetch semantics, the 
suggestion for this case is to allow the recompressed message set to exceed the 
max message size. This should be rare in practice and won't prevent consumers 
from making progress.

To handle the issue, one option is to allocate a temporary buffer when 
filtering in {{MemoryRecords.filterTo}} and return it in the result. As an 
optimization, we can resort to this only when there is a single recompressed 
message set which is larger than the entire write buffer. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to