I am trying to detect connection loss in my Kafka 0.9 Consumer and Producer
so that I can check if I should update any properties (update passwords or
truststore locations by restarting the client with new configs if they have
been provided).

I am having trouble finding good ways to detect if I have lost connection.
In the KafkaProducer, I am using the Callback() to look for
TimeoutExceptions (assuming that acks=1 or all).

Here is an approximation of the send code:

producer.send(new ProducerRecord<String, String>(topic ,key, message),
              new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if(e != null){
                    e.printStackTrace();
                    if
(e.getClass().getName().equalsIgnoreCase(TIMEOUT_EXCEPTION))
                    checkForNewProperties(); //will probably just throw the
exception in this case
                }
            }
        });


I'm not as far on how to do this with the consumer. I have seen a
KafkaConsumer info-level log message that states "An exception was thrown"
that seemed related to connection, but the client didn't seem to actually
throw any exceptions.

If anyone has any ideas, recommendations, or doc that I may have missed,
that would be great. I will post the solution that I end up coming up with.
Thanks for the help!

Using Kafka 0.9.0.0 clients, Kafka 0.9.1.0 broker.

Reply via email to