Hi All, We are facing a weird problem where Kafka broker fails to start due to an unhandled exception while 'recovering' a log segment. I have been able to isolate the problem to a single record and providing the details below:
During Kafka restart, if index files are corrupted or they don't exist, kafka broker is trying to 'recover' a LogSegment and rebuild the indexes - LogSegment:recover() I the main while loop here which iterates over the entries in the log: while(iter.hasNext) { val entry = iter.next....}, I get an entry with complete underlying byte buffer as follows: [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 72] A toString() on this entry yields: *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)* It appears that this record is corrupt and deserializing/decompressing it causes exceptions which are unhandled. Specifically in 0.10.0 version this calls fails with NoSuchElementException ByteBufferMessageSet.deepIterator(entry).next().offset Note: This message was written to disk using* kafka 0.10.0 broker running snappy jar version 1.1.1.7* (which is known to have some read time bugs). The log file itself is 512MB large and this message appears at around 4MB in the file. We have upgraded snappy; but should this condition be handled correctly? What is the correct behavior here? Should the exception be handled and log file be truncated? At the moment this causes kafka to completely crash with no recovery path except of deleting the bad data file manually and then starting kafka. -- cheers, gaurav A test case to repro the crash @Test def testCorruptLog() { val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 72); val msg = new Message(ByteBuffer.wrap(buf), None, None) val entry = new MessageAndOffset(msg, 4449011L) val deepIterator: Iterator[MessageAndOffset] = ByteBufferMessageSet.deepIterator(entry) deepIterator.next().offset }