I have published variation of this question before regarding 'old' publisher. I 
figured out how to use 'new' redesigned publisher and I have similar question. 
Basically, I want

#1) Detect when broker is down when using asynchronous publisher. If broker is 
down, I want to generate alert and stop publication.

#2) Most of all, I want to avoid message gaps when broker recovers - in other 
words (I do not want to loose messages when broker is down and silently resume 
publication when broker is up again).

Apparently, this is not possible with old publisher when asynchronous mode mode 
is used.

My test program is below. I do not see the way to detect when broker is down. I 
see errors published in the log, but my publisher keeps publishing without 
reporting any errors via the API. Once broker connection is up again, it will 
report exceptions for the messages that it failed to publish in the Callback. 
However, I suspect it would also publish new messages at the point when 
exception is reported, creating a 'gap'.

I there a way to address these concerns?

I would hate to use synch publisher - my publishing rate drops dramatically. 
From 300,000 m/s to 7,000 m/s.

package com.kcg.kafka.test;

import java.util.*;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class TestProducer {
    public static void main(String[] args) {
        System.setProperty("log4j.debug", "true");
        long events = Long.parseLong(args[0]);

        Properties props = new Properties();

        props.put(ProducerConfig.ACKS_CONFIG, "1");

        KafkaProducer<String,String> producer = new 

        long start = System.currentTimeMillis();

        long nEvents = 0;
        for (; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            final String key = Long.toString(nEvents);
            String msg = runtime + ": " + key + new String(new byte[300]);
            ProducerRecord<String,String> producerRecord = new 
ProducerRecord<String,String>("test", key, msg);

            try {
                System.out.println("Sending " + key);

                // 'send' will not throw an exception when broker is down.
                        new Callback() {
                            public void onCompletion(RecordMetadata metadata, 
Exception e) {
                                if(e != null)


                                   // Stop publisher. But is there guarantee 
that no more messages are published after this one?

                                    System.out.println("The offset of the 
record we just sent is: " + metadata.offset() + " " + key);
                });//.get(20, TimeUnit.SECONDS);
            catch (Throwable e) {

            if (nEvents % 10000 == 0) {
                System.out.println("" + key);

            try {
            catch (InterruptedException e) {

        long duration = (System.currentTimeMillis() - start);
        System.out.println("Published " + nEvents + " messages in " + duration 
+ "ms. " + (int)((double)nEvents/((double)duration / 1000.0)) + " m/s.");


Sending 0
The offset of the record we just sent is: 12 0
2015-11-10 10:23:25 WARN  Selector:276 - Error in I/O with 
    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)


Sending 10

