[ 
https://issues.apache.org/jira/browse/KAFKA-861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13629410#comment-13629410
 ] 

Sriram Subramanian commented on KAFKA-861:
------------------------------------------

This happens when an existing follower becomes the new leader and the current 
leader starts following the new leader. 

The existing follower closes the fetcher thread and transitions to become a 
leader
The current leader truncates its log to high water mark and starts following 
the new leader

The messageset that is received by the old follower during this transition 
contains only zero bytes. When we try to iterate through this messageset, we 
fail and throw the above exception.

What causes these zero bytes to be present in the messageset? It looks like 
when the old leader truncated its log, it was also trying to send bytes to the 
follower. These bytes were outside the truncated region. Somehow, the bytes 
after the highwatermark all became zeros. 

It turns out that in jdk 1.6 there is a bug in truncateTo that truncates the 
file but does not update the postion of the file. This is fixed in kafka by 
explicitly setting the position after the truncate call. However, a simple 
program below verifies that reading the file channel after the truncated region 
(without setting the position) is totally fine and does not return any bytes

1.
    // create a channel for a file
    val path = "/home/myid/outfile1"
    val fileAccess = new RandomAccessFile(path, "rw")
    val fc = fileAccess.getChannel

2. 
    // create random buffer
    val b = ByteBuffer.allocate(100)
    new Random().nextBytes(b.array())
   
    // write the buffer to the channel
    fc.write(b)
    var pos = fc.position() // position is 100
    var size = fc.size() // size is 100

3.
    // truncate the channel
    fc.truncate(50)
    size = fc.size() // size is 50
    pos = fc.position() // position is 100

4. 
    // transfer the truncated portition to a channel
    val path1 = "/home/myid/outfile2"
    val f2 = new RandomAccessFile(path1, "rw")
    val fc1 = f2.getChannel
    val transferred = fc.transferTo(50, 50, fc1) // transferred is 0

Further, if we add the 3" step below after step 3 above, it can be seen that 
step 4 does return non zero bytes and they all contain 0 bytes.

3"

   // write more bytes
    b.rewind()
    fc.write(b)
    pos = fc.position() // position is 200
    size = fc.size() // size is 200

The code above shows that appending to a file without setting the position 
after truncate does expose the zero bytes to the reader. But in kafka, 
truncate/set position and append are all synchronized. This means we should not 
hit the issue above. 

This could mean there is a race condition in FileChannelImpl that could somehow 
cause this. The code snippet below from transferTo method from FileChannelImpl 
might explain what we see. 

        long sz = size();   -- > checks size. size() is synchronized with other 
FileChannelImpl methods
        if (position > sz)
            return 0; --> This is what is returned in step 4 above in the first 
case. The size is smaller than the position requested. However, truncate can 
happen after this line.
        int icount = (int)Math.min(count, Integer.MAX_VALUE);
        if ((sz - position) < icount)
            icount = (int)(sz - position);

        long n;

        // Attempt a direct transfer, if the kernel supports it
        if ((n = transferToDirectly(position, icount, target)) >= 0) // the 
size check above could have been good above but at this point the size is 
smaller than the requested 
            return n;                                                           
      // position. transferToDirectly calls transferTo0 which could just read 
the zero bytes written by truncate.


Few open questions
1. Does truncate zero out the bytes synchronously or lazily? If it is lazy, we 
could also get junk bytes instead of zeros
2. How to fix it in kafka. One possible fix is to ensure that the MessageSet 
iterator throws invalid message when it encounters 0 byte size or if crc does 
not match the message. The follower can then try to refetch the offset for that 
topic partition or just fail (atleast we know the cause). 

    

   
                
> IndexOutOfBoundsException while fetching data from leader
> ---------------------------------------------------------
>
>                 Key: KAFKA-861
>                 URL: https://issues.apache.org/jira/browse/KAFKA-861
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Sriram Subramanian
>            Assignee: Sriram Subramanian
>             Fix For: 0.8
>
>
> 2013-04-09 16:36:50,051] ERROR [ReplicaFetcherThread-0-261], Error due to  
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for topic firehoseUpdates 
> partititon 14 offset 53531364
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:136)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
>         at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
>         at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113)
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> Caused by: java.lang.IndexOutOfBoundsException
>         at java.nio.Buffer.checkIndex(Buffer.java:512)
>         at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:121)
>         at kafka.message.Message.compressionCodec(Message.scala:202)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:174)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:197)
>         at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145)
>         at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
>         at scala.collection.IterableLike$class.isEmpty(IterableLike.scala:92)
>         at kafka.message.MessageSet.isEmpty(MessageSet.scala:67)
>         at 
> scala.collection.TraversableLike$class.lastOption(TraversableLike.scala:512)
>         at kafka.message.MessageSet.lastOption(MessageSet.scala:67)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to