[ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14195762#comment-14195762 ]
Honghai Chen commented on KAFKA-391: ------------------------------------ This situation happen under below scenario: one broker is leader for several partitions, for example 3, when send one messageset which has message for all of the 3 partitions of this broker , the response.status.size is 3 and the producerRequest.data.size is 1. then it hit this exception. Any idea for fix? Do we need compare response.status.size with messagesPerTopic.Count instead of producerRequest.data.size ? private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = { if(brokerId < 0) { warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(","))) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { val currentCorrelationId = correlationId.getAndIncrement val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks, config.requestTimeoutMs, messagesPerTopic) var failedTopicPartitions = Seq.empty[TopicAndPartition] try { val syncProducer = producerPool.getProducer(brokerId) debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) val response = syncProducer.send(producerRequest) debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) if(response != null) { if (response.status.size != producerRequest.data.size) throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) > Producer request and response classes should use maps > ----------------------------------------------------- > > Key: KAFKA-391 > URL: https://issues.apache.org/jira/browse/KAFKA-391 > Project: Kafka > Issue Type: Bug > Reporter: Joel Koshy > Assignee: Joel Koshy > Priority: Blocker > Labels: optimization > Fix For: 0.8.0 > > Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, > KAFKA-391-v3.patch, KAFKA-391-v4.patch > > > Producer response contains two arrays of error codes and offsets - the > ordering in these arrays correspond to the flattened ordering of the request > arrays. > It would be better to switch to maps in the request and response as this > would make the code clearer and more efficient (right now, linear scans are > used in handling producer acks). > We can probably do the same in the fetch request/response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)