Hello Litao, Just double checking on the leader election details, do you have time to complete the proposal on that part?
Also Jun mentioned one caveat related to KIP-250 on the KIP-232 discussion thread that Dong is working on, I figured it is worth pointing out here with a tentative solution: ``` Currently, if the producer uses acks=-1, a write will only succeed if the write is received by all in-sync replicas (i.e., committed). This is true even when min.isr is set since we first wait for a message to be committed and then check the min.isr requirement. KIP-250 may change that, but we can discuss the implication there. ``` The caveat is that, if we change the acking semantics in KIP-250 that we will only requires num of {min.isr} to acknowledge a produce, then the above scenario will have a caveat: imagine you have {A, B, C} replicas of a partition with A as the leader, all in the isr list, and min.isr is 2. 1. Say there is a network partition and both A and B are fenced off. C is elected as the new leader, it shrinks its isr list to only {C}; from A's point of view it does not know it becomes the "ghost" and no longer the leader, all it does is shrinking the isr list to {A, B}. 2. At this time, any new writes with ack=-1 to C will not be acked, since from C's pov there is only one replica. This is correct. 3. However, any writes that are send to A (NOTE this is totally possible, since producers would only refresh metadata periodically, additionally if they happen to ask A or B they will get the stale metadata that A's still the leader), since A thinks that isr list is {A, B} and as long as B has replicated the message, A can acked the produce. This is not correct behavior, since when network heals, A would realize it is not the leader and will truncate its log. And hence as a result the acked records are lost, violating Kafka's guarantees. And KIP-232 would not help preventing this scenario. Although one can argue that, with 3 replicas and min.isr set to 2, Kafka is guaranteeing to tolerate only one failure, while the above scenario is actually two concurrent failures (both A and B are considered wedged), this is still a regression to the current version. So to resolve this issue, I'd propose we can change the semantics in the following way (this is only slightly different from your proposal): 1. Add one more value to client-side acks config: 0: no acks needed at all. 1: ack from the leader. all: ack from ALL the ISR replicas AND that current number of isr replicas has to be no smaller than {min.isr} (i.e. not changing this semantic). quorum: this is the new value, it requires ack from enough number of ISR replicas no smaller than majority of the replicas AND no smaller than {min.isr}. 2. Clarify in the docs that if a user wants to tolerate X failures, she needs to set client acks=all or acks=quorum (better tail latency than "all") with broker {min.sir} to be X+1; however, "all" is not necessarily stronger than "quorum": For example, with 3 replicas, and {min.isr} set to 2. Here is a list of scenarios: a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them. b. ISR list has 2: "all" and "quorum" waits for both 2 of them. c. ISR list has 1: "all" and "quorum" would not ack. If {min.isr} is set to 1, interestingly, here would be the list of scenarios: a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them. b. ISR list has 2: "all" and "quorum" waits for both 2 of them. c. ISR list has 1: "all" waits for leader to return, while "quorum" would not ack (because it requires that number > {min.isr}, AND >= {majority of num.replicas}, so its actually stronger than "all"). WDYT? Guozhang On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Litao, > > Not sure there will be an easy way to select the broker with highest LEO > without losing acknowledged message. In case it is useful, here is another > idea. Maybe we can have a mechanism to turn switch between the min.isr and > isr set for determining when to acknowledge a message. Controller can > probably use RPC to request the current leader to use isr set before it > sends LeaderAndIsrRequest for leadership change. > > Regards, > Dong > > > On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng <litao.d...@airbnb.com.invalid > > > wrote: > > > Thanks Jun for the detailed feedback. > > > > Yes, for #1, I mean the live replicas from the ISR. > > > > Actually, I believe for all of the 4 new leader election strategies > > (offline, reassign, preferred replica and controlled shutdown), we need > to > > make corresponding changes. Will document the details in the KIP. > > > > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Litao, > > > > > > Thanks for the KIP. Good proposal. A few comments below. > > > > > > 1. The KIP says "select the live replica with the largest LEO". I > guess > > > what you meant is selecting the live replicas in ISR with the largest > > LEO? > > > > > > 2. I agree that we can probably just reuse the current min.isr > > > configuration, but with a slightly different semantics. Currently, if > > > min.isr is set, a user expects the record to be in at least min.isr > > > replicas on successful ack. This KIP guarantees this too. Most people > are > > > probably surprised that currently the ack is only sent back after all > > > replicas in ISR receive the record. This KIP will change the ack to > only > > > wait on min.isr replicas, which matches the user's expectation and > gives > > > better latency. Currently, we guarantee no data loss if there are fewer > > > than replication factor failures. The KIP changes that to fewer than > > > min.isr failures. The latter probably matches the user expectation. > > > > > > 3. I agree that the new leader election process is a bit more > > complicated. > > > The controller now needs to contact all replicas in ISR to determine > who > > > has the longest log. However, this happens infrequently. So, it's > > probably > > > worth doing for the better latency in #2. > > > > > > 4. We have to think through the preferred leader election process. > > > Currently, the first assigned replica is preferred for load balancing. > > > There is a process to automatically move the leader to the preferred > > > replica when it's in sync. The issue is that the preferred replica may > no > > > be the replica with the longest log. Naively switching to the preferred > > > replica may cause data loss when there are actually fewer failures than > > > configured min.isr. One way to address this issue is to do the > following > > > steps during preferred leader election: (a) controller sends an RPC > > request > > > to the current leader; (b) the current leader stops taking new writes > > > (sending a new error code to the clients) and returns its LEO (call it > L) > > > to the controller; (c) the controller issues an RPC request to the > > > preferred replica and waits its LEO to reach L; (d) the controller > > changes > > > the leader to the preferred replica. > > > > > > Jun > > > > > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng > > <litao.d...@airbnb.com.invalid > > > > > > > wrote: > > > > > > > Sorry folks, just realized I didn't use the correct thread format for > > the > > > > discussion. I started this new one and copied all of the responses > from > > > the > > > > old one. > > > > > > > > @Dong > > > > It makes sense to just use the min.insync.replicas instead of > > > introducing a > > > > new config, and we must make this change together with the LEO-based > > new > > > > leader election. > > > > > > > > @Xi > > > > I thought about embedding the LEO information to the > ControllerContext, > > > > didn't find a way. Using RPC will make the leader election period > > longer > > > > and this should happen in very rare cases (broker failure, controlled > > > > shutdown, preferred leader election and partition reassignment). > > > > > > > > @Jeff > > > > The current leader election is to pick the first replica from AR > which > > > > exists both in the live brokers and ISR sets. I agree with you about > > > > changing the current/default behavior will cause many confusions, and > > > > that's the reason the title is "Add Support ...". In this case, we > > > wouldn't > > > > break any current promises and provide a separate option for our > user. > > > > In terms of KIP-250, I feel it is more like the "Semisynchronous > > > > Replication" in the MySQL world, and yes it is something between > acks=1 > > > and > > > > acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227 are > > > > two orthogonal improvements. KIP-227 is to improve the replication > > > protocol > > > > (like the introduction of parallel replication in MySQL), and KIP-250 > > is > > > an > > > > enhancement for the replication architecture (sync, semi-sync, and > > > async). > > > > > > > > > > > > Dong Lin > > > > > > > > > Thanks for the KIP. I have one quick comment before you provide > more > > > > detail > > > > > on how to select the leader with the largest LEO. > > > > > Do you think it would make sense to change the default behavior of > > > > acks=-1, > > > > > such that broker will acknowledge the message once the message has > > been > > > > > replicated to min.insync.replicas brokers? This would allow us to > > keep > > > > the > > > > > same durability guarantee, improve produce request latency without > > > > having a > > > > > new config. > > > > > > > > > > > > Hu Xi > > > > > > > > > Currently, with holding the assigned replicas(AR) for all > > partitions, > > > > > controller is now able to elect new leaders by selecting the first > > > > replica > > > > > of AR which occurs in both live replica set and ISR. If switching > to > > > the > > > > > LEO-based strategy, controller context might need to be enriched or > > > > > augmented to store those values. If retrieving those LEOs > real-time, > > > > > several rounds of RPCs are unavoidable which seems to violate the > > > > original > > > > > intention of this KIP. > > > > > > > > > > > > Jeff Widman > > > > > > > > > I agree with Dong, we should see if it's possible to change the > > default > > > > > behavior so that as soon as min.insync.replicas brokers respond > than > > > the > > > > > broker acknowledges the message back to the client without waiting > > for > > > > > additional brokers who are in the in-sync replica list to respond. > (I > > > > > actually thought it already worked this way). > > > > > As you implied in the KIP though, changing this default introduces > a > > > > weird > > > > > state where an in-sync follower broker is not guaranteed to have a > > > > > message... > > > > > So at a minimum, the leadership failover algorithm would need to be > > > sure > > > > to > > > > > pick the most up-to-date follower... I thought it already did this? > > > > > But if multiple brokers fail in quick succession, then a broker > that > > > was > > > > in > > > > > the ISR could become a leader without ever receiving the message... > > > > > violating the current promises of unclean.leader.election. > > > > enable=False... > > > > > so changing the default might be not be a tenable solution. > > > > > What also jumped out at me in the KIP was the goal of reducing p999 > > > when > > > > > setting replica lag time at 10 seconds(!!)... I understand the > desire > > > to > > > > > minimize frequent ISR shrink/expansion, as I face this same issue > at > > my > > > > day > > > > > job. But what you're essentially trying to do here is create an > > > > additional > > > > > replication state that is in-between acks=1 and acks = ISR to paper > > > over > > > > a > > > > > root problem of ISR shrink/expansion... > > > > > I'm just wary of shipping more features (and more operational > > > confusion) > > > > if > > > > > it's only addressing the symptom rather than the root cause. For > > > example, > > > > > my day job's problem is we run a very high number of low-traffic > > > > > partitions-per-broker, so the fetch requests hit many partitions > > before > > > > > they fill. Solving that requires changing our architecture + making > > the > > > > > replication protocol more efficient (KIP-227). > > > > > > > > > > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng <litao.d...@airbnb.com> > > > > wrote: > > > > > > > > > Hey folks. I would like to add a feature to support the > quorum-based > > > > > acknowledgment for the producer request. We have been running a > > > modified > > > > > version of Kafka on our testing cluster for weeks, the improvement > of > > > > P999 > > > > > is significant with very stable latency. Additionally, I have a > > > proposal > > > > to > > > > > achieve a similar data durability as with the insync.replicas-based > > > > > acknowledgment through LEO-based leader election. > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge > > > > > > > > > > > > > > > -- -- Guozhang