Whoops, small correction--meant to say ConsumerRebalanceListener::onPartitionsLost, not Consumer::onPartitionsLost
On Mon, Apr 12, 2021 at 8:17 AM Chris Egerton <chr...@confluent.io> wrote: > Hi Sophie, > > This sounds fantastic. I've made a note on KAFKA-12487 about being sure to > implement Consumer::onPartitionsLost to avoid unnecessary task failures on > consumer protocol downgrade, but besides that, I don't think things could > get any smoother for Connect users or developers. The automatic protocol > upgrade/downgrade behavior appears safe, intuitive, and pain-free. > > Really excited for this development and hoping we can see it come to > fruition in time for the 3.0 release! > > Cheers, > > Chris > > On Fri, Apr 9, 2021 at 2:43 PM Sophie Blee-Goldman > <sop...@confluent.io.invalid> wrote: > >> 1) Yes, all of the above will be part of KAFKA-12477 (not KIP-726) >> >> 2) No, KAFKA-12638 would be nice to have but I don't think it's >> appropriate >> to remove >> the default implementation of #onPartitionsLost in 3.0 since we never gave >> any indication >> yet that we intend to remove it >> >> 3) Yes, this would be similar to when a Consumer drops out of the group. >> It's always been >> possible for a member to miss a rebalance and have its partition be >> reassigned to another >> member, during which time both members would claim to own said partition. >> But this is safe >> because the member who dropped out is blocked from committing offsets on >> that partition. >> >> On Fri, Apr 9, 2021 at 2:46 AM Luke Chen <show...@gmail.com> wrote: >> >> > Hi Sophie, >> > That sounds great to take care of each case I can think of. >> > Questions: >> > 1. Do you mean the short-Circuit will also be implemented in >> KAFKA-12477? >> > 2. I don't think KAFKA-12638 is the blocker of this KIP-726, Am I right? >> > 3. So, does that mean we still have possibility to have multiple >> consumer >> > owned the same topic partition? And in this situation, we avoid them >> doing >> > committing, and waiting for next rebalance (should be soon). Is my >> > understanding correct? >> > >> > Thank you very much for finding this great solution. >> > >> > Luke >> > >> > On Fri, Apr 9, 2021 at 11:37 AM Sophie Blee-Goldman >> > <sop...@confluent.io.invalid> wrote: >> > >> > > Alright, here's the detailed proposal for KAFKA-12477. This assumes we >> > will >> > > change the default assignor to ["cooperative-sticky", "range"] in >> > KIP-726. >> > > It also acknowledges that users may attempt any kind of upgrade >> without >> > > reading the docs, and so we need to put in safeguards against data >> > > corruption rather than assume everyone will follow the safe upgrade >> path. >> > > >> > > With this proposal, >> > > 1) New applications on 3.0 will enable cooperative rebalancing by >> default >> > > 2) Existing applications which don’t set an assignor can safely >> upgrade >> > to >> > > 3.0 using a single rolling bounce with no extra steps, and will >> > > automatically transition to cooperative rebalancing >> > > 3) Existing applications which do set an assignor that uses EAGER can >> > > likewise upgrade their applications to COOPERATIVE with a single >> rolling >> > > bounce >> > > 4) Once on 3.0, applications can safely go back and forth between >> EAGER >> > and >> > > COOPERATIVE >> > > 5) Applications can safely downgrade away from 3.0 >> > > >> > > The high-level idea for dynamic protocol upgrades is that the group >> will >> > > leverage the assignor selected by the group coordinator to determine >> when >> > > it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to >> protect >> > the >> > > group in case of rare events or user misconfiguration. The group >> > > coordinator selects the most preferred assignor that’s supported by >> all >> > > members of the group, so we know that all members will support >> > COOPERATIVE >> > > once we receive the “cooperative-sticky” assignor after a rebalance. >> At >> > > this point, each member can upgrade their own protocol to COOPERATIVE. >> > > However, there may be situations in which an EAGER member may join the >> > > group even after upgrading to COOPERATIVE. For example, during a >> rolling >> > > upgrade if the last remaining member on the old bytecode misses a >> > > rebalance, the other members will be allowed to upgrade to >> COOPERATIVE. >> > If >> > > the old member rejoins and is chosen to be the group leader before >> it’s >> > > upgraded to 3.0, it won’t be aware that the other members of the group >> > have >> > > not yet revoked their partitions when computing the assignment. >> > > >> > > Short Circuit: >> > > The risk of mixing the cooperative and eager rebalancing protocols is >> > that >> > > a partition may be assigned to one member while it has yet to be >> revoked >> > > from its previous owner. The danger is that the new owner may begin >> > > processing and committing offsets for this partition while the >> previous >> > > owner is also committing offsets in its #onPartitionsRevoked callback, >> > > which is invoked at the end of the rebalance in the cooperative >> protocol. >> > > This can result in these consumers overwriting each other’s offsets >> and >> > > getting a corrupted view of the partition. Note that it’s not >> possible to >> > > commit during a rebalance, so we can protect against offset >> corruption by >> > > blocking further commits after we detect that the group leader may not >> > > understand COOPERATIVE, but before we invoke #onPartitionsRevoked. >> This >> > is >> > > the “short-circuit” — if we detect that the group is in an unsafe >> state, >> > we >> > > invoke #onPartitionsLost instead of #onPartitionsRevoked and >> explicitly >> > > prevent offsets from being committed on those revoked partitions. >> > > >> > > Consumer procedure: >> > > Upon startup, the consumer will initially select the highest >> > > commonly-supported protocol across its configured assignors. With >> > > ["cooperative-sticky", "range”], the initial protocol will be EAGER >> when >> > > the member first joins the group. Following a rebalance, each member >> will >> > > check the selected assignor. If the chosen assignor supports >> COOPERATIVE, >> > > the member can upgrade their used protocol to COOPERATIVE and no >> further >> > > action is required. If the member is already on COOPERATIVE but the >> > > selected assignor does NOT support it, then we need to trigger the >> > > short-circuit. In this case we will invoke #onPartitionsLost instead >> of >> > > #onPartitionsRevoked, and set a flag to block any attempts at >> committing >> > > those partitions which have been revoked. If a commit is attempted, as >> > may >> > > be the case if the user does not implement #onPartitionsLost (see >> > > KAFKA-12638), we will throw a CommitFailedException which will be >> bubbled >> > > up through poll() after completing the rebalance. The member will then >> > > downgrade its protocol to EAGER for the next rebalance. >> > > >> > > Let me know what you think, >> > > Sophie >> > > >> > > On Fri, Apr 2, 2021 at 7:08 PM Luke Chen <show...@gmail.com> wrote: >> > > >> > > > Hi Sophie, >> > > > Making the default to "cooperative-sticky, range" is a smart idea, >> to >> > > > ensure we can at least fall back to rangeAssignor if consumers are >> not >> > > > following our recommended upgrade path. I updated the KIP >> accordingly. >> > > > >> > > > Hi Chris, >> > > > No problem, I updated the KIP to include the change in Connect. >> > > > >> > > > Thank you very much. >> > > > >> > > > Luke >> > > > >> > > > On Thu, Apr 1, 2021 at 3:24 AM Chris Egerton >> > <chr...@confluent.io.invalid >> > > > >> > > > wrote: >> > > > >> > > > > Hi all, >> > > > > >> > > > > @Sophie - I like the sound of the dual-protocol default. The >> smooth >> > > > upgrade >> > > > > path it permits sounds fantastic! >> > > > > >> > > > > @Luke - Do you think we can also include Connect in this KIP? >> Right >> > now >> > > > we >> > > > > don't set any custom partition assignment strategies for the >> consumer >> > > > > groups we bring up for sink tasks, and if we continue to just use >> the >> > > > > default, the assignment strategy for those consumer groups would >> > change >> > > > on >> > > > > Connect clusters once people upgrade to 3.0. I think this is fine >> > > > (assuming >> > > > > we can take care of >> > https://issues.apache.org/jira/browse/KAFKA-12487 >> > > > > before then, which I'm fairly optimistic about), but it might be >> > worth >> > > a >> > > > > sentence or two in the KIP explaining that the change in default >> will >> > > > > intentionally propagate to Connect. And, if we think Connect >> should >> > be >> > > > left >> > > > > out of this change and stay on the range assignor instead, we >> should >> > > > > probably call that fact out in the KIP as well and state that >> Connect >> > > > will >> > > > > now override the default partition assignment strategy to be the >> > range >> > > > > assignor (assuming the user hasn't specified a value for >> > > > > consumer.partition.assignment.strategy in their worker config or >> for >> > > > > consumer.override.partition.assignment.strategy in their connector >> > > > config). >> > > > > >> > > > > Cheers, >> > > > > >> > > > > Chris >> > > > > >> > > > > On Wed, Mar 31, 2021 at 12:18 AM Sophie Blee-Goldman >> > > > > <sop...@confluent.io.invalid> wrote: >> > > > > >> > > > > > Ok I'm still fleshing out all the details of KAFKA-12477 but I >> > think >> > > we >> > > > > can >> > > > > > simplify some things a bit, and avoid >> > > > > > any kind of "fail-fast" which will require user intervention. In >> > > fact I >> > > > > > think we can avoid requiring the user to make >> > > > > > any changes at all for KIP-726, so we don't have to worry about >> > > whether >> > > > > > they actually read our documentation: >> > > > > > >> > > > > > Instead of making ["cooperative-sticky"] the default, we change >> the >> > > > > default >> > > > > > to ["cooperative-sticky", "range"]. >> > > > > > Since "range" is the old default, this is equivalent to the >> first >> > > > rolling >> > > > > > bounce of the safe upgrade path in KIP-429. >> > > > > > >> > > > > > Of course this also means that under the current protocol >> selection >> > > > > > mechanism we won't actually upgrade to >> > > > > > cooperative rebalancing with the default assignor. But that's >> where >> > > > > > KAFKA-12477 will come in. >> > > > > > >> > > > > > @Guozhang Wang <guozh...@confluent.io> I'll get back to you >> with >> > a >> > > > > > concrete proposal and answer your questions, I just want to >> point >> > out >> > > > > > that it's possible to side-step the risk of users shooting >> > themselves >> > > > in >> > > > > > the foot (well, at least in this one specific case, >> > > > > > obviously they always find a way) >> > > > > > >> > > > > > On Tue, Mar 30, 2021 at 10:37 AM Guozhang Wang < >> wangg...@gmail.com >> > > >> > > > > wrote: >> > > > > > >> > > > > > > Hi Sophie, >> > > > > > > >> > > > > > > My question is more related to KAFKA-12477, but since your >> latest >> > > > > replies >> > > > > > > are on this thread I figured we can follow-up on the same >> venue. >> > > Just >> > > > > so >> > > > > > I >> > > > > > > understand your latest comments above about the approach: >> > > > > > > >> > > > > > > * I think, we would need to persist this decision so that the >> > group >> > > > > would >> > > > > > > never go back to the eager protocol, this bit would be >> written to >> > > the >> > > > > > > internal topic's assignment message. Is that correct? >> > > > > > > * Maybe you can describe the steps, after the group has >> decided >> > to >> > > > move >> > > > > > > forward with cooperative protocols, when: >> > > > > > > 1) a new member joined the group with the old version, and >> hence >> > > only >> > > > > > > recognized eager protocol and executing the eager protocol >> with >> > its >> > > > > first >> > > > > > > rebalance, what would happen. >> > > > > > > 2) in addition to 1), the new member joined the group with the >> > old >> > > > > > version >> > > > > > > and only recognized the old subscription format, and was >> selected >> > > as >> > > > > the >> > > > > > > leader, what would happen. >> > > > > > > >> > > > > > > Guozhang >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Mon, Mar 29, 2021 at 10:30 PM Luke Chen <show...@gmail.com >> > >> > > > wrote: >> > > > > > > >> > > > > > > > Hi Sophie & Ismael, >> > > > > > > > Thank you for your feedback. >> > > > > > > > No problem, let's pause this KIP and wait for this >> improvement: >> > > > > > > KAFKA-12477 >> > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. >> > > > > > > > >> > > > > > > > Stay tuned :) >> > > > > > > > >> > > > > > > > Thank you. >> > > > > > > > Luke >> > > > > > > > >> > > > > > > > On Tue, Mar 30, 2021 at 3:14 AM Ismael Juma < >> ism...@juma.me.uk >> > > >> > > > > wrote: >> > > > > > > > >> > > > > > > > > Hi Sophie, >> > > > > > > > > >> > > > > > > > > I didn't analyze the KIP in detail, but the two >> suggestions >> > you >> > > > > > > mentioned >> > > > > > > > > sound like great improvements. >> > > > > > > > > >> > > > > > > > > A bit more context: breaking changes for a widely used >> > product >> > > > like >> > > > > > > Kafka >> > > > > > > > > are costly and hence why we try as hard as we can to avoid >> > > them. >> > > > > When >> > > > > > > it >> > > > > > > > > comes to the brokers, they are often managed by a central >> > group >> > > > (or >> > > > > > > > they're >> > > > > > > > > in the Cloud), so they're a bit easier to manage. Even so, >> > it's >> > > > > still >> > > > > > > > > possible to upgrade from 0.8.x directly to 2.7 since all >> > > protocol >> > > > > > > > versions >> > > > > > > > > are still supported. When it comes to the basic clients >> > > > (producer, >> > > > > > > > > consumer, admin client), they're often embedded in >> > applications >> > > > so >> > > > > we >> > > > > > > > have >> > > > > > > > > to be even more conservative. >> > > > > > > > > >> > > > > > > > > Ismael >> > > > > > > > > >> > > > > > > > > On Mon, Mar 29, 2021 at 10:50 AM Sophie Blee-Goldman >> > > > > > > > > <sop...@confluent.io.invalid> wrote: >> > > > > > > > > >> > > > > > > > > > Ismael, >> > > > > > > > > > >> > > > > > > > > > It seems like given 3.0 is a breaking release, we have >> to >> > > rely >> > > > on >> > > > > > > users >> > > > > > > > > > being aware of this and responsible >> > > > > > > > > > enough to read the upgrade guide. Otherwise we could >> never >> > > ever >> > > > > > make >> > > > > > > > any >> > > > > > > > > > breaking changes beyond just >> > > > > > > > > > removing deprecated APIs or other compilation-breaking >> > errors >> > > > > that >> > > > > > > > would >> > > > > > > > > be >> > > > > > > > > > immediately visible, no? >> > > > > > > > > > >> > > > > > > > > > That said, obviously it's better to have a >> circuit-breaker >> > > that >> > > > > > will >> > > > > > > > fail >> > > > > > > > > > fast in case of a user misconfiguration >> > > > > > > > > > rather than silently corrupting the consumer group >> state -- >> > > eg >> > > > > for >> > > > > > > two >> > > > > > > > > > consumers to overlap in their ownership >> > > > > > > > > > of the same partition(s). We could definitely implement >> > this, >> > > > and >> > > > > > now >> > > > > > > > > that >> > > > > > > > > > I think about it this might solve a >> > > > > > > > > > related problem in KAFKA-12477 >> > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. We >> > just >> > > > > add a >> > > > > > > new >> > > > > > > > > > field to the Assignment in which the group leader >> > > > > > > > > > indicates whether it's on a recent enough version to >> > > understand >> > > > > > > > > cooperative >> > > > > > > > > > rebalancing. If an upgraded member >> > > > > > > > > > joins the group, it'll only be allowed to start >> following >> > the >> > > > new >> > > > > > > > > > rebalancing protocol after receiving the go-ahead >> > > > > > > > > > from the group leader. >> > > > > > > > > > >> > > > > > > > > > If we do go ahead and add this new field in the >> Assignment >> > > then >> > > > > I'm >> > > > > > > > > pretty >> > > > > > > > > > confident we can reduce the number >> > > > > > > > > > of required rolling bounces to just one with KAFKA-12477 >> > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12477>. In >> > that >> > > > > case >> > > > > > we >> > > > > > > > > > should >> > > > > > > > > > be in much better shape to >> > > > > > > > > > feel good about changing the default to the >> > > > > > > CooperativeStickyAssignor. >> > > > > > > > > How >> > > > > > > > > > does that sound? >> > > > > > > > > > >> > > > > > > > > > To be clear, I'm not proposing we do this as part of >> > KIP-726. >> > > > > > Here's >> > > > > > > my >> > > > > > > > > > take: >> > > > > > > > > > >> > > > > > > > > > Let's pause this KIP while I work on making these two >> > > > > improvements >> > > > > > in >> > > > > > > > > > KAFKA-12477 < >> > > https://issues.apache.org/jira/browse/KAFKA-12477 >> > > > >. >> > > > > > > Once >> > > > > > > > I >> > > > > > > > > > can >> > > > > > > > > > confirm the >> > > > > > > > > > short-circuit and single rolling bounce will be >> available >> > for >> > > > > 3.0, >> > > > > > > I'll >> > > > > > > > > > report back on this thread. Then we can move >> > > > > > > > > > forward with this KIP again. >> > > > > > > > > > >> > > > > > > > > > Thoughts? >> > > > > > > > > > Sophie >> > > > > > > > > > >> > > > > > > > > > On Mon, Mar 29, 2021 at 12:01 AM Luke Chen < >> > > show...@gmail.com> >> > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hi Ismael, >> > > > > > > > > > > Thanks for your good question. Answer them below: >> > > > > > > > > > > *1. Are we saying that every consumer upgraded would >> have >> > > to >> > > > > > follow >> > > > > > > > the >> > > > > > > > > > > complex path described in the KIP? * >> > > > > > > > > > > --> We suggest that every consumer did these 2 steps >> of >> > > > rolling >> > > > > > > > > upgrade. >> > > > > > > > > > > And after KAFKA-12477 < >> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-12477 >> > > > > > > > > > > >> > > > > > > > > > > is completed, it can be reduced to 1 rolling upgrade. >> > > > > > > > > > > >> > > > > > > > > > > *2. what happens if they don't read the instructions >> and >> > > > > upgrade >> > > > > > as >> > > > > > > > > they >> > > > > > > > > > > have in the past?* >> > > > > > > > > > > --> The reason we want 2 steps of rolling upgrade is >> that >> > > we >> > > > > want >> > > > > > > to >> > > > > > > > > > avoid >> > > > > > > > > > > the situation where leader is on old byte-code and >> only >> > > > > recognize >> > > > > > > > > > "eager", >> > > > > > > > > > > but due to compatibility would still be able to >> > deserialize >> > > > the >> > > > > > new >> > > > > > > > > > > protocol data from newer versioned members, and hence >> > just >> > > go >> > > > > > ahead >> > > > > > > > and >> > > > > > > > > > do >> > > > > > > > > > > the assignment while new versioned members did not >> revoke >> > > > their >> > > > > > > > > > partitions >> > > > > > > > > > > before joining the group. >> > > > > > > > > > > >> > > > > > > > > > > But I'd say, the new default assignor >> > > > > "CooperativeStickyAssignor" >> > > > > > > was >> > > > > > > > > > > already introduced in V2.4.0, and it should be long >> > enough >> > > > for >> > > > > > user >> > > > > > > > to >> > > > > > > > > > > upgrade to the new byte-code to recognize the >> > "cooperative" >> > > > > > > protocol. >> > > > > > > > > > > >> > > > > > > > > > > What do you think? >> > > > > > > > > > > >> > > > > > > > > > > Thank you. >> > > > > > > > > > > Luke >> > > > > > > > > > > >> > > > > > > > > > > On Mon, Mar 29, 2021 at 12:14 PM Ismael Juma < >> > > > > ism...@juma.me.uk> >> > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Thanks for the KIP. Are we saying that every >> consumer >> > > > > upgraded >> > > > > > > > would >> > > > > > > > > > have >> > > > > > > > > > > > to follow the complex path described in the KIP? >> Also, >> > > what >> > > > > > > happens >> > > > > > > > > if >> > > > > > > > > > > they >> > > > > > > > > > > > don't read the instructions and upgrade as they >> have in >> > > the >> > > > > > past? >> > > > > > > > > > > > >> > > > > > > > > > > > Ismael >> > > > > > > > > > > > >> > > > > > > > > > > > On Fri, Mar 26, 2021, 1:53 AM Luke Chen < >> > > show...@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hi everyone, >> > > > > > > > > > > > > <Update the subject> >> > > > > > > > > > > > > >> > > > > > > > > > > > > I'd like to discuss the following proposal to make >> > the >> > > > > > > > > > > > > CooperativeStickyAssignor as the default assignor. >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-726%3A+Make+the+CooperativeStickyAssignor+as+the+default+assignor >> > > > > > > > > > > > > >> > > > > > > > > > > > > Any comments are welcomed. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thank you. >> > > > > > > > > > > > > Luke >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > -- Guozhang >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >