Kafka never had a callback for the async producer yet. But this is proposed
for Kafka 0.9. You can find the proposal here -
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI

Thanks,
Neha
On Oct 7, 2013 4:52 AM, "Bruno D. Rodrigues" <bruno.rodrig...@litux.org>
wrote:

> Apologise in advance if it's a dummy or common question but as usual I
> couldn't yet find the answer anywhere to it.
>
> How can we setup some kind of handler to catch async errors?
>
> Let's say something as simple as this:
>
>         final Properties props = new Properties();
>         props.put("metadata.broker.list", "something:12345");
>         props.put("producer.type", "async");
>         final ProducerConfig config = new ProducerConfig(props);
>         final Producer<String, String> producer = new Producer<String,
> String>(config);
>         while (true) {
> final KeyedMessage<String, String> data = new KeyedMessage<>("test", "foo",
> line);
>             final List<KeyedMessage<String, String>> l =
> Arrays.asList(data);
>             producer.send(JavaConversions.asBuffer(l).toList());
>         }
>
> Using sync, the producer.send does throw an exception if something goes
> wrong.
>
> Using async, I can only see on the logs something like this:
>
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:127)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>
> Albeit in this case one can assume that the host is correct and it's down
> so the code should keep running until the server gets back on, my issue was
> related to the lack of log4j properties ;) and seeing this code producing
> messages without anything reaching the server.
>
> For example by setting the right server, and then enabling snappy
> compression, I was impressed that the msg/sec did go up to amazing values,
> until I noticed that no message was going out at all. Now that I had the
> intelligence of putting the log4j.properties file, now I can see the real
> reason:
>
>
> java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream
> at
> kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$crea
> te(ByteBufferMessageSet.scala:41)
> at
> kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandle
> r.scala:301)
>
>
> So coming back to the core question, is there a way to programatically
> know what is going on if such errors occur, via some callback? What I
> understood so far was that there were some callbacks on 0.7 that are gone
> on 0.8, so this change is causing me confusion reading the documentation
> and searching on google.
>
> thanks in advance
>

Reply via email to