As part of testing v0.9 Kafka at least once guarantees, we tried disconnecting Producer network and found that retries=10000000 are not happening. We get a
WARN kafka-producer-network-thread | producer-1 [.kafka.clients.producer.internals.Sender] - Got error produce response with correlation id 6474 on topic-partition test-topic-3-100-38, retrying (9999999 attempts left). Error: NETWORK_EXCEPTION And org.apache.kafka.common.errors.TimeoutException: Batch Expired When we tried debugging by putting a breakpoint in Accumulator and BatchRecord classes to stop when batch.attempts > 1 and it never stops beyond a value of 1 where the batch is reenqueued and although canRetry() always returns true. Is there a better way to debug this? clients.producer.internals.Sender.completeBatch(RecordBatch, Errors, long, long, long) The producer decides to skip messages when there is a network issue and was also verified by checking topic message counts. Also, the only option in an Async send is a callback on completion where even the recordmetadata is empty as expected because there was no server communication but how do we get the record itself after all the retries have happened so that nothing is lost? reconnect.backoff.ms = 100 retry.backoff.ms = 100 buffer.memory = 33554432 timeout.ms = 30000 connections.max.idle.ms = 540000 max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 request.timeout.ms = 5000 acks = 1 batch.size = 16384 receive.buffer.bytes = 32768 retries = 10000000 <<<<<<<<<<<<<<<< max.request.size = 1048576 metrics.sample.window.ms = 30000 send.buffer.bytes = 131072 linger.ms = 10 /* * Produce a record without waiting for server. This includes a callback * that will print an error if something goes wrong */ public static void produceAsync(Producer<String, String> producer, String topic, String key, String value) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value); producer.send(record, new DemoProducerCallback()); } public static class DemoProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.out.println("Error producing to topic " + ((recordMetadata != null) ? recordMetadata.topic() : "")); e.printStackTrace(); } } }