Hello, I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler file. Below is the dispatchSerializedData function. Looks like we are catching exception outside the loop and purely logs an error message. We then return failedProduceRequests.
In case one broker is having problem, messages that will be sent to brokers after the problematic broker will NOT be included in the failedTopicAndPartitions and will be ignored quietly. Is this correct? Shall we change the code to catch exception for sending message to each broker? Thanks private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = { val partitionedDataOpt = partitionAndCollate(messages) partitionedDataOpt match { case Some(partitionedData) => val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] try { * for ((brokerid, messagesPerBrokerMap) <- partitionedData) { * if (logger.isTraceEnabled) messagesPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap) val failedTopicPartitions = send(brokerid, messageSetPerBroker) failedTopicPartitions.foreach(topicPartition => { messagesPerBrokerMap.get(topicPartition) match { case Some(data) => failedProduceRequests.appendAll(data) case None => // nothing } }) } * } catch { case t: Throwable => error("Failed to send messages", t) } * failedProduceRequests case None => // all produce requests failed messages } }