I would suggest to use the new client java producer in 0.8.2-beta http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html it handles the case you brought up (among lots of other goodies).
/******************************************* Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> ********************************************/ On Thu, Dec 18, 2014 at 4:27 PM, Xiaoyu Wang <xw...@rocketfuel.com> wrote: > > Hello, > > I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler > file. Below is the dispatchSerializedData function. Looks like we are > catching exception outside the loop and purely logs an error message. > We then return failedProduceRequests. > > In case one broker is having problem, messages that will be sent to > brokers after the problematic broker will NOT be included in the > failedTopicAndPartitions and will be ignored quietly. Is this correct? > Shall we change the code to catch exception for sending message to > each broker? > > Thanks > > private def dispatchSerializedData(messages: > Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = { > val partitionedDataOpt = partitionAndCollate(messages) > partitionedDataOpt match { > case Some(partitionedData) => > val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] > try { > > * for ((brokerid, messagesPerBrokerMap) <- partitionedData) { * > if (logger.isTraceEnabled) > messagesPerBrokerMap.foreach(partitionAndEvent => > trace("Handling event for Topic: %s, Broker: %d, > Partitions: %s".format(partitionAndEvent._1, brokerid, > partitionAndEvent._2))) > val messageSetPerBroker = > groupMessagesToSet(messagesPerBrokerMap) > > val failedTopicPartitions = send(brokerid, messageSetPerBroker) > failedTopicPartitions.foreach(topicPartition => { > messagesPerBrokerMap.get(topicPartition) match { > case Some(data) => failedProduceRequests.appendAll(data) > case None => // nothing > } > }) > } > > > > * } catch { case t: Throwable => error("Failed to send > messages", t) } * failedProduceRequests > case None => // all produce requests failed > messages > } > } >