Kafka version: 0.10.0 Exception Trace -------------------- java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37) at kafka.log.LogSegment.recover(LogSegment.scala:189) at kafka.log.Log.recoverLog(Log.scala:268) at kafka.log.Log.loadSegments(Log.scala:243) at kafka.log.Log.<init>(Log.scala:101) at kafka.log.LogTest.testCorruptLog(LogTest.scala:830) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Test Code (same exception trace is see in broker logs as well on prod machines with exactly the same lof files as given in this mini test) --------- val logProps = new Properties() logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024: java.lang.Integer) val config = LogConfig(logProps) val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topic3-12") var log = new Log(cp, config, 0, time.scheduler, time On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai <jai.forums2...@gmail.com> wrote: > Can you paste the entire exception stacktrace please? > > -Jaikiran > > On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote: > >> Hi there, just wanted to bump up the thread one more time to check if >> someone can point us in the right direction... This one was quite a >> serious >> failure that took down many of our kafka brokers.. >> >> On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <gauravagarw...@gmail.com >> > >> wrote: >> >> Hi All, >>> >>> We are facing a weird problem where Kafka broker fails to start due to an >>> unhandled exception while 'recovering' a log segment. I have been able to >>> isolate the problem to a single record and providing the details below: >>> >>> During Kafka restart, if index files are corrupted or they don't exist, >>> kafka broker is trying to 'recover' a LogSegment and rebuild the indexes >>> - >>> LogSegment:recover() >>> I the main while loop here which iterates over the entries in the log: >>> while(iter.hasNext) { val entry = iter.next....}, I get an entry with >>> complete underlying byte buffer as follows: >>> >>> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65, >>> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1, >>> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58, >>> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, >>> 10, >>> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, >>> 108, >>> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, >>> 114, >>> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42, >>> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116, >>> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101, >>> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, >>> 8, >>> 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50, >>> 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, >>> 56, >>> 48, 72] >>> >>> A toString() on this entry yields: >>> >>> *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, >>> key >>> = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 >>> cap=197]),4449011)* >>> >>> It appears that this record is corrupt and deserializing/decompressing it >>> causes exceptions which are unhandled. Specifically in 0.10.0 version >>> this >>> calls fails with NoSuchElementException >>> >>> ByteBufferMessageSet.deepIterator(entry).next().offset >>> >>> Note: This message was written to disk using* kafka 0.10.0 broker running >>> snappy jar version 1.1.1.7* (which is known to have some read time bugs). >>> >>> The log file itself is 512MB large and this message appears at around 4MB >>> in the file. >>> >>> We have upgraded snappy; but should this condition be handled correctly? >>> What is the correct behavior here? Should the exception be handled and >>> log >>> file be truncated? At the moment this causes kafka to completely crash >>> with >>> no recovery path except of deleting the bad data file manually and then >>> starting kafka. >>> >>> -- >>> >>> cheers, >>> >>> gaurav >>> >>> >>> A test case to repro the crash >>> >>> @Test >>> >>> def testCorruptLog() { >>> >>> val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, >>> -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, >>> -79, >>> -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, >>> 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, >>> 48, >>> 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, >>> 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, >>> 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, >>> -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, >>> 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, >>> 99, >>> 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, >>> 115, >>> 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, >>> 50, >>> 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, >>> 52, >>> 52, 54, 48, 56, 48, 72); >>> >>> val msg = new Message(ByteBuffer.wrap(buf), None, None) >>> >>> val entry = new MessageAndOffset(msg, 4449011L) >>> >>> val deepIterator: Iterator[MessageAndOffset] = >>> ByteBufferMessageSet.deepIterator(entry) >>> >>> deepIterator.next().offset >>> >>> } >>> >>> >