Kafka version: 0.10.0

Exception Trace
--------------------
java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37)
at kafka.log.LogSegment.recover(LogSegment.scala:189)
at kafka.log.Log.recoverLog(Log.scala:268)
at kafka.log.Log.loadSegments(Log.scala:243)
at kafka.log.Log.<init>(Log.scala:101)
at kafka.log.LogTest.testCorruptLog(LogTest.scala:830)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Test Code (same exception trace is see in broker logs as well on prod
machines with exactly the same lof files as given in this mini test)
---------

val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024: java.lang.Integer)
val config = LogConfig(logProps)
val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topic3-12")
var log = new Log(cp, config, 0, time.scheduler, time


On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:

> Can you paste the entire exception stacktrace please?
>
> -Jaikiran
>
> On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:
>
>> Hi there, just wanted to bump up the thread one more time to check if
>> someone can point us in the right direction... This one was quite a
>> serious
>> failure that took down many of our kafka brokers..
>>
>> On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <gauravagarw...@gmail.com
>> >
>> wrote:
>>
>> Hi All,
>>>
>>> We are facing a weird problem where Kafka broker fails to start due to an
>>> unhandled exception while 'recovering' a log segment. I have been able to
>>> isolate the problem to a single record and providing the details below:
>>>
>>> During Kafka restart, if index files are corrupted or they don't exist,
>>> kafka broker is trying to 'recover' a LogSegment and rebuild the indexes
>>> -
>>> LogSegment:recover()
>>> I the main while loop here which iterates over the entries in the log:
>>> while(iter.hasNext) { val entry = iter.next....}, I get an entry with
>>> complete underlying byte buffer as follows:
>>>
>>> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
>>> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
>>> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
>>> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102,
>>> 10,
>>> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101,
>>> 108,
>>> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46,
>>> 114,
>>> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
>>> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
>>> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
>>> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11,
>>> 8,
>>> 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
>>> 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48,
>>> 56,
>>> 48, 72]
>>>
>>> A toString() on this entry yields:
>>>
>>> *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251,
>>> key
>>> = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197
>>> cap=197]),4449011)*
>>>
>>> It appears that this record is corrupt and deserializing/decompressing it
>>> causes exceptions which are unhandled. Specifically in 0.10.0 version
>>> this
>>> calls fails with NoSuchElementException
>>>
>>> ByteBufferMessageSet.deepIterator(entry).next().offset
>>>
>>> Note: This message was written to disk using* kafka 0.10.0 broker running
>>> snappy jar version 1.1.1.7* (which is known to have some read time bugs).
>>>
>>> The log file itself is 512MB large and this message appears at around 4MB
>>> in the file.
>>>
>>> We have upgraded snappy; but should this condition be handled correctly?
>>> What is the correct behavior here? Should the exception be handled and
>>> log
>>> file be truncated? At the moment this causes kafka to completely crash
>>> with
>>> no recovery path except of deleting the bad data file manually and then
>>> starting kafka.
>>>
>>> --
>>>
>>> cheers,
>>>
>>> gaurav
>>>
>>>
>>> A test case to repro the crash
>>>
>>> @Test
>>>
>>> def testCorruptLog() {
>>>
>>>   val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
>>> -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0,
>>> -79,
>>> -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
>>> 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
>>> 48,
>>> 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
>>> 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
>>> 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
>>> -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
>>> 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72,
>>> 99,
>>> 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111,
>>> 115,
>>> 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49,
>>> 50,
>>> 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53,
>>> 52,
>>> 52, 54, 48, 56, 48, 72);
>>>
>>>    val msg = new Message(ByteBuffer.wrap(buf), None, None)
>>>
>>>    val entry = new MessageAndOffset(msg, 4449011L)
>>>
>>>    val deepIterator: Iterator[MessageAndOffset] =
>>> ByteBufferMessageSet.deepIterator(entry)
>>>
>>>    deepIterator.next().offset
>>>
>>> }
>>>
>>>
>

Reply via email to