Nope. I see lots of the "Error in fetch” messages, but that’s all.

It does sound roughly like KAFKA-1193 but I don’t know how to verify if
this is the same issue or not. Any ideas?


On 3/27/14, 8:24 PM, "Jun Rao" <jun...@gmail.com> wrote:

>We don't expect to lose data in that case. So, this sounds like a bug. Do
>you see any other error/warn in broker log around the time the data is
>lost?
>
>Thanks,
>
>Jun
>
>
>On Thu, Mar 27, 2014 at 10:52 AM, Oliver Dain
><od...@3cinteractive.com>wrote:
>
>> Hi Neha,
>>
>> Thanks for the reply. I do not see the ³No broker in ISR² message. If my
>> original diagnosis was correct (that there were at least 2 replicas
>>alive
>> for the topic at all times) then I believe this is expected, right? I
>> gather this makes it more likely that we¹ve hit KAFKA-1193?? If so, is
>> there any workaround and/or an ETA for a fix?
>>
>> Thanks,
>> Oliver
>>
>>
>>
>>
>> On 3/27/14, 5:18 AM, "Neha Narkhede" <neha.narkh...@gmail.com> wrote:
>>
>> >It is possible that you are hitting KAFKA-1193, but I'm not sure. Do
>>you
>> >see the following log line when you observe data loss -
>> >
>> >"No broker in ISR is alive for ... There's potential data loss."
>> >
>> >Thanks,
>> >Neha
>> >
>> >
>> >On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain
>> ><od...@3cinteractive.com>wrote:
>> >
>> >> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which
>>seems
>> >> like it could be the cause of this. Does that sound right? Is there a
>> >>patch
>> >> we can test? Any date/time when this is expected to be fixed?
>> >>
>> >> From: New User
>><od...@3cinteractive.com<mailto:od...@3cinteractive.com
>> >>
>> >> Date: Wednesday, March 26, 2014 at 11:59 AM
>> >> To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" <
>> >> users@kafka.apache.org<mailto:users@kafka.apache.org>>
>> >> Subject: data loss on replicated topic
>> >>
>> >> My company currently testing Kafka for throughput and fault
>>tolerance.
>> >> We've set up a cluster of 5 Kafka brokers and are publishing to a
>>topic
>> >> with replication factor 3 and 100 partitions. We are publishing with
>> >> request.required.acks == -1 (e.g. All ISR replicas must ACK before
>>the
>> >> message is considered sent). If a publication fails, we retry it
>> >> indefinitely until it succeeds. We ran a test over a weekend in
>>which we
>> >> published messages as fast as we could (from a single publisher).
>>Each
>> >> message has a unique ID so we can ensure that all messages are saved
>>by
>> >> Kafka at least once at the end of the test. We have a simple script,
>>run
>> >> via cron, that kills one broker (chosen at random) once every other
>>hour
>> >> (killed via "kill -9"). The broker is then revived 16 minutes after
>>it
>> >>was
>> >> killed. At the end of the weekend we ran a script to pull all data
>>from
>> >>all
>> >> partitions and then verify that all messages were persisted by Kafka.
>> >>For
>> >> the most part, the results are very good. We can sustain about 3k
>> >> message/second with almost no data loss.
>> >>
>> >> Of the roughly 460 million records we produced over 48 hours we lost
>> >>only
>> >> 7 records. But, I don't think we should have lost any record. All of
>>the
>> >> lost records were produced at almost exactly the time one of the
>>brokers
>> >> was killed (down to the second which is the granularity of our logs).
>> >>Note
>> >> that we're producing around 3k messages/second and we killed brokers
>> >>many
>> >> times over the 48 hour period. Only twice did we see data loss: once
>>we
>> >> lost 4 records and once we lost 3. I have checked the Kafka logs and
>> >>there
>> >> are some expected error messages from the surviving brokers that look
>> >>like:
>> >>
>> >>
>> >> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in
>> >>fetch
>> >> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId:
>> >> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1
>> >>bytes;
>> >> RequestInfo: [load_test,20] ->
>> >> PartitionFetchInfo(521319,1048576),[load_test,74] ->
>> >> PartitionFetchInfo(559017,1048576),[load_test,14] ->
>> >> PartitionFetchInfo(420539,1048576),[load_test,0] ->
>> >> PartitionFetchInfo(776869,1048576),[load_test,34] ->
>> >> PartitionFetchInfo(446435,1048576),[load_test,94] ->
>> >> PartitionFetchInfo(849943,1048576),[load_test,40] ->
>> >> PartitionFetchInfo(241876,1048576),[load_test,80] ->
>> >> PartitionFetchInfo(508778,1048576),[load_test,60] ->
>> >> PartitionFetchInfo(81314,1048576),[load_test,54] ->
>> >> PartitionFetchInfo(165798,1048576)
>>(kafka.server.ReplicaFetcherThread)
>> >>
>> >> java.net.ConnectException: Connection refused
>> >>
>> >>         at sun.nio.ch.Net.connect0(Native Method)
>> >>
>> >>         at sun.nio.ch.Net.connect(Net.java:465)
>> >>
>> >>         at sun.nio.ch.Net.connect(Net.java:457)
>> >>
>> >>         at
>> >>sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>> >>
>> >>         at
>> >>kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>> >>
>> >>         at
>> >>kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>> >>
>> >>         at
>> >>kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>> >>
>> >>         at
>> >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendReques
>>>>t(
>> >>SimpleConsumer.scala:71)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.
>>>>ap
>> >>ply$mcV$sp(SimpleConsumer.scala:109)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.
>>>>ap
>> >>ply(SimpleConsumer.scala:109)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.
>>>>ap
>> >>ply(SimpleConsumer.scala:109)
>> >>
>> >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsu
>>>>me
>> >>r.scala:108)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.sca
>>>>la
>> >>:108)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.sca
>>>>la
>> >>:108)
>> >>
>> >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>
>> >>         at 
>>kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherT
>>>>hr
>> >>ead.scala:96)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:8
>>>>8)
>> >>
>> >>         at
>> >>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >>
>> >> I have verified that all the partitions mentioned in these messages
>> >>(e.g.
>> >> The above mentions partitions 0, 34, 94, etc.) had the newly killed
>> >>node as
>> >> the leader. I believe that means that the other 4 brokers were alive
>>and
>> >> running without issues. There are no other log messages that indicate
>> >>any
>> >> other broker communication issues.
>> >>
>> >> As I understand it, this scenario shouldn't cause any data loss
>>since at
>> >> least 4/5 of the brokers were alive and healthy at all times. Is
>>there
>> >>any
>> >> way to explain the data loss? Perhaps a known bug in 0.8.1?
>> >>
>> >> Thanks,
>> >> Oliver
>> >>
>> >>
>>
>>

Reply via email to