Apologise in advance if it's a dummy or common question but as usual I couldn't yet find the answer anywhere to it.
How can we setup some kind of handler to catch async errors? Let's say something as simple as this: final Properties props = new Properties(); props.put("metadata.broker.list", "something:12345"); props.put("producer.type", "async"); final ProducerConfig config = new ProducerConfig(props); final Producer<String, String> producer = new Producer<String, String>(config); while (true) { final KeyedMessage<String, String> data = new KeyedMessage<>("test", "foo", line); final List<KeyedMessage<String, String>> l = Arrays.asList(data); producer.send(JavaConversions.asBuffer(l).toList()); } Using sync, the producer.send does throw an exception if something goes wrong. Using async, I can only see on the logs something like this: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:127) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) Albeit in this case one can assume that the host is correct and it's down so the code should keep running until the server gets back on, my issue was related to the lack of log4j properties ;) and seeing this code producing messages without anything reaching the server. For example by setting the right server, and then enabling snappy compression, I was impressed that the msg/sec did go up to amazing values, until I noticed that no message was going out at all. Now that I had the intelligence of putting the log4j.properties file, now I can see the real reason: java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream at kafka.message.ByteBufferMessageSet$.kafka$message$ByteBufferMessageSet$$crea te(ByteBufferMessageSet.scala:41) at kafka.message.ByteBufferMessageSet.<init>(ByteBufferMessageSet.scala:102) at kafka.producer.async.DefaultEventHandler$$anonfun$7.apply(DefaultEventHandle r.scala:301) So coming back to the core question, is there a way to programatically know what is going on if such errors occur, via some callback? What I understood so far was that there were some callbacks on 0.7 that are gone on 0.8, so this change is causing me confusion reading the documentation and searching on google. thanks in advance
signature.asc
Description: Message signed with OpenPGP using GPGMail