Hi Jay, Thanks for explaining the lag detection mechanism. I think my real concern is from the description of request.required.acks=-1 from kafka's document: "-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains." Since it states that acks=-1 provides the best durability, I had thought it's equivalent to acks=3 for a topic with replicas 3. My understanding is that, acks=3 provides the best durability for such a topic, better than ack=2 and ack=1. But because followers may fail out of sync, acks=-1 actually provides the same level of durability as acks=1. It seems to me there's inconsistency between the behavior of ack=-1 and its description, therefore one of them may need to be modified.
Regards, Jiang From: users@kafka.apache.org At: Jul 11 2014 18:27:46 To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org Cc: wangg...@gmail.com Subject: Re: request.required.acks=-1 under high data volume I think the root problem is that replicas are falling behind and hence are effectively "failed" under normal load and also that you have unclean leader election enabled which "solves" this catastrophic failure by electing new leaders without complete data. Starting in 0.8.2 you will be able to selectively disable unclean leader election. The root problem for the spuriously failing replicas is the configuration replica.lag.max.messages. This configuration defaults to 4000. But throughput can be really high, like a million messages per second. At a million messages per second, 4k messages of lag is only 4ms behind, which can happen for all kinds of reasons (e.g. just normal linux i/o latency jitter). Jiang, I suspect you can resolve your issue by just making this higher. However, raising this setting is not a panacea. The higher you raise it the longer it will take to detect a partition that is actually falling behind. We have been discussing this setting, and if you think about it the setting is actually somewhat impossible to set right in a cluster which has both low volume and high volume topics/partitions. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -Jay On Fri, Jul 11, 2014 at 2:55 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 LEX -) <jwu...@bloomberg.net> wrote: > Hi Guozhang, > > KAFKA-1537 is created. https://issues.apache.org/jira/i#browse/KAFKA-1537 > > I'll try to see if I'm able to submit a patch for this, but cannot commit a > date, so please feel free to assign it to others. > > Regards, > Jiang > ----- Original Message ----- > From: wangg...@gmail.com > To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), users@kafka.apache.org > At: Jul 11 2014 16:42:55 > > Hello Jiang, > > That is a valid point. The reason we design ack=-1 to be "receive acks from > replicas in ISR" is basically trading consistency for availability. I think > instead of change it meaning, we could add another ack, -2 for instance, to > specify "receive acks from all replicas" as a favor of consistency. > > Since you already did this much investigation would you like to file a JIRA > and submit a patch for this? > > Guozhang > > > On Fri, Jul 11, 2014 at 11:49 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 > LEX -) <jwu...@bloomberg.net> wrote: > >> Hi, >> I'm doing stress and failover tests on a 3 node 0.8.1.1 kafka cluster and >> have the following observations. >> >> A topic is created with 1 partition and 3 replications. >> request.required.acks is set to -1 for a sync producer. When the publishing >> speed is high (3M messages, each 2000 bytes, published in lists of size >> 2000), the two followers will fail out of sync. Only the leader remains in >> ISR. But the producer can keep sending. If the leader is killed with CTR_C, >> one follower will become leader, but message loss will happen because of >> the unclean leader election. >> >> In the same test, request.required.acks=3 gives the desired result. >> Followers will fail out of sync, but the producer will be blocked untill >> all followers back to ISR. No data loss is observed in this case. >> >> From the code, this turns out to be how it's designed: >> if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) || >> (requiredAcks > 0 && numAcks >= requiredAcks)) { >> /* >> * requiredAcks < 0 means acknowledge after all replicas in ISR >> * are fully caught up to the (local) leader's offset >> * corresponding to this produce request. >> */ >> (true, ErrorMapping.NoError) >> } >> >> I'm wondering if it's more reasonable to let request.required.acks=-1 mean >> "receive acks from all replicas" instead of "receive acks from replicas in >> ISR"? As in the above test, follower will fail out sync under high >> publishing volume; that makes request.required.acks=-1 equivalent to >> request.required.acks=1. Since the kafka document states >> request.required.acks=-1 provides the best durability, one would expect it >> is equivalent to request.required.acks=number_of_replications. >> >> Regards, >> Jiang > > > > > -- > -- Guozhang