[ https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luca Bruno updated KAFKA-3686: ------------------------------ Description: *Setup* I have a cluster of 3 kafka server, a topic with 12 partitions with replica 2, and a zookeeper cluster of 3 nodes. Producer config: {code} props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092"); props.put("acks", "1"); props.put("batch.size", 16384); props.put("retries", 3); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); {code} Producer code: {code} Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 10; i++) { Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>("topic", null, Integer.toString(i))); f.get(); } {code} *Problem* Cut the network between the producer (p1) and one of the kafka servers (say k1). The cluster is healthy, hence the kafka bootstrap tells the producer that there are 3 kafka servers (as I understood it), and the leaders of the partitions of the topic. So the producer will send messages to all of the 3 leaders for each partition. If the leader happens to be k1 for a message, the producer raises the following exception after request.timeout.ms: {code} Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at Test.main(Test.java:25) Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired {code} In theory, the application should handle the failure. In practice, messages are getting lost, even though there are other 2 leaders available for writing. I tried with values of acks both 1 and -1. *What I expected* Given the client library is automatically deciding the hashing / round robin schema for the partition, I would say it's not very important which partition the message is being sent to. I expect the client library to handle the failure by sending the message to a partition of a different leader. Neither kafka-clients nor rdkafka handle this failure. Given those are the main client libraries being used for kafka as far as I know, I find it a serious problem in terms of fault tolerance. was: *Setup* I have a cluster of 3 kafka server, a topic with 12 partitions with replica 2, and a zookeeper cluster of 3 nodes. Producer config: {code} props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092"); props.put("acks", "1"); props.put("batch.size", 16384); props.put("retries", 3); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); {code} Producer code: {code} Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 10; i++) { Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>("topic", null, Integer.toString(i))); f.get(); } {code} *Problem* Cut the network between the producer (p1) and one of the kafka servers (say k1). The cluster is healthy, hence the kafka bootstrap tells the producer that there are 3 kafka servers (as I understood it), and the leaders of the partitions of the topic. So the producer will send messages to all of the 3 leaders for each partition. If the leader happens to be k1 for a message, the producer raises the following exception after request.timeout.ms: {code} Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at Test.main(Test.java:25) Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired {code} In theory, the application should handle the failure. In practice, messages are getting lost, even though there are other 2 leaders available for writing. I tried with values of acks both 1 and -1. *What I expected* Given the client is automatically deciding the hashing / round robin schema for the partition, I would say it's not very important which partition the message is being sent to. I expect the client library to handle the failure by sending the message to a partition of a different leader. Neither kafka-clients nor rdkafka handle this failure. Given those are the main client libraries being used for kafka as far as I know, I find it a serious problem in terms of fault tolerance. > Kafka producer is not fault tolerant > ------------------------------------ > > Key: KAFKA-3686 > URL: https://issues.apache.org/jira/browse/KAFKA-3686 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.9.0.1 > Reporter: Luca Bruno > > *Setup* > I have a cluster of 3 kafka server, a topic with 12 partitions with replica > 2, and a zookeeper cluster of 3 nodes. > Producer config: > {code} > props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092"); > props.put("acks", "1"); > props.put("batch.size", 16384); > props.put("retries", 3); > props.put("buffer.memory", 33554432); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > {code} > Producer code: > {code} > Producer<String, String> producer = new KafkaProducer<>(props); > for(int i = 0; i < 10; i++) { > Future<RecordMetadata> f = producer.send(new ProducerRecord<String, > String>("topic", null, Integer.toString(i))); > f.get(); > } > {code} > *Problem* > Cut the network between the producer (p1) and one of the kafka servers (say > k1). > The cluster is healthy, hence the kafka bootstrap tells the producer that > there are 3 kafka servers (as I understood it), and the leaders of the > partitions of the topic. > So the producer will send messages to all of the 3 leaders for each > partition. If the leader happens to be k1 for a message, the producer raises > the following exception after request.timeout.ms: > {code} > Exception in thread "main" java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Batch Expired > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) > at Test.main(Test.java:25) > Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired > {code} > In theory, the application should handle the failure. In practice, messages > are getting lost, even though there are other 2 leaders available for writing. > I tried with values of acks both 1 and -1. > *What I expected* > Given the client library is automatically deciding the hashing / round robin > schema for the partition, I would say it's not very important which partition > the message is being sent to. > I expect the client library to handle the failure by sending the message to a > partition of a different leader. > Neither kafka-clients nor rdkafka handle this failure. Given those are the > main client libraries being used for kafka as far as I know, I find it a > serious problem in terms of fault tolerance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)