Hi Sophie,

This sounds fantastic. I've made a note on KAFKA-12487 about being sure to
implement Consumer::onPartitionsLost to avoid unnecessary task failures on
consumer protocol downgrade, but besides that, I don't think things could
get any smoother for Connect users or developers. The automatic protocol
upgrade/downgrade behavior appears safe, intuitive, and pain-free.

Really excited for this development and hoping we can see it come to
fruition in time for the 3.0 release!

Cheers,

Chris

On Fri, Apr 9, 2021 at 2:43 PM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

> 1) Yes, all of the above will be part of KAFKA-12477 (not KIP-726)
>
> 2) No, KAFKA-12638 would be nice to have but I don't think it's appropriate
> to remove
> the default implementation of #onPartitionsLost in 3.0 since we never gave
> any indication
> yet that we intend to remove it
>
> 3) Yes, this would be similar to when a Consumer drops out of the group.
> It's always been
> possible for a member to miss a rebalance and have its partition be
> reassigned to another
> member, during which time both members would claim to own said partition.
> But this is safe
> because the member who dropped out is blocked from committing offsets on
> that partition.
>
> On Fri, Apr 9, 2021 at 2:46 AM Luke Chen <show...@gmail.com> wrote:
>
> > Hi Sophie,
> > That sounds great to take care of each case I can think of.
> > Questions:
> > 1. Do you mean the short-Circuit will also be implemented in KAFKA-12477?
> > 2. I don't think KAFKA-12638 is the blocker of this KIP-726, Am I right?
> > 3. So, does that mean we still have possibility to have multiple consumer
> > owned the same topic partition? And in this situation, we avoid them
> doing
> > committing, and waiting for next rebalance (should be soon). Is my
> > understanding correct?
> >
> > Thank you very much for finding this great solution.
> >
> > Luke
> >
> > On Fri, Apr 9, 2021 at 11:37 AM Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> > > Alright, here's the detailed proposal for KAFKA-12477. This assumes we
> > will
> > > change the default assignor to ["cooperative-sticky", "range"] in
> > KIP-726.
> > > It also acknowledges that users may attempt any kind of upgrade without
> > > reading the docs, and so we need to put in safeguards against data
> > > corruption rather than assume everyone will follow the safe upgrade
> path.
> > >
> > > With this proposal,
> > > 1) New applications on 3.0 will enable cooperative rebalancing by
> default
> > > 2) Existing applications which don’t set an assignor can safely upgrade
> > to
> > > 3.0 using a single rolling bounce with no extra steps, and will
> > > automatically transition to cooperative rebalancing
> > > 3) Existing applications which do set an assignor that uses EAGER can
> > > likewise upgrade their applications to COOPERATIVE with a single
> rolling
> > > bounce
> > > 4) Once on 3.0, applications can safely go back and forth between EAGER
> > and
> > > COOPERATIVE
> > > 5) Applications can safely downgrade away from 3.0
> > >
> > > The high-level idea for dynamic protocol upgrades is that the group
> will
> > > leverage the assignor selected by the group coordinator to determine
> when
> > > it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect
> > the
> > > group in case of rare events or user misconfiguration. The group
> > > coordinator selects the most preferred assignor that’s supported by all
> > > members of the group, so we know that all members will support
> > COOPERATIVE
> > > once we receive the “cooperative-sticky” assignor after a rebalance. At
> > > this point, each member can upgrade their own protocol to COOPERATIVE.
> > > However, there may be situations in which an EAGER member may join the
> > > group even after upgrading to COOPERATIVE. For example, during a
> rolling
> > > upgrade if the last remaining member on the old bytecode misses a
> > > rebalance, the other members will be allowed to upgrade to COOPERATIVE.
> > If
> > > the old member rejoins and is chosen to be the group leader before it’s
> > > upgraded to 3.0, it won’t be aware that the other members of the group
> > have
> > > not yet revoked their partitions when computing the assignment.
> > >
> > > Short Circuit:
> > > The risk of mixing the cooperative and eager rebalancing protocols is
> > that
> > > a partition may be assigned to one member while it has yet to be
> revoked
> > > from its previous owner. The danger is that the new owner may begin
> > > processing and committing offsets for this partition while the previous
> > > owner is also committing offsets in its #onPartitionsRevoked callback,
> > > which is invoked at the end of the rebalance in the cooperative
> protocol.
> > > This can result in these consumers overwriting each other’s offsets and
> > > getting a corrupted view of the partition. Note that it’s not possible
> to
> > > commit during a rebalance, so we can protect against offset corruption
> by
> > > blocking further commits after we detect that the group leader may not
> > > understand COOPERATIVE, but before we invoke #onPartitionsRevoked. This
> > is
> > > the “short-circuit” — if we detect that the group is in an unsafe
> state,
> > we
> > > invoke #onPartitionsLost instead of #onPartitionsRevoked and explicitly
> > > prevent offsets from being committed on those revoked partitions.
> > >
> > > Consumer procedure:
> > > Upon startup, the consumer will initially select the highest
> > > commonly-supported protocol across its configured assignors. With
> > > ["cooperative-sticky", "range”], the initial protocol will be EAGER
> when
> > > the member first joins the group. Following a rebalance, each member
> will
> > > check the selected assignor. If the chosen assignor supports
> COOPERATIVE,
> > > the member can upgrade their used protocol to COOPERATIVE and no
> further
> > > action is required. If the member is already on COOPERATIVE but the
> > > selected assignor does NOT support it, then we need to trigger the
> > > short-circuit. In this case we will invoke #onPartitionsLost instead of
> > > #onPartitionsRevoked, and set a flag to block any attempts at
> committing
> > > those partitions which have been revoked. If a commit is attempted, as
> > may
> > > be the case if the user does not implement #onPartitionsLost (see
> > > KAFKA-12638), we will throw a CommitFailedException which will be
> bubbled
> > > up through poll() after completing the rebalance. The member will then
> > > downgrade its protocol to EAGER for the next rebalance.
> > >
> > > Let me know what you think,
> > > Sophie
> > >
> > > On Fri, Apr 2, 2021 at 7:08 PM Luke Chen <show...@gmail.com> wrote:
> > >
> > > > Hi Sophie,
> > > > Making the default to "cooperative-sticky, range" is a smart idea, to
> > > > ensure we can at least fall back to rangeAssignor if consumers are
> not
> > > > following our recommended upgrade path. I updated the KIP
> accordingly.
> > > >
> > > > Hi Chris,
> > > > No problem, I updated the KIP to include the change in Connect.
> > > >
> > > > Thank you very much.
> > > >
> > > > Luke
> > > >
> > > > On Thu, Apr 1, 2021 at 3:24 AM Chris Egerton
> > <chr...@confluent.io.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > @Sophie - I like the sound of the dual-protocol default. The smooth
> > > > upgrade
> > > > > path it permits sounds fantastic!
> > > > >
> > > > > @Luke - Do you think we can also include Connect in this KIP? Right
> > now
> > > > we
> > > > > don't set any custom partition assignment strategies for the
> consumer
> > > > > groups we bring up for sink tasks, and if we continue to just use
> the
> > > > > default, the assignment strategy for those consumer groups would
> > change
> > > > on
> > > > > Connect clusters once people upgrade to 3.0. I think this is fine
> > > > (assuming
> > > > > we can take care of
> > https://issues.apache.org/jira/browse/KAFKA-12487
> > > > > before then, which I'm fairly optimistic about), but it might be
> > worth
> > > a
> > > > > sentence or two in the KIP explaining that the change in default
> will
> > > > > intentionally propagate to Connect. And, if we think Connect should
> > be
> > > > left
> > > > > out of this change and stay on the range assignor instead, we
> should
> > > > > probably call that fact out in the KIP as well and state that
> Connect
> > > > will
> > > > > now override the default partition assignment strategy to be the
> > range
> > > > > assignor (assuming the user hasn't specified a value for
> > > > > consumer.partition.assignment.strategy in their worker config or
> for
> > > > > consumer.override.partition.assignment.strategy in their connector
> > > > config).
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Wed, Mar 31, 2021 at 12:18 AM Sophie Blee-Goldman
> > > > > <sop...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Ok I'm still fleshing out all the details of KAFKA-12477 but I
> > think
> > > we
> > > > > can
> > > > > > simplify some things a bit, and avoid
> > > > > > any kind of "fail-fast" which will require user intervention. In
> > > fact I
> > > > > > think we can avoid requiring the user to make
> > > > > > any changes at all for KIP-726, so we don't have to worry about
> > > whether
> > > > > > they actually read our documentation:
> > > > > >
> > > > > > Instead of making ["cooperative-sticky"] the default, we change
> the
> > > > > default
> > > > > > to ["cooperative-sticky", "range"].
> > > > > > Since "range" is the old default, this is equivalent to the first
> > > > rolling
> > > > > > bounce of the safe upgrade path in KIP-429.
> > > > > >
> > > > > > Of course this also means that under the current protocol
> selection
> > > > > > mechanism we won't actually upgrade to
> > > > > > cooperative rebalancing with the default assignor. But that's
> where
> > > > > > KAFKA-12477 will come in.
> > > > > >
> > > > > > @Guozhang Wang <guozh...@confluent.io>  I'll get back to you
> with
> > a
> > > > > > concrete proposal and answer your questions, I just want to point
> > out
> > > > > > that it's possible to side-step the risk of users shooting
> > themselves
> > > > in
> > > > > > the foot (well, at least in this one specific case,
> > > > > > obviously they always find a way)
> > > > > >
> > > > > > On Tue, Mar 30, 2021 at 10:37 AM Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Sophie,
> > > > > > >
> > > > > > > My question is more related to KAFKA-12477, but since your
> latest
> > > > > replies
> > > > > > > are on this thread I figured we can follow-up on the same
> venue.
> > > Just
> > > > > so
> > > > > > I
> > > > > > > understand your latest comments above about the approach:
> > > > > > >
> > > > > > > * I think, we would need to persist this decision so that the
> > group
> > > > > would
> > > > > > > never go back to the eager protocol, this bit would be written
> to
> > > the
> > > > > > > internal topic's assignment message. Is that correct?
> > > > > > > * Maybe you can describe the steps, after the group has decided
> > to
> > > > move
> > > > > > > forward with cooperative protocols, when:
> > > > > > > 1) a new member joined the group with the old version, and
> hence
> > > only
> > > > > > > recognized eager protocol and executing the eager protocol with
> > its
> > > > > first
> > > > > > > rebalance, what would happen.
> > > > > > > 2) in addition to 1), the new member joined the group with the
> > old
> > > > > > version
> > > > > > > and only recognized the old subscription format, and was
> selected
> > > as
> > > > > the
> > > > > > > leader, what would happen.
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 29, 2021 at 10:30 PM Luke Chen <show...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Sophie & Ismael,
> > > > > > > > Thank you for your feedback.
> > > > > > > > No problem, let's pause this KIP and wait for this
> improvement:
> > > > > > > KAFKA-12477
> > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>.
> > > > > > > >
> > > > > > > > Stay tuned :)
> > > > > > > >
> > > > > > > > Thank you.
> > > > > > > > Luke
> > > > > > > >
> > > > > > > > On Tue, Mar 30, 2021 at 3:14 AM Ismael Juma <
> ism...@juma.me.uk
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Sophie,
> > > > > > > > >
> > > > > > > > > I didn't analyze the KIP in detail, but the two suggestions
> > you
> > > > > > > mentioned
> > > > > > > > > sound like great improvements.
> > > > > > > > >
> > > > > > > > > A bit more context: breaking changes for a widely used
> > product
> > > > like
> > > > > > > Kafka
> > > > > > > > > are costly and hence why we try as hard as we can to avoid
> > > them.
> > > > > When
> > > > > > > it
> > > > > > > > > comes to the brokers, they are often managed by a central
> > group
> > > > (or
> > > > > > > > they're
> > > > > > > > > in the Cloud), so they're a bit easier to manage. Even so,
> > it's
> > > > > still
> > > > > > > > > possible to upgrade from 0.8.x directly to 2.7 since all
> > > protocol
> > > > > > > > versions
> > > > > > > > > are still supported. When it comes to the basic clients
> > > > (producer,
> > > > > > > > > consumer, admin client), they're often embedded in
> > applications
> > > > so
> > > > > we
> > > > > > > > have
> > > > > > > > > to be even more conservative.
> > > > > > > > >
> > > > > > > > > Ismael
> > > > > > > > >
> > > > > > > > > On Mon, Mar 29, 2021 at 10:50 AM Sophie Blee-Goldman
> > > > > > > > > <sop...@confluent.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > > Ismael,
> > > > > > > > > >
> > > > > > > > > > It seems like given 3.0 is a breaking release, we have to
> > > rely
> > > > on
> > > > > > > users
> > > > > > > > > > being aware of this and responsible
> > > > > > > > > > enough to read the upgrade guide. Otherwise we could
> never
> > > ever
> > > > > > make
> > > > > > > > any
> > > > > > > > > > breaking changes beyond just
> > > > > > > > > > removing deprecated APIs or other compilation-breaking
> > errors
> > > > > that
> > > > > > > > would
> > > > > > > > > be
> > > > > > > > > > immediately visible, no?
> > > > > > > > > >
> > > > > > > > > > That said, obviously it's better to have a
> circuit-breaker
> > > that
> > > > > > will
> > > > > > > > fail
> > > > > > > > > > fast in case of a user misconfiguration
> > > > > > > > > > rather than silently corrupting the consumer group state
> --
> > > eg
> > > > > for
> > > > > > > two
> > > > > > > > > > consumers to overlap in their ownership
> > > > > > > > > > of the same partition(s). We could definitely implement
> > this,
> > > > and
> > > > > > now
> > > > > > > > > that
> > > > > > > > > > I think about it this might solve a
> > > > > > > > > > related problem in KAFKA-12477
> > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. We
> > just
> > > > > add a
> > > > > > > new
> > > > > > > > > > field to the Assignment in which the group leader
> > > > > > > > > > indicates whether it's on a recent enough version to
> > > understand
> > > > > > > > > cooperative
> > > > > > > > > > rebalancing. If an upgraded member
> > > > > > > > > > joins the group, it'll only be allowed to start following
> > the
> > > > new
> > > > > > > > > > rebalancing protocol after receiving the go-ahead
> > > > > > > > > > from the group leader.
> > > > > > > > > >
> > > > > > > > > > If we do go ahead and add this new field in the
> Assignment
> > > then
> > > > > I'm
> > > > > > > > > pretty
> > > > > > > > > > confident we can reduce the number
> > > > > > > > > > of required rolling bounces to just one with KAFKA-12477
> > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. In
> > that
> > > > > case
> > > > > > we
> > > > > > > > > > should
> > > > > > > > > > be in much better shape to
> > > > > > > > > > feel good about changing the default to the
> > > > > > > CooperativeStickyAssignor.
> > > > > > > > > How
> > > > > > > > > > does that sound?
> > > > > > > > > >
> > > > > > > > > > To be clear, I'm not proposing we do this as part of
> > KIP-726.
> > > > > > Here's
> > > > > > > my
> > > > > > > > > > take:
> > > > > > > > > >
> > > > > > > > > > Let's pause this KIP while I work on making these two
> > > > > improvements
> > > > > > in
> > > > > > > > > > KAFKA-12477 <
> > > https://issues.apache.org/jira/browse/KAFKA-12477
> > > > >.
> > > > > > > Once
> > > > > > > > I
> > > > > > > > > > can
> > > > > > > > > > confirm the
> > > > > > > > > > short-circuit and single rolling bounce will be available
> > for
> > > > > 3.0,
> > > > > > > I'll
> > > > > > > > > > report back on this thread. Then we can move
> > > > > > > > > > forward with this KIP again.
> > > > > > > > > >
> > > > > > > > > > Thoughts?
> > > > > > > > > > Sophie
> > > > > > > > > >
> > > > > > > > > > On Mon, Mar 29, 2021 at 12:01 AM Luke Chen <
> > > show...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Ismael,
> > > > > > > > > > > Thanks for your good question. Answer them below:
> > > > > > > > > > > *1. Are we saying that every consumer upgraded would
> have
> > > to
> > > > > > follow
> > > > > > > > the
> > > > > > > > > > > complex path described in the KIP? *
> > > > > > > > > > > --> We suggest that every consumer did these 2 steps of
> > > > rolling
> > > > > > > > > upgrade.
> > > > > > > > > > > And after KAFKA-12477 <
> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-12477
> > > > > > > > > > >
> > > > > > > > > > > is completed, it can be reduced to 1 rolling upgrade.
> > > > > > > > > > >
> > > > > > > > > > > *2. what happens if they don't read the instructions
> and
> > > > > upgrade
> > > > > > as
> > > > > > > > > they
> > > > > > > > > > > have in the past?*
> > > > > > > > > > > --> The reason we want 2 steps of rolling upgrade is
> that
> > > we
> > > > > want
> > > > > > > to
> > > > > > > > > > avoid
> > > > > > > > > > > the situation where leader is on old byte-code and only
> > > > > recognize
> > > > > > > > > > "eager",
> > > > > > > > > > > but due to compatibility would still be able to
> > deserialize
> > > > the
> > > > > > new
> > > > > > > > > > > protocol data from newer versioned members, and hence
> > just
> > > go
> > > > > > ahead
> > > > > > > > and
> > > > > > > > > > do
> > > > > > > > > > > the assignment while new versioned members did not
> revoke
> > > > their
> > > > > > > > > > partitions
> > > > > > > > > > > before joining the group.
> > > > > > > > > > >
> > > > > > > > > > > But I'd say, the new default assignor
> > > > > "CooperativeStickyAssignor"
> > > > > > > was
> > > > > > > > > > > already introduced in V2.4.0, and it should be long
> > enough
> > > > for
> > > > > > user
> > > > > > > > to
> > > > > > > > > > > upgrade to the new byte-code to recognize the
> > "cooperative"
> > > > > > > protocol.
> > > > > > > > > > >
> > > > > > > > > > > What do you think?
> > > > > > > > > > >
> > > > > > > > > > > Thank you.
> > > > > > > > > > > Luke
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Mar 29, 2021 at 12:14 PM Ismael Juma <
> > > > > ism...@juma.me.uk>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks for the KIP. Are we saying that every consumer
> > > > > upgraded
> > > > > > > > would
> > > > > > > > > > have
> > > > > > > > > > > > to follow the complex path described in the KIP?
> Also,
> > > what
> > > > > > > happens
> > > > > > > > > if
> > > > > > > > > > > they
> > > > > > > > > > > > don't read the instructions and upgrade as they have
> in
> > > the
> > > > > > past?
> > > > > > > > > > > >
> > > > > > > > > > > > Ismael
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Mar 26, 2021, 1:53 AM Luke Chen <
> > > show...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > > <Update the subject>
> > > > > > > > > > > > >
> > > > > > > > > > > > > I'd like to discuss the following proposal to make
> > the
> > > > > > > > > > > > > CooperativeStickyAssignor as the default assignor.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-726%3A+Make+the+CooperativeStickyAssignor+as+the+default+assignor
> > > > > > > > > > > > >
> > > > > > > > > > > > > Any comments are welcomed.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > Luke
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to