Hi Dong, Could you elaborate a bit more how controller could affect leaders to switch between all and quorum?
Guozhang On Fri, Feb 2, 2018 at 10:12 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Guazhang, > > Got it. Thanks for the detailed explanation. I guess my point is that we > can probably achieve the best of both worlds, i.e. maintain the existing > behavior of ack="all" while improving the tail latency. > > Thanks, > Dong > > > > On Fri, Feb 2, 2018 at 8:43 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi Dong, >> >> Yes, in terms of fault tolerance "quorum" does not do better than "all", >> as I said, with {min.isr} to X+1 Kafka is able to tolerate X failures only. >> So if A and B are partitioned off at the same time, then there are two >> concurrent failures and we do not guarantee all acked messages will be >> retained. >> >> The goal of my approach is to maintain the behavior of ack="all", which >> happen to do better than what Kafka is actually guaranteed: when both A and >> B are partitioned off, produced records will not be acked since "all" >> requires all replicas (not only ISRs, my previous email has an incorrect >> term) are required. This is doing better than tolerating X failures, which >> I was proposing to keep, so that we would not introduce any regression >> "surprises" to users who are already using "all". In other words, "quorum" >> is trading a bit of failure tolerance that is strictly defined on min.isr >> for better tail latency. >> >> >> Guozhang >> >> >> On Fri, Feb 2, 2018 at 6:25 PM, Dong Lin <lindon...@gmail.com> wrote: >> >>> Hey Guozhang, >>> >>> According to the new proposal, with 3 replicas, min.isr=2 and >>> acks="quorum", it seems that acknowledged messages can still be truncated >>> in the network partition scenario you mentioned, right? So I guess the goal >>> is for some user to achieve better tail latency at the cost of potential >>> message loss? >>> >>> If this is the case, then I think it may be better to adopt an approach >>> where controller dynamically turn on/off this optimization. This provides >>> user with peace of mind (i.e. no message loss) while still reducing tail >>> latency. What do you think? >>> >>> Thanks, >>> Dong >>> >>> >>> On Fri, Feb 2, 2018 at 11:11 AM, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> >>>> Hello Litao, >>>> >>>> Just double checking on the leader election details, do you have time >>>> to complete the proposal on that part? >>>> >>>> Also Jun mentioned one caveat related to KIP-250 on the KIP-232 >>>> discussion thread that Dong is working on, I figured it is worth pointing >>>> out here with a tentative solution: >>>> >>>> >>>> ``` >>>> Currently, if the producer uses acks=-1, a write will only succeed if >>>> the write is received by all in-sync replicas (i.e., committed). This >>>> is true even when min.isr is set since we first wait for a message to >>>> be committed and then check the min.isr requirement. KIP-250 may >>>> change that, but we can discuss the implication there. >>>> ``` >>>> >>>> The caveat is that, if we change the acking semantics in KIP-250 that >>>> we will only requires num of {min.isr} to acknowledge a produce, then the >>>> above scenario will have a caveat: imagine you have {A, B, C} replicas of a >>>> partition with A as the leader, all in the isr list, and min.isr is 2. >>>> >>>> 1. Say there is a network partition and both A and B are fenced off. C >>>> is elected as the new leader, it shrinks its isr list to only {C}; from A's >>>> point of view it does not know it becomes the "ghost" and no longer the >>>> leader, all it does is shrinking the isr list to {A, B}. >>>> >>>> 2. At this time, any new writes with ack=-1 to C will not be acked, >>>> since from C's pov there is only one replica. This is correct. >>>> >>>> 3. However, any writes that are send to A (NOTE this is totally >>>> possible, since producers would only refresh metadata periodically, >>>> additionally if they happen to ask A or B they will get the stale metadata >>>> that A's still the leader), since A thinks that isr list is {A, B} and as >>>> long as B has replicated the message, A can acked the produce. >>>> >>>> This is not correct behavior, since when network heals, A would >>>> realize it is not the leader and will truncate its log. And hence as a >>>> result the acked records are lost, violating Kafka's guarantees. And >>>> KIP-232 would not help preventing this scenario. >>>> >>>> >>>> Although one can argue that, with 3 replicas and min.isr set to 2, >>>> Kafka is guaranteeing to tolerate only one failure, while the above >>>> scenario is actually two concurrent failures (both A and B are considered >>>> wedged), this is still a regression to the current version. >>>> >>>> So to resolve this issue, I'd propose we can change the semantics in >>>> the following way (this is only slightly different from your proposal): >>>> >>>> >>>> 1. Add one more value to client-side acks config: >>>> >>>> 0: no acks needed at all. >>>> 1: ack from the leader. >>>> all: ack from ALL the ISR replicas AND that current number of isr >>>> replicas has to be no smaller than {min.isr} (i.e. not changing this >>>> semantic). >>>> quorum: this is the new value, it requires ack from enough number of >>>> ISR replicas no smaller than majority of the replicas AND no smaller than >>>> {min.isr}. >>>> >>>> 2. Clarify in the docs that if a user wants to tolerate X failures, she >>>> needs to set client acks=all or acks=quorum (better tail latency than >>>> "all") with broker {min.sir} to be X+1; however, "all" is not necessarily >>>> stronger than "quorum": >>>> >>>> For example, with 3 replicas, and {min.isr} set to 2. Here is a list of >>>> scenarios: >>>> >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them. >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them. >>>> c. ISR list has 1: "all" and "quorum" would not ack. >>>> >>>> If {min.isr} is set to 1, interestingly, here would be the list of >>>> scenarios: >>>> >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them. >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them. >>>> c. ISR list has 1: "all" waits for leader to return, while "quorum" >>>> would not ack (because it requires that number > {min.isr}, AND >= >>>> {majority of num.replicas}, so its actually stronger than "all"). >>>> >>>> >>>> WDYT? >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindon...@gmail.com> wrote: >>>> >>>>> Hey Litao, >>>>> >>>>> Not sure there will be an easy way to select the broker with highest >>>>> LEO >>>>> without losing acknowledged message. In case it is useful, here is >>>>> another >>>>> idea. Maybe we can have a mechanism to turn switch between the min.isr >>>>> and >>>>> isr set for determining when to acknowledge a message. Controller can >>>>> probably use RPC to request the current leader to use isr set before it >>>>> sends LeaderAndIsrRequest for leadership change. >>>>> >>>>> Regards, >>>>> Dong >>>>> >>>>> >>>>> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng >>>>> <litao.d...@airbnb.com.invalid> >>>>> wrote: >>>>> >>>>> > Thanks Jun for the detailed feedback. >>>>> > >>>>> > Yes, for #1, I mean the live replicas from the ISR. >>>>> > >>>>> > Actually, I believe for all of the 4 new leader election strategies >>>>> > (offline, reassign, preferred replica and controlled shutdown), we >>>>> need to >>>>> > make corresponding changes. Will document the details in the KIP. >>>>> > >>>>> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <j...@confluent.io> wrote: >>>>> > >>>>> > > Hi, Litao, >>>>> > > >>>>> > > Thanks for the KIP. Good proposal. A few comments below. >>>>> > > >>>>> > > 1. The KIP says "select the live replica with the largest LEO". I >>>>> guess >>>>> > > what you meant is selecting the live replicas in ISR with the >>>>> largest >>>>> > LEO? >>>>> > > >>>>> > > 2. I agree that we can probably just reuse the current min.isr >>>>> > > configuration, but with a slightly different semantics. Currently, >>>>> if >>>>> > > min.isr is set, a user expects the record to be in at least min.isr >>>>> > > replicas on successful ack. This KIP guarantees this too. Most >>>>> people are >>>>> > > probably surprised that currently the ack is only sent back after >>>>> all >>>>> > > replicas in ISR receive the record. This KIP will change the ack >>>>> to only >>>>> > > wait on min.isr replicas, which matches the user's expectation and >>>>> gives >>>>> > > better latency. Currently, we guarantee no data loss if there are >>>>> fewer >>>>> > > than replication factor failures. The KIP changes that to fewer >>>>> than >>>>> > > min.isr failures. The latter probably matches the user expectation. >>>>> > > >>>>> > > 3. I agree that the new leader election process is a bit more >>>>> > complicated. >>>>> > > The controller now needs to contact all replicas in ISR to >>>>> determine who >>>>> > > has the longest log. However, this happens infrequently. So, it's >>>>> > probably >>>>> > > worth doing for the better latency in #2. >>>>> > > >>>>> > > 4. We have to think through the preferred leader election process. >>>>> > > Currently, the first assigned replica is preferred for load >>>>> balancing. >>>>> > > There is a process to automatically move the leader to the >>>>> preferred >>>>> > > replica when it's in sync. The issue is that the preferred replica >>>>> may no >>>>> > > be the replica with the longest log. Naively switching to the >>>>> preferred >>>>> > > replica may cause data loss when there are actually fewer failures >>>>> than >>>>> > > configured min.isr. One way to address this issue is to do the >>>>> following >>>>> > > steps during preferred leader election: (a) controller sends an RPC >>>>> > request >>>>> > > to the current leader; (b) the current leader stops taking new >>>>> writes >>>>> > > (sending a new error code to the clients) and returns its LEO >>>>> (call it L) >>>>> > > to the controller; (c) the controller issues an RPC request to the >>>>> > > preferred replica and waits its LEO to reach L; (d) the controller >>>>> > changes >>>>> > > the leader to the preferred replica. >>>>> > > >>>>> > > Jun >>>>> > > >>>>> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng >>>>> > <litao.d...@airbnb.com.invalid >>>>> > > > >>>>> > > wrote: >>>>> > > >>>>> > > > Sorry folks, just realized I didn't use the correct thread >>>>> format for >>>>> > the >>>>> > > > discussion. I started this new one and copied all of the >>>>> responses from >>>>> > > the >>>>> > > > old one. >>>>> > > > >>>>> > > > @Dong >>>>> > > > It makes sense to just use the min.insync.replicas instead of >>>>> > > introducing a >>>>> > > > new config, and we must make this change together with the >>>>> LEO-based >>>>> > new >>>>> > > > leader election. >>>>> > > > >>>>> > > > @Xi >>>>> > > > I thought about embedding the LEO information to the >>>>> ControllerContext, >>>>> > > > didn't find a way. Using RPC will make the leader election period >>>>> > longer >>>>> > > > and this should happen in very rare cases (broker failure, >>>>> controlled >>>>> > > > shutdown, preferred leader election and partition reassignment). >>>>> > > > >>>>> > > > @Jeff >>>>> > > > The current leader election is to pick the first replica from AR >>>>> which >>>>> > > > exists both in the live brokers and ISR sets. I agree with you >>>>> about >>>>> > > > changing the current/default behavior will cause many >>>>> confusions, and >>>>> > > > that's the reason the title is "Add Support ...". In this case, >>>>> we >>>>> > > wouldn't >>>>> > > > break any current promises and provide a separate option for our >>>>> user. >>>>> > > > In terms of KIP-250, I feel it is more like the "Semisynchronous >>>>> > > > Replication" in the MySQL world, and yes it is something between >>>>> acks=1 >>>>> > > and >>>>> > > > acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227 >>>>> are >>>>> > > > two orthogonal improvements. KIP-227 is to improve the >>>>> replication >>>>> > > protocol >>>>> > > > (like the introduction of parallel replication in MySQL), and >>>>> KIP-250 >>>>> > is >>>>> > > an >>>>> > > > enhancement for the replication architecture (sync, semi-sync, >>>>> and >>>>> > > async). >>>>> > > > >>>>> > > > >>>>> > > > Dong Lin >>>>> > > > >>>>> > > > > Thanks for the KIP. I have one quick comment before you >>>>> provide more >>>>> > > > detail >>>>> > > > > on how to select the leader with the largest LEO. >>>>> > > > > Do you think it would make sense to change the default >>>>> behavior of >>>>> > > > acks=-1, >>>>> > > > > such that broker will acknowledge the message once the message >>>>> has >>>>> > been >>>>> > > > > replicated to min.insync.replicas brokers? This would allow us >>>>> to >>>>> > keep >>>>> > > > the >>>>> > > > > same durability guarantee, improve produce request latency >>>>> without >>>>> > > > having a >>>>> > > > > new config. >>>>> > > > >>>>> > > > >>>>> > > > Hu Xi >>>>> > > > >>>>> > > > > Currently, with holding the assigned replicas(AR) for all >>>>> > partitions, >>>>> > > > > controller is now able to elect new leaders by selecting the >>>>> first >>>>> > > > replica >>>>> > > > > of AR which occurs in both live replica set and ISR. If >>>>> switching to >>>>> > > the >>>>> > > > > LEO-based strategy, controller context might need to be >>>>> enriched or >>>>> > > > > augmented to store those values. If retrieving those LEOs >>>>> real-time, >>>>> > > > > several rounds of RPCs are unavoidable which seems to violate >>>>> the >>>>> > > > original >>>>> > > > > intention of this KIP. >>>>> > > > >>>>> > > > >>>>> > > > Jeff Widman >>>>> > > > >>>>> > > > > I agree with Dong, we should see if it's possible to change the >>>>> > default >>>>> > > > > behavior so that as soon as min.insync.replicas brokers >>>>> respond than >>>>> > > the >>>>> > > > > broker acknowledges the message back to the client without >>>>> waiting >>>>> > for >>>>> > > > > additional brokers who are in the in-sync replica list to >>>>> respond. (I >>>>> > > > > actually thought it already worked this way). >>>>> > > > > As you implied in the KIP though, changing this default >>>>> introduces a >>>>> > > > weird >>>>> > > > > state where an in-sync follower broker is not guaranteed to >>>>> have a >>>>> > > > > message... >>>>> > > > > So at a minimum, the leadership failover algorithm would need >>>>> to be >>>>> > > sure >>>>> > > > to >>>>> > > > > pick the most up-to-date follower... I thought it already did >>>>> this? >>>>> > > > > But if multiple brokers fail in quick succession, then a >>>>> broker that >>>>> > > was >>>>> > > > in >>>>> > > > > the ISR could become a leader without ever receiving the >>>>> message... >>>>> > > > > violating the current promises of unclean.leader.election. >>>>> > > > enable=False... >>>>> > > > > so changing the default might be not be a tenable solution. >>>>> > > > > What also jumped out at me in the KIP was the goal of reducing >>>>> p999 >>>>> > > when >>>>> > > > > setting replica lag time at 10 seconds(!!)... I understand the >>>>> desire >>>>> > > to >>>>> > > > > minimize frequent ISR shrink/expansion, as I face this same >>>>> issue at >>>>> > my >>>>> > > > day >>>>> > > > > job. But what you're essentially trying to do here is create an >>>>> > > > additional >>>>> > > > > replication state that is in-between acks=1 and acks = ISR to >>>>> paper >>>>> > > over >>>>> > > > a >>>>> > > > > root problem of ISR shrink/expansion... >>>>> > > > > I'm just wary of shipping more features (and more operational >>>>> > > confusion) >>>>> > > > if >>>>> > > > > it's only addressing the symptom rather than the root cause. >>>>> For >>>>> > > example, >>>>> > > > > my day job's problem is we run a very high number of >>>>> low-traffic >>>>> > > > > partitions-per-broker, so the fetch requests hit many >>>>> partitions >>>>> > before >>>>> > > > > they fill. Solving that requires changing our architecture + >>>>> making >>>>> > the >>>>> > > > > replication protocol more efficient (KIP-227). >>>>> > > > >>>>> > > > >>>>> > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng < >>>>> litao.d...@airbnb.com> >>>>> > > > wrote: >>>>> > > > >>>>> > > > > Hey folks. I would like to add a feature to support the >>>>> quorum-based >>>>> > > > > acknowledgment for the producer request. We have been running a >>>>> > > modified >>>>> > > > > version of Kafka on our testing cluster for weeks, the >>>>> improvement of >>>>> > > > P999 >>>>> > > > > is significant with very stable latency. Additionally, I have a >>>>> > > proposal >>>>> > > > to >>>>> > > > > achieve a similar data durability as with the >>>>> insync.replicas-based >>>>> > > > > acknowledgment through LEO-based leader election. >>>>> > > > > >>>>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge >>>>> > > > > >>>>> > > > >>>>> > > >>>>> > >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >>> >> >> >> -- >> -- Guozhang >> > > -- -- Guozhang