Whoops, small correction--meant to say
ConsumerRebalanceListener::onPartitionsLost, not Consumer::onPartitionsLost

On Mon, Apr 12, 2021 at 8:17 AM Chris Egerton <chr...@confluent.io> wrote:

> Hi Sophie,
>
> This sounds fantastic. I've made a note on KAFKA-12487 about being sure to
> implement Consumer::onPartitionsLost to avoid unnecessary task failures on
> consumer protocol downgrade, but besides that, I don't think things could
> get any smoother for Connect users or developers. The automatic protocol
> upgrade/downgrade behavior appears safe, intuitive, and pain-free.
>
> Really excited for this development and hoping we can see it come to
> fruition in time for the 3.0 release!
>
> Cheers,
>
> Chris
>
> On Fri, Apr 9, 2021 at 2:43 PM Sophie Blee-Goldman
> <sop...@confluent.io.invalid> wrote:
>
>> 1) Yes, all of the above will be part of KAFKA-12477 (not KIP-726)
>>
>> 2) No, KAFKA-12638 would be nice to have but I don't think it's
>> appropriate
>> to remove
>> the default implementation of #onPartitionsLost in 3.0 since we never gave
>> any indication
>> yet that we intend to remove it
>>
>> 3) Yes, this would be similar to when a Consumer drops out of the group.
>> It's always been
>> possible for a member to miss a rebalance and have its partition be
>> reassigned to another
>> member, during which time both members would claim to own said partition.
>> But this is safe
>> because the member who dropped out is blocked from committing offsets on
>> that partition.
>>
>> On Fri, Apr 9, 2021 at 2:46 AM Luke Chen <show...@gmail.com> wrote:
>>
>> > Hi Sophie,
>> > That sounds great to take care of each case I can think of.
>> > Questions:
>> > 1. Do you mean the short-Circuit will also be implemented in
>> KAFKA-12477?
>> > 2. I don't think KAFKA-12638 is the blocker of this KIP-726, Am I right?
>> > 3. So, does that mean we still have possibility to have multiple
>> consumer
>> > owned the same topic partition? And in this situation, we avoid them
>> doing
>> > committing, and waiting for next rebalance (should be soon). Is my
>> > understanding correct?
>> >
>> > Thank you very much for finding this great solution.
>> >
>> > Luke
>> >
>> > On Fri, Apr 9, 2021 at 11:37 AM Sophie Blee-Goldman
>> > <sop...@confluent.io.invalid> wrote:
>> >
>> > > Alright, here's the detailed proposal for KAFKA-12477. This assumes we
>> > will
>> > > change the default assignor to ["cooperative-sticky", "range"] in
>> > KIP-726.
>> > > It also acknowledges that users may attempt any kind of upgrade
>> without
>> > > reading the docs, and so we need to put in safeguards against data
>> > > corruption rather than assume everyone will follow the safe upgrade
>> path.
>> > >
>> > > With this proposal,
>> > > 1) New applications on 3.0 will enable cooperative rebalancing by
>> default
>> > > 2) Existing applications which don’t set an assignor can safely
>> upgrade
>> > to
>> > > 3.0 using a single rolling bounce with no extra steps, and will
>> > > automatically transition to cooperative rebalancing
>> > > 3) Existing applications which do set an assignor that uses EAGER can
>> > > likewise upgrade their applications to COOPERATIVE with a single
>> rolling
>> > > bounce
>> > > 4) Once on 3.0, applications can safely go back and forth between
>> EAGER
>> > and
>> > > COOPERATIVE
>> > > 5) Applications can safely downgrade away from 3.0
>> > >
>> > > The high-level idea for dynamic protocol upgrades is that the group
>> will
>> > > leverage the assignor selected by the group coordinator to determine
>> when
>> > > it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to
>> protect
>> > the
>> > > group in case of rare events or user misconfiguration. The group
>> > > coordinator selects the most preferred assignor that’s supported by
>> all
>> > > members of the group, so we know that all members will support
>> > COOPERATIVE
>> > > once we receive the “cooperative-sticky” assignor after a rebalance.
>> At
>> > > this point, each member can upgrade their own protocol to COOPERATIVE.
>> > > However, there may be situations in which an EAGER member may join the
>> > > group even after upgrading to COOPERATIVE. For example, during a
>> rolling
>> > > upgrade if the last remaining member on the old bytecode misses a
>> > > rebalance, the other members will be allowed to upgrade to
>> COOPERATIVE.
>> > If
>> > > the old member rejoins and is chosen to be the group leader before
>> it’s
>> > > upgraded to 3.0, it won’t be aware that the other members of the group
>> > have
>> > > not yet revoked their partitions when computing the assignment.
>> > >
>> > > Short Circuit:
>> > > The risk of mixing the cooperative and eager rebalancing protocols is
>> > that
>> > > a partition may be assigned to one member while it has yet to be
>> revoked
>> > > from its previous owner. The danger is that the new owner may begin
>> > > processing and committing offsets for this partition while the
>> previous
>> > > owner is also committing offsets in its #onPartitionsRevoked callback,
>> > > which is invoked at the end of the rebalance in the cooperative
>> protocol.
>> > > This can result in these consumers overwriting each other’s offsets
>> and
>> > > getting a corrupted view of the partition. Note that it’s not
>> possible to
>> > > commit during a rebalance, so we can protect against offset
>> corruption by
>> > > blocking further commits after we detect that the group leader may not
>> > > understand COOPERATIVE, but before we invoke #onPartitionsRevoked.
>> This
>> > is
>> > > the “short-circuit” — if we detect that the group is in an unsafe
>> state,
>> > we
>> > > invoke #onPartitionsLost instead of #onPartitionsRevoked and
>> explicitly
>> > > prevent offsets from being committed on those revoked partitions.
>> > >
>> > > Consumer procedure:
>> > > Upon startup, the consumer will initially select the highest
>> > > commonly-supported protocol across its configured assignors. With
>> > > ["cooperative-sticky", "range”], the initial protocol will be EAGER
>> when
>> > > the member first joins the group. Following a rebalance, each member
>> will
>> > > check the selected assignor. If the chosen assignor supports
>> COOPERATIVE,
>> > > the member can upgrade their used protocol to COOPERATIVE and no
>> further
>> > > action is required. If the member is already on COOPERATIVE but the
>> > > selected assignor does NOT support it, then we need to trigger the
>> > > short-circuit. In this case we will invoke #onPartitionsLost instead
>> of
>> > > #onPartitionsRevoked, and set a flag to block any attempts at
>> committing
>> > > those partitions which have been revoked. If a commit is attempted, as
>> > may
>> > > be the case if the user does not implement #onPartitionsLost (see
>> > > KAFKA-12638), we will throw a CommitFailedException which will be
>> bubbled
>> > > up through poll() after completing the rebalance. The member will then
>> > > downgrade its protocol to EAGER for the next rebalance.
>> > >
>> > > Let me know what you think,
>> > > Sophie
>> > >
>> > > On Fri, Apr 2, 2021 at 7:08 PM Luke Chen <show...@gmail.com> wrote:
>> > >
>> > > > Hi Sophie,
>> > > > Making the default to "cooperative-sticky, range" is a smart idea,
>> to
>> > > > ensure we can at least fall back to rangeAssignor if consumers are
>> not
>> > > > following our recommended upgrade path. I updated the KIP
>> accordingly.
>> > > >
>> > > > Hi Chris,
>> > > > No problem, I updated the KIP to include the change in Connect.
>> > > >
>> > > > Thank you very much.
>> > > >
>> > > > Luke
>> > > >
>> > > > On Thu, Apr 1, 2021 at 3:24 AM Chris Egerton
>> > <chr...@confluent.io.invalid
>> > > >
>> > > > wrote:
>> > > >
>> > > > > Hi all,
>> > > > >
>> > > > > @Sophie - I like the sound of the dual-protocol default. The
>> smooth
>> > > > upgrade
>> > > > > path it permits sounds fantastic!
>> > > > >
>> > > > > @Luke - Do you think we can also include Connect in this KIP?
>> Right
>> > now
>> > > > we
>> > > > > don't set any custom partition assignment strategies for the
>> consumer
>> > > > > groups we bring up for sink tasks, and if we continue to just use
>> the
>> > > > > default, the assignment strategy for those consumer groups would
>> > change
>> > > > on
>> > > > > Connect clusters once people upgrade to 3.0. I think this is fine
>> > > > (assuming
>> > > > > we can take care of
>> > https://issues.apache.org/jira/browse/KAFKA-12487
>> > > > > before then, which I'm fairly optimistic about), but it might be
>> > worth
>> > > a
>> > > > > sentence or two in the KIP explaining that the change in default
>> will
>> > > > > intentionally propagate to Connect. And, if we think Connect
>> should
>> > be
>> > > > left
>> > > > > out of this change and stay on the range assignor instead, we
>> should
>> > > > > probably call that fact out in the KIP as well and state that
>> Connect
>> > > > will
>> > > > > now override the default partition assignment strategy to be the
>> > range
>> > > > > assignor (assuming the user hasn't specified a value for
>> > > > > consumer.partition.assignment.strategy in their worker config or
>> for
>> > > > > consumer.override.partition.assignment.strategy in their connector
>> > > > config).
>> > > > >
>> > > > > Cheers,
>> > > > >
>> > > > > Chris
>> > > > >
>> > > > > On Wed, Mar 31, 2021 at 12:18 AM Sophie Blee-Goldman
>> > > > > <sop...@confluent.io.invalid> wrote:
>> > > > >
>> > > > > > Ok I'm still fleshing out all the details of KAFKA-12477 but I
>> > think
>> > > we
>> > > > > can
>> > > > > > simplify some things a bit, and avoid
>> > > > > > any kind of "fail-fast" which will require user intervention. In
>> > > fact I
>> > > > > > think we can avoid requiring the user to make
>> > > > > > any changes at all for KIP-726, so we don't have to worry about
>> > > whether
>> > > > > > they actually read our documentation:
>> > > > > >
>> > > > > > Instead of making ["cooperative-sticky"] the default, we change
>> the
>> > > > > default
>> > > > > > to ["cooperative-sticky", "range"].
>> > > > > > Since "range" is the old default, this is equivalent to the
>> first
>> > > > rolling
>> > > > > > bounce of the safe upgrade path in KIP-429.
>> > > > > >
>> > > > > > Of course this also means that under the current protocol
>> selection
>> > > > > > mechanism we won't actually upgrade to
>> > > > > > cooperative rebalancing with the default assignor. But that's
>> where
>> > > > > > KAFKA-12477 will come in.
>> > > > > >
>> > > > > > @Guozhang Wang <guozh...@confluent.io>  I'll get back to you
>> with
>> > a
>> > > > > > concrete proposal and answer your questions, I just want to
>> point
>> > out
>> > > > > > that it's possible to side-step the risk of users shooting
>> > themselves
>> > > > in
>> > > > > > the foot (well, at least in this one specific case,
>> > > > > > obviously they always find a way)
>> > > > > >
>> > > > > > On Tue, Mar 30, 2021 at 10:37 AM Guozhang Wang <
>> wangg...@gmail.com
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Sophie,
>> > > > > > >
>> > > > > > > My question is more related to KAFKA-12477, but since your
>> latest
>> > > > > replies
>> > > > > > > are on this thread I figured we can follow-up on the same
>> venue.
>> > > Just
>> > > > > so
>> > > > > > I
>> > > > > > > understand your latest comments above about the approach:
>> > > > > > >
>> > > > > > > * I think, we would need to persist this decision so that the
>> > group
>> > > > > would
>> > > > > > > never go back to the eager protocol, this bit would be
>> written to
>> > > the
>> > > > > > > internal topic's assignment message. Is that correct?
>> > > > > > > * Maybe you can describe the steps, after the group has
>> decided
>> > to
>> > > > move
>> > > > > > > forward with cooperative protocols, when:
>> > > > > > > 1) a new member joined the group with the old version, and
>> hence
>> > > only
>> > > > > > > recognized eager protocol and executing the eager protocol
>> with
>> > its
>> > > > > first
>> > > > > > > rebalance, what would happen.
>> > > > > > > 2) in addition to 1), the new member joined the group with the
>> > old
>> > > > > > version
>> > > > > > > and only recognized the old subscription format, and was
>> selected
>> > > as
>> > > > > the
>> > > > > > > leader, what would happen.
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Mon, Mar 29, 2021 at 10:30 PM Luke Chen <show...@gmail.com
>> >
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Sophie & Ismael,
>> > > > > > > > Thank you for your feedback.
>> > > > > > > > No problem, let's pause this KIP and wait for this
>> improvement:
>> > > > > > > KAFKA-12477
>> > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>.
>> > > > > > > >
>> > > > > > > > Stay tuned :)
>> > > > > > > >
>> > > > > > > > Thank you.
>> > > > > > > > Luke
>> > > > > > > >
>> > > > > > > > On Tue, Mar 30, 2021 at 3:14 AM Ismael Juma <
>> ism...@juma.me.uk
>> > >
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi Sophie,
>> > > > > > > > >
>> > > > > > > > > I didn't analyze the KIP in detail, but the two
>> suggestions
>> > you
>> > > > > > > mentioned
>> > > > > > > > > sound like great improvements.
>> > > > > > > > >
>> > > > > > > > > A bit more context: breaking changes for a widely used
>> > product
>> > > > like
>> > > > > > > Kafka
>> > > > > > > > > are costly and hence why we try as hard as we can to avoid
>> > > them.
>> > > > > When
>> > > > > > > it
>> > > > > > > > > comes to the brokers, they are often managed by a central
>> > group
>> > > > (or
>> > > > > > > > they're
>> > > > > > > > > in the Cloud), so they're a bit easier to manage. Even so,
>> > it's
>> > > > > still
>> > > > > > > > > possible to upgrade from 0.8.x directly to 2.7 since all
>> > > protocol
>> > > > > > > > versions
>> > > > > > > > > are still supported. When it comes to the basic clients
>> > > > (producer,
>> > > > > > > > > consumer, admin client), they're often embedded in
>> > applications
>> > > > so
>> > > > > we
>> > > > > > > > have
>> > > > > > > > > to be even more conservative.
>> > > > > > > > >
>> > > > > > > > > Ismael
>> > > > > > > > >
>> > > > > > > > > On Mon, Mar 29, 2021 at 10:50 AM Sophie Blee-Goldman
>> > > > > > > > > <sop...@confluent.io.invalid> wrote:
>> > > > > > > > >
>> > > > > > > > > > Ismael,
>> > > > > > > > > >
>> > > > > > > > > > It seems like given 3.0 is a breaking release, we have
>> to
>> > > rely
>> > > > on
>> > > > > > > users
>> > > > > > > > > > being aware of this and responsible
>> > > > > > > > > > enough to read the upgrade guide. Otherwise we could
>> never
>> > > ever
>> > > > > > make
>> > > > > > > > any
>> > > > > > > > > > breaking changes beyond just
>> > > > > > > > > > removing deprecated APIs or other compilation-breaking
>> > errors
>> > > > > that
>> > > > > > > > would
>> > > > > > > > > be
>> > > > > > > > > > immediately visible, no?
>> > > > > > > > > >
>> > > > > > > > > > That said, obviously it's better to have a
>> circuit-breaker
>> > > that
>> > > > > > will
>> > > > > > > > fail
>> > > > > > > > > > fast in case of a user misconfiguration
>> > > > > > > > > > rather than silently corrupting the consumer group
>> state --
>> > > eg
>> > > > > for
>> > > > > > > two
>> > > > > > > > > > consumers to overlap in their ownership
>> > > > > > > > > > of the same partition(s). We could definitely implement
>> > this,
>> > > > and
>> > > > > > now
>> > > > > > > > > that
>> > > > > > > > > > I think about it this might solve a
>> > > > > > > > > > related problem in KAFKA-12477
>> > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. We
>> > just
>> > > > > add a
>> > > > > > > new
>> > > > > > > > > > field to the Assignment in which the group leader
>> > > > > > > > > > indicates whether it's on a recent enough version to
>> > > understand
>> > > > > > > > > cooperative
>> > > > > > > > > > rebalancing. If an upgraded member
>> > > > > > > > > > joins the group, it'll only be allowed to start
>> following
>> > the
>> > > > new
>> > > > > > > > > > rebalancing protocol after receiving the go-ahead
>> > > > > > > > > > from the group leader.
>> > > > > > > > > >
>> > > > > > > > > > If we do go ahead and add this new field in the
>> Assignment
>> > > then
>> > > > > I'm
>> > > > > > > > > pretty
>> > > > > > > > > > confident we can reduce the number
>> > > > > > > > > > of required rolling bounces to just one with KAFKA-12477
>> > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. In
>> > that
>> > > > > case
>> > > > > > we
>> > > > > > > > > > should
>> > > > > > > > > > be in much better shape to
>> > > > > > > > > > feel good about changing the default to the
>> > > > > > > CooperativeStickyAssignor.
>> > > > > > > > > How
>> > > > > > > > > > does that sound?
>> > > > > > > > > >
>> > > > > > > > > > To be clear, I'm not proposing we do this as part of
>> > KIP-726.
>> > > > > > Here's
>> > > > > > > my
>> > > > > > > > > > take:
>> > > > > > > > > >
>> > > > > > > > > > Let's pause this KIP while I work on making these two
>> > > > > improvements
>> > > > > > in
>> > > > > > > > > > KAFKA-12477 <
>> > > https://issues.apache.org/jira/browse/KAFKA-12477
>> > > > >.
>> > > > > > > Once
>> > > > > > > > I
>> > > > > > > > > > can
>> > > > > > > > > > confirm the
>> > > > > > > > > > short-circuit and single rolling bounce will be
>> available
>> > for
>> > > > > 3.0,
>> > > > > > > I'll
>> > > > > > > > > > report back on this thread. Then we can move
>> > > > > > > > > > forward with this KIP again.
>> > > > > > > > > >
>> > > > > > > > > > Thoughts?
>> > > > > > > > > > Sophie
>> > > > > > > > > >
>> > > > > > > > > > On Mon, Mar 29, 2021 at 12:01 AM Luke Chen <
>> > > show...@gmail.com>
>> > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi Ismael,
>> > > > > > > > > > > Thanks for your good question. Answer them below:
>> > > > > > > > > > > *1. Are we saying that every consumer upgraded would
>> have
>> > > to
>> > > > > > follow
>> > > > > > > > the
>> > > > > > > > > > > complex path described in the KIP? *
>> > > > > > > > > > > --> We suggest that every consumer did these 2 steps
>> of
>> > > > rolling
>> > > > > > > > > upgrade.
>> > > > > > > > > > > And after KAFKA-12477 <
>> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-12477
>> > > > > > > > > > >
>> > > > > > > > > > > is completed, it can be reduced to 1 rolling upgrade.
>> > > > > > > > > > >
>> > > > > > > > > > > *2. what happens if they don't read the instructions
>> and
>> > > > > upgrade
>> > > > > > as
>> > > > > > > > > they
>> > > > > > > > > > > have in the past?*
>> > > > > > > > > > > --> The reason we want 2 steps of rolling upgrade is
>> that
>> > > we
>> > > > > want
>> > > > > > > to
>> > > > > > > > > > avoid
>> > > > > > > > > > > the situation where leader is on old byte-code and
>> only
>> > > > > recognize
>> > > > > > > > > > "eager",
>> > > > > > > > > > > but due to compatibility would still be able to
>> > deserialize
>> > > > the
>> > > > > > new
>> > > > > > > > > > > protocol data from newer versioned members, and hence
>> > just
>> > > go
>> > > > > > ahead
>> > > > > > > > and
>> > > > > > > > > > do
>> > > > > > > > > > > the assignment while new versioned members did not
>> revoke
>> > > > their
>> > > > > > > > > > partitions
>> > > > > > > > > > > before joining the group.
>> > > > > > > > > > >
>> > > > > > > > > > > But I'd say, the new default assignor
>> > > > > "CooperativeStickyAssignor"
>> > > > > > > was
>> > > > > > > > > > > already introduced in V2.4.0, and it should be long
>> > enough
>> > > > for
>> > > > > > user
>> > > > > > > > to
>> > > > > > > > > > > upgrade to the new byte-code to recognize the
>> > "cooperative"
>> > > > > > > protocol.
>> > > > > > > > > > >
>> > > > > > > > > > > What do you think?
>> > > > > > > > > > >
>> > > > > > > > > > > Thank you.
>> > > > > > > > > > > Luke
>> > > > > > > > > > >
>> > > > > > > > > > > On Mon, Mar 29, 2021 at 12:14 PM Ismael Juma <
>> > > > > ism...@juma.me.uk>
>> > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Thanks for the KIP. Are we saying that every
>> consumer
>> > > > > upgraded
>> > > > > > > > would
>> > > > > > > > > > have
>> > > > > > > > > > > > to follow the complex path described in the KIP?
>> Also,
>> > > what
>> > > > > > > happens
>> > > > > > > > > if
>> > > > > > > > > > > they
>> > > > > > > > > > > > don't read the instructions and upgrade as they
>> have in
>> > > the
>> > > > > > past?
>> > > > > > > > > > > >
>> > > > > > > > > > > > Ismael
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Fri, Mar 26, 2021, 1:53 AM Luke Chen <
>> > > show...@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hi everyone,
>> > > > > > > > > > > > > <Update the subject>
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I'd like to discuss the following proposal to make
>> > the
>> > > > > > > > > > > > > CooperativeStickyAssignor as the default assignor.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-726%3A+Make+the+CooperativeStickyAssignor+as+the+default+assignor
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Any comments are welcomed.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thank you.
>> > > > > > > > > > > > > Luke
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to