Nope. I see lots of the "Error in fetch” messages, but that’s all.
It does sound roughly like KAFKA-1193 but I don’t know how to verify if this is the same issue or not. Any ideas? On 3/27/14, 8:24 PM, "Jun Rao" <jun...@gmail.com> wrote: >We don't expect to lose data in that case. So, this sounds like a bug. Do >you see any other error/warn in broker log around the time the data is >lost? > >Thanks, > >Jun > > >On Thu, Mar 27, 2014 at 10:52 AM, Oliver Dain ><od...@3cinteractive.com>wrote: > >> Hi Neha, >> >> Thanks for the reply. I do not see the ³No broker in ISR² message. If my >> original diagnosis was correct (that there were at least 2 replicas >>alive >> for the topic at all times) then I believe this is expected, right? I >> gather this makes it more likely that we¹ve hit KAFKA-1193?? If so, is >> there any workaround and/or an ETA for a fix? >> >> Thanks, >> Oliver >> >> >> >> >> On 3/27/14, 5:18 AM, "Neha Narkhede" <neha.narkh...@gmail.com> wrote: >> >> >It is possible that you are hitting KAFKA-1193, but I'm not sure. Do >>you >> >see the following log line when you observe data loss - >> > >> >"No broker in ISR is alive for ... There's potential data loss." >> > >> >Thanks, >> >Neha >> > >> > >> >On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain >> ><od...@3cinteractive.com>wrote: >> > >> >> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which >>seems >> >> like it could be the cause of this. Does that sound right? Is there a >> >>patch >> >> we can test? Any date/time when this is expected to be fixed? >> >> >> >> From: New User >><od...@3cinteractive.com<mailto:od...@3cinteractive.com >> >> >> >> Date: Wednesday, March 26, 2014 at 11:59 AM >> >> To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" < >> >> users@kafka.apache.org<mailto:users@kafka.apache.org>> >> >> Subject: data loss on replicated topic >> >> >> >> My company currently testing Kafka for throughput and fault >>tolerance. >> >> We've set up a cluster of 5 Kafka brokers and are publishing to a >>topic >> >> with replication factor 3 and 100 partitions. We are publishing with >> >> request.required.acks == -1 (e.g. All ISR replicas must ACK before >>the >> >> message is considered sent). If a publication fails, we retry it >> >> indefinitely until it succeeds. We ran a test over a weekend in >>which we >> >> published messages as fast as we could (from a single publisher). >>Each >> >> message has a unique ID so we can ensure that all messages are saved >>by >> >> Kafka at least once at the end of the test. We have a simple script, >>run >> >> via cron, that kills one broker (chosen at random) once every other >>hour >> >> (killed via "kill -9"). The broker is then revived 16 minutes after >>it >> >>was >> >> killed. At the end of the weekend we ran a script to pull all data >>from >> >>all >> >> partitions and then verify that all messages were persisted by Kafka. >> >>For >> >> the most part, the results are very good. We can sustain about 3k >> >> message/second with almost no data loss. >> >> >> >> Of the roughly 460 million records we produced over 48 hours we lost >> >>only >> >> 7 records. But, I don't think we should have lost any record. All of >>the >> >> lost records were produced at almost exactly the time one of the >>brokers >> >> was killed (down to the second which is the granularity of our logs). >> >>Note >> >> that we're producing around 3k messages/second and we killed brokers >> >>many >> >> times over the 48 hour period. Only twice did we see data loss: once >>we >> >> lost 4 records and once we lost 3. I have checked the Kafka logs and >> >>there >> >> are some expected error messages from the surviving brokers that look >> >>like: >> >> >> >> >> >> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in >> >>fetch >> >> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId: >> >> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 >> >>bytes; >> >> RequestInfo: [load_test,20] -> >> >> PartitionFetchInfo(521319,1048576),[load_test,74] -> >> >> PartitionFetchInfo(559017,1048576),[load_test,14] -> >> >> PartitionFetchInfo(420539,1048576),[load_test,0] -> >> >> PartitionFetchInfo(776869,1048576),[load_test,34] -> >> >> PartitionFetchInfo(446435,1048576),[load_test,94] -> >> >> PartitionFetchInfo(849943,1048576),[load_test,40] -> >> >> PartitionFetchInfo(241876,1048576),[load_test,80] -> >> >> PartitionFetchInfo(508778,1048576),[load_test,60] -> >> >> PartitionFetchInfo(81314,1048576),[load_test,54] -> >> >> PartitionFetchInfo(165798,1048576) >>(kafka.server.ReplicaFetcherThread) >> >> >> >> java.net.ConnectException: Connection refused >> >> >> >> at sun.nio.ch.Net.connect0(Native Method) >> >> >> >> at sun.nio.ch.Net.connect(Net.java:465) >> >> >> >> at sun.nio.ch.Net.connect(Net.java:457) >> >> >> >> at >> >>sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) >> >> >> >> at >> >>kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) >> >> >> >> at >> >>kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) >> >> >> >> at >> >>kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) >> >> >> >> at >> >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) >> >> >> >> at >> >> >> >>>>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendReques >>>>t( >> >>SimpleConsumer.scala:71) >> >> >> >> at >> >> >> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1. >>>>ap >> >>ply$mcV$sp(SimpleConsumer.scala:109) >> >> >> >> at >> >> >> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1. >>>>ap >> >>ply(SimpleConsumer.scala:109) >> >> >> >> at >> >> >> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1. >>>>ap >> >>ply(SimpleConsumer.scala:109) >> >> >> >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> >> >> >> at >> >> >> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsu >>>>me >> >>r.scala:108) >> >> >> >> at >> >> >> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.sca >>>>la >> >>:108) >> >> >> >> at >> >> >> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.sca >>>>la >> >>:108) >> >> >> >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> >> >> >> at >>kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) >> >> >> >> at >> >> >> >>>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherT >>>>hr >> >>ead.scala:96) >> >> >> >> at >> >> >> >>>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:8 >>>>8) >> >> >> >> at >> >>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) >> >> >> >> I have verified that all the partitions mentioned in these messages >> >>(e.g. >> >> The above mentions partitions 0, 34, 94, etc.) had the newly killed >> >>node as >> >> the leader. I believe that means that the other 4 brokers were alive >>and >> >> running without issues. There are no other log messages that indicate >> >>any >> >> other broker communication issues. >> >> >> >> As I understand it, this scenario shouldn't cause any data loss >>since at >> >> least 4/5 of the brokers were alive and healthy at all times. Is >>there >> >>any >> >> way to explain the data loss? Perhaps a known bug in 0.8.1? >> >> >> >> Thanks, >> >> Oliver >> >> >> >> >> >>