Di Shang created KAFKA-4515:
-------------------------------

             Summary: Async producer send not retrying on TimeoutException: 
Batch Expired
                 Key: KAFKA-4515
                 URL: https://issues.apache.org/jira/browse/KAFKA-4515
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 0.9.0.1
            Reporter: Di Shang


We are testing out broker failure resiliency, we have a cluster of 3 brokers, a 
topic with 5 partitions and 2 replicas. We use this code to continuously send 
msg and then kill one of the brokers to see if we lost any msg. 

{code:title=MyTest.java|borderStyle=solid}
    static volatile KafkaProducer<Void, String> producer;

    public static void send(ProducerRecord<Void, String> record) {
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                // handle exception with manual retry
                System.out.println("Error, resending...");
                exception.printStackTrace();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //send(record); // without this retry, msg would be lost
            } else if (metadata != null) {
                System.out.println("Sent " + record);
            } else {
                System.out.println("No exception and no metadata");
            }
        });
    }

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "...");
        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
        props.put("retries", "100000");
        props.put("acks", "1");
        props.put("request.timeout.ms", "1000");
        producer = new KafkaProducer<>(props);

        Long i = 1L;
        while (true) {
            ProducerRecord<Void, String> record =
                new ProducerRecord<>("my-topic", i.toString());

            send(record);

            Thread.sleep(100);
            i++;
        }
    }
{code}

What we found is that when we set *request.timeout.ms* to a small value like 
1000, then when we kill a broker we would get a few TimeoutException: Batch 
Expired errors in the send() callback. And if we don't handle this by explicit 
retry like in the above code, then we would lose those msg. 

The documentation for *request.timeout.ms* says:
bq. The configuration controls the maximum amount of time the client will wait 
for the response of a request. If the response is not received before the 
timeout elapses the client will resend the request if necessary or fail the 
request if retries are exhausted.

This makes me think that a TimeoutException should be implicitly retried using 
the *retries* options, which doesn't seem to work. 

Strangely we also noticed that if *request.timeout.ms* is set long enough like 
the default 30000, then we don't lose any msg when killing a broker even if we 
set *retries* to 0. 

So it seems to me that the *retries* option is not working regarding to broker 
down scenario. There seems to be some other internal mechanism for handling 
broker failure and msg retry, and this mechanism won't work if there is 
TimeoutException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to