I have published variation of this question before regarding 'old' publisher. I figured out how to use 'new' redesigned publisher and I have similar question. Basically, I want
#1) Detect when broker is down when using asynchronous publisher. If broker is down, I want to generate alert and stop publication. #2) Most of all, I want to avoid message gaps when broker recovers - in other words (I do not want to loose messages when broker is down and silently resume publication when broker is up again). Apparently, this is not possible with old publisher when asynchronous mode mode is used. My test program is below. I do not see the way to detect when broker is down. I see errors published in the log, but my publisher keeps publishing without reporting any errors via the API. Once broker connection is up again, it will report exceptions for the messages that it failed to publish in the Callback. However, I suspect it would also publish new messages at the point when exception is reported, creating a 'gap'. I there a way to address these concerns? I would hate to use synch publisher - my publishing rate drops dramatically. From 300,000 m/s to 7,000 m/s. package com.kcg.kafka.test; import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; public class TestProducer { public static void main(String[] args) { System.setProperty("log4j.debug", "true"); long events = Long.parseLong(args[0]); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"cno-d-igoberman2:9092"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "1"); KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props); long start = System.currentTimeMillis(); long nEvents = 0; for (; nEvents < events; nEvents++) { long runtime = new Date().getTime(); final String key = Long.toString(nEvents); String msg = runtime + ": " + key + new String(new byte[300]); ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("test", key, msg); try { System.out.println("Sending " + key); // 'send' will not throw an exception when broker is down. producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); // Stop publisher. But is there guarantee that no more messages are published after this one? } else System.out.println("The offset of the record we just sent is: " + metadata.offset() + " " + key); } });//.get(20, TimeUnit.SECONDS); } catch (Throwable e) { e.printStackTrace(); } if (nEvents % 10000 == 0) { System.out.println("" + key); } try { Thread.sleep(1000); } catch (InterruptedException e) { } } long duration = (System.currentTimeMillis() - start); System.out.println("Published " + nEvents + " messages in " + duration + "ms. " + (int)((double)nEvents/((double)duration / 1000.0)) + " m/s."); producer.close(); } } log4j: Trying to find [log4j.xml] using context classloader sun.misc.Launcher$AppClassLoader@6da21389. log4j: Trying to find [log4j.xml] using sun.misc.Launcher$AppClassLoader@6da21389 class loader. log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource(). log4j: Trying to find [log4j.properties] using context classloader sun.misc.Launcher$AppClassLoader@6da21389. log4j: Using URL [file:/home/apprun/kafkatest/TestProducer/bin/log4j.properties] for automatic log4j configuration. log4j: Reading configuration from URL file:/home/apprun/kafkatest/TestProducer/bin/log4j.properties log4j: Parsing for [root] with value=[INFO, stdout]. log4j: Level token is [INFO]. log4j: Category root set to INFO log4j: Parsing appender named "stdout". log4j: Parsing layout options for "stdout". log4j: Setting property [conversionPattern] to [%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n]. log4j: End of parsing for "stdout". log4j: Setting property [target] to [System.out]. log4j: Parsed "stdout" options. log4j: Finished configuring. 2015-11-10 10:23:16 INFO ProducerConfig:113 - ProducerConfig values: value.serializer = class org.apache.kafka.common.serialization.StringSerializer key.serializer = class org.apache.kafka.common.serialization.StringSerializer block.on.buffer.full = true retry.backoff.ms = 100 buffer.memory = 33554432 batch.size = 16384 metrics.sample.window.ms = 30000 metadata.max.age.ms = 300000 receive.buffer.bytes = 32768 timeout.ms = 30000 max.in.flight.requests.per.connection = 5 bootstrap.servers = [cno-d-igoberman2:9092] metric.reporters = [] client.id = compression.type = none retries = 0 max.request.size = 1048576 send.buffer.bytes = 131072 acks = 1 reconnect.backoff.ms = 10 linger.ms = 0 metrics.num.samples = 2 metadata.fetch.timeout.ms = 60000 Sending 0 0 The offset of the record we just sent is: 12 0 Sending 1 The offset of the record we just sent is: 13 1 Sending 2 The offset of the record we just sent is: 14 2 Sending 3 The offset of the record we just sent is: 15 3 Sending 4 The offset of the record we just sent is: 16 4 Sending 5 The offset of the record we just sent is: 17 5 Sending 6 The offset of the record we just sent is: 18 6 Sending 7 The offset of the record we just sent is: 19 7 Sending 8 The offset of the record we just sent is: 20 8 Sending 9 The offset of the record we just sent is: 21 9 2015-11-10 10:23:25 WARN Selector:276 - Error in I/O with cno-d-igoberman2/10.83.55.13 java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745) .... Sending 10 This e-mail and its attachments are intended only for the individual or entity to whom it is addressed and may contain information that is confidential, privileged, inside information, or subject to other restrictions on use or disclosure. Any unauthorized use, dissemination or copying of this transmission or the information in it is prohibited and may be unlawful. If you have received this transmission in error, please notify the sender immediately by return e-mail, and permanently delete or destroy this e-mail, any attachments, and all copies (digital or paper). Unless expressly stated in this e-mail, nothing in this message should be construed as a digital or electronic signature. For additional important disclaimers and disclosures regarding KCG's products and services, please click on the following link: http://www.kcg.com/legal/global-disclosures