Hi Guozhang,

Are you proposing changing semantic of ack=all to acknowledge message only
after all replicas (not all ISRs, which is what Kafka currently is doing)
have committed the message? This is equivalent to setting min.isr=number of
replicas, which makes ack=all much stricter than what Kafka has right now.
I think this may introduce surprise to users too as producer will not
succeed in producing a message to Kafka when one of the followers is down

On Sat, 3 Feb 2018 at 15:26 Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Dong,
>
> Could you elaborate a bit more how controller could affect leaders to
> switch between all and quorum?
>
>
> Guozhang
>
>
> On Fri, Feb 2, 2018 at 10:12 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Guazhang,
> >
> > Got it. Thanks for the detailed explanation. I guess my point is that we
> > can probably achieve the best of both worlds, i.e. maintain the existing
> > behavior of ack="all" while improving the tail latency.
> >
> > Thanks,
> > Dong
> >
> >
> >
> > On Fri, Feb 2, 2018 at 8:43 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hi Dong,
> >>
> >> Yes, in terms of fault tolerance "quorum" does not do better than "all",
> >> as I said, with {min.isr} to X+1 Kafka is able to tolerate X failures
> only.
> >> So if A and B are partitioned off at the same time, then there are two
> >> concurrent failures and we do not guarantee all acked messages will be
> >> retained.
> >>
> >> The goal of my approach is to maintain the behavior of ack="all", which
> >> happen to do better than what Kafka is actually guaranteed: when both A
> and
> >> B are partitioned off, produced records will not be acked since "all"
> >> requires all replicas (not only ISRs, my previous email has an incorrect
> >> term) are required. This is doing better than tolerating X failures,
> which
> >> I was proposing to keep, so that we would not introduce any regression
> >> "surprises" to users who are already using "all". In other words,
> "quorum"
> >> is trading a bit of failure tolerance that is strictly defined on
> min.isr
> >> for better tail latency.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Fri, Feb 2, 2018 at 6:25 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >>> Hey Guozhang,
> >>>
> >>> According to the new proposal, with 3 replicas, min.isr=2 and
> >>> acks="quorum", it seems that acknowledged messages can still be
> truncated
> >>> in the network partition scenario you mentioned, right? So I guess the
> goal
> >>> is for some user to achieve better tail latency at the cost of
> potential
> >>> message loss?
> >>>
> >>> If this is the case, then I think it may be better to adopt an approach
> >>> where controller dynamically turn on/off this optimization. This
> provides
> >>> user with peace of mind (i.e. no message loss) while still reducing
> tail
> >>> latency. What do you think?
> >>>
> >>> Thanks,
> >>> Dong
> >>>
> >>>
> >>> On Fri, Feb 2, 2018 at 11:11 AM, Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hello Litao,
> >>>>
> >>>> Just double checking on the leader election details, do you have time
> >>>> to complete the proposal on that part?
> >>>>
> >>>> Also Jun mentioned one caveat related to KIP-250 on the KIP-232
> >>>> discussion thread that Dong is working on, I figured it is worth
> pointing
> >>>> out here with a tentative solution:
> >>>>
> >>>>
> >>>> ```
> >>>> Currently, if the producer uses acks=-1, a write will only succeed if
> >>>> the write is received by all in-sync replicas (i.e., committed). This
> >>>> is true even when min.isr is set since we first wait for a message to
> >>>> be committed and then check the min.isr requirement. KIP-250 may
> >>>> change that, but we can discuss the implication there.
> >>>> ```
> >>>>
> >>>> The caveat is that, if we change the acking semantics in KIP-250 that
> >>>> we will only requires num of {min.isr} to acknowledge a produce, then
> the
> >>>> above scenario will have a caveat: imagine you have {A, B, C}
> replicas of a
> >>>> partition with A as the leader, all in the isr list, and min.isr is 2.
> >>>>
> >>>> 1. Say there is a network partition and both A and B are fenced off. C
> >>>> is elected as the new leader, it shrinks its isr list to only {C};
> from A's
> >>>> point of view it does not know it becomes the "ghost" and no longer
> the
> >>>> leader, all it does is shrinking the isr list to {A, B}.
> >>>>
> >>>> 2. At this time, any new writes with ack=-1 to C will not be acked,
> >>>> since from C's pov there is only one replica. This is correct.
> >>>>
> >>>> 3. However, any writes that are send to A (NOTE this is totally
> >>>> possible, since producers would only refresh metadata periodically,
> >>>> additionally if they happen to ask A or B they will get the stale
> metadata
> >>>> that A's still the leader), since A thinks that isr list is {A, B}
> and as
> >>>> long as B has replicated the message, A can acked the produce.
> >>>>
> >>>>     This is not correct behavior, since when network heals, A would
> >>>> realize it is not the leader and will truncate its log. And hence as a
> >>>> result the acked records are lost, violating Kafka's guarantees. And
> >>>> KIP-232 would not help preventing this scenario.
> >>>>
> >>>>
> >>>> Although one can argue that, with 3 replicas and min.isr set to 2,
> >>>> Kafka is guaranteeing to tolerate only one failure, while the above
> >>>> scenario is actually two concurrent failures (both A and B are
> considered
> >>>> wedged), this is still a regression to the current version.
> >>>>
> >>>> So to resolve this issue, I'd propose we can change the semantics in
> >>>> the following way (this is only slightly different from your
> proposal):
> >>>>
> >>>>
> >>>> 1. Add one more value to client-side acks config:
> >>>>
> >>>>    0: no acks needed at all.
> >>>>    1: ack from the leader.
> >>>>    all: ack from ALL the ISR replicas AND that current number of isr
> >>>> replicas has to be no smaller than {min.isr} (i.e. not changing this
> >>>> semantic).
> >>>>    quorum: this is the new value, it requires ack from enough number
> of
> >>>> ISR replicas no smaller than majority of the replicas AND no smaller
> than
> >>>> {min.isr}.
> >>>>
> >>>> 2. Clarify in the docs that if a user wants to tolerate X failures,
> she
> >>>> needs to set client acks=all or acks=quorum (better tail latency than
> >>>> "all") with broker {min.sir} to be X+1; however, "all" is not
> necessarily
> >>>> stronger than "quorum":
> >>>>
> >>>> For example, with 3 replicas, and {min.isr} set to 2. Here is a list
> of
> >>>> scenarios:
> >>>>
> >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of
> them.
> >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
> >>>> c. ISR list has 1: "all" and "quorum" would not ack.
> >>>>
> >>>> If {min.isr} is set to 1, interestingly, here would be the list of
> >>>> scenarios:
> >>>>
> >>>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of
> them.
> >>>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
> >>>> c. ISR list has 1: "all" waits for leader to return, while "quorum"
> >>>> would not ack (because it requires that number > {min.isr}, AND >=
> >>>> {majority of num.replicas}, so its actually stronger than "all").
> >>>>
> >>>>
> >>>> WDYT?
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> >>>>
> >>>>> Hey Litao,
> >>>>>
> >>>>> Not sure there will be an easy way to select the broker with highest
> >>>>> LEO
> >>>>> without losing acknowledged message. In case it is useful, here is
> >>>>> another
> >>>>> idea. Maybe we can have a mechanism to turn switch between the
> min.isr
> >>>>> and
> >>>>> isr set for determining when to acknowledge a message. Controller can
> >>>>> probably use RPC to request the current leader to use isr set before
> it
> >>>>> sends LeaderAndIsrRequest for leadership change.
> >>>>>
> >>>>> Regards,
> >>>>> Dong
> >>>>>
> >>>>>
> >>>>> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng
> >>>>> <litao.d...@airbnb.com.invalid>
> >>>>> wrote:
> >>>>>
> >>>>> > Thanks Jun for the detailed feedback.
> >>>>> >
> >>>>> > Yes, for #1, I mean the live replicas from the ISR.
> >>>>> >
> >>>>> > Actually, I believe for all of the 4 new leader election strategies
> >>>>> > (offline, reassign, preferred replica and controlled shutdown), we
> >>>>> need to
> >>>>> > make corresponding changes. Will document the details in the KIP.
> >>>>> >
> >>>>> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <j...@confluent.io> wrote:
> >>>>> >
> >>>>> > > Hi, Litao,
> >>>>> > >
> >>>>> > > Thanks for the KIP. Good proposal. A few comments below.
> >>>>> > >
> >>>>> > > 1. The KIP says "select the live replica with the largest LEO".
> I
> >>>>> guess
> >>>>> > > what you meant is selecting the live replicas in ISR with the
> >>>>> largest
> >>>>> > LEO?
> >>>>> > >
> >>>>> > > 2. I agree that we can probably just reuse the current min.isr
> >>>>> > > configuration, but with a slightly different semantics.
> Currently,
> >>>>> if
> >>>>> > > min.isr is set, a user expects the record to be in at least
> min.isr
> >>>>> > > replicas on successful ack. This KIP guarantees this too. Most
> >>>>> people are
> >>>>> > > probably surprised that currently the ack is only sent back after
> >>>>> all
> >>>>> > > replicas in ISR receive the record. This KIP will change the ack
> >>>>> to only
> >>>>> > > wait on min.isr replicas, which matches the user's expectation
> and
> >>>>> gives
> >>>>> > > better latency. Currently, we guarantee no data loss if there are
> >>>>> fewer
> >>>>> > > than replication factor failures. The KIP changes that to fewer
> >>>>> than
> >>>>> > > min.isr failures. The latter probably matches the user
> expectation.
> >>>>> > >
> >>>>> > > 3. I agree that the new leader election process is a bit more
> >>>>> > complicated.
> >>>>> > > The controller now needs to contact all replicas in ISR to
> >>>>> determine who
> >>>>> > > has the longest log. However, this happens infrequently. So, it's
> >>>>> > probably
> >>>>> > > worth doing for the better latency in #2.
> >>>>> > >
> >>>>> > > 4. We have to think through the preferred leader election
> process.
> >>>>> > > Currently, the first assigned replica is preferred for load
> >>>>> balancing.
> >>>>> > > There is a process to automatically move the leader to the
> >>>>> preferred
> >>>>> > > replica when it's in sync. The issue is that the preferred
> replica
> >>>>> may no
> >>>>> > > be the replica with the longest log. Naively switching to the
> >>>>> preferred
> >>>>> > > replica may cause data loss when there are actually fewer
> failures
> >>>>> than
> >>>>> > > configured min.isr. One way to address this issue is to do the
> >>>>> following
> >>>>> > > steps during preferred leader election: (a) controller sends an
> RPC
> >>>>> > request
> >>>>> > > to the current leader; (b) the current leader stops taking new
> >>>>> writes
> >>>>> > > (sending a new error code to the clients) and returns its LEO
> >>>>> (call it L)
> >>>>> > > to the controller; (c) the controller issues an RPC request to
> the
> >>>>> > > preferred replica and waits its LEO to reach L; (d) the
> controller
> >>>>> > changes
> >>>>> > > the leader to the preferred replica.
> >>>>> > >
> >>>>> > > Jun
> >>>>> > >
> >>>>> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng
> >>>>> > <litao.d...@airbnb.com.invalid
> >>>>> > > >
> >>>>> > > wrote:
> >>>>> > >
> >>>>> > > > Sorry folks, just realized I didn't use the correct thread
> >>>>> format for
> >>>>> > the
> >>>>> > > > discussion. I started this new one and copied all of the
> >>>>> responses from
> >>>>> > > the
> >>>>> > > > old one.
> >>>>> > > >
> >>>>> > > > @Dong
> >>>>> > > > It makes sense to just use the min.insync.replicas instead of
> >>>>> > > introducing a
> >>>>> > > > new config, and we must make this change together with the
> >>>>> LEO-based
> >>>>> > new
> >>>>> > > > leader election.
> >>>>> > > >
> >>>>> > > > @Xi
> >>>>> > > > I thought about embedding the LEO information to the
> >>>>> ControllerContext,
> >>>>> > > > didn't find a way. Using RPC will make the leader election
> period
> >>>>> > longer
> >>>>> > > > and this should happen in very rare cases (broker failure,
> >>>>> controlled
> >>>>> > > > shutdown, preferred leader election and partition
> reassignment).
> >>>>> > > >
> >>>>> > > > @Jeff
> >>>>> > > > The current leader election is to pick the first replica from
> AR
> >>>>> which
> >>>>> > > > exists both in the live brokers and ISR sets. I agree with you
> >>>>> about
> >>>>> > > > changing the current/default behavior will cause many
> >>>>> confusions, and
> >>>>> > > > that's the reason the title is "Add Support ...". In this case,
> >>>>> we
> >>>>> > > wouldn't
> >>>>> > > > break any current promises and provide a separate option for
> our
> >>>>> user.
> >>>>> > > > In terms of KIP-250, I feel it is more like the
> "Semisynchronous
> >>>>> > > > Replication" in the MySQL world, and yes it is something
> between
> >>>>> acks=1
> >>>>> > > and
> >>>>> > > > acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227
> >>>>> are
> >>>>> > > > two orthogonal improvements. KIP-227 is to improve the
> >>>>> replication
> >>>>> > > protocol
> >>>>> > > > (like the introduction of parallel replication in MySQL), and
> >>>>> KIP-250
> >>>>> > is
> >>>>> > > an
> >>>>> > > > enhancement for the replication architecture (sync, semi-sync,
> >>>>> and
> >>>>> > > async).
> >>>>> > > >
> >>>>> > > >
> >>>>> > > > Dong Lin
> >>>>> > > >
> >>>>> > > > > Thanks for the KIP. I have one quick comment before you
> >>>>> provide more
> >>>>> > > > detail
> >>>>> > > > > on how to select the leader with the largest LEO.
> >>>>> > > > > Do you think it would make sense to change the default
> >>>>> behavior of
> >>>>> > > > acks=-1,
> >>>>> > > > > such that broker will acknowledge the message once the
> message
> >>>>> has
> >>>>> > been
> >>>>> > > > > replicated to min.insync.replicas brokers? This would allow
> us
> >>>>> to
> >>>>> > keep
> >>>>> > > > the
> >>>>> > > > > same durability guarantee, improve produce request latency
> >>>>> without
> >>>>> > > > having a
> >>>>> > > > > new config.
> >>>>> > > >
> >>>>> > > >
> >>>>> > > > Hu Xi
> >>>>> > > >
> >>>>> > > > > Currently,  with holding the assigned replicas(AR) for all
> >>>>> > partitions,
> >>>>> > > > > controller is now able to elect new leaders by selecting the
> >>>>> first
> >>>>> > > > replica
> >>>>> > > > > of AR which occurs in both live replica set and ISR. If
> >>>>> switching to
> >>>>> > > the
> >>>>> > > > > LEO-based strategy, controller context might need to be
> >>>>> enriched or
> >>>>> > > > > augmented to store those values.  If retrieving those LEOs
> >>>>> real-time,
> >>>>> > > > > several rounds of RPCs are unavoidable which seems to violate
> >>>>> the
> >>>>> > > > original
> >>>>> > > > > intention of this KIP.​
> >>>>> > > >
> >>>>> > > >
> >>>>> > > > Jeff Widman
> >>>>> > > >
> >>>>> > > > > I agree with Dong, we should see if it's possible to change
> the
> >>>>> > default
> >>>>> > > > > behavior so that as soon as min.insync.replicas brokers
> >>>>> respond than
> >>>>> > > the
> >>>>> > > > > broker acknowledges the message back to the client without
> >>>>> waiting
> >>>>> > for
> >>>>> > > > > additional brokers who are in the in-sync replica list to
> >>>>> respond. (I
> >>>>> > > > > actually thought it already worked this way).
> >>>>> > > > > As you implied in the KIP though, changing this default
> >>>>> introduces a
> >>>>> > > > weird
> >>>>> > > > > state where an in-sync follower broker is not guaranteed to
> >>>>> have a
> >>>>> > > > > message...
> >>>>> > > > > So at a minimum, the leadership failover algorithm would need
> >>>>> to be
> >>>>> > > sure
> >>>>> > > > to
> >>>>> > > > > pick the most up-to-date follower... I thought it already did
> >>>>> this?
> >>>>> > > > > But if multiple brokers fail in quick succession, then a
> >>>>> broker that
> >>>>> > > was
> >>>>> > > > in
> >>>>> > > > > the ISR could become a leader without ever receiving the
> >>>>> message...
> >>>>> > > > > violating the current promises of unclean.leader.election.
> >>>>> > > > enable=False...
> >>>>> > > > > so changing the default might be not be a tenable solution.
> >>>>> > > > > What also jumped out at me in the KIP was the goal of
> reducing
> >>>>> p999
> >>>>> > > when
> >>>>> > > > > setting replica lag time at 10 seconds(!!)... I understand
> the
> >>>>> desire
> >>>>> > > to
> >>>>> > > > > minimize frequent ISR shrink/expansion, as I face this same
> >>>>> issue at
> >>>>> > my
> >>>>> > > > day
> >>>>> > > > > job. But what you're essentially trying to do here is create
> an
> >>>>> > > > additional
> >>>>> > > > > replication state that is in-between acks=1 and acks = ISR to
> >>>>> paper
> >>>>> > > over
> >>>>> > > > a
> >>>>> > > > > root problem of ISR shrink/expansion...
> >>>>> > > > > I'm just wary of shipping more features (and more operational
> >>>>> > > confusion)
> >>>>> > > > if
> >>>>> > > > > it's only addressing the symptom rather than the root cause.
> >>>>> For
> >>>>> > > example,
> >>>>> > > > > my day job's problem is we run a very high number of
> >>>>> low-traffic
> >>>>> > > > > partitions-per-broker, so the fetch requests hit many
> >>>>> partitions
> >>>>> > before
> >>>>> > > > > they fill. Solving that requires changing our architecture +
> >>>>> making
> >>>>> > the
> >>>>> > > > > replication protocol more efficient (KIP-227).
> >>>>> > > >
> >>>>> > > >
> >>>>> > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng <
> >>>>> litao.d...@airbnb.com>
> >>>>> > > > wrote:
> >>>>> > > >
> >>>>> > > > > Hey folks. I would like to add a feature to support the
> >>>>> quorum-based
> >>>>> > > > > acknowledgment for the producer request. We have been
> running a
> >>>>> > > modified
> >>>>> > > > > version of Kafka on our testing cluster for weeks, the
> >>>>> improvement of
> >>>>> > > > P999
> >>>>> > > > > is significant with very stable latency. Additionally, I
> have a
> >>>>> > > proposal
> >>>>> > > > to
> >>>>> > > > > achieve a similar data durability as with the
> >>>>> insync.replicas-based
> >>>>> > > > > acknowledgment through LEO-based leader election.
> >>>>> > > > >
> >>>>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge
> >>>>> > > > >
> >>>>> > > >
> >>>>> > >
> >>>>> >
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>
>
> --
> -- Guozhang
>

Reply via email to