Hi there, just wanted to bump up the thread one more time to check if someone can point us in the right direction... This one was quite a serious failure that took down many of our kafka brokers..
On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <gauravagarw...@gmail.com> wrote: > Hi All, > > We are facing a weird problem where Kafka broker fails to start due to an > unhandled exception while 'recovering' a log segment. I have been able to > isolate the problem to a single record and providing the details below: > > During Kafka restart, if index files are corrupted or they don't exist, > kafka broker is trying to 'recover' a LogSegment and rebuild the indexes - > LogSegment:recover() > I the main while loop here which iterates over the entries in the log: > while(iter.hasNext) { val entry = iter.next....}, I get an entry with > complete underlying byte buffer as follows: > > [82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65, > 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1, > 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58, > 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10, > 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108, > 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114, > 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42, > 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116, > 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101, > 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8, > 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50, > 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, > 48, 72] > > A toString() on this entry yields: > > *MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key > = null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)* > > It appears that this record is corrupt and deserializing/decompressing it > causes exceptions which are unhandled. Specifically in 0.10.0 version this > calls fails with NoSuchElementException > > ByteBufferMessageSet.deepIterator(entry).next().offset > > Note: This message was written to disk using* kafka 0.10.0 broker running > snappy jar version 1.1.1.7* (which is known to have some read time bugs). > The log file itself is 512MB large and this message appears at around 4MB > in the file. > > We have upgraded snappy; but should this condition be handled correctly? > What is the correct behavior here? Should the exception be handled and log > file be truncated? At the moment this causes kafka to completely crash with > no recovery path except of deleting the bad data file manually and then > starting kafka. > > -- > > cheers, > > gaurav > > > A test case to repro the crash > > @Test > > def testCorruptLog() { > > val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, > -59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, > -58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, > 48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, > 0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, > 111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, > 97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120, > -115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, > 109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, > 101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, > 116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, > 53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, > 52, 54, 48, 56, 48, 72); > > val msg = new Message(ByteBuffer.wrap(buf), None, None) > > val entry = new MessageAndOffset(msg, 4449011L) > > val deepIterator: Iterator[MessageAndOffset] = > ByteBufferMessageSet.deepIterator(entry) > > deepIterator.next().offset > > } >