Hi Sophie, This sounds fantastic. I've made a note on KAFKA-12487 about being sure to implement Consumer::onPartitionsLost to avoid unnecessary task failures on consumer protocol downgrade, but besides that, I don't think things could get any smoother for Connect users or developers. The automatic protocol upgrade/downgrade behavior appears safe, intuitive, and pain-free.
Really excited for this development and hoping we can see it come to fruition in time for the 3.0 release! Cheers, Chris On Fri, Apr 9, 2021 at 2:43 PM Sophie Blee-Goldman <sop...@confluent.io.invalid> wrote: > 1) Yes, all of the above will be part of KAFKA-12477 (not KIP-726) > > 2) No, KAFKA-12638 would be nice to have but I don't think it's appropriate > to remove > the default implementation of #onPartitionsLost in 3.0 since we never gave > any indication > yet that we intend to remove it > > 3) Yes, this would be similar to when a Consumer drops out of the group. > It's always been > possible for a member to miss a rebalance and have its partition be > reassigned to another > member, during which time both members would claim to own said partition. > But this is safe > because the member who dropped out is blocked from committing offsets on > that partition. > > On Fri, Apr 9, 2021 at 2:46 AM Luke Chen <show...@gmail.com> wrote: > > > Hi Sophie, > > That sounds great to take care of each case I can think of. > > Questions: > > 1. Do you mean the short-Circuit will also be implemented in KAFKA-12477? > > 2. I don't think KAFKA-12638 is the blocker of this KIP-726, Am I right? > > 3. So, does that mean we still have possibility to have multiple consumer > > owned the same topic partition? And in this situation, we avoid them > doing > > committing, and waiting for next rebalance (should be soon). Is my > > understanding correct? > > > > Thank you very much for finding this great solution. > > > > Luke > > > > On Fri, Apr 9, 2021 at 11:37 AM Sophie Blee-Goldman > > <sop...@confluent.io.invalid> wrote: > > > > > Alright, here's the detailed proposal for KAFKA-12477. This assumes we > > will > > > change the default assignor to ["cooperative-sticky", "range"] in > > KIP-726. > > > It also acknowledges that users may attempt any kind of upgrade without > > > reading the docs, and so we need to put in safeguards against data > > > corruption rather than assume everyone will follow the safe upgrade > path. > > > > > > With this proposal, > > > 1) New applications on 3.0 will enable cooperative rebalancing by > default > > > 2) Existing applications which don’t set an assignor can safely upgrade > > to > > > 3.0 using a single rolling bounce with no extra steps, and will > > > automatically transition to cooperative rebalancing > > > 3) Existing applications which do set an assignor that uses EAGER can > > > likewise upgrade their applications to COOPERATIVE with a single > rolling > > > bounce > > > 4) Once on 3.0, applications can safely go back and forth between EAGER > > and > > > COOPERATIVE > > > 5) Applications can safely downgrade away from 3.0 > > > > > > The high-level idea for dynamic protocol upgrades is that the group > will > > > leverage the assignor selected by the group coordinator to determine > when > > > it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect > > the > > > group in case of rare events or user misconfiguration. The group > > > coordinator selects the most preferred assignor that’s supported by all > > > members of the group, so we know that all members will support > > COOPERATIVE > > > once we receive the “cooperative-sticky” assignor after a rebalance. At > > > this point, each member can upgrade their own protocol to COOPERATIVE. > > > However, there may be situations in which an EAGER member may join the > > > group even after upgrading to COOPERATIVE. For example, during a > rolling > > > upgrade if the last remaining member on the old bytecode misses a > > > rebalance, the other members will be allowed to upgrade to COOPERATIVE. > > If > > > the old member rejoins and is chosen to be the group leader before it’s > > > upgraded to 3.0, it won’t be aware that the other members of the group > > have > > > not yet revoked their partitions when computing the assignment. > > > > > > Short Circuit: > > > The risk of mixing the cooperative and eager rebalancing protocols is > > that > > > a partition may be assigned to one member while it has yet to be > revoked > > > from its previous owner. The danger is that the new owner may begin > > > processing and committing offsets for this partition while the previous > > > owner is also committing offsets in its #onPartitionsRevoked callback, > > > which is invoked at the end of the rebalance in the cooperative > protocol. > > > This can result in these consumers overwriting each other’s offsets and > > > getting a corrupted view of the partition. Note that it’s not possible > to > > > commit during a rebalance, so we can protect against offset corruption > by > > > blocking further commits after we detect that the group leader may not > > > understand COOPERATIVE, but before we invoke #onPartitionsRevoked. This > > is > > > the “short-circuit” — if we detect that the group is in an unsafe > state, > > we > > > invoke #onPartitionsLost instead of #onPartitionsRevoked and explicitly > > > prevent offsets from being committed on those revoked partitions. > > > > > > Consumer procedure: > > > Upon startup, the consumer will initially select the highest > > > commonly-supported protocol across its configured assignors. With > > > ["cooperative-sticky", "range”], the initial protocol will be EAGER > when > > > the member first joins the group. Following a rebalance, each member > will > > > check the selected assignor. If the chosen assignor supports > COOPERATIVE, > > > the member can upgrade their used protocol to COOPERATIVE and no > further > > > action is required. If the member is already on COOPERATIVE but the > > > selected assignor does NOT support it, then we need to trigger the > > > short-circuit. In this case we will invoke #onPartitionsLost instead of > > > #onPartitionsRevoked, and set a flag to block any attempts at > committing > > > those partitions which have been revoked. If a commit is attempted, as > > may > > > be the case if the user does not implement #onPartitionsLost (see > > > KAFKA-12638), we will throw a CommitFailedException which will be > bubbled > > > up through poll() after completing the rebalance. The member will then > > > downgrade its protocol to EAGER for the next rebalance. > > > > > > Let me know what you think, > > > Sophie > > > > > > On Fri, Apr 2, 2021 at 7:08 PM Luke Chen <show...@gmail.com> wrote: > > > > > > > Hi Sophie, > > > > Making the default to "cooperative-sticky, range" is a smart idea, to > > > > ensure we can at least fall back to rangeAssignor if consumers are > not > > > > following our recommended upgrade path. I updated the KIP > accordingly. > > > > > > > > Hi Chris, > > > > No problem, I updated the KIP to include the change in Connect. > > > > > > > > Thank you very much. > > > > > > > > Luke > > > > > > > > On Thu, Apr 1, 2021 at 3:24 AM Chris Egerton > > <chr...@confluent.io.invalid > > > > > > > > wrote: > > > > > > > > > Hi all, > > > > > > > > > > @Sophie - I like the sound of the dual-protocol default. The smooth > > > > upgrade > > > > > path it permits sounds fantastic! > > > > > > > > > > @Luke - Do you think we can also include Connect in this KIP? Right > > now > > > > we > > > > > don't set any custom partition assignment strategies for the > consumer > > > > > groups we bring up for sink tasks, and if we continue to just use > the > > > > > default, the assignment strategy for those consumer groups would > > change > > > > on > > > > > Connect clusters once people upgrade to 3.0. I think this is fine > > > > (assuming > > > > > we can take care of > > https://issues.apache.org/jira/browse/KAFKA-12487 > > > > > before then, which I'm fairly optimistic about), but it might be > > worth > > > a > > > > > sentence or two in the KIP explaining that the change in default > will > > > > > intentionally propagate to Connect. And, if we think Connect should > > be > > > > left > > > > > out of this change and stay on the range assignor instead, we > should > > > > > probably call that fact out in the KIP as well and state that > Connect > > > > will > > > > > now override the default partition assignment strategy to be the > > range > > > > > assignor (assuming the user hasn't specified a value for > > > > > consumer.partition.assignment.strategy in their worker config or > for > > > > > consumer.override.partition.assignment.strategy in their connector > > > > config). > > > > > > > > > > Cheers, > > > > > > > > > > Chris > > > > > > > > > > On Wed, Mar 31, 2021 at 12:18 AM Sophie Blee-Goldman > > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > > > Ok I'm still fleshing out all the details of KAFKA-12477 but I > > think > > > we > > > > > can > > > > > > simplify some things a bit, and avoid > > > > > > any kind of "fail-fast" which will require user intervention. In > > > fact I > > > > > > think we can avoid requiring the user to make > > > > > > any changes at all for KIP-726, so we don't have to worry about > > > whether > > > > > > they actually read our documentation: > > > > > > > > > > > > Instead of making ["cooperative-sticky"] the default, we change > the > > > > > default > > > > > > to ["cooperative-sticky", "range"]. > > > > > > Since "range" is the old default, this is equivalent to the first > > > > rolling > > > > > > bounce of the safe upgrade path in KIP-429. > > > > > > > > > > > > Of course this also means that under the current protocol > selection > > > > > > mechanism we won't actually upgrade to > > > > > > cooperative rebalancing with the default assignor. But that's > where > > > > > > KAFKA-12477 will come in. > > > > > > > > > > > > @Guozhang Wang <guozh...@confluent.io> I'll get back to you > with > > a > > > > > > concrete proposal and answer your questions, I just want to point > > out > > > > > > that it's possible to side-step the risk of users shooting > > themselves > > > > in > > > > > > the foot (well, at least in this one specific case, > > > > > > obviously they always find a way) > > > > > > > > > > > > On Tue, Mar 30, 2021 at 10:37 AM Guozhang Wang < > wangg...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > Hi Sophie, > > > > > > > > > > > > > > My question is more related to KAFKA-12477, but since your > latest > > > > > replies > > > > > > > are on this thread I figured we can follow-up on the same > venue. > > > Just > > > > > so > > > > > > I > > > > > > > understand your latest comments above about the approach: > > > > > > > > > > > > > > * I think, we would need to persist this decision so that the > > group > > > > > would > > > > > > > never go back to the eager protocol, this bit would be written > to > > > the > > > > > > > internal topic's assignment message. Is that correct? > > > > > > > * Maybe you can describe the steps, after the group has decided > > to > > > > move > > > > > > > forward with cooperative protocols, when: > > > > > > > 1) a new member joined the group with the old version, and > hence > > > only > > > > > > > recognized eager protocol and executing the eager protocol with > > its > > > > > first > > > > > > > rebalance, what would happen. > > > > > > > 2) in addition to 1), the new member joined the group with the > > old > > > > > > version > > > > > > > and only recognized the old subscription format, and was > selected > > > as > > > > > the > > > > > > > leader, what would happen. > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 29, 2021 at 10:30 PM Luke Chen <show...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > Hi Sophie & Ismael, > > > > > > > > Thank you for your feedback. > > > > > > > > No problem, let's pause this KIP and wait for this > improvement: > > > > > > > KAFKA-12477 > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. > > > > > > > > > > > > > > > > Stay tuned :) > > > > > > > > > > > > > > > > Thank you. > > > > > > > > Luke > > > > > > > > > > > > > > > > On Tue, Mar 30, 2021 at 3:14 AM Ismael Juma < > ism...@juma.me.uk > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Sophie, > > > > > > > > > > > > > > > > > > I didn't analyze the KIP in detail, but the two suggestions > > you > > > > > > > mentioned > > > > > > > > > sound like great improvements. > > > > > > > > > > > > > > > > > > A bit more context: breaking changes for a widely used > > product > > > > like > > > > > > > Kafka > > > > > > > > > are costly and hence why we try as hard as we can to avoid > > > them. > > > > > When > > > > > > > it > > > > > > > > > comes to the brokers, they are often managed by a central > > group > > > > (or > > > > > > > > they're > > > > > > > > > in the Cloud), so they're a bit easier to manage. Even so, > > it's > > > > > still > > > > > > > > > possible to upgrade from 0.8.x directly to 2.7 since all > > > protocol > > > > > > > > versions > > > > > > > > > are still supported. When it comes to the basic clients > > > > (producer, > > > > > > > > > consumer, admin client), they're often embedded in > > applications > > > > so > > > > > we > > > > > > > > have > > > > > > > > > to be even more conservative. > > > > > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > On Mon, Mar 29, 2021 at 10:50 AM Sophie Blee-Goldman > > > > > > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > Ismael, > > > > > > > > > > > > > > > > > > > > It seems like given 3.0 is a breaking release, we have to > > > rely > > > > on > > > > > > > users > > > > > > > > > > being aware of this and responsible > > > > > > > > > > enough to read the upgrade guide. Otherwise we could > never > > > ever > > > > > > make > > > > > > > > any > > > > > > > > > > breaking changes beyond just > > > > > > > > > > removing deprecated APIs or other compilation-breaking > > errors > > > > > that > > > > > > > > would > > > > > > > > > be > > > > > > > > > > immediately visible, no? > > > > > > > > > > > > > > > > > > > > That said, obviously it's better to have a > circuit-breaker > > > that > > > > > > will > > > > > > > > fail > > > > > > > > > > fast in case of a user misconfiguration > > > > > > > > > > rather than silently corrupting the consumer group state > -- > > > eg > > > > > for > > > > > > > two > > > > > > > > > > consumers to overlap in their ownership > > > > > > > > > > of the same partition(s). We could definitely implement > > this, > > > > and > > > > > > now > > > > > > > > > that > > > > > > > > > > I think about it this might solve a > > > > > > > > > > related problem in KAFKA-12477 > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. We > > just > > > > > add a > > > > > > > new > > > > > > > > > > field to the Assignment in which the group leader > > > > > > > > > > indicates whether it's on a recent enough version to > > > understand > > > > > > > > > cooperative > > > > > > > > > > rebalancing. If an upgraded member > > > > > > > > > > joins the group, it'll only be allowed to start following > > the > > > > new > > > > > > > > > > rebalancing protocol after receiving the go-ahead > > > > > > > > > > from the group leader. > > > > > > > > > > > > > > > > > > > > If we do go ahead and add this new field in the > Assignment > > > then > > > > > I'm > > > > > > > > > pretty > > > > > > > > > > confident we can reduce the number > > > > > > > > > > of required rolling bounces to just one with KAFKA-12477 > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. In > > that > > > > > case > > > > > > we > > > > > > > > > > should > > > > > > > > > > be in much better shape to > > > > > > > > > > feel good about changing the default to the > > > > > > > CooperativeStickyAssignor. > > > > > > > > > How > > > > > > > > > > does that sound? > > > > > > > > > > > > > > > > > > > > To be clear, I'm not proposing we do this as part of > > KIP-726. > > > > > > Here's > > > > > > > my > > > > > > > > > > take: > > > > > > > > > > > > > > > > > > > > Let's pause this KIP while I work on making these two > > > > > improvements > > > > > > in > > > > > > > > > > KAFKA-12477 < > > > https://issues.apache.org/jira/browse/KAFKA-12477 > > > > >. > > > > > > > Once > > > > > > > > I > > > > > > > > > > can > > > > > > > > > > confirm the > > > > > > > > > > short-circuit and single rolling bounce will be available > > for > > > > > 3.0, > > > > > > > I'll > > > > > > > > > > report back on this thread. Then we can move > > > > > > > > > > forward with this KIP again. > > > > > > > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > Sophie > > > > > > > > > > > > > > > > > > > > On Mon, Mar 29, 2021 at 12:01 AM Luke Chen < > > > show...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Ismael, > > > > > > > > > > > Thanks for your good question. Answer them below: > > > > > > > > > > > *1. Are we saying that every consumer upgraded would > have > > > to > > > > > > follow > > > > > > > > the > > > > > > > > > > > complex path described in the KIP? * > > > > > > > > > > > --> We suggest that every consumer did these 2 steps of > > > > rolling > > > > > > > > > upgrade. > > > > > > > > > > > And after KAFKA-12477 < > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-12477 > > > > > > > > > > > > > > > > > > > > > > is completed, it can be reduced to 1 rolling upgrade. > > > > > > > > > > > > > > > > > > > > > > *2. what happens if they don't read the instructions > and > > > > > upgrade > > > > > > as > > > > > > > > > they > > > > > > > > > > > have in the past?* > > > > > > > > > > > --> The reason we want 2 steps of rolling upgrade is > that > > > we > > > > > want > > > > > > > to > > > > > > > > > > avoid > > > > > > > > > > > the situation where leader is on old byte-code and only > > > > > recognize > > > > > > > > > > "eager", > > > > > > > > > > > but due to compatibility would still be able to > > deserialize > > > > the > > > > > > new > > > > > > > > > > > protocol data from newer versioned members, and hence > > just > > > go > > > > > > ahead > > > > > > > > and > > > > > > > > > > do > > > > > > > > > > > the assignment while new versioned members did not > revoke > > > > their > > > > > > > > > > partitions > > > > > > > > > > > before joining the group. > > > > > > > > > > > > > > > > > > > > > > But I'd say, the new default assignor > > > > > "CooperativeStickyAssignor" > > > > > > > was > > > > > > > > > > > already introduced in V2.4.0, and it should be long > > enough > > > > for > > > > > > user > > > > > > > > to > > > > > > > > > > > upgrade to the new byte-code to recognize the > > "cooperative" > > > > > > > protocol. > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 29, 2021 at 12:14 PM Ismael Juma < > > > > > ism...@juma.me.uk> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP. Are we saying that every consumer > > > > > upgraded > > > > > > > > would > > > > > > > > > > have > > > > > > > > > > > > to follow the complex path described in the KIP? > Also, > > > what > > > > > > > happens > > > > > > > > > if > > > > > > > > > > > they > > > > > > > > > > > > don't read the instructions and upgrade as they have > in > > > the > > > > > > past? > > > > > > > > > > > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Mar 26, 2021, 1:53 AM Luke Chen < > > > show...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > <Update the subject> > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to discuss the following proposal to make > > the > > > > > > > > > > > > > CooperativeStickyAssignor as the default assignor. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-726%3A+Make+the+CooperativeStickyAssignor+as+the+default+assignor > > > > > > > > > > > > > > > > > > > > > > > > > > Any comments are welcomed. > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > >