Neha Narkhede created KAFKA-669:
-----------------------------------

             Summary: Kafka server sends producer response with an error code 
for successfully completed produce requests leading to spurious 
FailedToSendMessageException on the producer
                 Key: KAFKA-669
                 URL: https://issues.apache.org/jira/browse/KAFKA-669
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.8
            Reporter: Neha Narkhede
            Priority: Blocker
             Fix For: 0.8


I setup a test cluster of 5 brokers and setup migration tool to send data for 
~1000 partitions to this cluster. I see  a bunch of 
FailedToSendMessageExceptions on the producer. After tracing some of these 
failed requests, I found that the broker successfully completed the requests, 
yet sends a ProducerResponseStatus(-1, -1) to the producer. The impact is the 
producer marks the messages for those partitions as failed, retries and 
eventually gives up. This seems like a bug on the broker. 

The producer's log says -

[2012-12-11 23:00:22,179] DEBUG Producer sending messages with correlation id 
60626 for topics Map([CommGenericMessage
SendEvent,1] -> ByteBufferMessageSet(MessageAndOffset(Message(magic = 2, 
attributes = 0, crc = 2412809952, key = null,
 payload = java.nio.HeapByteBuffer[pos=0 lim=159 cap=159]),0), )) to broker 3 
on 172.20.72.46:9092 (kafka.producer.async.DefaultEventHandler)
[2012-12-11 23:00:22,179] INFO Connected to 172.20.72.46:9092 for producing 
(kafka.producer.SyncProducer)
[2012-12-11 23:00:22,185] DEBUG Producer sent messages for topics 
Map([CommGenericMessageSendEvent,1] -> 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 2, attributes = 0, crc = 
2412809952, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=159 
cap=159]),0), )) to broker 3 on 
(kafka.producer.async.DefaultEventHandler)
[2012-12-11 23:00:22,185] DEBUG Produce request with correlation id 60626 
failed due to response ProducerResponse(0,60
626,Map([CommGenericMessageSendEvent,1] -> ProducerResponseStatus(-1,-1))). 
List of failed topic partitions is [CommGe
nericMessageSendEvent,1] (kafka.producer.async.DefaultEventHandler)
[2012-12-11 23:00:22,285] INFO Fetching metadata with correlation id 60627 for 
1 topic(s) Set(CommGenericMessageSendEvent) (kafka.client.ClientUtils$)
[2012-12-11 23:00:22,312] ERROR Failed to send the following requests with 
correlation id 60627: 
ArrayBuffer(KeyedMessage(CommGenericMessageSendEvent,null,Message(magic = 2, 
attributes = 0, crc = 2412809952, key = null, payload = java.n
io.HeapByteBuffer[pos=0 lim=159 cap=159]))) 
(kafka.producer.async.DefaultEventHandler)
[2012-12-11 23:00:22,314] ERROR Error in handling batch of 200 events 
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)


Tracing produce request with correlation id 60626 on broker 3 -


[2012-12-11 23:00:22,180] TRACE Received request with correlation id 60626 from 
client : ProducerRequest(0,60626,,0,3000,Map([CommGenericMessageSendEvent,1] -> 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 2, attributes = 0, crc = 
2412809952, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=159 
cap=159]),0), ))) (kafka.network.RequestChannel$)
[2012-12-11 23:00:22,180] TRACE Handling ProducerRequest with correlation id 
60626 from client : 
ProducerRequest(0,60626,,0,3000,Map([CommGenericMessageSendEvent,1] -> 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 2, attributes = 0, crc = 
2412809952, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=159 
cap=159]),0), ))) (kafka.request.logger)
[2012-12-11 23:00:22,184] TRACE Completed request with correlation id 60626 and 
client : ProducerRequest(0,60626,,0,3000,Map([CommGenericMessageSendEvent,1] -> 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 2, attributes = 0, crc = 
2412809952, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=159 
cap=159]),0), ))), totalTime:4, queueTime:0, localTime:4, remoteTime:0, 
sendTime:0 (kafka.network.RequestChannel$)


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to