Hi there, just wanted to bump up the thread one more time to check if
someone can point us in the right direction... This one was quite a serious
failure that took down many of our kafka brokers..

On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <gauravagarw...@gmail.com>
wrote:

> Hi All,
>
> We are facing a weird problem where Kafka broker fails to start due to an
> unhandled exception while 'recovering' a log segment. I have been able to
> isolate the problem to a single record and providing the details below:
>
> During Kafka restart, if index files are corrupted or they don't exist,
> kafka broker is trying to 'recover' a LogSegment and rebuild the indexes -
> LogSegment:recover()
> I the main while loop here which iterates over the entries in the log:
> while(iter.hasNext) { val entry = iter.next....}, I get an entry with
> complete underlying byte buffer as follows:
>
> [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
> 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
> 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
> 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10,
> 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108,
> 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114,
> 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
> 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
> 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
> 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8,
> 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
> 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
> 48, 72]
>
> A toString() on this entry yields:
>
> *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key
> = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)*
>
> It appears that this record is corrupt and deserializing/decompressing it
> causes exceptions which are unhandled. Specifically in 0.10.0 version this
> calls fails with NoSuchElementException
>
> ByteBufferMessageSet.deepIterator(entry).next().offset
>
> Note: This message was written to disk using* kafka 0.10.0 broker running
> snappy jar version 1.1.1.7* (which is known to have some read time bugs).
> The log file itself is 512MB large and this message appears at around 4MB
> in the file.
>
> We have upgraded snappy; but should this condition be handled correctly?
> What is the correct behavior here? Should the exception be handled and log
> file be truncated? At the moment this causes kafka to completely crash with
> no recovery path except of deleting the bad data file manually and then
> starting kafka.
>
> --
>
> cheers,
>
> gaurav
>
>
> A test case to repro the crash
>
> @Test
>
> def testCorruptLog() {
>
>  val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
> -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79,
> -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
> 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48,
> 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
> 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
> 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
> -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
> 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99,
> 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115,
> 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50,
> 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52,
> 52, 54, 48, 56, 48, 72);
>
>   val msg = new Message(ByteBuffer.wrap(buf), None, None)
>
>   val entry = new MessageAndOffset(msg, 4449011L)
>
>   val deepIterator: Iterator[MessageAndOffset] =
> ByteBufferMessageSet.deepIterator(entry)
>
>   deepIterator.next().offset
>
> }
>

Reply via email to