Hello Litao,

Just double checking on the leader election details, do you have time to
complete the proposal on that part?

Also Jun mentioned one caveat related to KIP-250 on the KIP-232 discussion
thread that Dong is working on, I figured it is worth pointing out here
with a tentative solution:


```
Currently, if the producer uses acks=-1, a write will only succeed if the
write is received by all in-sync replicas (i.e., committed). This is true
even when min.isr is set since we first wait for a message to be committed
and then check the min.isr requirement. KIP-250 may change that, but we can
discuss the implication there.
```

The caveat is that, if we change the acking semantics in KIP-250 that we
will only requires num of {min.isr} to acknowledge a produce, then the
above scenario will have a caveat: imagine you have {A, B, C} replicas of a
partition with A as the leader, all in the isr list, and min.isr is 2.

1. Say there is a network partition and both A and B are fenced off. C is
elected as the new leader, it shrinks its isr list to only {C}; from A's
point of view it does not know it becomes the "ghost" and no longer the
leader, all it does is shrinking the isr list to {A, B}.

2. At this time, any new writes with ack=-1 to C will not be acked, since
from C's pov there is only one replica. This is correct.

3. However, any writes that are send to A (NOTE this is totally possible,
since producers would only refresh metadata periodically, additionally if
they happen to ask A or B they will get the stale metadata that A's still
the leader), since A thinks that isr list is {A, B} and as long as B has
replicated the message, A can acked the produce.

    This is not correct behavior, since when network heals, A would realize
it is not the leader and will truncate its log. And hence as a result the
acked records are lost, violating Kafka's guarantees. And KIP-232 would not
help preventing this scenario.


Although one can argue that, with 3 replicas and min.isr set to 2, Kafka is
guaranteeing to tolerate only one failure, while the above scenario is
actually two concurrent failures (both A and B are considered wedged), this
is still a regression to the current version.

So to resolve this issue, I'd propose we can change the semantics in the
following way (this is only slightly different from your proposal):


1. Add one more value to client-side acks config:

   0: no acks needed at all.
   1: ack from the leader.
   all: ack from ALL the ISR replicas AND that current number of isr
replicas has to be no smaller than {min.isr} (i.e. not changing this
semantic).
   quorum: this is the new value, it requires ack from enough number of ISR
replicas no smaller than majority of the replicas AND no smaller than
{min.isr}.

2. Clarify in the docs that if a user wants to tolerate X failures, she
needs to set client acks=all or acks=quorum (better tail latency than
"all") with broker {min.sir} to be X+1; however, "all" is not necessarily
stronger than "quorum":

For example, with 3 replicas, and {min.isr} set to 2. Here is a list of
scenarios:

a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them.
b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
c. ISR list has 1: "all" and "quorum" would not ack.

If {min.isr} is set to 1, interestingly, here would be the list of
scenarios:

a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them.
b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
c. ISR list has 1: "all" waits for leader to return, while "quorum" would
not ack (because it requires that number > {min.isr}, AND >= {majority of
num.replicas}, so its actually stronger than "all").


WDYT?


Guozhang


On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Litao,
>
> Not sure there will be an easy way to select the broker with highest LEO
> without losing acknowledged message. In case it is useful, here is another
> idea. Maybe we can have a mechanism to turn switch between the min.isr and
> isr set for determining when to acknowledge a message. Controller can
> probably use RPC to request the current leader to use isr set before it
> sends LeaderAndIsrRequest for leadership change.
>
> Regards,
> Dong
>
>
> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng <litao.d...@airbnb.com.invalid
> >
> wrote:
>
> > Thanks Jun for the detailed feedback.
> >
> > Yes, for #1, I mean the live replicas from the ISR.
> >
> > Actually, I believe for all of the 4 new leader election strategies
> > (offline, reassign, preferred replica and controlled shutdown), we need
> to
> > make corresponding changes. Will document the details in the KIP.
> >
> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Litao,
> > >
> > > Thanks for the KIP. Good proposal. A few comments below.
> > >
> > > 1. The KIP says "select the live replica with the largest LEO".  I
> guess
> > > what you meant is selecting the live replicas in ISR with the largest
> > LEO?
> > >
> > > 2. I agree that we can probably just reuse the current min.isr
> > > configuration, but with a slightly different semantics. Currently, if
> > > min.isr is set, a user expects the record to be in at least min.isr
> > > replicas on successful ack. This KIP guarantees this too. Most people
> are
> > > probably surprised that currently the ack is only sent back after all
> > > replicas in ISR receive the record. This KIP will change the ack to
> only
> > > wait on min.isr replicas, which matches the user's expectation and
> gives
> > > better latency. Currently, we guarantee no data loss if there are fewer
> > > than replication factor failures. The KIP changes that to fewer than
> > > min.isr failures. The latter probably matches the user expectation.
> > >
> > > 3. I agree that the new leader election process is a bit more
> > complicated.
> > > The controller now needs to contact all replicas in ISR to determine
> who
> > > has the longest log. However, this happens infrequently. So, it's
> > probably
> > > worth doing for the better latency in #2.
> > >
> > > 4. We have to think through the preferred leader election process.
> > > Currently, the first assigned replica is preferred for load balancing.
> > > There is a process to automatically move the leader to the preferred
> > > replica when it's in sync. The issue is that the preferred replica may
> no
> > > be the replica with the longest log. Naively switching to the preferred
> > > replica may cause data loss when there are actually fewer failures than
> > > configured min.isr. One way to address this issue is to do the
> following
> > > steps during preferred leader election: (a) controller sends an RPC
> > request
> > > to the current leader; (b) the current leader stops taking new writes
> > > (sending a new error code to the clients) and returns its LEO (call it
> L)
> > > to the controller; (c) the controller issues an RPC request to the
> > > preferred replica and waits its LEO to reach L; (d) the controller
> > changes
> > > the leader to the preferred replica.
> > >
> > > Jun
> > >
> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng
> > <litao.d...@airbnb.com.invalid
> > > >
> > > wrote:
> > >
> > > > Sorry folks, just realized I didn't use the correct thread format for
> > the
> > > > discussion. I started this new one and copied all of the responses
> from
> > > the
> > > > old one.
> > > >
> > > > @Dong
> > > > It makes sense to just use the min.insync.replicas instead of
> > > introducing a
> > > > new config, and we must make this change together with the LEO-based
> > new
> > > > leader election.
> > > >
> > > > @Xi
> > > > I thought about embedding the LEO information to the
> ControllerContext,
> > > > didn't find a way. Using RPC will make the leader election period
> > longer
> > > > and this should happen in very rare cases (broker failure, controlled
> > > > shutdown, preferred leader election and partition reassignment).
> > > >
> > > > @Jeff
> > > > The current leader election is to pick the first replica from AR
> which
> > > > exists both in the live brokers and ISR sets. I agree with you about
> > > > changing the current/default behavior will cause many confusions, and
> > > > that's the reason the title is "Add Support ...". In this case, we
> > > wouldn't
> > > > break any current promises and provide a separate option for our
> user.
> > > > In terms of KIP-250, I feel it is more like the "Semisynchronous
> > > > Replication" in the MySQL world, and yes it is something between
> acks=1
> > > and
> > > > acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227 are
> > > > two orthogonal improvements. KIP-227 is to improve the replication
> > > protocol
> > > > (like the introduction of parallel replication in MySQL), and KIP-250
> > is
> > > an
> > > > enhancement for the replication architecture (sync, semi-sync, and
> > > async).
> > > >
> > > >
> > > > Dong Lin
> > > >
> > > > > Thanks for the KIP. I have one quick comment before you provide
> more
> > > > detail
> > > > > on how to select the leader with the largest LEO.
> > > > > Do you think it would make sense to change the default behavior of
> > > > acks=-1,
> > > > > such that broker will acknowledge the message once the message has
> > been
> > > > > replicated to min.insync.replicas brokers? This would allow us to
> > keep
> > > > the
> > > > > same durability guarantee, improve produce request latency without
> > > > having a
> > > > > new config.
> > > >
> > > >
> > > > Hu Xi
> > > >
> > > > > Currently,  with holding the assigned replicas(AR) for all
> > partitions,
> > > > > controller is now able to elect new leaders by selecting the first
> > > > replica
> > > > > of AR which occurs in both live replica set and ISR. If switching
> to
> > > the
> > > > > LEO-based strategy, controller context might need to be enriched or
> > > > > augmented to store those values.  If retrieving those LEOs
> real-time,
> > > > > several rounds of RPCs are unavoidable which seems to violate the
> > > > original
> > > > > intention of this KIP.​
> > > >
> > > >
> > > > Jeff Widman
> > > >
> > > > > I agree with Dong, we should see if it's possible to change the
> > default
> > > > > behavior so that as soon as min.insync.replicas brokers respond
> than
> > > the
> > > > > broker acknowledges the message back to the client without waiting
> > for
> > > > > additional brokers who are in the in-sync replica list to respond.
> (I
> > > > > actually thought it already worked this way).
> > > > > As you implied in the KIP though, changing this default introduces
> a
> > > > weird
> > > > > state where an in-sync follower broker is not guaranteed to have a
> > > > > message...
> > > > > So at a minimum, the leadership failover algorithm would need to be
> > > sure
> > > > to
> > > > > pick the most up-to-date follower... I thought it already did this?
> > > > > But if multiple brokers fail in quick succession, then a broker
> that
> > > was
> > > > in
> > > > > the ISR could become a leader without ever receiving the message...
> > > > > violating the current promises of unclean.leader.election.
> > > > enable=False...
> > > > > so changing the default might be not be a tenable solution.
> > > > > What also jumped out at me in the KIP was the goal of reducing p999
> > > when
> > > > > setting replica lag time at 10 seconds(!!)... I understand the
> desire
> > > to
> > > > > minimize frequent ISR shrink/expansion, as I face this same issue
> at
> > my
> > > > day
> > > > > job. But what you're essentially trying to do here is create an
> > > > additional
> > > > > replication state that is in-between acks=1 and acks = ISR to paper
> > > over
> > > > a
> > > > > root problem of ISR shrink/expansion...
> > > > > I'm just wary of shipping more features (and more operational
> > > confusion)
> > > > if
> > > > > it's only addressing the symptom rather than the root cause. For
> > > example,
> > > > > my day job's problem is we run a very high number of low-traffic
> > > > > partitions-per-broker, so the fetch requests hit many partitions
> > before
> > > > > they fill. Solving that requires changing our architecture + making
> > the
> > > > > replication protocol more efficient (KIP-227).
> > > >
> > > >
> > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng <litao.d...@airbnb.com>
> > > > wrote:
> > > >
> > > > > Hey folks. I would like to add a feature to support the
> quorum-based
> > > > > acknowledgment for the producer request. We have been running a
> > > modified
> > > > > version of Kafka on our testing cluster for weeks, the improvement
> of
> > > > P999
> > > > > is significant with very stable latency. Additionally, I have a
> > > proposal
> > > > to
> > > > > achieve a similar data durability as with the insync.replicas-based
> > > > > acknowledgment through LEO-based leader election.
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to