As part of testing v0.9 Kafka at least once guarantees, we tried
disconnecting Producer network and found that retries=10000000 are not
happening. We get a

WARN  kafka-producer-network-thread | producer-1
[.kafka.clients.producer.internals.Sender]  - Got error produce response
with correlation id 6474 on topic-partition test-topic-3-100-38, retrying
(9999999 attempts left). Error: NETWORK_EXCEPTION

And

org.apache.kafka.common.errors.TimeoutException: Batch Expired

When we tried debugging by putting a breakpoint in Accumulator and
BatchRecord classes to stop when batch.attempts > 1 and it never stops
beyond a value of 1 where the batch is reenqueued and although canRetry()
always returns true. Is there a better way to debug this?
clients.producer.internals.Sender.completeBatch(RecordBatch, Errors, long,
long, long)

The producer decides to skip messages when there is a network issue and was
also verified by checking topic message counts.

Also, the only option in an Async send is a callback on completion where
even the recordmetadata is empty as expected because there was no server
communication but how do we get the record itself after all the retries
have happened so that nothing is lost?
reconnect.backoff.ms = 100

retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 30000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
request.timeout.ms = 5000
acks = 1
batch.size = 16384
receive.buffer.bytes = 32768
retries = 10000000 <<<<<<<<<<<<<<<<
max.request.size = 1048576
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
linger.ms = 10
    /*
     * Produce a record without waiting for server. This includes a callback
     * that will print an error if something goes wrong
     */
    public static void produceAsync(Producer<String, String> producer,
String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<String,
String>(topic, value);
        producer.send(record, new DemoProducerCallback());
    }

    public static class DemoProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception
e) {
            if (e != null) {
                System.out.println("Error producing to topic " +
                            ((recordMetadata != null) ?
recordMetadata.topic() : ""));
                e.printStackTrace();
            }
        }
    }

Reply via email to