It is possible that you are hitting KAFKA-1193, but I'm not sure. Do you see the following log line when you observe data loss -
"No broker in ISR is alive for ... There's potential data loss." Thanks, Neha On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain <od...@3cinteractive.com>wrote: > I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which seems > like it could be the cause of this. Does that sound right? Is there a patch > we can test? Any date/time when this is expected to be fixed? > > From: New User <od...@3cinteractive.com<mailto:od...@3cinteractive.com>> > Date: Wednesday, March 26, 2014 at 11:59 AM > To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" < > users@kafka.apache.org<mailto:users@kafka.apache.org>> > Subject: data loss on replicated topic > > My company currently testing Kafka for throughput and fault tolerance. > We've set up a cluster of 5 Kafka brokers and are publishing to a topic > with replication factor 3 and 100 partitions. We are publishing with > request.required.acks == -1 (e.g. All ISR replicas must ACK before the > message is considered sent). If a publication fails, we retry it > indefinitely until it succeeds. We ran a test over a weekend in which we > published messages as fast as we could (from a single publisher). Each > message has a unique ID so we can ensure that all messages are saved by > Kafka at least once at the end of the test. We have a simple script, run > via cron, that kills one broker (chosen at random) once every other hour > (killed via "kill -9"). The broker is then revived 16 minutes after it was > killed. At the end of the weekend we ran a script to pull all data from all > partitions and then verify that all messages were persisted by Kafka. For > the most part, the results are very good. We can sustain about 3k > message/second with almost no data loss. > > Of the roughly 460 million records we produced over 48 hours we lost only > 7 records. But, I don't think we should have lost any record. All of the > lost records were produced at almost exactly the time one of the brokers > was killed (down to the second which is the granularity of our logs). Note > that we're producing around 3k messages/second and we killed brokers many > times over the 48 hour period. Only twice did we see data loss: once we > lost 4 records and once we lost 3. I have checked the Kafka logs and there > are some expected error messages from the surviving brokers that look like: > > > [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in fetch > Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId: > ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; > RequestInfo: [load_test,20] -> > PartitionFetchInfo(521319,1048576),[load_test,74] -> > PartitionFetchInfo(559017,1048576),[load_test,14] -> > PartitionFetchInfo(420539,1048576),[load_test,0] -> > PartitionFetchInfo(776869,1048576),[load_test,34] -> > PartitionFetchInfo(446435,1048576),[load_test,94] -> > PartitionFetchInfo(849943,1048576),[load_test,40] -> > PartitionFetchInfo(241876,1048576),[load_test,80] -> > PartitionFetchInfo(508778,1048576),[load_test,60] -> > PartitionFetchInfo(81314,1048576),[load_test,54] -> > PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread) > > java.net.ConnectException: Connection refused > > at sun.nio.ch.Net.connect0(Native Method) > > at sun.nio.ch.Net.connect(Net.java:465) > > at sun.nio.ch.Net.connect(Net.java:457) > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) > > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) > > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > I have verified that all the partitions mentioned in these messages (e.g. > The above mentions partitions 0, 34, 94, etc.) had the newly killed node as > the leader. I believe that means that the other 4 brokers were alive and > running without issues. There are no other log messages that indicate any > other broker communication issues. > > As I understand it, this scenario shouldn't cause any data loss since at > least 4/5 of the brokers were alive and healthy at all times. Is there any > way to explain the data loss? Perhaps a known bug in 0.8.1? > > Thanks, > Oliver > >