[ 
https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luca Bruno updated KAFKA-3686:
------------------------------
    Description: 
*Setup*

I have a cluster of 3 kafka server, a topic with 12 partitions with replica 2, 
and a zookeeper cluster of 3 nodes.

Producer config:

{code}
 props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092");
 props.put("acks", "1");
 props.put("batch.size", 16384);
 props.put("retries", 3);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
{code}

Producer code:

{code}
 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 10; i++) {
     Future<RecordMetadata> f = producer.send(new ProducerRecord<String, 
String>("topic", null, Integer.toString(i)));
     f.get();
 }
{code}

*Problem*

Cut the network between the producer (p1) and one of the kafka servers (say k1).

The cluster is healthy, hence the kafka bootstrap tells the producer that there 
are 3 kafka servers (as I understood it), and the leaders of the partitions of 
the topic.

So the producer will send messages to all of the 3 leaders for each partition. 
If the leader happens to be k1 for a message, the producer raises the following 
exception after request.timeout.ms:

{code}
Exception in thread "main" java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Batch Expired
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at Test.main(Test.java:25)
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
{code}

In theory, the application should handle the failure. In practice, messages are 
getting lost, even though there are other 2 leaders available for writing.
I tried with values of acks both 1 and -1.

*What I expected*

Given the client library is automatically deciding the hashing / round robin 
schema for the partition, I would say it's not very important which partition 
the message is being sent to.
I expect the client library to handle the failure by sending the message to a 
partition of a different leader.

Neither kafka-clients nor rdkafka handle this failure. Given those are the main 
client libraries being used for kafka as far as I know, I find it a serious 
problem in terms of fault tolerance.

  was:
*Setup*

I have a cluster of 3 kafka server, a topic with 12 partitions with replica 2, 
and a zookeeper cluster of 3 nodes.

Producer config:

{code}
 props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092");
 props.put("acks", "1");
 props.put("batch.size", 16384);
 props.put("retries", 3);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
{code}

Producer code:

{code}
 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 10; i++) {
     Future<RecordMetadata> f = producer.send(new ProducerRecord<String, 
String>("topic", null, Integer.toString(i)));
     f.get();
 }
{code}

*Problem*

Cut the network between the producer (p1) and one of the kafka servers (say k1).

The cluster is healthy, hence the kafka bootstrap tells the producer that there 
are 3 kafka servers (as I understood it), and the leaders of the partitions of 
the topic.

So the producer will send messages to all of the 3 leaders for each partition. 
If the leader happens to be k1 for a message, the producer raises the following 
exception after request.timeout.ms:

{code}
Exception in thread "main" java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Batch Expired
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at Test.main(Test.java:25)
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
{code}

In theory, the application should handle the failure. In practice, messages are 
getting lost, even though there are other 2 leaders available for writing.
I tried with values of acks both 1 and -1.

*What I expected*

Given the client is automatically deciding the hashing / round robin schema for 
the partition, I would say it's not very important which partition the message 
is being sent to.
I expect the client library to handle the failure by sending the message to a 
partition of a different leader.

Neither kafka-clients nor rdkafka handle this failure. Given those are the main 
client libraries being used for kafka as far as I know, I find it a serious 
problem in terms of fault tolerance.


> Kafka producer is not fault tolerant
> ------------------------------------
>
>                 Key: KAFKA-3686
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3686
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.1
>            Reporter: Luca Bruno
>
> *Setup*
> I have a cluster of 3 kafka server, a topic with 12 partitions with replica 
> 2, and a zookeeper cluster of 3 nodes.
> Producer config:
> {code}
>  props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092");
>  props.put("acks", "1");
>  props.put("batch.size", 16384);
>  props.put("retries", 3);
>  props.put("buffer.memory", 33554432);
>  props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>  props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> {code}
> Producer code:
> {code}
>  Producer<String, String> producer = new KafkaProducer<>(props);
>  for(int i = 0; i < 10; i++) {
>      Future<RecordMetadata> f = producer.send(new ProducerRecord<String, 
> String>("topic", null, Integer.toString(i)));
>      f.get();
>  }
> {code}
> *Problem*
> Cut the network between the producer (p1) and one of the kafka servers (say 
> k1).
> The cluster is healthy, hence the kafka bootstrap tells the producer that 
> there are 3 kafka servers (as I understood it), and the leaders of the 
> partitions of the topic.
> So the producer will send messages to all of the 3 leaders for each 
> partition. If the leader happens to be k1 for a message, the producer raises 
> the following exception after request.timeout.ms:
> {code}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Batch Expired
>       at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
>       at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
>       at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>       at Test.main(Test.java:25)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
> {code}
> In theory, the application should handle the failure. In practice, messages 
> are getting lost, even though there are other 2 leaders available for writing.
> I tried with values of acks both 1 and -1.
> *What I expected*
> Given the client library is automatically deciding the hashing / round robin 
> schema for the partition, I would say it's not very important which partition 
> the message is being sent to.
> I expect the client library to handle the failure by sending the message to a 
> partition of a different leader.
> Neither kafka-clients nor rdkafka handle this failure. Given those are the 
> main client libraries being used for kafka as far as I know, I find it a 
> serious problem in terms of fault tolerance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to