Hi Matthias, Just wondering, once this KIP goes through. Could I restart my older KIP to update SubscriptionInfo?
Thanks Richard On Wed, Mar 21, 2018 at 11:18 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for following up James. > > > Is this the procedure that happens during every rebalance? The reason I > ask is that this step: > >>>> As long as the leader (before or after upgrade) receives at least > one old version X Subscription it always sends version Assignment X back > (the encoded supported version is X before the leader is upgrade and Y > after the leader is upgraded). > > Yes, that would be the consequence. > > > This implies that the leader receives all Subscriptions before sending > back any responses. Is that what actually happens? Is it possible that it > would receive say 4 out of 5 Subscriptions of Y, send back a response Y, > and then later receive a Subscription X? What happens in that case? Would > that Subscription X then trigger another rebalance, and the whole thing > starts again? > > That sounds correct. A 'delayed' Subscription could always happen -- > even before KIP-268 -- and would trigger a new rebalance. With this > regard, the behavior does not change. The difference is, that we would > automatically downgrade the Assignment from Y to X again -- but the > application would not fail (as it would before the KIP). > > Do you see an issue with this behavior. The idea of the design is to > make Kafka Streams robust against those scenarios. Thus, if 4 apps are > upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first > upgrade from X to Y and downgrade from Y to X in the second rebalance > when no.5 joins the group. If no.5 gets upgraded, a third rebalance > would upgrade to Y again. > > Thus, as long as not all instances are on the newest version, > upgrades/donwgrades of the exchanged rebalance metadata could happen > multiple times. However, this should not be an issue from my understanding. > > > Let us know what you think about it. > > > -Matthias > > > On 3/20/18 11:10 PM, James Cheng wrote: > > Sorry, I see that the VOTE started already, but I have a late question > on this KIP. > > > > In the "version probing" protocol: > >> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2): > >> On startup/rolling-bounce, an instance does not know what version the > leader understands and (optimistically) sends an Subscription with the > latest version Y > >> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the > corresponding instance that sent the newer Subscription it does not > understand. The Assignment metadata only encodes both version numbers > (used-version == supported-version) as leader's supported-version X. > >> For all other instances the leader sends a regular Assignment in > version X back. > >> If an upgrade follower sends new version number Y Subscription and > receives version X Assignment with "supported-version = X", it can > downgrade to X (in-memory flag) and resends a new Subscription with old > version X to retry joining the group. To force an immediate second > rebalance, the follower does an "unsubscribe()/subscribe()/poll()" > sequence. > >> As long as the leader (before or after upgrade) receives at least one > old version X Subscription it always sends version Assignment X back (the > encoded supported version is X before the leader is upgrade and Y after the > leader is upgraded). > >> If an upgraded instance receives an Assigment it always checks the > leaders supported-version and update its downgraded "used-version" if > possible > > > > Is this the procedure that happens during every rebalance? The reason I > ask is that this step: > >>> As long as the leader (before or after upgrade) receives at least one > old version X Subscription it always sends version Assignment X back (the > encoded supported version is X before the leader is upgrade and Y after the > leader is upgraded). > > > > This implies that the leader receives all Subscriptions before sending > back any responses. Is that what actually happens? Is it possible that it > would receive say 4 out of 5 Subscriptions of Y, send back a response Y, > and then later receive a Subscription X? What happens in that case? Would > that Subscription X then trigger another rebalance, and the whole thing > starts again? > > > > Thanks, > > -James > > > >> On Mar 19, 2018, at 5:04 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> > >> Guozhang, > >> > >> thanks for your comments. > >> > >> 2: I think my main concern is, that 1.2 would be "special" release that > >> everybody need to use to upgrade. As an alternative, we could say that > >> we add the config in 1.2 and keep it for 2 additional releases (1.3 and > >> 1.4) but remove it in 1.5. This gives users more flexibility and does > >> force not force user to upgrade to a specific version but also allows us > >> to not carry the tech debt forever. WDYT about this? If users upgrade on > >> an regular basis, this approach could avoid a forces update with high > >> probability as the will upgrade to either 1.2/1.3/1.4 anyway at some > >> point. Thus, only if users don't upgrade for a very long time, they are > >> forces to do 2 upgrades with an intermediate version. > >> > >> 4. Updated the KIP to remove the ".x" suffix > >> > >> 5. Updated the KIP accordingly. > >> > >> -Matthias > >> > >> On 3/19/18 10:33 AM, Guozhang Wang wrote: > >>> Yup :) > >>> > >>> On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >>> > >>>> bq. some snippet like ProduceRequest / ProduceRequest > >>>> > >>>> Did you mean ProduceRequest / Response ? > >>>> > >>>> Cheers > >>>> > >>>> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > >>>> > >>>>> Hi Matthias, > >>>>> > >>>>> About 2: yeah I guess this is a subjective preference. My main > concern > >>>>> about keeping the config / handling code beyond 1.2 release is that > it > >>>> will > >>>>> become a non-cleanable tech debt forever, as fewer and fewer users > would > >>>>> need to upgrade from 0.10.x and 1.1.x, and eventually we will need to > >>>>> maintain this for nearly no one. On the other hand, I agree that this > >>>> tech > >>>>> debt is not too large. So if more people feel this is a good > tradeoff to > >>>>> pay for not enforcing users from older versions to upgrade twice I'm > >>>> happen > >>>>> to change my opinion. > >>>>> > >>>>> A few more minor comments: > >>>>> > >>>>> 4. For the values of "upgrade.from", could we simply to only > major.minor? > >>>>> I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility > >>>>> behavior in bug fix releases we would not need to specify a bug-fix > >>>> version > >>>>> to distinguish ever. > >>>>> > >>>>> 5. Could you also present the encoding format in subscription / > >>>> assignment > >>>>> metadata bytes in version 2, and in future versions (i.e. which first > >>>> bytes > >>>>> would be kept moving forward), for readers to better understand the > >>>>> proposal? some snippet like ProduceRequest / ProduceRequest in > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > >>>>> would be very helpful. > >>>>> > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax < > matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> Thanks for your comments. > >>>>>> > >>>>>> 1. Because the old leader cannot decode the new Subscription it can > >>>> only > >>>>>> send an empty assignment back. The idea to send empty assignments to > >>>> all > >>>>>> members is interesting. I will try this out in an PR to see how it > >>>>> behaves. > >>>>>> > >>>>>> 2. I don't see an issue with keeping config `upgrade.from` for > future > >>>>>> releases. Personally, I would prefer to not force users to do two > >>>>>> upgrades if they want to go from pre-1.2 to post-1.2 version. Is > there > >>>> a > >>>>>> technical argument why you want to get rid of the config? What > >>>>>> disadvantages do you see keeping `upgrade.from` beyond 1.2 release? > >>>>>> > >>>>>> Keeping the config is just a few lines of code in `StreamsConfig` as > >>>>>> well we a single `if` statement in `StreamsPartitionAssignor` to > force > >>>> a > >>>>>> downgrade (cf > >>>>>> https://github.com/apache/kafka/pull/4636/files#diff- > >>>>>> 392371c29384e33bb09ed342e7696c68R201) > >>>>>> > >>>>>> > >>>>>> 3. I updated the KIP accordingly. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 3/15/18 3:19 PM, Guozhang Wang wrote: > >>>>>>> Hello Matthias, thanks for the KIP. Here are some comments: > >>>>>>> > >>>>>>> 1. "For all other instances the leader sends a regular Assignment > in > >>>>>>> version X back." Does that mean the leader will exclude any member > of > >>>>> the > >>>>>>> group whose protocol version that it does not understand? For > >>>> example, > >>>>> if > >>>>>>> we have A, B, C with A the leader, and B bounced with the newer > >>>>> version. > >>>>>> In > >>>>>>> the first rebalance, A will only consider {A, C} for assignment > while > >>>>>>> sending empty assignment to B. And then later when B downgrades > will > >>>> it > >>>>>>> re-assign the tasks to it again? I felt this is unnecessarily > >>>>> increasing > >>>>>>> the num. rebalances and the total latency. Could the leader just > >>>> sends > >>>>>>> empty assignment to everyone, and since upon receiving the empty > >>>>>> assignment > >>>>>>> each thread will not create / restore any tasks and will not clean > up > >>>>> its > >>>>>>> local state (so that the prevCachedTasks are not lost in future > >>>>>> rebalances) > >>>>>>> and re-joins immediately, if users choose to bounce an instance > once > >>>> it > >>>>>> is > >>>>>>> in RUNNING state the total time of rolling upgrades will be > reduced. > >>>>>>> > >>>>>>> 2. If we want to allow upgrading from 1.1- versions to any of the > >>>>> future > >>>>>>> versions beyond 1.2, then we'd always need to keep the special > >>>> handling > >>>>>>> logic for this two rolling-bounce mechanism plus a config that we > >>>> would > >>>>>>> never be able to deprecate; on the other hand, if the version > probing > >>>>>>> procedure is fast, I think the extra operational cost from > upgrading > >>>>> from > >>>>>>> 1.1- to a future version, to upgrading from 1.1- to 1.2, and then > >>>>> another > >>>>>>> upgrade from 1.2 to a future version could be small. So depending > on > >>>>> the > >>>>>>> experimental result of the upgrade latency, I'd suggest considering > >>>> the > >>>>>>> trade-off of the extra code/config needed maintaining for the > special > >>>>>>> handling. > >>>>>>> > >>>>>>> 3. Testing plan: could you elaborate a bit more on the actual > >>>>>> upgrade-paths > >>>>>>> we should test? For example, I'm thinking the following: > >>>>>>> > >>>>>>> a. 0.10.0 -> 1.2 > >>>>>>> b. 1.1 -> 1.2 > >>>>>>> c. 1.2 -> 1.3 (simulated v4) > >>>>>>> d. 0.10.0 -> 1.3 (simulated v4) > >>>>>>> e. 1.1 -> 1.3 (simulated v4) > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax < > >>>>> matth...@confluent.io > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> I want to propose KIP-268 to allow rebalance metadata version > >>>> upgrades > >>>>>>>> in Kafka Streams: > >>>>>>>> > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>> 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade > >>>>>>>> > >>>>>>>> Looking forward to your feedback. > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>> > >>> > >>> > >> > > > > > >