Adam, I am still working on it. Was pulled into a lot of other tasks lately so this was delayed. Also had some discussions about simplifying the upgrade path with some colleagues and I am prototyping this atm. Hope to update the KIP accordingly soon.
-Matthias On 11/10/18 7:41 AM, Adam Bellemare wrote: > Hello Matthias > > I am curious as to the status of this KIP. TTL and expiry of records will > be extremely useful for several of our business use-cases, as well as > another KIP I had been working on. > > Thanks > > > > On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi Matthias, >> >> Good stuff. Could you comment a bit on how future-proof is this change? For >> example, if we want to store both event timestamp "and" processing time in >> RocksDB will we then need another interface (e.g. called >> KeyValueWithTwoTimestampsStore)? >> >> Thanks >> Eno >> >> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Thanks for your input Guozhang and John. >>> >>> I see your point, that the upgrade API is not simple. If you don't >>> thinks it's valuable to make generic store upgrades possible (atm), we >>> can make the API internal, too. The impact is, that we only support a >>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to >>> WindowedWithTS etc) for which we implement the internal interfaces. >>> >>> We can keep the design generic, so if we decide to make it public, we >>> don't need to re-invent it. This will also have the advantage, that we >>> can add upgrade pattern for other stores later, too. >>> >>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, but it >>> was the only way I could find to design a generic upgrade interface. If >>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` would >>> become an internal interface I guess (don't think we can remove it). >>> >>> I will wait for more feedback about this and if nobody wants to keep it >>> as public API I will update the KIP accordingly. Will add some more >>> clarifications for different upgrade patterns in the mean time and fix >>> the typos/minor issues. >>> >>> About adding a new state UPGRADING: maybe we could do that. However, I >>> find it particularly difficult to make the estimation when we should >>> switch to RUNNING, thus, I am a little hesitant. Using store callbacks >>> or just logging the progress including some indication about the "lag" >>> might actually be sufficient. Not sure what others think? >>> >>> About "value before timestamp": no real reason and I think it does not >>> make any difference. Do you want to change it? >>> >>> About upgrade robustness: yes, we cannot control if an instance fails. >>> That is what I meant by "we need to write test". The upgrade should be >>> able to continuous even is an instance goes down (and we must make sure >>> that we don't end up in an invalid state that forces us to wipe out the >>> whole store). Thus, we need to write system tests that fail instances >>> during upgrade. >>> >>> For `in_place_offline` upgrade: I don't think we need this mode, because >>> people can do this via a single rolling bounce. >>> >>> - prepare code and switch KV-Store to KVwithTs-Store >>> - do a single rolling bounce (don't set any upgrade config) >>> >>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we >>> remove the `StoreUpgradeBuilder`) will detect that there is only an old >>> local KV store w/o TS, will start to restore the new KVwithTs store, >>> wipe out the old store and replace with the new store after restore is >>> finished, and start processing only afterwards. (I guess we need to >>> document this case -- will also add it to the KIP.) >>> >>> >>> >>> -Matthias >>> >>> >>> >>> On 8/9/18 1:10 PM, John Roesler wrote: >>>> Hi Matthias, >>>> >>>> I think this KIP is looking really good. >>>> >>>> I have a few thoughts to add to the others: >>>> >>>> 1. You mentioned at one point users needing to configure >>>> `upgrade.mode="null"`. I think this was a typo and you meant to say >> they >>>> should remove the config. If they really have to set it to a string >>> "null" >>>> or even set it to a null value but not remove it, it would be >>> unfortunate. >>>> >>>> 2. In response to Bill's comment #1 , you said that "The idea is that >> the >>>> upgrade should be robust and not fail. We need to write according >> tests". >>>> I may have misunderstood the conversation, but I don't think it's >> within >>>> our power to say that an instance won't fail. What if one of my >> computers >>>> catches on fire? What if I'm deployed in the cloud and one instance >>>> disappears and is replaced by a new one? Or what if one instance goes >>> AWOL >>>> for a long time and then suddenly returns? How will the upgrade process >>>> behave in light of such failures? >>>> >>>> 3. your thought about making in-place an offline mode is interesting, >> but >>>> it might be a bummer for on-prem users who wish to upgrade online, but >>>> cannot just add new machines to the pool. It could be a new upgrade >> mode >>>> "offline-in-place", though... >>>> >>>> 4. I was surprised to see that a user would need to modify the topology >>> to >>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's >>>> suggestions would remove this necessity. >>>> >>>> Thanks for taking on this very complex but necessary work. >>>> >>>> -John >>>> >>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <wangg...@gmail.com> >>> wrote: >>>> >>>>> Hello Matthias, >>>>> >>>>> Thanks for the updated KIP. Some more comments: >>>>> >>>>> 1. The current set of proposed API is a bit too complicated, which >> makes >>>>> the upgrade flow from user's perspective also a bit complex. I'd like >> to >>>>> check different APIs and discuss about their needs separately: >>>>> >>>>> 1.a. StoreProxy: needed for in-place upgrade only, between the >> first >>>>> and second rolling bounce, where the old-versioned stores can handle >>>>> new-versioned store APIs. I think such upgrade paths (i.e. from one >>> store >>>>> type to another) would not be very common: users may want to upgrade >>> from a >>>>> certain store engine to another, but the interface would likely be >>> staying >>>>> the same. Hence personally I'd suggest we keep it internally and only >>>>> consider exposing it in the future if it does become a common pattern. >>>>> >>>>> 1.b. ConverterStore / RecordConverter: needed for both in-place >> and >>>>> roll-over upgrade, between the first and second rolling bounces, for >> the >>>>> new versioned store to be able to read old-versioned changelog topics. >>>>> Firstly I think we should not expose key in the public APIs but only >> the >>>>> values, since allowing key format changes would break log compaction, >>> and >>>>> hence would not be compatible anyways. As for value format changes, >>>>> personally I think we can also keep its upgrade logic internally as it >>> may >>>>> not worth generalizing to user customizable logic. >>>>> >>>>> 1.c. If you agrees with 2.a/b above, then we can also remove " >>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the public >>> APIs. >>>>> >>>>> 1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" is >> not >>>>> needed either given that we are exposing "ValueAndTimestamp" anyways. >>> I.e. >>>>> it is just a syntax sugar and for IQ, users can always just set a " >>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" as the >>> new >>>>> interface does not provide any additional functions. >>>>> >>>>> >>>>> 2. Could we further categorize the upgrade flow for different use >> cases, >>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be used >>>>> automatically for non-windowed aggregate; 2) PAPI users who do not >> need >>> to >>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to switch to >>>>> KeyValueWithTimestampStore. Just to give my understanding for 3), the >>>>> upgrade flow for users may be simplified as the following (for both >>>>> in-place and roll-over): >>>>> >>>>> * Update the jar to new version, make code changes from >>> KeyValueStore >>>>> to KeyValueWithTimestampStore, set upgrade config. >>>>> >>>>> * First rolling bounce, and library code can internally use proxy >> / >>>>> converter based on the specified config to handle new APIs with old >>> stores, >>>>> while let new stores read from old changelog data. >>>>> >>>>> * Reset upgrade config. >>>>> >>>>> * Second rolling bounce, and the library code automatically turn >> off >>>>> logic for proxy / converter. >>>>> >>>>> >>>>> 3. Some more detailed proposals are needed for when to recommend users >>> to >>>>> trigger the second rolling bounce. I have one idea to share here: we >>> add a >>>>> new state to KafkaStreams, say UPGRADING, which is set when 1) upgrade >>>>> config is set, and 2) the new stores are still ramping up (for the >>> second >>>>> part, we can start with some internal hard-coded heuristics to decide >>> when >>>>> it is close to be ramped up). If either one of it is not true any >> more, >>> it >>>>> should transit to RUNNING. Users can then watch on this state, and >>> decide >>>>> to only trigger the second rebalance when the state has transited from >>>>> UPGRADING. They can also choose to cut over while the instance is >> still >>>>> UPGRADING, the downside is that after that the application may have >> long >>>>> restoration phase which is, to user's pov, unavailability periods. >>>>> >>>>> >>>>> Below are just some minor things on the wiki: >>>>> >>>>> 4. "proxy story" => "proxy store". >>>>> >>>>> 5. "use the a builder " => "use a builder" >>>>> >>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to the >> value": >>>>> what's the rationale of putting the timestamp before the value, than >>> after >>>>> the value? >>>>> >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax < >> matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> Thanks for the feedback Bill. I just update the KIP with some of your >>>>>> points. >>>>>> >>>>>>>> Regarding step 3C of the in-place upgrade (users needing to watch >> the >>>>>>>> restore process), I'm wondering if we want to provide a type of >>>>>>>> StateRestoreListener that could signal when the new stores have >>>>> reached >>>>>>>> parity with the existing old stores and that could be the signal to >>>>>> start >>>>>>>> second rolling rebalance? >>>>>> >>>>>> I think we can reuse the existing listeners, thus, I did not include >>>>>> anything in the KIP. About a signal to rebalance: this might be >> tricky. >>>>>> If we prepare the store "online", the active task will update the >> state >>>>>> continuously, and thus, state prepare is never finished. It will be >> the >>>>>> users responsibility to do the second rebalance (note, that the >> second >>>>>> rebalance will first finish the last delta of the upgrade to finish >> the >>>>>> upgrade before actual processing resumes). I clarified the KIP with >>> this >>>>>> regard a little bit. >>>>>> >>>>>>>> 1. Out of N instances, one fails midway through the process, would >> we >>>>>> allow >>>>>>>> the other instances to complete or just fail the entire upgrade? >>>>>> >>>>>> The idea is that the upgrade should be robust and not fail. We need >> to >>>>>> write according tests. >>>>>> >>>>>>>> 2. During the second rolling bounce, maybe we could rename the >>> current >>>>>>>> active directories vs. deleting them right away, and when all the >>>>>> prepare >>>>>>>> task directories are successfully migrated then delete the previous >>>>>> active >>>>>>>> ones. >>>>>> >>>>>> Ack. Updated the KIP. >>>>>> >>>>>>>> 3. For the first rolling bounce we pause any processing any new >>>>> records >>>>>> and >>>>>>>> just allow the prepare tasks to restore, then once all prepare >> tasks >>>>>> have >>>>>>>> restored, it's a signal for the second round of rolling bounces and >>>>>> then as >>>>>>>> each task successfully renames its prepare directories and deletes >>> the >>>>>> old >>>>>>>> active task directories, normal processing of records resumes. >>>>>> >>>>>> The basic idea is to do an online upgrade to avoid downtime. We can >>>>>> discuss to offer both options... For the offline upgrade option, we >>>>>> could simplify user interaction and trigger the second rebalance >>>>>> automatically with the requirement that a user needs to update any >>>>> config. >>>>>> >>>>>> If might actually be worth to include this option: we know from >>>>>> experience with state restore, that regular processing slows down the >>>>>> restore. For roll_over upgrade, it would be a different story and >>>>>> upgrade should not be slowed down by regular processing. Thus, we >>> should >>>>>> even make in_place an offline upgrade and force people to use >> roll_over >>>>>> if they need onlint upgrade. Might be a fair tradeoff that may >> simplify >>>>>> the upgrade for the user and for the code complexity. >>>>>> >>>>>> Let's see what other think. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote: >>>>>>> Hi Matthias, >>>>>>> >>>>>>> Thanks for the update and the working prototype, it helps with >>>>>>> understanding the KIP. >>>>>>> >>>>>>> I took an initial pass over this PR, and overall I find the >> interfaces >>>>>> and >>>>>>> approach to be reasonable. >>>>>>> >>>>>>> Regarding step 3C of the in-place upgrade (users needing to watch >> the >>>>>>> restore process), I'm wondering if we want to provide a type of >>>>>>> StateRestoreListener that could signal when the new stores have >>> reached >>>>>>> parity with the existing old stores and that could be the signal to >>>>> start >>>>>>> second rolling rebalance? >>>>>>> >>>>>>> Although you solicited feedback on the interfaces involved, I wanted >>> to >>>>>> put >>>>>>> down some thoughts that have come to mind reviewing this KIP again >>>>>>> >>>>>>> 1. Out of N instances, one fails midway through the process, would >> we >>>>>> allow >>>>>>> the other instances to complete or just fail the entire upgrade? >>>>>>> 2. During the second rolling bounce, maybe we could rename the >> current >>>>>>> active directories vs. deleting them right away, and when all the >>>>>> prepare >>>>>>> task directories are successfully migrated then delete the previous >>>>>> active >>>>>>> ones. >>>>>>> 3. For the first rolling bounce we pause any processing any new >>> records >>>>>> and >>>>>>> just allow the prepare tasks to restore, then once all prepare tasks >>>>> have >>>>>>> restored, it's a signal for the second round of rolling bounces and >>>>> then >>>>>> as >>>>>>> each task successfully renames its prepare directories and deletes >> the >>>>>> old >>>>>>> active task directories, normal processing of records resumes. >>>>>>> >>>>>>> Thanks, >>>>>>> Bill >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax < >>> matth...@confluent.io >>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> KIP-268 (rebalance meatadata) is finished and included in AK 2.0 >>>>>>>> release. Thus, I want to pick up this KIP again to get the RocksDB >>>>>>>> upgrade done for 2.1. >>>>>>>> >>>>>>>> I updated the KIP accordingly and also have a "prove of concept" PR >>>>>>>> ready (for "in place" upgrade only): >>>>>>>> https://github.com/apache/kafka/pull/5422/ >>>>>>>> >>>>>>>> There a still open questions, but I want to collect early feedback >> on >>>>>>>> the proposed interfaces we need for the store upgrade. Also note, >>> that >>>>>>>> the KIP now also aim to define a generic upgrade path from any >> store >>>>>>>> format A to any other store format B. Adding timestamps is just a >>>>>>>> special case. >>>>>>>> >>>>>>>> I will continue to work on the PR and refine the KIP in the >> meantime, >>>>>> too. >>>>>>>> >>>>>>>> Looking forward to your feedback. >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote: >>>>>>>>> After some more thoughts, I want to follow John's suggestion and >>>>> split >>>>>>>>> upgrading the rebalance metadata from the store upgrade. >>>>>>>>> >>>>>>>>> I extracted the metadata upgrade into it's own KIP: >>>>>>>>> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268% >>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade >>>>>>>>> >>>>>>>>> I'll update this KIP accordingly shortly. I also want to consider >> to >>>>>>>>> make the store format upgrade more flexible/generic. Atm, the KIP >> is >>>>>> too >>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI users >> that >>>>> we >>>>>>>>> should not force to upgrade the stores. I need to figure out the >>>>>> details >>>>>>>>> and follow up later. >>>>>>>>> >>>>>>>>> Please give feedback for the new KIP-268 on the corresponding >>>>>> discussion >>>>>>>>> thread. >>>>>>>>> >>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't figure out >> a >>>>> way >>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a fix >> for >>>>>>>>> future upgrades. Please share your thoughts. >>>>>>>>> >>>>>>>>> Thanks for all your feedback! >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote: >>>>>>>>>> @John: yes, we would throw if configs are missing (it's an >>>>>>>>>> implementation details IMHO and thus I did not include it in the >>>>> KIP) >>>>>>>>>> >>>>>>>>>> @Guozhang: >>>>>>>>>> >>>>>>>>>> 1) I understand know what you mean. We can certainly, allow all >>>>> values >>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for >> `upgrade.from` >>>>>>>>>> parameter. I had a similar though once but decided to collapse >> them >>>>>> into >>>>>>>>>> one -- will update the KIP accordingly. >>>>>>>>>> >>>>>>>>>> 2) The idea to avoid any config would be, to always send both >>>>> request. >>>>>>>>>> If we add a config to eventually disable the old request, we >> don't >>>>>> gain >>>>>>>>>> anything with this approach. The question is really, if we are >>>>> willing >>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be limited to >> 2 >>>>>>>>>> versions and not grow further in future releases. More details in >>>>> (3) >>>>>>>>>> >>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and allows >> us >>>>> to >>>>>>>>>> stay with 2 "assignment strategies" we need to register, as the >> new >>>>>>>>>> assignment strategy will allow to "upgrade itself" via "version >>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a config >> if >>>>>>>>>> people upgrade from pre-1.2 releases. >>>>>>>>>> >>>>>>>>>> Thus, I don't think we need to register new "assignment >> strategies" >>>>>> and >>>>>>>>>> send empty subscriptions for older version. >>>>>>>>>> >>>>>>>>>> 4) I agree that this is a tricky thing to get right with a single >>>>>>>>>> rebalance. I share the concern that an application might never >>> catch >>>>>> up >>>>>>>>>> and thus the hot standby will never be ready. >>>>>>>>>> >>>>>>>>>> Maybe it's better to go with 2 rebalances for store upgrades. If >> we >>>>> do >>>>>>>>>> this, we also don't need to go with (2) and can get (3) in place >>> for >>>>>>>>>> future upgrades. I also think that changes to the metadata are >> more >>>>>>>>>> likely and thus allowing for single rolling bounce for this case >> is >>>>>> more >>>>>>>>>> important anyway. If we assume that store upgrade a rare, it >> might >>>>> be >>>>>> ok >>>>>>>>>> to sacrifice two rolling bounced for this case. It was just an >> idea >>>>> I >>>>>>>>>> wanted to share (even if I see the issues). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote: >>>>>>>>>>> Hello Matthias, thanks for your replies. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 1) About the config names: actually I was trying to not expose >>>>>>>>>>> implementation details :) My main concern was that in your >>> proposal >>>>>> the >>>>>>>>>>> values need to cover the span of all the versions that are >>> actually >>>>>>>> using >>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am >>>>>>>> upgrading >>>>>>>>>>> from any versions within this range I need to remember to use >> the >>>>>> value >>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In my >>>>>> suggestion >>>>>>>> I >>>>>>>>>>> was trying to argue the benefit of just letting users to specify >>>>> the >>>>>>>> actual >>>>>>>>>>> Kafka version she's trying to upgrade from, than specifying a >>> range >>>>>> of >>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as the >>>>> values, >>>>>>>> but >>>>>>>>>>> still using Kafka versions like broker's `internal.version` >>> config. >>>>>>>> But if >>>>>>>>>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you >>>>>> meant >>>>>>>> to >>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or >>>>> "1.1" >>>>>>>> which >>>>>>>>>>> are all recognizable config values then I think we are actually >> on >>>>>> the >>>>>>>> same >>>>>>>>>>> page. >>>>>>>>>>> >>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would increase the >>>>>> network >>>>>>>>>>> footprint, but not the message size, IF I'm not >> mis-understanding >>>>>> your >>>>>>>> idea >>>>>>>>>>> of registering multiple assignment. More details: >>>>>>>>>>> >>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can encode >>>>>> multiple >>>>>>>>>>> protocols each with their different metadata. The coordinator >> will >>>>>>>> pick the >>>>>>>>>>> common one that everyone supports (if there are no common one, >> it >>>>>> will >>>>>>>> send >>>>>>>>>>> an error back; if there are multiple ones, it will pick the one >>>>> with >>>>>>>> most >>>>>>>>>>> votes, i.e. the one which was earlier in the encoded list). >> Since >>>>> our >>>>>>>>>>> current Streams rebalance protocol is still based on the >> consumer >>>>>>>>>>> coordinator, it means our protocol_type would be "consumer", but >>>>>>>> instead >>>>>>>>>>> the protocol type we can have multiple protocols like "streams", >>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we need to >>>>>>>> implement a >>>>>>>>>>> different assignor class for each version and register all of >> them >>>>> in >>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future >> if >>>>> we >>>>>>>>>>> re-factor our implementation to have our own client coordinator >>>>> layer >>>>>>>> like >>>>>>>>>>> Connect did, we can simplify this part of the implementation. >> But >>>>>> even >>>>>>>> for >>>>>>>>>>> now with the above approach this is still doable. >>>>>>>>>>> >>>>>>>>>>> On the broker side, the group coordinator will only persist a >>> group >>>>>>>> with >>>>>>>>>>> the selected protocol and its subscription metadata, e.g. if >>>>>>>> coordinator >>>>>>>>>>> decides to pick "streams_v2" it will only sends that protocol's >>>>>>>> metadata >>>>>>>>>>> from everyone to the leader to assign, AND when completing the >>>>>>>> rebalance it >>>>>>>>>>> will also only write the group metadata with that protocol and >> the >>>>>>>>>>> assignment only. In a word, although the network traffic maybe >>>>>>>> increased a >>>>>>>>>>> bit, it would not be a bummer in our trade-off. One corner >>>>> situation >>>>>> we >>>>>>>>>>> need to consider is how to stop registering very old assignors >> to >>>>>>>> avoid the >>>>>>>>>>> network traffic from increasing indefinitely, e.g. if you are >>>>> rolling >>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1 >> assignor >>>>>>>> anymore, >>>>>>>>>>> but that would unfortunately still require some configs. >>>>>>>>>>> >>>>>>>>>>> 3) About the "version probing" idea, I think that's a promising >>>>>>>> approach >>>>>>>>>>> as well, but if we are going to do the multi-assignment its >> value >>>>>> seems >>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top of >>>>>>>> multi-assignment >>>>>>>>>>> to save us from still requiring the config to avoid registering >>> all >>>>>> the >>>>>>>>>>> metadata for all version. More details: >>>>>>>>>>> >>>>>>>>>>> In the JoinGroupRequest, we still register all the assignor but >>> for >>>>>>>> all old >>>>>>>>>>> assignors we do not encode any metadata, i.e. the encoded data >>>>> would >>>>>>>> be: >>>>>>>>>>> >>>>>>>>>>> "streams_vN" : "encoded metadata" >>>>>>>>>>> "streams_vN-1":empty >>>>>>>>>>> "streams_vN-2":empty >>>>>>>>>>> .. >>>>>>>>>>> "streams_0":empty >>>>>>>>>>> >>>>>>>>>>> So the coordinator can still safely choose the latest common >>>>> version; >>>>>>>> and >>>>>>>>>>> then when leaders receive the subscription (note it should >> always >>>>>>>> recognize >>>>>>>>>>> that version), let's say it is streams_vN-2, if one of the >>>>>>>> subscriptions >>>>>>>>>>> are empty bytes, it will send the empty assignment with that >>>>> version >>>>>>>> number >>>>>>>>>>> encoded in the metadata. So in the second auto-triggered all >>>>> members >>>>>>>> would >>>>>>>>>>> send the metadata with that version: >>>>>>>>>>> >>>>>>>>>>> "streams_vN" : empty >>>>>>>>>>> "streams_vN-1" : empty >>>>>>>>>>> "streams_vN-2" : "encoded metadata" >>>>>>>>>>> .. >>>>>>>>>>> "streams_0":empty >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> By doing this we would not require any configs for users. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about >>> the >>>>>>>> details >>>>>>>>>>> so probably we'd need to fill that out before making a call. For >>>>>>>> example, >>>>>>>>>>> you mentioned "If we detect this situation, the Streams >>> application >>>>>>>> closes >>>>>>>>>>> corresponding active tasks as well as "hot standby" tasks, and >>>>>>>> re-creates >>>>>>>>>>> the new active tasks using the new store." How could we >> guarantee >>>>>> that >>>>>>>> the >>>>>>>>>>> gap between these two stores will keep decreasing than >> increasing >>>>> so >>>>>>>> we'll >>>>>>>>>>> eventually achieve the flip point? And also the longer we are >>>>> before >>>>>>>> the >>>>>>>>>>> flip point, the larger we are doubling the storage space, etc. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax < >>>>>>>> matth...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> @John, Guozhang, >>>>>>>>>>>> >>>>>>>>>>>> thanks a lot for your comments. Very long reply... >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> About upgrading the rebalance metadata: >>>>>>>>>>>> >>>>>>>>>>>> Another possibility to do this, would be to register multiple >>>>>>>> assignment >>>>>>>>>>>> strategies for the 1.2 applications. For this case, new >> instances >>>>>>>> would >>>>>>>>>>>> be configured to support both and the broker would pick the >>>>> version >>>>>>>> that >>>>>>>>>>>> all instances understand. The disadvantage would be, that we >> send >>>>>> much >>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as long as >> no >>>>>>>> second >>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, using this >>>>>>>> approach >>>>>>>>>>>> would allow to avoid a second rebalance trading-off an >> increased >>>>>>>>>>>> rebalance network footprint (I also assume that this would >>>>> increase >>>>>>>> the >>>>>>>>>>>> message size that is written into __consumer_offsets topic?). >>>>>>>> Overall, I >>>>>>>>>>>> am not sure if this would be a good tradeoff, but it could >> avoid >>> a >>>>>>>>>>>> second rebalance (I have some more thoughts about stores below >>>>> that >>>>>>>> are >>>>>>>>>>>> relevant for single rebalance upgrade). >>>>>>>>>>>> >>>>>>>>>>>> For future upgrades we might be able to fix this though. I was >>>>>>>> thinking >>>>>>>>>>>> about the following: >>>>>>>>>>>> >>>>>>>>>>>> In the current implementation, the leader fails if it gets a >>>>>>>>>>>> subscription it does not understand (ie, newer version). We >> could >>>>>>>> change >>>>>>>>>>>> this behavior and let the leader send an empty assignment plus >>>>> error >>>>>>>>>>>> code (including supported version) back to the instance sending >>>>> the >>>>>>>>>>>> "bad" subscription. This would allow the following logic for an >>>>>>>>>>>> application instance: >>>>>>>>>>>> >>>>>>>>>>>> - on startup, always send the latest subscription format >>>>>>>>>>>> - if leader understands it, we get an assignment back an start >>>>>>>> processing >>>>>>>>>>>> - if leader does not understand it, we get an empty assignment >>>>> and >>>>>>>>>>>> supported version back >>>>>>>>>>>> - the application unsubscribe()/subscribe()/poll() again and >>>>>> sends a >>>>>>>>>>>> subscription using the leader's supported version >>>>>>>>>>>> >>>>>>>>>>>> This protocol would allow to do a single rolling bounce, and >>>>>>>> implements >>>>>>>>>>>> a "version probing" step, that might result in two executed >>>>>>>> rebalances. >>>>>>>>>>>> The advantage would be, that the user does not need to set any >>>>>> configs >>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care of this >>>>>>>>>>>> automatically. >>>>>>>>>>>> >>>>>>>>>>>> One disadvantage would be, that two rebalances happen and that >>> for >>>>>> an >>>>>>>>>>>> error case during rebalance, we loose the information about the >>>>>>>>>>>> supported leader version and the "probing step" would happen a >>>>>> second >>>>>>>> time. >>>>>>>>>>>> >>>>>>>>>>>> If the leader is eventually updated, it will include it's own >>>>>>>> supported >>>>>>>>>>>> version in all assignments, to allow a "down graded" >> application >>>>> to >>>>>>>>>>>> upgrade its version later. Also, if a application fails, the >>> first >>>>>>>>>>>> probing would always be successful and only a single rebalance >>>>>>>> happens. >>>>>>>>>>>> If we use this protocol, I think we don't need any >> configuration >>>>>>>>>>>> parameter for future upgrades. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version": >>>>>>>>>>>> >>>>>>>>>>>> Users would set "upgrade.from" to the release version the >>>>>> current/old >>>>>>>>>>>> application is using. I think this is simpler, as users know >> this >>>>>>>>>>>> version. If we use "internal.protocol.version" instead, we >> expose >>>>>>>>>>>> implementation details and users need to know the protocol >>> version >>>>>>>> (ie, >>>>>>>>>>>> they need to map from the release version to the protocol >>> version; >>>>>> ie, >>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version 2"). >>>>>>>>>>>> >>>>>>>>>>>> Also the KIP states that for the second rolling bounce, the >>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- and thus, >>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored (I will >>>>>> update >>>>>>>>>>>> the KIP to point out this dependency). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> About your second point: I'll update the KIP accordingly to >>>>> describe >>>>>>>>>>>> future updates as well. Both will be different. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> One more point about upgrading the store format. I was thinking >>>>>> about >>>>>>>>>>>> avoiding the second rolling bounce all together in the future: >>> (1) >>>>>> the >>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this >>> required >>>>>> to >>>>>>>>>>>> prepare the stores as "hot standbys" before we do the switch >> and >>>>>>>> delete >>>>>>>>>>>> the old stores. (3) the current proposal does the switch >>>>> "globally" >>>>>> -- >>>>>>>>>>>> this is simpler and due to the required second rebalance no >>>>>>>> disadvantage. >>>>>>>>>>>> However, a global consistent switch over might actually not be >>>>>>>> required. >>>>>>>>>>>> For "in_place" upgrade, following the protocol from above, we >>>>> could >>>>>>>>>>>> decouple the store switch and each instance could switch its >>> store >>>>>>>>>>>> independently from all other instances. After the rolling >> bounce, >>>>> it >>>>>>>>>>>> seems to be ok to switch from the old store to the new store >>>>> "under >>>>>>>> the >>>>>>>>>>>> hood" whenever the new store is ready (this could even be done, >>>>>> before >>>>>>>>>>>> we switch to the new metadata version). Each time we update the >>>>> "hot >>>>>>>>>>>> standby" we check if it reached the "endOffset" (or maybe X% >>> that >>>>>>>> could >>>>>>>>>>>> either be hardcoded or configurable). If we detect this >>> situation, >>>>>> the >>>>>>>>>>>> Streams application closes corresponding active tasks as well >> as >>>>>> "hot >>>>>>>>>>>> standby" tasks, and re-creates the new active tasks using the >> new >>>>>>>> store. >>>>>>>>>>>> (I need to go through the details once again, but it seems to >> be >>>>>>>>>>>> feasible.). >>>>>>>>>>>> >>>>>>>>>>>> Combining this strategy with the "multiple assignment" idea, >>> might >>>>>>>> even >>>>>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1 -> >> 1.2. >>>>>>>>>>>> Applications would just use the old store, as long as the new >>>>> store >>>>>> is >>>>>>>>>>>> not ready, even if the new metadata version is used already. >>>>>>>>>>>> >>>>>>>>>>>> For future upgrades, a single rebalance would be sufficient, >> too, >>>>>> even >>>>>>>>>>>> if the stores are upgraded. We would not need any config >>>>> parameters >>>>>> as >>>>>>>>>>>> the "probe" step allows us to detect the supported rebalance >>>>>> metadata >>>>>>>>>>>> version (and we would also not need multiple "assigmnent >>>>> strategies" >>>>>>>> as >>>>>>>>>>>> out own protocol encoded everything we need). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Let me know what you think. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote: >>>>>>>>>>>>> @John: >>>>>>>>>>>>> >>>>>>>>>>>>> For the protocol version upgrade, it is only for the encoded >>>>>> metadata >>>>>>>>>>>> bytes >>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from Consumer's >> pov, >>>>>> so I >>>>>>>>>>>> think >>>>>>>>>>>>> this change should be in the Streams layer as well. >>>>>>>>>>>>> >>>>>>>>>>>>> @Matthias: >>>>>>>>>>>>> >>>>>>>>>>>>> for 2), I agree that adding a "newest supported version" >> besides >>>>>> the >>>>>>>>>>>>> "currently used version for encoding" is a good idea to allow >>>>>> either >>>>>>>>>>>> case; >>>>>>>>>>>>> the key is that in Streams we would likely end up with a >> mapping >>>>>>>> from the >>>>>>>>>>>>> protocol version to the other persistent data format versions >>>>> such >>>>>> as >>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually achieve >>>>> both >>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded protocol >>>>>>>> version's >>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 -> >> 0.10.1 >>>>>>>> leaders >>>>>>>>>>>>> can choose to use the newer version in the first rolling >> bounce >>>>>>>> directly >>>>>>>>>>>>> and we can document to users that they would not need to set >>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded >>>>> protocol >>>>>>>>>>>> version >>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and >> then >>>>> we >>>>>>>> can >>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the first >>> rolling >>>>>>>> bounce >>>>>>>>>>>>> and reset in the second. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Besides that, some additional comments: >>>>>>>>>>>>> >>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for users to >>>>> set >>>>>>>> than >>>>>>>>>>>>> "internal.protocol.version" where for the latter users only >> need >>>>> to >>>>>>>> set a >>>>>>>>>>>>> single version, while the Streams will map that version to the >>>>>>>> Streams >>>>>>>>>>>>> assignor's behavior as well as the data format. But maybe I >> did >>>>> not >>>>>>>> get >>>>>>>>>>>>> your idea about how the "upgrade.from" config will be set, >>>>> because >>>>>>>> in >>>>>>>>>>>>> your Compatibility section how the upgrade.from config will be >>>>> set >>>>>>>> for >>>>>>>>>>>>> these two rolling bounces are not very clear: for example, >>> should >>>>>>>> user >>>>>>>>>>>>> reset it to null in the second rolling bounce? >>>>>>>>>>>>> >>>>>>>>>>>>> 2) In the upgrade path description, rather than talking about >>>>>>>> specific >>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize >> all >>>>>> the >>>>>>>>>>>>> possible scenarios, even for future upgrade versions, what >>> should >>>>>> be >>>>>>>> the >>>>>>>>>>>>> standard operations? The categorized we can summarize to would >>> be >>>>>>>>>>>> (assuming >>>>>>>>>>>>> user upgrade from version X to version Y, where X and Y are >>> Kafka >>>>>>>>>>>> versions, >>>>>>>>>>>>> with the corresponding supported protocol version x and y): >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and hence >> no >>>>>>>>>>>> persistent >>>>>>>>>>>>> data formats have changed. >>>>>>>>>>>>> >>>>>>>>>>>>> b. x != y, but all persistent data format remains the same. >>>>>>>>>>>>> >>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB format, >>>>>>>> changelog >>>>>>>>>>>>> format, has been changed. >>>>>>>>>>>>> >>>>>>>>>>>>> c. special case: we may need some special handling logic when >>>>>>>> "current >>>>>>>>>>>>> version" or "newest supported version" are not available in >> the >>>>>>>> protocol, >>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> under the above scenarios, how many rolling bounces users need >>> to >>>>>>>>>>>> execute? >>>>>>>>>>>>> how they should set the configs in each rolling bounce? and >> how >>>>>>>> Streams >>>>>>>>>>>>> library will execute in these cases? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Guozhang >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax < >>>>>>>> matth...@confluent.io> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Ted, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I still consider changing the KIP to include it right away -- >>> if >>>>>>>> not, >>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more detail >>>>> first. >>>>>>>>>>>>>> >>>>>>>>>>>>>> (Same for other open questions like interface names -- I >>> collect >>>>>>>>>>>>>> feedback and update the KIP after we reach consensus :)) >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote: >>>>>>>>>>>>>>> Thanks for the details, Matthias. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> bq. change the metadata protocol only if a future release, >>>>>> encoding >>>>>>>>>>>> both >>>>>>>>>>>>>> used >>>>>>>>>>>>>>> and supported version might be an advantage >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be implemented in >>>>> this >>>>>>>> KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding proposal. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax < >>>>>>>> matth...@confluent.io >>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of user >>> code. >>>>>> And >>>>>>>>>>>> even >>>>>>>>>>>>>>>> if we want to add something like this, I would prefer to do >>> it >>>>>> in >>>>>>>> a >>>>>>>>>>>>>>>> separate KIP. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling bounce. But >>>>> from >>>>>> my >>>>>>>>>>>>>>>> understanding it would not be possible. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to >> switch >>>>>> from >>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to switch >> from >>>>> the >>>>>>>> old >>>>>>>>>>>>>>>> store to the new store (this happens after the last >> instance >>>>>>>> bounces a >>>>>>>>>>>>>>>> second time). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The problem with one round of rolling bounces is, that it's >>>>>>>> unclear >>>>>>>>>>>> when >>>>>>>>>>>>>>>> to which from version 2 to version 3. The >>>>>>>> StreamsPartitionsAssignor is >>>>>>>>>>>>>>>> stateless by design, and thus, the information which >> version >>>>> it >>>>>>>> should >>>>>>>>>>>>>>>> use must be passed in from externally -- and we want to use >>>>> the >>>>>>>>>>>>>>>> StreamsConfig to pass in this information. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> During upgrade, all new instanced have no information about >>>>> the >>>>>>>>>>>> progress >>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got upgrades >>>>>>>> already). >>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3 >>>>>>>> subscription. >>>>>>>>>>>> The >>>>>>>>>>>>>>>> leader also has this limited view on the world and can only >>>>> send >>>>>>>>>>>> version >>>>>>>>>>>>>>>> 2 assignments back. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify >> the >>>>>>>> upgrade. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We did consider to change the metadata to make later >> upgrades >>>>>> (ie, >>>>>>>>>>>> from >>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the >>>>> metadata >>>>>> or >>>>>>>>>>>>>>>> storage format again -- as long as we don't change it, a >>>>> single >>>>>>>>>>>> rolling >>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and >>>>> "supported >>>>>>>>>>>>>>>> version". This would allow the leader to switch to the new >>>>>> version >>>>>>>>>>>>>>>> earlier and without a second rebalance: leader would >> receive >>>>>> "used >>>>>>>>>>>>>>>> version == old" and "supported version = old/new" -- as >> long >>>>> as >>>>>> at >>>>>>>>>>>> least >>>>>>>>>>>>>>>> one instance sends a "supported version = old" leader sends >>>>> old >>>>>>>>>>>> version >>>>>>>>>>>>>>>> assignment back. However, encoding both version would allow >>>>> that >>>>>>>> the >>>>>>>>>>>>>>>> leader can send a new version assignment back, right after >>> the >>>>>>>> first >>>>>>>>>>>>>>>> round or rebalance finished (all instances send "supported >>>>>>>> version = >>>>>>>>>>>>>>>> new"). However, there are still two issues with this: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1) if we switch to the new format right after the last >>>>> instance >>>>>>>>>>>> bounced, >>>>>>>>>>>>>>>> the new stores might not be ready to be used -- this could >>>>> lead >>>>>> to >>>>>>>>>>>>>>>> "downtime" as store must be restored before processing can >>>>>> resume. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. At this >>>>>>>> point, the >>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and thus >>> sends >>>>>>>> the old >>>>>>>>>>>>>>>> protocol data. However, it would be desirable to never fall >>>>> back >>>>>>>> to >>>>>>>>>>>> the >>>>>>>>>>>>>>>> old protocol after the switch to the new protocol. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The second issue is minor and I guess if users set-up the >>>>>> instance >>>>>>>>>>>>>>>> properly it could be avoided. However, the first issue >> would >>>>>>>> prevent >>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we consider >>>>> that >>>>>> we >>>>>>>>>>>> might >>>>>>>>>>>>>>>> change the metadata protocol only if a future release, >>>>> encoding >>>>>>>> both >>>>>>>>>>>>>>>> used and supported version might be an advantage in the >>> future >>>>>>>> and we >>>>>>>>>>>>>>>> could consider to add this information in 1.2 release to >>>>> prepare >>>>>>>> for >>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to give the >>>>>>>> instances >>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If you >> would >>>>> do >>>>>>>> the >>>>>>>>>>>>>>>> second rolling bounce before this, it would still work -- >>>>>>>> however, you >>>>>>>>>>>>>>>> might see app "downtime" as the new store must be fully >>>>> restored >>>>>>>>>>>> before >>>>>>>>>>>>>>>> processing can resume. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Does this make sense? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote: >>>>>>>>>>>>>>>>> Matthias, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid of >> the >>>>> 2nd >>>>>>>>>>>> rolling >>>>>>>>>>>>>>>> bounce? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary difference >>>>>>>> between >>>>>>>>>>>> the >>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide >>>>>>>> whether to >>>>>>>>>>>>>> send >>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3. >> (Actually, >>>>>>>> there is >>>>>>>>>>>>>>>> another difference mentioned in that the KIP says that the >>> 2nd >>>>>>>> rolling >>>>>>>>>>>>>>>> bounce should happen after all new state stores are created >>> by >>>>>> the >>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling bounce, >> we >>>>>> say >>>>>>>> that >>>>>>>>>>>>>>>> there is still a background thread, so it seems like is no >>>>>> actual >>>>>>>>>>>>>>>> requirement to wait for the new state stores to be >> created.) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal with >>>>>> mixed-mode >>>>>>>>>>>>>> (having >>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer group). >> It >>>>>> seems >>>>>>>>>>>> like >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic >>>>>>>> (somehow/somewhere) >>>>>>>>>>>>>> such >>>>>>>>>>>>>>>> that: >>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all >> instances >>>>> are >>>>>>>>>>>> running >>>>>>>>>>>>>>>> the new code. >>>>>>>>>>>>>>>>> * Once all the instances are running the new code, then >> one >>>>> at >>>>>> a >>>>>>>>>>>> time, >>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader still >>>>> hands >>>>>>>> out >>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores are ready. >>>>>>>>>>>>>>>>> * Once all instances report that new stores are ready, >>> Leader >>>>>>>> sends >>>>>>>>>>>> out >>>>>>>>>>>>>>>> Assignment Version 3. >>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3, it >> can >>>>>>>> delete >>>>>>>>>>>> the >>>>>>>>>>>>>>>> old state store. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot of >>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling >>>>> restarts. >>>>>> No >>>>>>>>>>>> need >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy it, >> and >>>>>> the >>>>>>>>>>>>>> instances >>>>>>>>>>>>>>>> update themselves. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What do you think? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The thing that made me think of this is that the "2 >> rolling >>>>>>>> bounces" >>>>>>>>>>>> is >>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in >>>>>>>>>>>>>>>> inter.broker.protocol.version and >> log.message.format.version. >>>>>> And >>>>>>>> in >>>>>>>>>>>> the >>>>>>>>>>>>>>>> broker case, it seems like it would be possible (with some >>>>> work >>>>>> of >>>>>>>>>>>>>> course) >>>>>>>>>>>>>>>> to modify kafka to allow us to do similar auto-detection of >>>>>> broker >>>>>>>>>>>>>>>> capabilities and automatically do a switchover from old/new >>>>>>>> versions. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -James >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck < >>> bbej...@gmail.com >>>>>> >>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Matthias, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval methods on >>>>> the >>>>>>>> new >>>>>>>>>>>>>>>>>> interfaces. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Would want to consider adding one method with a Predicate >>>>> that >>>>>>>> would >>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored with the >>>>> record? >>>>>>>> Or >>>>>>>>>>>> is >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>> better left for users to implement themselves once the >> data >>>>>> has >>>>>>>> been >>>>>>>>>>>>>>>>>> retrieved? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu < >>> yuzhih...@gmail.com >>>>>> >>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Matthias: >>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to which >>>>>> separator >>>>>>>> is >>>>>>>>>>>>>>>> chosen. >>>>>>>>>>>>>>>>>>> Given the background you mentioned, current choice is >>> good. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is closer >>> to >>>>>>>> English >>>>>>>>>>>>>>>>>>> grammar. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Would be good to listen to what other people think. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax < >>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the comments! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @Guozhang: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance metadata >>> upgrade >>>>>> fix >>>>>>>>>>>>>>>>>>>> (addressing the mentioned >>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It >>>>> give a >>>>>>>> first >>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works including a >>>>> system >>>>>>>> test: >>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. I >> agree >>>>>> that >>>>>>>> the >>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to give >> better >>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>>>> context. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @Ted: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the >>>>>>>>>>>> `processing.guarantee` >>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and >> `exactly_once` >>>>> as >>>>>>>>>>>> values. >>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash but I >>>>>> prefer >>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can also >>>>> change >>>>>>>> it to >>>>>>>>>>>>>>>> `-`. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way -- I >>>>> stripped >>>>>>>> the >>>>>>>>>>>>>>>> `With` >>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good to get >>>>>>>> feedback >>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @John: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would not >>>>> make a >>>>>>>>>>>>>>>> difference. >>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two version >>>>>> might >>>>>>>>>>>>>>>> introduce >>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I don't hit >> an >>>>>>>> issue >>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead of >>>>>>>> `rocksdb-v2` >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: >>>>>>>>>>>>>>>>>>>>> Hey Matthias, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several questions >> queued >>>>>> up, >>>>>>>> but >>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>> were >>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... oh, >> well. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state directory >>>>> from >>>>>>>>>>>>>>>>>>>> "/<state.dir>/< >>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to >>>>>>>> "/<state.dir>/< >>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if >>> you >>>>>>>> put the >>>>>>>>>>>>>>>> "v2" >>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e., >>>>>>>> "/<state.dir>/< >>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), >> then >>>>>> you >>>>>>>> get >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level directory >>>>>> structure. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people running >>>>>>>> scripts to >>>>>>>>>>>>>>>>>>> monitor >>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such use >>>>> cases. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu < >>>>>> yuzhih...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Matthias: >>>>>>>>>>>>>>>>>>>>>> Nicely written KIP. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may >>>>>>>> sometimes >>>>>>>>>>>> be >>>>>>>>>>>>>>>> miss >>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly to >>>>>> user. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> >>> { >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for >>>>> the >>>>>>>>>>>> class ? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang < >>>>>>>>>>>> wangg...@gmail.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section and it >>>>> looks >>>>>>>> good >>>>>>>>>>>> to >>>>>>>>>>>>>>>> me, >>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also share it >>>>> here >>>>>>>> so >>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> people >>>>>>>>>>>>>>>>>>>>>>> can take a look? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this >>>>> there >>>>>>>> are >>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is better >>> to >>>>>>>> have >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the design >>>>>>>> discussion :) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax < >>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to >>> allow >>>>>>>> storing >>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to >>>>>>>> resolve >>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this! >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature