[ https://issues.apache.org/jira/browse/KAFKA-861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13629410#comment-13629410 ]
Sriram Subramanian commented on KAFKA-861: ------------------------------------------ This happens when an existing follower becomes the new leader and the current leader starts following the new leader. The existing follower closes the fetcher thread and transitions to become a leader The current leader truncates its log to high water mark and starts following the new leader The messageset that is received by the old follower during this transition contains only zero bytes. When we try to iterate through this messageset, we fail and throw the above exception. What causes these zero bytes to be present in the messageset? It looks like when the old leader truncated its log, it was also trying to send bytes to the follower. These bytes were outside the truncated region. Somehow, the bytes after the highwatermark all became zeros. It turns out that in jdk 1.6 there is a bug in truncateTo that truncates the file but does not update the postion of the file. This is fixed in kafka by explicitly setting the position after the truncate call. However, a simple program below verifies that reading the file channel after the truncated region (without setting the position) is totally fine and does not return any bytes 1. // create a channel for a file val path = "/home/myid/outfile1" val fileAccess = new RandomAccessFile(path, "rw") val fc = fileAccess.getChannel 2. // create random buffer val b = ByteBuffer.allocate(100) new Random().nextBytes(b.array()) // write the buffer to the channel fc.write(b) var pos = fc.position() // position is 100 var size = fc.size() // size is 100 3. // truncate the channel fc.truncate(50) size = fc.size() // size is 50 pos = fc.position() // position is 100 4. // transfer the truncated portition to a channel val path1 = "/home/myid/outfile2" val f2 = new RandomAccessFile(path1, "rw") val fc1 = f2.getChannel val transferred = fc.transferTo(50, 50, fc1) // transferred is 0 Further, if we add the 3" step below after step 3 above, it can be seen that step 4 does return non zero bytes and they all contain 0 bytes. 3" // write more bytes b.rewind() fc.write(b) pos = fc.position() // position is 200 size = fc.size() // size is 200 The code above shows that appending to a file without setting the position after truncate does expose the zero bytes to the reader. But in kafka, truncate/set position and append are all synchronized. This means we should not hit the issue above. This could mean there is a race condition in FileChannelImpl that could somehow cause this. The code snippet below from transferTo method from FileChannelImpl might explain what we see. long sz = size(); -- > checks size. size() is synchronized with other FileChannelImpl methods if (position > sz) return 0; --> This is what is returned in step 4 above in the first case. The size is smaller than the position requested. However, truncate can happen after this line. int icount = (int)Math.min(count, Integer.MAX_VALUE); if ((sz - position) < icount) icount = (int)(sz - position); long n; // Attempt a direct transfer, if the kernel supports it if ((n = transferToDirectly(position, icount, target)) >= 0) // the size check above could have been good above but at this point the size is smaller than the requested return n; // position. transferToDirectly calls transferTo0 which could just read the zero bytes written by truncate. Few open questions 1. Does truncate zero out the bytes synchronously or lazily? If it is lazy, we could also get junk bytes instead of zeros 2. How to fix it in kafka. One possible fix is to ensure that the MessageSet iterator throws invalid message when it encounters 0 byte size or if crc does not match the message. The follower can then try to refetch the offset for that topic partition or just fail (atleast we know the cause). > IndexOutOfBoundsException while fetching data from leader > --------------------------------------------------------- > > Key: KAFKA-861 > URL: https://issues.apache.org/jira/browse/KAFKA-861 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Sriram Subramanian > Assignee: Sriram Subramanian > Fix For: 0.8 > > > 2013-04-09 16:36:50,051] ERROR [ReplicaFetcherThread-0-261], Error due to > (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: error processing data for topic firehoseUpdates > partititon 14 offset 53531364 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:136) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > Caused by: java.lang.IndexOutOfBoundsException > at java.nio.Buffer.checkIndex(Buffer.java:512) > at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:121) > at kafka.message.Message.compressionCodec(Message.scala:202) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:174) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:197) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53) > at scala.collection.IterableLike$class.isEmpty(IterableLike.scala:92) > at kafka.message.MessageSet.isEmpty(MessageSet.scala:67) > at > scala.collection.TraversableLike$class.lastOption(TraversableLike.scala:512) > at kafka.message.MessageSet.lastOption(MessageSet.scala:67) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira