After digging a bit into the source code of LogCleaner class, i realized
maybe i should not try to fix the crash by increasing the
log.cleaner.io.buffer.size: the cleaner would automatically grow the buffer
so that it can hold at least one record.

Also for the last error i pasted above, i.e. "largest offset in message set
can not be safely converted to relative offset", it should be due to the
fact that i set a too large io buffer size (256MB) which can hold too many
records, among which the value of (largest offset - base offset of the new
cleaned segment) is larger than Integer.MAX_VALUE, so it failed the
assertion
<https://github.com/apache/kafka/blob/0.10.2.0/core/src/main/scala/kafka/log/LogSegment.scala#L86>
in LogSegment.append.

>From the source code, I see the workflow of cleaning a segment is like this:

- read as much records as possible into the readBuffer
- construct a MemoryRecords object and filter the records using the
OffsetMap
- write the filtered records into the writeBuffer
- repeat, until the whole segment is processed

So the question is clear: Given that the size of readBuffer and writeBuffer
are exactly the same
<https://github.com/apache/kafka/blob/0.10.2.0/core/src/main/scala/kafka/log/LogCleaner.scala#L320-L324>
(half of log.cleaner.io.buffer.size), why would the cleaner throw a
BufferOverflowException when writing the filtered records into the
writeBuffer?  That should never happen because the size of the filtered
records should be no greater that the size of the readBuffer, thus no
greater than the size of the writeBuffer.

[2017-03-24 10:41:07,372] ERROR [kafka-log-cleaner-thread-0], Error due to
>  (kafka.log.LogCleaner)
> java.nio.BufferOverflowException
>         at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
>         at org.apache.kafka.common.record.LogEntry.writeTo(
> LogEntry.java:98)
>         at org.apache.kafka.common.record.MemoryRecords.filterTo(
> MemoryRecords.java:158)
>         at org.apache.kafka.common.record.MemoryRecords.filterTo(
> MemoryRecords.java:111)
>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)


Also worth mentioning that we are still using log.message.format.version =
0.9.0.0 because there are still some old consumers. Could this be related
to the problem?


On Sun, Apr 2, 2017 at 11:46 PM, Shuai Lin <linshuai2...@gmail.com> wrote:

> Hi,
>
> Recently we updated from kafka 0.10.0.1 to 0.10.2.1, and found the log
> cleaner thread crashed with this error:
>
> [2017-03-24 10:41:03,926] INFO [kafka-log-cleaner-thread-0], Starting
>>  (kafka.log.LogCleaner)
>> [2017-03-24 10:41:04,177] INFO Cleaner 0: Beginning cleaning of log
>> app-topic-20170317-20. (kafka.log.LogCleaner)
>> [2017-03-24 10:41:04,177] INFO Cleaner 0: Building offset map for
>> app-topic-20170317-20... (kafka.log.LogCleaner)
>> [2017-03-24 10:41:04,387] INFO Cleaner 0: Building offset map for log
>> app-topic-20170317-20 for 1 segments in offset range [9737795, 9887707).
>> (kafka.log.LogCleaner)
>> [2017-03-24 10:41:07,101] INFO Cleaner 0: Offset map for log
>> app-topic-20170317-20 complete. (kafka.log.LogCleaner)
>> [2017-03-24 10:41:07,106] INFO Cleaner 0: Cleaning log
>> app-topic-20170317-20 (cleaning prior to Fri Mar 24 10:36:06 GMT 2017,
>> discarding tombstones prior to Thu Mar 23 10:18:02 GMT 2017)...
>> (kafka.log.LogCleaner)
>> [2017-03-24 10:41:07,110] INFO Cleaner 0: Cleaning segment 0 in log
>> app-topic-20170317-20 (largest timestamp Fri Mar 24 09:58:25 GMT 2017) into
>> 0, retaining deletes. (kafka.log.LogCleaner)
>> [2017-03-24 10:41:07,372] ERROR [kafka-log-cleaner-thread-0], Error due
>> to  (kafka.log.LogCleaner)
>> java.nio.BufferOverflowException
>>         at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
>>         at org.apache.kafka.common.record.LogEntry.writeTo(
>> LogEntry.java:98)
>>         at org.apache.kafka.common.record.MemoryRecords.filterTo(
>> MemoryRecords.java:158)
>>         at org.apache.kafka.common.record.MemoryRecords.filterTo(
>> MemoryRecords.java:111)
>>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
>>         at kafka.log.Cleaner.$anonfun$cleanSegments$1(LogCleaner.
>> scala:405)
>>         at kafka.log.Cleaner.$anonfun$cleanSegments$1$adapted(
>> LogCleaner.scala:401)
>>         at scala.collection.immutable.List.foreach(List.scala:378)
>>         at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
>>         at kafka.log.Cleaner.$anonfun$clean$6(LogCleaner.scala:363)
>>         at kafka.log.Cleaner.$anonfun$clean$6$adapted(LogCleaner.
>> scala:362)
>>         at scala.collection.immutable.List.foreach(List.scala:378)
>>         at kafka.log.Cleaner.clean(LogCleaner.scala:362)
>>         at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(
>> LogCleaner.scala:241)
>>         at kafka.log.LogCleaner$CleanerThread.doWork(
>> LogCleaner.scala:220)
>>         at kafka.utils.ShutdownableThread.run(
>> ShutdownableThread.scala:63)
>> [2017-03-24 10:41:07,375] INFO [kafka-log-cleaner-thread-0], Stopped
>>  (kafka.log.LogCleaner)
>
>
>
> For completeness, here are some other related settings (they are the same
> before & after the upgrade to 0.10.2):
>
> - log.cleaner.enable = 'true'
> - log.cleaner.min.cleanable.ratio = '0.1'
> - log.cleaner.threads = '1'
> - log.cleaner.io.buffer.load.factor = '0.98'
> - log.roll.hours = '24'
> - log.cleaner.dedupe.buffer.size = 2GB
> - log.segment.bytes = 512MB
> - message.max.bytes = 10MB
>
>
> Before the upgrade, we used the default value of
> *log.cleaner.io.buffer.size*, i.e. 512K. Since the above error said
> "buffer overflow", I updated it to 2M, but the log cleaner thread crashed
> immediately (with the same error) when the broker restarted.
>
> Then I tried 10M, then 128M, but all with no luck. Finally when I used
> 256MB, the above error didn't happen anymore.
>
> However, after a few days, the log cleaner thread crashed again, with
> another error:
>
> [2017-03-27 12:33:51,323] INFO Cleaner 0: Cleaning segment 8590943933
>> <0859%20094%203933> in log __consumer_offsets-49 (largest timestamp Mon
>> Mar 27 12:27:12 GMT 2017) into 6443803053, retaining deletes.
>> (kafka.log.LogCleaner)
>> [2017-03-27 12:33:51,377] ERROR [kafka-log-cleaner-thread-0], Error due
>> to  (kafka.log.LogCleaner)
>> java.lang.IllegalArgumentException: requirement failed: largest offset
>> in message set can not be safely converted to relative offset.
>>         at scala.Predef$.require(Predef.scala:277)
>>         at kafka.log.LogSegment.append(LogSegment.scala:109)
>>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:482)
>>         at kafka.log.Cleaner.$anonfun$cleanSegments$1(LogCleaner.
>> scala:405)
>>         at kafka.log.Cleaner.$anonfun$cleanSegments$1$adapted(
>> LogCleaner.scala:401)
>>         at scala.collection.immutable.List.foreach(List.scala:378)
>>         at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
>>         at kafka.log.Cleaner.$anonfun$clean$6(LogCleaner.scala:363)
>>         at kafka.log.Cleaner.$anonfun$clean$6$adapted(LogCleaner.
>> scala:362)
>>         at scala.collection.immutable.List.foreach(List.scala:378)
>>         at kafka.log.Cleaner.clean(LogCleaner.scala:362)
>>         at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(
>> LogCleaner.scala:241)
>>         at kafka.log.LogCleaner$CleanerThread.doWork(
>> LogCleaner.scala:220)
>>         at kafka.utils.ShutdownableThread.run(
>> ShutdownableThread.scala:63)
>> [2017-03-27 12:33:51,377] INFO [kafka-log-cleaner-thread-0], Stopped
>>  (kafka.log.LogCleaner)
>
>
> Can someone explain what might have caused the above errors, or suggest a
> possible fix? Thanks!
>
> Regards,
> Shuai
>

Reply via email to