My company currently testing Kafka for throughput and fault tolerance. We've 
set up a cluster of 5 Kafka brokers and are publishing to a topic with 
replication factor 3 and 100 partitions. We are publishing with 
request.required.acks == -1 (e.g. All ISR replicas must ACK before the message 
is considered sent). If a publication fails, we retry it indefinitely until it 
succeeds. We ran a test over a weekend in which we published messages as fast 
as we could (from a single publisher). Each message has a unique ID so we can 
ensure that all messages are saved by Kafka at least once at the end of the 
test. We have a simple script, run via cron, that kills one broker (chosen at 
random) once every other hour (killed via "kill -9"). The broker is then 
revived 16 minutes after it was killed. At the end of the weekend we ran a 
script to pull all data from all partitions and then verify that all messages 
were persisted by Kafka. For the most part, the results are very good. We can 
sustain about 3k message/second with almost no data loss.

Of the roughly 460 million records we produced over 48 hours we lost only 7 
records. But, I don't think we should have lost any record. All of the lost 
records were produced at almost exactly the time one of the brokers was killed 
(down to the second which is the granularity of our logs). Note that we're 
producing around 3k messages/second and we killed brokers many times over the 
48 hour period. Only twice did we see data loss: once we lost 4 records and 
once we lost 3. I have checked the Kafka logs and there are some expected error 
messages from the surviving brokers that look like:


[2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in fetch 
Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId: 
ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [load_test,20] -> 
PartitionFetchInfo(521319,1048576),[load_test,74] -> 
PartitionFetchInfo(559017,1048576),[load_test,14] -> 
PartitionFetchInfo(420539,1048576),[load_test,0] -> 
PartitionFetchInfo(776869,1048576),[load_test,34] -> 
PartitionFetchInfo(446435,1048576),[load_test,94] -> 
PartitionFetchInfo(849943,1048576),[load_test,40] -> 
PartitionFetchInfo(241876,1048576),[load_test,80] -> 
PartitionFetchInfo(508778,1048576),[load_test,60] -> 
PartitionFetchInfo(81314,1048576),[load_test,54] -> 
PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread)

java.net.ConnectException: Connection refused

        at sun.nio.ch.Net.connect0(Native Method)

        at sun.nio.ch.Net.connect(Net.java:465)

        at sun.nio.ch.Net.connect(Net.java:457)

        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)

        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)

        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)

        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)

        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)

        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)

        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)

        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

I have verified that all the partitions mentioned in these messages (e.g. The 
above mentions partitions 0, 34, 94, etc.) had the newly killed node as the 
leader. I believe that means that the other 4 brokers were alive and running 
without issues. There are no other log messages that indicate any other broker 
communication issues.

As I understand it, this scenario shouldn't cause any data loss since at least 
4/5 of the brokers were alive and healthy at all times. Is there any way to 
explain the data loss? Perhaps a known bug in 0.8.1?

Thanks,
Oliver

Reply via email to