Thanks Guozhang, Another case I am seeing is that producer.send() seems to block when the brokers are unavailable. This is not the behavior I want (I would rather have it throw an exception immediately so I can queue the messages for replay). I will try to confirm this tomorrow.
-Steve On Sep 9, 2014, at 8:55 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Steve: > > 1. the new producer will be included in the 0.8.2 release for production > usage. > 2. The error you reported has been changed as a WARN in the latest trunk. > We realize this should not really be an error case. In general, any errors > will be propagated to the producer in the following two ways: > > a. producer.send() can throw ApiException and runtime exceptions, > indicating the sending is not triggered in the underlying sending thread. > b. producer.send().get() can throw KafkaException such as leader not find, > indicating the sending response from the broker says it has failed > receiving the message. > > Guozhang > > > On Mon, Sep 8, 2014 at 8:16 AM, Tarzia <star...@signal.co> wrote: > >> Hello, >> >> I am trying to use the new org.apache.kafka.clients.producer.KafkaProducer >> to take advantage of error reporting that is lacking in the current >> "stable" Scala client (import kafka.javaapi.producer.Producer). Two >> questions: >> >> * I know that 0.8.2 is not yet released but is the new Producer >> feature-complete and ready for testing? >> >> * If so, how should I check for errors in KafkaProducer#send()? In my >> tests I brought down the Kafka sever and hoped to detect errors in the >> producer so that I could respond by re-queueing failed requests. However, >> I was not getting any exceptions on KafkaProducer#send(), instead I got an >> exception inside the producer Thread: >> >> WARN org.apache.kafka.common.network.Selector - Error in I/O with >> localhost.localdomain/127.0.0.1 >> java.net.ConnectException: Connection refused >> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >> at >> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) >> at org.apache.kafka.common.network.Selector.poll(Selector.java:232) >> at >> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) >> at >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) >> at >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) >> at java.lang.Thread.run(Thread.java:744) >> >> Should this be bubbling up the the send() method, or should there be a >> getError() method in the RecordMetadata that is asynchronously returned? >> Basically, I don't understand the error-reporting API. >> >> Thanks, >> Steve Tarzia > > > > > -- > -- Guozhang