Folks. Thanks for all of the good discussions. Here are a few of my thoughts:
1. we won't change any existing semantics. That means, besides acks '-1 (all)', '0', '1', we will introduce a separate 'quorum' and document the semantic. 'quorum' is a totally different view of our replication protocol for the sake of better tail (P999) latency. I will advertise don't compare 'quorum' with '-1 (all)' and any other existing values. 2. in terms of the controller functionality, I admit there are many awesome consensus protocols; however, for this specific KIP, I choose to minimize the impact/change on the controller code path. - we will keep the current controller's overall design and implementation by NOT introducing any consensus protocol. - we will introduce a topic level config "enable.quorum.acks" (default to false), and only accept acks='quorum' produce requests while the corresponding topic enabled this config. In this case, during the new leader election, we will only use the new LEO-based new leader election for the topics turned the "enable.quorum.acks" on. In this case, we only do LEO-based new leader election for the topics needed, and other topics won't pay the penalty. One requirement for this KIP is fully semantic backward compatible and pay-as-you-go for the complexity of controller (longer new leader election latency). Thoughts? On Mon, Feb 12, 2018 at 10:19 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Tao, > > No I was not proposing to change the mechanism of acks=all, and only > mentioning that today even with acks=all the tolerance of failures is > theoretically still bounded by min.isr settings though we do require all > replicas in ISR (which may be larger than min.isr) to replicate before > responding; this is what Jun mentioned may surprise many users today. I > think with an additional "acks=quorum" can help resolve this, by requiring > the num.acks >= majority (to make sure consistency is guaranteed with at > most (X-1) / 2 failures with X number of replicas) AND num.acks >= min.isr > (to specify if we want tolerate more failures than (X-1) / 2). > > The question then is, whether acks=all is still useful with introduced > "quorum": if it is not, we can just replace the current semantics of "all" > and document it. The example that we gave above, demonstrate that > "acks=all" itself may still be useful even with the introduction of > "quorum" since that scenario can be avoided by acks=all, but not > acks=quorum as it requires ALL ISR replicas to replicate even if that > number is larger than {min.isr} and also larger than the majority number > (and if A is trying to shrink its ISR from {A,B,C} to {A,B} it will fail > the ZK write since epoch has been incremented). Hence my proposal is to add > a new config than replacing current semantics of "all". > > > Guozhang > > > On Sat, Feb 3, 2018 at 2:45 AM, tao xiao <xiaotao...@gmail.com> wrote: > > > Hi Guozhang, > > > > Are you proposing changing semantic of ack=all to acknowledge message > only > > after all replicas (not all ISRs, which is what Kafka currently is doing) > > have committed the message? This is equivalent to setting min.isr=number > of > > replicas, which makes ack=all much stricter than what Kafka has right > now. > > I think this may introduce surprise to users too as producer will not > > succeed in producing a message to Kafka when one of the followers is down > > > > On Sat, 3 Feb 2018 at 15:26 Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Hi Dong, > > > > > > Could you elaborate a bit more how controller could affect leaders to > > > switch between all and quorum? > > > > > > > > > Guozhang > > > > > > > > > On Fri, Feb 2, 2018 at 10:12 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Hey Guazhang, > > > > > > > > Got it. Thanks for the detailed explanation. I guess my point is that > > we > > > > can probably achieve the best of both worlds, i.e. maintain the > > existing > > > > behavior of ack="all" while improving the tail latency. > > > > > > > > Thanks, > > > > Dong > > > > > > > > > > > > > > > > On Fri, Feb 2, 2018 at 8:43 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > >> Hi Dong, > > > >> > > > >> Yes, in terms of fault tolerance "quorum" does not do better than > > "all", > > > >> as I said, with {min.isr} to X+1 Kafka is able to tolerate X > failures > > > only. > > > >> So if A and B are partitioned off at the same time, then there are > two > > > >> concurrent failures and we do not guarantee all acked messages will > be > > > >> retained. > > > >> > > > >> The goal of my approach is to maintain the behavior of ack="all", > > which > > > >> happen to do better than what Kafka is actually guaranteed: when > both > > A > > > and > > > >> B are partitioned off, produced records will not be acked since > "all" > > > >> requires all replicas (not only ISRs, my previous email has an > > incorrect > > > >> term) are required. This is doing better than tolerating X failures, > > > which > > > >> I was proposing to keep, so that we would not introduce any > regression > > > >> "surprises" to users who are already using "all". In other words, > > > "quorum" > > > >> is trading a bit of failure tolerance that is strictly defined on > > > min.isr > > > >> for better tail latency. > > > >> > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> On Fri, Feb 2, 2018 at 6:25 PM, Dong Lin <lindon...@gmail.com> > wrote: > > > >> > > > >>> Hey Guozhang, > > > >>> > > > >>> According to the new proposal, with 3 replicas, min.isr=2 and > > > >>> acks="quorum", it seems that acknowledged messages can still be > > > truncated > > > >>> in the network partition scenario you mentioned, right? So I guess > > the > > > goal > > > >>> is for some user to achieve better tail latency at the cost of > > > potential > > > >>> message loss? > > > >>> > > > >>> If this is the case, then I think it may be better to adopt an > > approach > > > >>> where controller dynamically turn on/off this optimization. This > > > provides > > > >>> user with peace of mind (i.e. no message loss) while still reducing > > > tail > > > >>> latency. What do you think? > > > >>> > > > >>> Thanks, > > > >>> Dong > > > >>> > > > >>> > > > >>> On Fri, Feb 2, 2018 at 11:11 AM, Guozhang Wang <wangg...@gmail.com > > > > > >>> wrote: > > > >>> > > > >>>> Hello Litao, > > > >>>> > > > >>>> Just double checking on the leader election details, do you have > > time > > > >>>> to complete the proposal on that part? > > > >>>> > > > >>>> Also Jun mentioned one caveat related to KIP-250 on the KIP-232 > > > >>>> discussion thread that Dong is working on, I figured it is worth > > > pointing > > > >>>> out here with a tentative solution: > > > >>>> > > > >>>> > > > >>>> ``` > > > >>>> Currently, if the producer uses acks=-1, a write will only succeed > > if > > > >>>> the write is received by all in-sync replicas (i.e., committed). > > This > > > >>>> is true even when min.isr is set since we first wait for a message > > to > > > >>>> be committed and then check the min.isr requirement. KIP-250 may > > > >>>> change that, but we can discuss the implication there. > > > >>>> ``` > > > >>>> > > > >>>> The caveat is that, if we change the acking semantics in KIP-250 > > that > > > >>>> we will only requires num of {min.isr} to acknowledge a produce, > > then > > > the > > > >>>> above scenario will have a caveat: imagine you have {A, B, C} > > > replicas of a > > > >>>> partition with A as the leader, all in the isr list, and min.isr > is > > 2. > > > >>>> > > > >>>> 1. Say there is a network partition and both A and B are fenced > > off. C > > > >>>> is elected as the new leader, it shrinks its isr list to only {C}; > > > from A's > > > >>>> point of view it does not know it becomes the "ghost" and no > longer > > > the > > > >>>> leader, all it does is shrinking the isr list to {A, B}. > > > >>>> > > > >>>> 2. At this time, any new writes with ack=-1 to C will not be > acked, > > > >>>> since from C's pov there is only one replica. This is correct. > > > >>>> > > > >>>> 3. However, any writes that are send to A (NOTE this is totally > > > >>>> possible, since producers would only refresh metadata > periodically, > > > >>>> additionally if they happen to ask A or B they will get the stale > > > metadata > > > >>>> that A's still the leader), since A thinks that isr list is {A, B} > > > and as > > > >>>> long as B has replicated the message, A can acked the produce. > > > >>>> > > > >>>> This is not correct behavior, since when network heals, A > would > > > >>>> realize it is not the leader and will truncate its log. And hence > > as a > > > >>>> result the acked records are lost, violating Kafka's guarantees. > And > > > >>>> KIP-232 would not help preventing this scenario. > > > >>>> > > > >>>> > > > >>>> Although one can argue that, with 3 replicas and min.isr set to 2, > > > >>>> Kafka is guaranteeing to tolerate only one failure, while the > above > > > >>>> scenario is actually two concurrent failures (both A and B are > > > considered > > > >>>> wedged), this is still a regression to the current version. > > > >>>> > > > >>>> So to resolve this issue, I'd propose we can change the semantics > in > > > >>>> the following way (this is only slightly different from your > > > proposal): > > > >>>> > > > >>>> > > > >>>> 1. Add one more value to client-side acks config: > > > >>>> > > > >>>> 0: no acks needed at all. > > > >>>> 1: ack from the leader. > > > >>>> all: ack from ALL the ISR replicas AND that current number of > isr > > > >>>> replicas has to be no smaller than {min.isr} (i.e. not changing > this > > > >>>> semantic). > > > >>>> quorum: this is the new value, it requires ack from enough > number > > > of > > > >>>> ISR replicas no smaller than majority of the replicas AND no > smaller > > > than > > > >>>> {min.isr}. > > > >>>> > > > >>>> 2. Clarify in the docs that if a user wants to tolerate X > failures, > > > she > > > >>>> needs to set client acks=all or acks=quorum (better tail latency > > than > > > >>>> "all") with broker {min.sir} to be X+1; however, "all" is not > > > necessarily > > > >>>> stronger than "quorum": > > > >>>> > > > >>>> For example, with 3 replicas, and {min.isr} set to 2. Here is a > list > > > of > > > >>>> scenarios: > > > >>>> > > > >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of > > > them. > > > >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them. > > > >>>> c. ISR list has 1: "all" and "quorum" would not ack. > > > >>>> > > > >>>> If {min.isr} is set to 1, interestingly, here would be the list of > > > >>>> scenarios: > > > >>>> > > > >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of > > > them. > > > >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them. > > > >>>> c. ISR list has 1: "all" waits for leader to return, while > "quorum" > > > >>>> would not ack (because it requires that number > {min.isr}, AND >= > > > >>>> {majority of num.replicas}, so its actually stronger than "all"). > > > >>>> > > > >>>> > > > >>>> WDYT? > > > >>>> > > > >>>> > > > >>>> Guozhang > > > >>>> > > > >>>> > > > >>>> On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > >>>> > > > >>>>> Hey Litao, > > > >>>>> > > > >>>>> Not sure there will be an easy way to select the broker with > > highest > > > >>>>> LEO > > > >>>>> without losing acknowledged message. In case it is useful, here > is > > > >>>>> another > > > >>>>> idea. Maybe we can have a mechanism to turn switch between the > > > min.isr > > > >>>>> and > > > >>>>> isr set for determining when to acknowledge a message. Controller > > can > > > >>>>> probably use RPC to request the current leader to use isr set > > before > > > it > > > >>>>> sends LeaderAndIsrRequest for leadership change. > > > >>>>> > > > >>>>> Regards, > > > >>>>> Dong > > > >>>>> > > > >>>>> > > > >>>>> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng > > > >>>>> <litao.d...@airbnb.com.invalid> > > > >>>>> wrote: > > > >>>>> > > > >>>>> > Thanks Jun for the detailed feedback. > > > >>>>> > > > > >>>>> > Yes, for #1, I mean the live replicas from the ISR. > > > >>>>> > > > > >>>>> > Actually, I believe for all of the 4 new leader election > > strategies > > > >>>>> > (offline, reassign, preferred replica and controlled shutdown), > > we > > > >>>>> need to > > > >>>>> > make corresponding changes. Will document the details in the > KIP. > > > >>>>> > > > > >>>>> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <j...@confluent.io> > > wrote: > > > >>>>> > > > > >>>>> > > Hi, Litao, > > > >>>>> > > > > > >>>>> > > Thanks for the KIP. Good proposal. A few comments below. > > > >>>>> > > > > > >>>>> > > 1. The KIP says "select the live replica with the largest > LEO". > > > I > > > >>>>> guess > > > >>>>> > > what you meant is selecting the live replicas in ISR with the > > > >>>>> largest > > > >>>>> > LEO? > > > >>>>> > > > > > >>>>> > > 2. I agree that we can probably just reuse the current > min.isr > > > >>>>> > > configuration, but with a slightly different semantics. > > > Currently, > > > >>>>> if > > > >>>>> > > min.isr is set, a user expects the record to be in at least > > > min.isr > > > >>>>> > > replicas on successful ack. This KIP guarantees this too. > Most > > > >>>>> people are > > > >>>>> > > probably surprised that currently the ack is only sent back > > after > > > >>>>> all > > > >>>>> > > replicas in ISR receive the record. This KIP will change the > > ack > > > >>>>> to only > > > >>>>> > > wait on min.isr replicas, which matches the user's > expectation > > > and > > > >>>>> gives > > > >>>>> > > better latency. Currently, we guarantee no data loss if there > > are > > > >>>>> fewer > > > >>>>> > > than replication factor failures. The KIP changes that to > fewer > > > >>>>> than > > > >>>>> > > min.isr failures. The latter probably matches the user > > > expectation. > > > >>>>> > > > > > >>>>> > > 3. I agree that the new leader election process is a bit more > > > >>>>> > complicated. > > > >>>>> > > The controller now needs to contact all replicas in ISR to > > > >>>>> determine who > > > >>>>> > > has the longest log. However, this happens infrequently. So, > > it's > > > >>>>> > probably > > > >>>>> > > worth doing for the better latency in #2. > > > >>>>> > > > > > >>>>> > > 4. We have to think through the preferred leader election > > > process. > > > >>>>> > > Currently, the first assigned replica is preferred for load > > > >>>>> balancing. > > > >>>>> > > There is a process to automatically move the leader to the > > > >>>>> preferred > > > >>>>> > > replica when it's in sync. The issue is that the preferred > > > replica > > > >>>>> may no > > > >>>>> > > be the replica with the longest log. Naively switching to the > > > >>>>> preferred > > > >>>>> > > replica may cause data loss when there are actually fewer > > > failures > > > >>>>> than > > > >>>>> > > configured min.isr. One way to address this issue is to do > the > > > >>>>> following > > > >>>>> > > steps during preferred leader election: (a) controller sends > an > > > RPC > > > >>>>> > request > > > >>>>> > > to the current leader; (b) the current leader stops taking > new > > > >>>>> writes > > > >>>>> > > (sending a new error code to the clients) and returns its LEO > > > >>>>> (call it L) > > > >>>>> > > to the controller; (c) the controller issues an RPC request > to > > > the > > > >>>>> > > preferred replica and waits its LEO to reach L; (d) the > > > controller > > > >>>>> > changes > > > >>>>> > > the leader to the preferred replica. > > > >>>>> > > > > > >>>>> > > Jun > > > >>>>> > > > > > >>>>> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng > > > >>>>> > <litao.d...@airbnb.com.invalid > > > >>>>> > > > > > > >>>>> > > wrote: > > > >>>>> > > > > > >>>>> > > > Sorry folks, just realized I didn't use the correct thread > > > >>>>> format for > > > >>>>> > the > > > >>>>> > > > discussion. I started this new one and copied all of the > > > >>>>> responses from > > > >>>>> > > the > > > >>>>> > > > old one. > > > >>>>> > > > > > > >>>>> > > > @Dong > > > >>>>> > > > It makes sense to just use the min.insync.replicas instead > of > > > >>>>> > > introducing a > > > >>>>> > > > new config, and we must make this change together with the > > > >>>>> LEO-based > > > >>>>> > new > > > >>>>> > > > leader election. > > > >>>>> > > > > > > >>>>> > > > @Xi > > > >>>>> > > > I thought about embedding the LEO information to the > > > >>>>> ControllerContext, > > > >>>>> > > > didn't find a way. Using RPC will make the leader election > > > period > > > >>>>> > longer > > > >>>>> > > > and this should happen in very rare cases (broker failure, > > > >>>>> controlled > > > >>>>> > > > shutdown, preferred leader election and partition > > > reassignment). > > > >>>>> > > > > > > >>>>> > > > @Jeff > > > >>>>> > > > The current leader election is to pick the first replica > from > > > AR > > > >>>>> which > > > >>>>> > > > exists both in the live brokers and ISR sets. I agree with > > you > > > >>>>> about > > > >>>>> > > > changing the current/default behavior will cause many > > > >>>>> confusions, and > > > >>>>> > > > that's the reason the title is "Add Support ...". In this > > case, > > > >>>>> we > > > >>>>> > > wouldn't > > > >>>>> > > > break any current promises and provide a separate option > for > > > our > > > >>>>> user. > > > >>>>> > > > In terms of KIP-250, I feel it is more like the > > > "Semisynchronous > > > >>>>> > > > Replication" in the MySQL world, and yes it is something > > > between > > > >>>>> acks=1 > > > >>>>> > > and > > > >>>>> > > > acks=insync.replicas. Additionally, I feel KIP-250 and > > KIP-227 > > > >>>>> are > > > >>>>> > > > two orthogonal improvements. KIP-227 is to improve the > > > >>>>> replication > > > >>>>> > > protocol > > > >>>>> > > > (like the introduction of parallel replication in MySQL), > and > > > >>>>> KIP-250 > > > >>>>> > is > > > >>>>> > > an > > > >>>>> > > > enhancement for the replication architecture (sync, > > semi-sync, > > > >>>>> and > > > >>>>> > > async). > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > Dong Lin > > > >>>>> > > > > > > >>>>> > > > > Thanks for the KIP. I have one quick comment before you > > > >>>>> provide more > > > >>>>> > > > detail > > > >>>>> > > > > on how to select the leader with the largest LEO. > > > >>>>> > > > > Do you think it would make sense to change the default > > > >>>>> behavior of > > > >>>>> > > > acks=-1, > > > >>>>> > > > > such that broker will acknowledge the message once the > > > message > > > >>>>> has > > > >>>>> > been > > > >>>>> > > > > replicated to min.insync.replicas brokers? This would > allow > > > us > > > >>>>> to > > > >>>>> > keep > > > >>>>> > > > the > > > >>>>> > > > > same durability guarantee, improve produce request > latency > > > >>>>> without > > > >>>>> > > > having a > > > >>>>> > > > > new config. > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > Hu Xi > > > >>>>> > > > > > > >>>>> > > > > Currently, with holding the assigned replicas(AR) for > all > > > >>>>> > partitions, > > > >>>>> > > > > controller is now able to elect new leaders by selecting > > the > > > >>>>> first > > > >>>>> > > > replica > > > >>>>> > > > > of AR which occurs in both live replica set and ISR. If > > > >>>>> switching to > > > >>>>> > > the > > > >>>>> > > > > LEO-based strategy, controller context might need to be > > > >>>>> enriched or > > > >>>>> > > > > augmented to store those values. If retrieving those > LEOs > > > >>>>> real-time, > > > >>>>> > > > > several rounds of RPCs are unavoidable which seems to > > violate > > > >>>>> the > > > >>>>> > > > original > > > >>>>> > > > > intention of this KIP. > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > Jeff Widman > > > >>>>> > > > > > > >>>>> > > > > I agree with Dong, we should see if it's possible to > change > > > the > > > >>>>> > default > > > >>>>> > > > > behavior so that as soon as min.insync.replicas brokers > > > >>>>> respond than > > > >>>>> > > the > > > >>>>> > > > > broker acknowledges the message back to the client > without > > > >>>>> waiting > > > >>>>> > for > > > >>>>> > > > > additional brokers who are in the in-sync replica list to > > > >>>>> respond. (I > > > >>>>> > > > > actually thought it already worked this way). > > > >>>>> > > > > As you implied in the KIP though, changing this default > > > >>>>> introduces a > > > >>>>> > > > weird > > > >>>>> > > > > state where an in-sync follower broker is not guaranteed > to > > > >>>>> have a > > > >>>>> > > > > message... > > > >>>>> > > > > So at a minimum, the leadership failover algorithm would > > need > > > >>>>> to be > > > >>>>> > > sure > > > >>>>> > > > to > > > >>>>> > > > > pick the most up-to-date follower... I thought it already > > did > > > >>>>> this? > > > >>>>> > > > > But if multiple brokers fail in quick succession, then a > > > >>>>> broker that > > > >>>>> > > was > > > >>>>> > > > in > > > >>>>> > > > > the ISR could become a leader without ever receiving the > > > >>>>> message... > > > >>>>> > > > > violating the current promises of > unclean.leader.election. > > > >>>>> > > > enable=False... > > > >>>>> > > > > so changing the default might be not be a tenable > solution. > > > >>>>> > > > > What also jumped out at me in the KIP was the goal of > > > reducing > > > >>>>> p999 > > > >>>>> > > when > > > >>>>> > > > > setting replica lag time at 10 seconds(!!)... I > understand > > > the > > > >>>>> desire > > > >>>>> > > to > > > >>>>> > > > > minimize frequent ISR shrink/expansion, as I face this > same > > > >>>>> issue at > > > >>>>> > my > > > >>>>> > > > day > > > >>>>> > > > > job. But what you're essentially trying to do here is > > create > > > an > > > >>>>> > > > additional > > > >>>>> > > > > replication state that is in-between acks=1 and acks = > ISR > > to > > > >>>>> paper > > > >>>>> > > over > > > >>>>> > > > a > > > >>>>> > > > > root problem of ISR shrink/expansion... > > > >>>>> > > > > I'm just wary of shipping more features (and more > > operational > > > >>>>> > > confusion) > > > >>>>> > > > if > > > >>>>> > > > > it's only addressing the symptom rather than the root > > cause. > > > >>>>> For > > > >>>>> > > example, > > > >>>>> > > > > my day job's problem is we run a very high number of > > > >>>>> low-traffic > > > >>>>> > > > > partitions-per-broker, so the fetch requests hit many > > > >>>>> partitions > > > >>>>> > before > > > >>>>> > > > > they fill. Solving that requires changing our > architecture > > + > > > >>>>> making > > > >>>>> > the > > > >>>>> > > > > replication protocol more efficient (KIP-227). > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng < > > > >>>>> litao.d...@airbnb.com> > > > >>>>> > > > wrote: > > > >>>>> > > > > > > >>>>> > > > > Hey folks. I would like to add a feature to support the > > > >>>>> quorum-based > > > >>>>> > > > > acknowledgment for the producer request. We have been > > > running a > > > >>>>> > > modified > > > >>>>> > > > > version of Kafka on our testing cluster for weeks, the > > > >>>>> improvement of > > > >>>>> > > > P999 > > > >>>>> > > > > is significant with very stable latency. Additionally, I > > > have a > > > >>>>> > > proposal > > > >>>>> > > > to > > > >>>>> > > > > achieve a similar data durability as with the > > > >>>>> insync.replicas-based > > > >>>>> > > > > acknowledgment through LEO-based leader election. > > > >>>>> > > > > > > > >>>>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > >>>>> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge > > > >>>>> > > > > > > > >>>>> > > > > > > >>>>> > > > > > >>>>> > > > > >>>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> -- Guozhang > > > >>>> > > > >>> > > > >>> > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >