Apologise in advance if it's a dummy or common question but as usual I couldn't 
yet find the answer anywhere to it.

How can we setup some kind of handler to catch async errors?

Let's say something as simple as this:

        final Properties props = new Properties();
        props.put("metadata.broker.list", "something:12345");
        props.put("producer.type", "async");
        final ProducerConfig config = new ProducerConfig(props);
        final Producer<String, String> producer = new Producer<String, 
String>(config);
        while (true) {
final KeyedMessage<String, String> data = new KeyedMessage<>("test", "foo", 
line);
            final List<KeyedMessage<String, String>> l = Arrays.asList(data);
            producer.send(JavaConversions.asBuffer(l).toList());
        }

Using sync, the producer.send does throw an exception if something goes wrong.

Using async, I can only see on the logs something like this:

java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:127)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)

Albeit in this case one can assume that the host is correct and it's down so 
the code should keep running until the server gets back on, my issue was 
related to the lack of log4j properties ;) and seeing this code producing 
messages without anything reaching the server.

For example by setting the right server, and then enabling snappy compression, 
I was impressed that the msg/sec did go up to amazing values, until I noticed 
that no message was going out at all. Now that I had the intelligence of 
putting the log4j.properties file, now I can see the real reason:


java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream
        at 
kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$crea
te(ByteBufferMessageSet.scala:41)
        at 
kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandle
r.scala:301)


So coming back to the core question, is there a way to programatically know 
what is going on if such errors occur, via some callback? What I understood so 
far was that there were some callbacks on 0.7 that are gone on 0.8, so this 
change is causing me confusion reading the documentation and searching on 
google.

thanks in advance

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to