[ 
https://issues.apache.org/jira/browse/KAFKA-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16029331#comment-16029331
 ] 

ASF GitHub Bot commented on KAFKA-5316:
---------------------------------------

GitHub user ijuma opened a pull request:

    https://github.com/apache/kafka/pull/3166

    KAFKA-5316: Follow-up with ByteBufferOutputStream and other misc 
improvements

    ByteBufferOutputStream improvements:
    * Document pitfalls
    * Improve efficiency when dealing with direct byte buffers
    * Improve handling of buffer expansion
    * Be consistent about using `limit` instead of `capacity
    
    Other minor changes:
    * Fix log warning to specify correct Kafka version
    * Clean-ups

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ijuma/kafka minor-kafka-5316-follow-ups

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/3166.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3166
    
----
commit 4b7e253d8d83c77600fe513790a6871c90aef5e9
Author: Ismael Juma <ism...@juma.me.uk>
Date:   2017-05-30T12:05:32Z

    A few improvements to ByteBufferOutputStream
    
    1. Document pitfalls
    2. Improve efficiency when dealing with direct byte buffers
    3. Improve handling of buffer expansion
    4. Be consistent about using `limit` instead of `capacity`

commit bc398d86e2f48dab7e5275f51c122dbeb65094c0
Author: Ismael Juma <ism...@juma.me.uk>
Date:   2017-05-30T12:06:09Z

    Fix log warning to specify correct Kafka version

commit f62ad676d5eb827c78af40ade485528474217ea9
Author: Ismael Juma <ism...@juma.me.uk>
Date:   2017-05-30T12:09:54Z

    Remove a couple of cases where a ByteBuffer is passed to 
`ByteBufferOutputStream`
    
    There are many more and some do it for good reasons, but these seemed easy 
to fix.

commit 35d15b6d39e02e1f290f3308237cbade19e39847
Author: Ismael Juma <ism...@juma.me.uk>
Date:   2017-05-30T12:10:47Z

    Remove a couple of redundant fields in `MemoryRecordsBuilder`

commit e661eda83bc609f02f968a2ffb48cd8d66c53a29
Author: Ismael Juma <ism...@juma.me.uk>
Date:   2017-05-30T12:11:03Z

    Minor grammar fix

----


> Log cleaning can increase message size and cause cleaner to crash with buffer 
> overflow
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5316
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5316
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>             Fix For: 0.11.0.0
>
>
> We have observed in practice that it is possible for a compressed record set 
> to grow after cleaning. Since the size of the cleaner's input and output 
> buffers are identical, this can lead to overflow of the output buffer:
> {code}
> [2017-05-23 15:05:15,480] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.nio.BufferOverflowException
>         at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
>         at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.java:104)
>         at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:163)
>         at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:114)
>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
>         at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
>         at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
>         at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
>         at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at kafka.log.Cleaner.clean(LogCleaner.scala:362)
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-05-23 15:05:15,481] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> {code}
> It is also then possible for a compressed message set to grow beyond the max 
> message size. Due to the changes in KIP-74 to alter fetch semantics, the 
> suggestion for this case is to allow the recompressed message set to exceed 
> the max message size. This should be rare in practice and won't prevent 
> consumers from making progress.
> To handle the overflow issue, one option is to allocate a temporary buffer 
> when filtering in {{MemoryRecords.filterTo}} and return it in the result. As 
> an optimization, we can resort to this only when there is a single 
> recompressed message set which is larger than the entire write buffer. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to