My company currently testing Kafka for throughput and fault tolerance. We've set up a cluster of 5 Kafka brokers and are publishing to a topic with replication factor 3 and 100 partitions. We are publishing with request.required.acks == -1 (e.g. All ISR replicas must ACK before the message is considered sent). If a publication fails, we retry it indefinitely until it succeeds. We ran a test over a weekend in which we published messages as fast as we could (from a single publisher). Each message has a unique ID so we can ensure that all messages are saved by Kafka at least once at the end of the test. We have a simple script, run via cron, that kills one broker (chosen at random) once every other hour (killed via "kill -9"). The broker is then revived 16 minutes after it was killed. At the end of the weekend we ran a script to pull all data from all partitions and then verify that all messages were persisted by Kafka. For the most part, the results are very good. We can sustain about 3k message/second with almost no data loss.
Of the roughly 460 million records we produced over 48 hours we lost only 7 records. But, I don't think we should have lost any record. All of the lost records were produced at almost exactly the time one of the brokers was killed (down to the second which is the granularity of our logs). Note that we're producing around 3k messages/second and we killed brokers many times over the 48 hour period. Only twice did we see data loss: once we lost 4 records and once we lost 3. I have checked the Kafka logs and there are some expected error messages from the surviving brokers that look like: [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId: ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [load_test,20] -> PartitionFetchInfo(521319,1048576),[load_test,74] -> PartitionFetchInfo(559017,1048576),[load_test,14] -> PartitionFetchInfo(420539,1048576),[load_test,0] -> PartitionFetchInfo(776869,1048576),[load_test,34] -> PartitionFetchInfo(446435,1048576),[load_test,94] -> PartitionFetchInfo(849943,1048576),[load_test,40] -> PartitionFetchInfo(241876,1048576),[load_test,80] -> PartitionFetchInfo(508778,1048576),[load_test,60] -> PartitionFetchInfo(81314,1048576),[load_test,54] -> PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) I have verified that all the partitions mentioned in these messages (e.g. The above mentions partitions 0, 34, 94, etc.) had the newly killed node as the leader. I believe that means that the other 4 brokers were alive and running without issues. There are no other log messages that indicate any other broker communication issues. As I understand it, this scenario shouldn't cause any data loss since at least 4/5 of the brokers were alive and healthy at all times. Is there any way to explain the data loss? Perhaps a known bug in 0.8.1? Thanks, Oliver