Kafka never had a callback for the async producer yet. But this is proposed for Kafka 0.9. You can find the proposal here - https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI
Thanks, Neha On Oct 7, 2013 4:52 AM, "Bruno D. Rodrigues" <bruno.rodrig...@litux.org> wrote: > Apologise in advance if it's a dummy or common question but as usual I > couldn't yet find the answer anywhere to it. > > How can we setup some kind of handler to catch async errors? > > Let's say something as simple as this: > > final Properties props = new Properties(); > props.put("metadata.broker.list", "something:12345"); > props.put("producer.type", "async"); > final ProducerConfig config = new ProducerConfig(props); > final Producer<String, String> producer = new Producer<String, > String>(config); > while (true) { > final KeyedMessage<String, String> data = new KeyedMessage<>("test", "foo", > line); > final List<KeyedMessage<String, String>> l = > Arrays.asList(data); > producer.send(JavaConversions.asBuffer(l).toList()); > } > > Using sync, the producer.send does throw an exception if something goes > wrong. > > Using async, I can only see on the logs something like this: > > java.nio.channels.UnresolvedAddressException > at sun.nio.ch.Net.checkAddress(Net.java:127) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > Albeit in this case one can assume that the host is correct and it's down > so the code should keep running until the server gets back on, my issue was > related to the lack of log4j properties ;) and seeing this code producing > messages without anything reaching the server. > > For example by setting the right server, and then enabling snappy > compression, I was impressed that the msg/sec did go up to amazing values, > until I noticed that no message was going out at all. Now that I had the > intelligence of putting the log4j.properties file, now I can see the real > reason: > > > java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream > at > kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$crea > te(ByteBufferMessageSet.scala:41) > at > kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102) > at > kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandle > r.scala:301) > > > So coming back to the core question, is there a way to programatically > know what is going on if such errors occur, via some callback? What I > understood so far was that there were some callbacks on 0.7 that are gone > on 0.8, so this change is causing me confusion reading the documentation > and searching on google. > > thanks in advance >