I would suggest to use the new client java producer in 0.8.2-beta
http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
it handles the case you brought up (among lots of other goodies).

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/

On Thu, Dec 18, 2014 at 4:27 PM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:
>
> Hello,
>
> I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler
> file. Below is the dispatchSerializedData function. Looks like we are
> catching exception outside the loop and purely logs an error message.
> We then return failedProduceRequests.
>
> In case one broker is having problem, messages that will be sent to
> brokers after the problematic broker will NOT be included in the
> failedTopicAndPartitions and will be ignored quietly. Is this correct?
> Shall we change the code to catch exception for sending message to
> each broker?
>
> Thanks
>
> private def dispatchSerializedData(messages:
> Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
>   val partitionedDataOpt = partitionAndCollate(messages)
>   partitionedDataOpt match {
>     case Some(partitionedData) =>
>       val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
>       try {
>
> *      for ((brokerid, messagesPerBrokerMap) <- partitionedData) { *
>       if (logger.isTraceEnabled)
>             messagesPerBrokerMap.foreach(partitionAndEvent =>
>               trace("Handling event for Topic: %s, Broker: %d,
> Partitions: %s".format(partitionAndEvent._1, brokerid,
> partitionAndEvent._2)))
>           val messageSetPerBroker =
> groupMessagesToSet(messagesPerBrokerMap)
>
>           val failedTopicPartitions = send(brokerid, messageSetPerBroker)
>           failedTopicPartitions.foreach(topicPartition => {
>             messagesPerBrokerMap.get(topicPartition) match {
>               case Some(data) => failedProduceRequests.appendAll(data)
>               case None => // nothing
>             }
>           })
>         }
>
>
>
> *   } catch {        case t: Throwable => error("Failed to send
> messages", t)      }  *    failedProduceRequests
>     case None => // all produce requests failed
>       messages
>   }
> }
>

Reply via email to