Hi Dong,

Could you elaborate a bit more how controller could affect leaders to
switch between all and quorum?


Guozhang


On Fri, Feb 2, 2018 at 10:12 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Guazhang,
>
> Got it. Thanks for the detailed explanation. I guess my point is that we
> can probably achieve the best of both worlds, i.e. maintain the existing
> behavior of ack="all" while improving the tail latency.
>
> Thanks,
> Dong
>
>
>
> On Fri, Feb 2, 2018 at 8:43 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hi Dong,
>>
>> Yes, in terms of fault tolerance "quorum" does not do better than "all",
>> as I said, with {min.isr} to X+1 Kafka is able to tolerate X failures only.
>> So if A and B are partitioned off at the same time, then there are two
>> concurrent failures and we do not guarantee all acked messages will be
>> retained.
>>
>> The goal of my approach is to maintain the behavior of ack="all", which
>> happen to do better than what Kafka is actually guaranteed: when both A and
>> B are partitioned off, produced records will not be acked since "all"
>> requires all replicas (not only ISRs, my previous email has an incorrect
>> term) are required. This is doing better than tolerating X failures, which
>> I was proposing to keep, so that we would not introduce any regression
>> "surprises" to users who are already using "all". In other words, "quorum"
>> is trading a bit of failure tolerance that is strictly defined on min.isr
>> for better tail latency.
>>
>>
>> Guozhang
>>
>>
>> On Fri, Feb 2, 2018 at 6:25 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>>> Hey Guozhang,
>>>
>>> According to the new proposal, with 3 replicas, min.isr=2 and
>>> acks="quorum", it seems that acknowledged messages can still be truncated
>>> in the network partition scenario you mentioned, right? So I guess the goal
>>> is for some user to achieve better tail latency at the cost of potential
>>> message loss?
>>>
>>> If this is the case, then I think it may be better to adopt an approach
>>> where controller dynamically turn on/off this optimization. This provides
>>> user with peace of mind (i.e. no message loss) while still reducing tail
>>> latency. What do you think?
>>>
>>> Thanks,
>>> Dong
>>>
>>>
>>> On Fri, Feb 2, 2018 at 11:11 AM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>>> Hello Litao,
>>>>
>>>> Just double checking on the leader election details, do you have time
>>>> to complete the proposal on that part?
>>>>
>>>> Also Jun mentioned one caveat related to KIP-250 on the KIP-232
>>>> discussion thread that Dong is working on, I figured it is worth pointing
>>>> out here with a tentative solution:
>>>>
>>>>
>>>> ```
>>>> Currently, if the producer uses acks=-1, a write will only succeed if
>>>> the write is received by all in-sync replicas (i.e., committed). This
>>>> is true even when min.isr is set since we first wait for a message to
>>>> be committed and then check the min.isr requirement. KIP-250 may
>>>> change that, but we can discuss the implication there.
>>>> ```
>>>>
>>>> The caveat is that, if we change the acking semantics in KIP-250 that
>>>> we will only requires num of {min.isr} to acknowledge a produce, then the
>>>> above scenario will have a caveat: imagine you have {A, B, C} replicas of a
>>>> partition with A as the leader, all in the isr list, and min.isr is 2.
>>>>
>>>> 1. Say there is a network partition and both A and B are fenced off. C
>>>> is elected as the new leader, it shrinks its isr list to only {C}; from A's
>>>> point of view it does not know it becomes the "ghost" and no longer the
>>>> leader, all it does is shrinking the isr list to {A, B}.
>>>>
>>>> 2. At this time, any new writes with ack=-1 to C will not be acked,
>>>> since from C's pov there is only one replica. This is correct.
>>>>
>>>> 3. However, any writes that are send to A (NOTE this is totally
>>>> possible, since producers would only refresh metadata periodically,
>>>> additionally if they happen to ask A or B they will get the stale metadata
>>>> that A's still the leader), since A thinks that isr list is {A, B} and as
>>>> long as B has replicated the message, A can acked the produce.
>>>>
>>>>     This is not correct behavior, since when network heals, A would
>>>> realize it is not the leader and will truncate its log. And hence as a
>>>> result the acked records are lost, violating Kafka's guarantees. And
>>>> KIP-232 would not help preventing this scenario.
>>>>
>>>>
>>>> Although one can argue that, with 3 replicas and min.isr set to 2,
>>>> Kafka is guaranteeing to tolerate only one failure, while the above
>>>> scenario is actually two concurrent failures (both A and B are considered
>>>> wedged), this is still a regression to the current version.
>>>>
>>>> So to resolve this issue, I'd propose we can change the semantics in
>>>> the following way (this is only slightly different from your proposal):
>>>>
>>>>
>>>> 1. Add one more value to client-side acks config:
>>>>
>>>>    0: no acks needed at all.
>>>>    1: ack from the leader.
>>>>    all: ack from ALL the ISR replicas AND that current number of isr
>>>> replicas has to be no smaller than {min.isr} (i.e. not changing this
>>>> semantic).
>>>>    quorum: this is the new value, it requires ack from enough number of
>>>> ISR replicas no smaller than majority of the replicas AND no smaller than
>>>> {min.isr}.
>>>>
>>>> 2. Clarify in the docs that if a user wants to tolerate X failures, she
>>>> needs to set client acks=all or acks=quorum (better tail latency than
>>>> "all") with broker {min.sir} to be X+1; however, "all" is not necessarily
>>>> stronger than "quorum":
>>>>
>>>> For example, with 3 replicas, and {min.isr} set to 2. Here is a list of
>>>> scenarios:
>>>>
>>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them.
>>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
>>>> c. ISR list has 1: "all" and "quorum" would not ack.
>>>>
>>>> If {min.isr} is set to 1, interestingly, here would be the list of
>>>> scenarios:
>>>>
>>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them.
>>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
>>>> c. ISR list has 1: "all" waits for leader to return, while "quorum"
>>>> would not ack (because it requires that number > {min.isr}, AND >=
>>>> {majority of num.replicas}, so its actually stronger than "all").
>>>>
>>>>
>>>> WDYT?
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindon...@gmail.com> wrote:
>>>>
>>>>> Hey Litao,
>>>>>
>>>>> Not sure there will be an easy way to select the broker with highest
>>>>> LEO
>>>>> without losing acknowledged message. In case it is useful, here is
>>>>> another
>>>>> idea. Maybe we can have a mechanism to turn switch between the min.isr
>>>>> and
>>>>> isr set for determining when to acknowledge a message. Controller can
>>>>> probably use RPC to request the current leader to use isr set before it
>>>>> sends LeaderAndIsrRequest for leadership change.
>>>>>
>>>>> Regards,
>>>>> Dong
>>>>>
>>>>>
>>>>> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng
>>>>> <litao.d...@airbnb.com.invalid>
>>>>> wrote:
>>>>>
>>>>> > Thanks Jun for the detailed feedback.
>>>>> >
>>>>> > Yes, for #1, I mean the live replicas from the ISR.
>>>>> >
>>>>> > Actually, I believe for all of the 4 new leader election strategies
>>>>> > (offline, reassign, preferred replica and controlled shutdown), we
>>>>> need to
>>>>> > make corresponding changes. Will document the details in the KIP.
>>>>> >
>>>>> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <j...@confluent.io> wrote:
>>>>> >
>>>>> > > Hi, Litao,
>>>>> > >
>>>>> > > Thanks for the KIP. Good proposal. A few comments below.
>>>>> > >
>>>>> > > 1. The KIP says "select the live replica with the largest LEO".  I
>>>>> guess
>>>>> > > what you meant is selecting the live replicas in ISR with the
>>>>> largest
>>>>> > LEO?
>>>>> > >
>>>>> > > 2. I agree that we can probably just reuse the current min.isr
>>>>> > > configuration, but with a slightly different semantics. Currently,
>>>>> if
>>>>> > > min.isr is set, a user expects the record to be in at least min.isr
>>>>> > > replicas on successful ack. This KIP guarantees this too. Most
>>>>> people are
>>>>> > > probably surprised that currently the ack is only sent back after
>>>>> all
>>>>> > > replicas in ISR receive the record. This KIP will change the ack
>>>>> to only
>>>>> > > wait on min.isr replicas, which matches the user's expectation and
>>>>> gives
>>>>> > > better latency. Currently, we guarantee no data loss if there are
>>>>> fewer
>>>>> > > than replication factor failures. The KIP changes that to fewer
>>>>> than
>>>>> > > min.isr failures. The latter probably matches the user expectation.
>>>>> > >
>>>>> > > 3. I agree that the new leader election process is a bit more
>>>>> > complicated.
>>>>> > > The controller now needs to contact all replicas in ISR to
>>>>> determine who
>>>>> > > has the longest log. However, this happens infrequently. So, it's
>>>>> > probably
>>>>> > > worth doing for the better latency in #2.
>>>>> > >
>>>>> > > 4. We have to think through the preferred leader election process.
>>>>> > > Currently, the first assigned replica is preferred for load
>>>>> balancing.
>>>>> > > There is a process to automatically move the leader to the
>>>>> preferred
>>>>> > > replica when it's in sync. The issue is that the preferred replica
>>>>> may no
>>>>> > > be the replica with the longest log. Naively switching to the
>>>>> preferred
>>>>> > > replica may cause data loss when there are actually fewer failures
>>>>> than
>>>>> > > configured min.isr. One way to address this issue is to do the
>>>>> following
>>>>> > > steps during preferred leader election: (a) controller sends an RPC
>>>>> > request
>>>>> > > to the current leader; (b) the current leader stops taking new
>>>>> writes
>>>>> > > (sending a new error code to the clients) and returns its LEO
>>>>> (call it L)
>>>>> > > to the controller; (c) the controller issues an RPC request to the
>>>>> > > preferred replica and waits its LEO to reach L; (d) the controller
>>>>> > changes
>>>>> > > the leader to the preferred replica.
>>>>> > >
>>>>> > > Jun
>>>>> > >
>>>>> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng
>>>>> > <litao.d...@airbnb.com.invalid
>>>>> > > >
>>>>> > > wrote:
>>>>> > >
>>>>> > > > Sorry folks, just realized I didn't use the correct thread
>>>>> format for
>>>>> > the
>>>>> > > > discussion. I started this new one and copied all of the
>>>>> responses from
>>>>> > > the
>>>>> > > > old one.
>>>>> > > >
>>>>> > > > @Dong
>>>>> > > > It makes sense to just use the min.insync.replicas instead of
>>>>> > > introducing a
>>>>> > > > new config, and we must make this change together with the
>>>>> LEO-based
>>>>> > new
>>>>> > > > leader election.
>>>>> > > >
>>>>> > > > @Xi
>>>>> > > > I thought about embedding the LEO information to the
>>>>> ControllerContext,
>>>>> > > > didn't find a way. Using RPC will make the leader election period
>>>>> > longer
>>>>> > > > and this should happen in very rare cases (broker failure,
>>>>> controlled
>>>>> > > > shutdown, preferred leader election and partition reassignment).
>>>>> > > >
>>>>> > > > @Jeff
>>>>> > > > The current leader election is to pick the first replica from AR
>>>>> which
>>>>> > > > exists both in the live brokers and ISR sets. I agree with you
>>>>> about
>>>>> > > > changing the current/default behavior will cause many
>>>>> confusions, and
>>>>> > > > that's the reason the title is "Add Support ...". In this case,
>>>>> we
>>>>> > > wouldn't
>>>>> > > > break any current promises and provide a separate option for our
>>>>> user.
>>>>> > > > In terms of KIP-250, I feel it is more like the "Semisynchronous
>>>>> > > > Replication" in the MySQL world, and yes it is something between
>>>>> acks=1
>>>>> > > and
>>>>> > > > acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227
>>>>> are
>>>>> > > > two orthogonal improvements. KIP-227 is to improve the
>>>>> replication
>>>>> > > protocol
>>>>> > > > (like the introduction of parallel replication in MySQL), and
>>>>> KIP-250
>>>>> > is
>>>>> > > an
>>>>> > > > enhancement for the replication architecture (sync, semi-sync,
>>>>> and
>>>>> > > async).
>>>>> > > >
>>>>> > > >
>>>>> > > > Dong Lin
>>>>> > > >
>>>>> > > > > Thanks for the KIP. I have one quick comment before you
>>>>> provide more
>>>>> > > > detail
>>>>> > > > > on how to select the leader with the largest LEO.
>>>>> > > > > Do you think it would make sense to change the default
>>>>> behavior of
>>>>> > > > acks=-1,
>>>>> > > > > such that broker will acknowledge the message once the message
>>>>> has
>>>>> > been
>>>>> > > > > replicated to min.insync.replicas brokers? This would allow us
>>>>> to
>>>>> > keep
>>>>> > > > the
>>>>> > > > > same durability guarantee, improve produce request latency
>>>>> without
>>>>> > > > having a
>>>>> > > > > new config.
>>>>> > > >
>>>>> > > >
>>>>> > > > Hu Xi
>>>>> > > >
>>>>> > > > > Currently,  with holding the assigned replicas(AR) for all
>>>>> > partitions,
>>>>> > > > > controller is now able to elect new leaders by selecting the
>>>>> first
>>>>> > > > replica
>>>>> > > > > of AR which occurs in both live replica set and ISR. If
>>>>> switching to
>>>>> > > the
>>>>> > > > > LEO-based strategy, controller context might need to be
>>>>> enriched or
>>>>> > > > > augmented to store those values.  If retrieving those LEOs
>>>>> real-time,
>>>>> > > > > several rounds of RPCs are unavoidable which seems to violate
>>>>> the
>>>>> > > > original
>>>>> > > > > intention of this KIP.​
>>>>> > > >
>>>>> > > >
>>>>> > > > Jeff Widman
>>>>> > > >
>>>>> > > > > I agree with Dong, we should see if it's possible to change the
>>>>> > default
>>>>> > > > > behavior so that as soon as min.insync.replicas brokers
>>>>> respond than
>>>>> > > the
>>>>> > > > > broker acknowledges the message back to the client without
>>>>> waiting
>>>>> > for
>>>>> > > > > additional brokers who are in the in-sync replica list to
>>>>> respond. (I
>>>>> > > > > actually thought it already worked this way).
>>>>> > > > > As you implied in the KIP though, changing this default
>>>>> introduces a
>>>>> > > > weird
>>>>> > > > > state where an in-sync follower broker is not guaranteed to
>>>>> have a
>>>>> > > > > message...
>>>>> > > > > So at a minimum, the leadership failover algorithm would need
>>>>> to be
>>>>> > > sure
>>>>> > > > to
>>>>> > > > > pick the most up-to-date follower... I thought it already did
>>>>> this?
>>>>> > > > > But if multiple brokers fail in quick succession, then a
>>>>> broker that
>>>>> > > was
>>>>> > > > in
>>>>> > > > > the ISR could become a leader without ever receiving the
>>>>> message...
>>>>> > > > > violating the current promises of unclean.leader.election.
>>>>> > > > enable=False...
>>>>> > > > > so changing the default might be not be a tenable solution.
>>>>> > > > > What also jumped out at me in the KIP was the goal of reducing
>>>>> p999
>>>>> > > when
>>>>> > > > > setting replica lag time at 10 seconds(!!)... I understand the
>>>>> desire
>>>>> > > to
>>>>> > > > > minimize frequent ISR shrink/expansion, as I face this same
>>>>> issue at
>>>>> > my
>>>>> > > > day
>>>>> > > > > job. But what you're essentially trying to do here is create an
>>>>> > > > additional
>>>>> > > > > replication state that is in-between acks=1 and acks = ISR to
>>>>> paper
>>>>> > > over
>>>>> > > > a
>>>>> > > > > root problem of ISR shrink/expansion...
>>>>> > > > > I'm just wary of shipping more features (and more operational
>>>>> > > confusion)
>>>>> > > > if
>>>>> > > > > it's only addressing the symptom rather than the root cause.
>>>>> For
>>>>> > > example,
>>>>> > > > > my day job's problem is we run a very high number of
>>>>> low-traffic
>>>>> > > > > partitions-per-broker, so the fetch requests hit many
>>>>> partitions
>>>>> > before
>>>>> > > > > they fill. Solving that requires changing our architecture +
>>>>> making
>>>>> > the
>>>>> > > > > replication protocol more efficient (KIP-227).
>>>>> > > >
>>>>> > > >
>>>>> > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng <
>>>>> litao.d...@airbnb.com>
>>>>> > > > wrote:
>>>>> > > >
>>>>> > > > > Hey folks. I would like to add a feature to support the
>>>>> quorum-based
>>>>> > > > > acknowledgment for the producer request. We have been running a
>>>>> > > modified
>>>>> > > > > version of Kafka on our testing cluster for weeks, the
>>>>> improvement of
>>>>> > > > P999
>>>>> > > > > is significant with very stable latency. Additionally, I have a
>>>>> > > proposal
>>>>> > > > to
>>>>> > > > > achieve a similar data durability as with the
>>>>> insync.replicas-based
>>>>> > > > > acknowledgment through LEO-based leader election.
>>>>> > > > >
>>>>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge
>>>>> > > > >
>>>>> > > >
>>>>> > >
>>>>> >
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


-- 
-- Guozhang

Reply via email to