handle() will throw the exception to the caller. In sync mode, client will receive the exception, but in async mode, there's an independent thread actually doing the sending. It's not easy to inform the caller except for blocking the queue. In practice, you can configure the retry times, and each time sendPartitionPerTopicCache will be cleared, and the messages might thus go to another available broker if you do not specify the partition key(partition key and partition class might decide the message might go to the same partition even it failed)
how handle() throw exception: if(outstandingProduceRequests.size > 0) { producerStats.failedSendRate.mark() val correlationIdEnd = correlationId.get() error("Failed to send requests for topics %s with correlation ids in [%d,%d]" .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","), correlationIdStart, correlationIdEnd-1)) throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null) } On Fri, Dec 19, 2014 at 5:22 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote: > Hello, > > I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler > file. Below is the dispatchSerializedData function. Looks like we are > catching exception outside the loop and purely logs an error message. > We then return failedProduceRequests. > > In case one broker is having problem, messages that will be sent to > brokers after the problematic broker will NOT be included in the > failedTopicAndPartitions and will be ignored quietly. Is this correct? > Shall we change the code to catch exception for sending message to > each broker? > > Thanks > > private def dispatchSerializedData(messages: > Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = { > val partitionedDataOpt = partitionAndCollate(messages) > partitionedDataOpt match { > case Some(partitionedData) => > val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] > try { > > * for ((brokerid, messagesPerBrokerMap) <- partitionedData) { * > if (logger.isTraceEnabled) > messagesPerBrokerMap.foreach(partitionAndEvent => > trace("Handling event for Topic: %s, Broker: %d, > Partitions: %s".format(partitionAndEvent._1, brokerid, > partitionAndEvent._2))) > val messageSetPerBroker = > groupMessagesToSet(messagesPerBrokerMap) > > val failedTopicPartitions = send(brokerid, messageSetPerBroker) > failedTopicPartitions.foreach(topicPartition => { > messagesPerBrokerMap.get(topicPartition) match { > case Some(data) => failedProduceRequests.appendAll(data) > case None => // nothing > } > }) > } > > > > * } catch { case t: Throwable => error("Failed to send > messages", t) } * failedProduceRequests > case None => // all produce requests failed > messages > } > } > -- have a good day! chenshang'an