> On Mar 21, 2018, at 11:45 PM, Matthias J. Sax <matth...@confluent.io> wrote: > > Yes, it only affects the metadata. KIP-268 targets metadata upgrade > without store upgrade. > > We can discuss store upgrade further in KIP-258: I think in general, the > upgrade/downgrade behavior might be an issue for upgrading stores. > However, this upgrade/downgrade can only happen when upgrading from 1.2 > to a future version. Thus, it won't affect an upgrade to 1.2. > > For an upgrade to 1.2, we introduce the "upgrade.from" parameter > (because we don't have "version probing" for 1.1 yet) and this ensures > that upgrading cannot happen "too early", and no downgrade can happen > either for this case. > > Let me know what you think. >
I think yes, we can discuss upgrade/downgrade issues (to versions after 1.2) in the other KIP (KIP-258). However, this KIP-268 looks fine. It gives us the mechanism to properly detect and automatically upgrade/downgrade the topology and allows the new/old code to co-exist within a topology, which is something we didn't have before. KIP-268 looks good to me. Thanks for all the answers to my questions. -James > > -Matthias > > On 3/21/18 11:16 PM, James Cheng wrote: >> >> >> >>> On Mar 21, 2018, at 11:18 AM, Matthias J. Sax <matth...@confluent.io> wrote: >>> >>> Thanks for following up James. >>> >>>> Is this the procedure that happens during every rebalance? The reason I >>>> ask is that this step: >>>>>>> As long as the leader (before or after upgrade) receives at least >>> one old version X Subscription it always sends version Assignment X back >>> (the encoded supported version is X before the leader is upgrade and Y >>> after the leader is upgraded). >>> >>> Yes, that would be the consequence. >>> >>>> This implies that the leader receives all Subscriptions before sending >>>> back any responses. Is that what actually happens? Is it possible that it >>>> would receive say 4 out of 5 Subscriptions of Y, send back a response Y, >>>> and then later receive a Subscription X? What happens in that case? Would >>>> that Subscription X then trigger another rebalance, and the whole thing >>>> starts again? >>> >>> That sounds correct. A 'delayed' Subscription could always happen -- >>> even before KIP-268 -- and would trigger a new rebalance. With this >>> regard, the behavior does not change. The difference is, that we would >>> automatically downgrade the Assignment from Y to X again -- but the >>> application would not fail (as it would before the KIP). >>> >>> Do you see an issue with this behavior. The idea of the design is to >>> make Kafka Streams robust against those scenarios. Thus, if 4 apps are >>> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first >>> upgrade from X to Y and downgrade from Y to X in the second rebalance >>> when no.5 joins the group. If no.5 gets upgraded, a third rebalance >>> would upgrade to Y again. >>> >> >> Sounds good. >> >> >>> Thus, as long as not all instances are on the newest version, >>> upgrades/donwgrades of the exchanged rebalance metadata could happen >>> multiple times. However, this should not be an issue from my understanding. >> >> About “this should not be an issue”: this upgrade/downgrade is just about >> the rebalance metadata, right? Are there other associated things that will >> also have to upgrade/downgrade in sync with the rebalance metadata? For >> example, the idea for this KIP originally came up during the discussion >> about adding timestamps to RockDB state stores, which required updating the >> on-disk schema. In the case of an updated RocksDB state store but with a >> downgraded rebalance metadata... that should work, right? Because we still >> have updated code (which understands the on-disk format) but that it simply >> gets its partition assignments via the downgraded rebalance metadata? >> >> Thanks, >> -James >> >> Sent from my iPhone >> >>> Let us know what you think about it. >>> >>> >>> -Matthias >>> >>> >>>> On 3/20/18 11:10 PM, James Cheng wrote: >>>> Sorry, I see that the VOTE started already, but I have a late question on >>>> this KIP. >>>> >>>> In the "version probing" protocol: >>>>> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2): >>>>> On startup/rolling-bounce, an instance does not know what version the >>>>> leader understands and (optimistically) sends an Subscription with the >>>>> latest version Y >>>>> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the >>>>> corresponding instance that sent the newer Subscription it does not >>>>> understand. The Assignment metadata only encodes both version numbers >>>>> (used-version == supported-version) as leader's supported-version X. >>>>> For all other instances the leader sends a regular Assignment in version >>>>> X back. >>>>> If an upgrade follower sends new version number Y Subscription and >>>>> receives version X Assignment with "supported-version = X", it can >>>>> downgrade to X (in-memory flag) and resends a new Subscription with old >>>>> version X to retry joining the group. To force an immediate second >>>>> rebalance, the follower does an "unsubscribe()/subscribe()/poll()" >>>>> sequence. >>>>> As long as the leader (before or after upgrade) receives at least one old >>>>> version X Subscription it always sends version Assignment X back (the >>>>> encoded supported version is X before the leader is upgrade and Y after >>>>> the leader is upgraded). >>>>> If an upgraded instance receives an Assigment it always checks the >>>>> leaders supported-version and update its downgraded "used-version" if >>>>> possible >>>> >>>> Is this the procedure that happens during every rebalance? The reason I >>>> ask is that this step: >>>>>> As long as the leader (before or after upgrade) receives at least one >>>>>> old version X Subscription it always sends version Assignment X back >>>>>> (the encoded supported version is X before the leader is upgrade and Y >>>>>> after the leader is upgraded). >>>> >>>> This implies that the leader receives all Subscriptions before sending >>>> back any responses. Is that what actually happens? Is it possible that it >>>> would receive say 4 out of 5 Subscriptions of Y, send back a response Y, >>>> and then later receive a Subscription X? What happens in that case? Would >>>> that Subscription X then trigger another rebalance, and the whole thing >>>> starts again? >>>> >>>> Thanks, >>>> -James >>>> >>>>> On Mar 19, 2018, at 5:04 PM, Matthias J. Sax <matth...@confluent.io> >>>>> wrote: >>>>> >>>>> Guozhang, >>>>> >>>>> thanks for your comments. >>>>> >>>>> 2: I think my main concern is, that 1.2 would be "special" release that >>>>> everybody need to use to upgrade. As an alternative, we could say that >>>>> we add the config in 1.2 and keep it for 2 additional releases (1.3 and >>>>> 1.4) but remove it in 1.5. This gives users more flexibility and does >>>>> force not force user to upgrade to a specific version but also allows us >>>>> to not carry the tech debt forever. WDYT about this? If users upgrade on >>>>> an regular basis, this approach could avoid a forces update with high >>>>> probability as the will upgrade to either 1.2/1.3/1.4 anyway at some >>>>> point. Thus, only if users don't upgrade for a very long time, they are >>>>> forces to do 2 upgrades with an intermediate version. >>>>> >>>>> 4. Updated the KIP to remove the ".x" suffix >>>>> >>>>> 5. Updated the KIP accordingly. >>>>> >>>>> -Matthias >>>>> >>>>>> On 3/19/18 10:33 AM, Guozhang Wang wrote: >>>>>> Yup :) >>>>>> >>>>>>> On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>> >>>>>>> bq. some snippet like ProduceRequest / ProduceRequest >>>>>>> >>>>>>> Did you mean ProduceRequest / Response ? >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>>> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang <wangg...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi Matthias, >>>>>>>> >>>>>>>> About 2: yeah I guess this is a subjective preference. My main concern >>>>>>>> about keeping the config / handling code beyond 1.2 release is that it >>>>>>> will >>>>>>>> become a non-cleanable tech debt forever, as fewer and fewer users >>>>>>>> would >>>>>>>> need to upgrade from 0.10.x and 1.1.x, and eventually we will need to >>>>>>>> maintain this for nearly no one. On the other hand, I agree that this >>>>>>> tech >>>>>>>> debt is not too large. So if more people feel this is a good tradeoff >>>>>>>> to >>>>>>>> pay for not enforcing users from older versions to upgrade twice I'm >>>>>>> happen >>>>>>>> to change my opinion. >>>>>>>> >>>>>>>> A few more minor comments: >>>>>>>> >>>>>>>> 4. For the values of "upgrade.from", could we simply to only >>>>>>>> major.minor? >>>>>>>> I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility >>>>>>>> behavior in bug fix releases we would not need to specify a bug-fix >>>>>>> version >>>>>>>> to distinguish ever. >>>>>>>> >>>>>>>> 5. Could you also present the encoding format in subscription / >>>>>>> assignment >>>>>>>> metadata bytes in version 2, and in future versions (i.e. which first >>>>>>> bytes >>>>>>>> would be kept moving forward), for readers to better understand the >>>>>>>> proposal? some snippet like ProduceRequest / ProduceRequest in >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging >>>>>>>> would be very helpful. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax >>>>>>>> <matth...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for your comments. >>>>>>>>> >>>>>>>>> 1. Because the old leader cannot decode the new Subscription it can >>>>>>> only >>>>>>>>> send an empty assignment back. The idea to send empty assignments to >>>>>>> all >>>>>>>>> members is interesting. I will try this out in an PR to see how it >>>>>>>> behaves. >>>>>>>>> >>>>>>>>> 2. I don't see an issue with keeping config `upgrade.from` for future >>>>>>>>> releases. Personally, I would prefer to not force users to do two >>>>>>>>> upgrades if they want to go from pre-1.2 to post-1.2 version. Is there >>>>>>> a >>>>>>>>> technical argument why you want to get rid of the config? What >>>>>>>>> disadvantages do you see keeping `upgrade.from` beyond 1.2 release? >>>>>>>>> >>>>>>>>> Keeping the config is just a few lines of code in `StreamsConfig` as >>>>>>>>> well we a single `if` statement in `StreamsPartitionAssignor` to force >>>>>>> a >>>>>>>>> downgrade (cf >>>>>>>>> https://github.com/apache/kafka/pull/4636/files#diff- >>>>>>>>> 392371c29384e33bb09ed342e7696c68R201) >>>>>>>>> >>>>>>>>> >>>>>>>>> 3. I updated the KIP accordingly. >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>>> On 3/15/18 3:19 PM, Guozhang Wang wrote: >>>>>>>>>> Hello Matthias, thanks for the KIP. Here are some comments: >>>>>>>>>> >>>>>>>>>> 1. "For all other instances the leader sends a regular Assignment in >>>>>>>>>> version X back." Does that mean the leader will exclude any member of >>>>>>>> the >>>>>>>>>> group whose protocol version that it does not understand? For >>>>>>> example, >>>>>>>> if >>>>>>>>>> we have A, B, C with A the leader, and B bounced with the newer >>>>>>>> version. >>>>>>>>> In >>>>>>>>>> the first rebalance, A will only consider {A, C} for assignment while >>>>>>>>>> sending empty assignment to B. And then later when B downgrades will >>>>>>> it >>>>>>>>>> re-assign the tasks to it again? I felt this is unnecessarily >>>>>>>> increasing >>>>>>>>>> the num. rebalances and the total latency. Could the leader just >>>>>>> sends >>>>>>>>>> empty assignment to everyone, and since upon receiving the empty >>>>>>>>> assignment >>>>>>>>>> each thread will not create / restore any tasks and will not clean up >>>>>>>> its >>>>>>>>>> local state (so that the prevCachedTasks are not lost in future >>>>>>>>> rebalances) >>>>>>>>>> and re-joins immediately, if users choose to bounce an instance once >>>>>>> it >>>>>>>>> is >>>>>>>>>> in RUNNING state the total time of rolling upgrades will be reduced. >>>>>>>>>> >>>>>>>>>> 2. If we want to allow upgrading from 1.1- versions to any of the >>>>>>>> future >>>>>>>>>> versions beyond 1.2, then we'd always need to keep the special >>>>>>> handling >>>>>>>>>> logic for this two rolling-bounce mechanism plus a config that we >>>>>>> would >>>>>>>>>> never be able to deprecate; on the other hand, if the version probing >>>>>>>>>> procedure is fast, I think the extra operational cost from upgrading >>>>>>>> from >>>>>>>>>> 1.1- to a future version, to upgrading from 1.1- to 1.2, and then >>>>>>>> another >>>>>>>>>> upgrade from 1.2 to a future version could be small. So depending on >>>>>>>> the >>>>>>>>>> experimental result of the upgrade latency, I'd suggest considering >>>>>>> the >>>>>>>>>> trade-off of the extra code/config needed maintaining for the special >>>>>>>>>> handling. >>>>>>>>>> >>>>>>>>>> 3. Testing plan: could you elaborate a bit more on the actual >>>>>>>>> upgrade-paths >>>>>>>>>> we should test? For example, I'm thinking the following: >>>>>>>>>> >>>>>>>>>> a. 0.10.0 -> 1.2 >>>>>>>>>> b. 1.1 -> 1.2 >>>>>>>>>> c. 1.2 -> 1.3 (simulated v4) >>>>>>>>>> d. 0.10.0 -> 1.3 (simulated v4) >>>>>>>>>> e. 1.1 -> 1.3 (simulated v4) >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax < >>>>>>>> matth...@confluent.io >>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I want to propose KIP-268 to allow rebalance metadata version >>>>>>> upgrades >>>>>>>>>>> in Kafka Streams: >>>>>>>>>>> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>> 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade >>>>>>>>>>> >>>>>>>>>>> Looking forward to your feedback. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >