Hey James and Matthias, It seems clear (to me) that there's no way to avoid a double bounce for this release.
But I do think we should figure out whether there's a feature we can put in right now to allow future releases to be single-bounce. I'm just thinking that this double bounce thing is the kind of operational paper cut that become super annoying the second or third time you have to do it. Preliminarily, I think we should consider these two problems as orthogonal: (1) rolling upgrade to protocol versions (2) upgrading state store (rocks) format For (1), I think something like this would probably be a change to the Consumer API, not Streams, which would be a separate KIP. I think it's worth thinking about doing something along these lines in 1.2 to facilitate smooth future releases. But all these concepts are extremely muddy for me, and I'm aware that the Consumer API is extremely detailed and sensitive, so I'm planning to do a lot more reading before considering whether I want to file a KIP. For (2), it's worth noting that (1) already forces us to do a double-bounce, so refining the state-store upgrade right now doesn't buy us any operational simplicity. However, let's assume we separately decide to make some changes for (1) in 1.2 such that future releases could be single-bounce. Is there some groundwork that we need to lay in 1.2 to position us for online state store upgrades? I think actually we do not. I think I can show that there is some, say 1.3 implementation that can upgrade state stores online independent of any code in 1.2. For example, a 1.3 instance can start up, detect its 1.2-formatted store, include this information in its subscribe(). If it gets the subscription, it can create a 1.3 store and begin restoring it, monitoring the progress until it's ready to swap, and then swap. For this whole process to work, we don't depend on any features to already be in place in 1.2. Considering both of those points, I think Matthias's existing proposal is the best we can do within the scope of this KIP. -John P.S., it did just occur to me that tackling (2) right now buys us one small win: operators won't have to monitor the logs for store readiness before kicking off the second bounce. Is that alone enough of a win to justify tackling this right now? Not sure. On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <matth...@confluent.io> wrote: > @Bill: I think a filter predicate should be part of user code. And even > if we want to add something like this, I would prefer to do it in a > separate KIP. > > > @James: I would love to avoid a second rolling bounce. But from my > understanding it would not be possible. > > The purpose of the second rolling bounce is indeed to switch from > version 2 to 3. It also has a second purpose, to switch from the old > store to the new store (this happens after the last instance bounces a > second time). > > The problem with one round of rolling bounces is, that it's unclear when > to which from version 2 to version 3. The StreamsPartitionsAssignor is > stateless by design, and thus, the information which version it should > use must be passed in from externally -- and we want to use the > StreamsConfig to pass in this information. > > During upgrade, all new instanced have no information about the progress > of the upgrade (ie, how many other instanced got upgrades already). > Therefore, it's not safe for them to send a version 3 subscription. The > leader also has this limited view on the world and can only send version > 2 assignments back. > > Thus, for the 1.2 upgrade, I don't think we can simplify the upgrade. > > We did consider to change the metadata to make later upgrades (ie, from > 1.2 to 1.x) simpler though (for the case we change the metadata or > storage format again -- as long as we don't change it, a single rolling > bounce is sufficient), by encoding "used version" and "supported > version". This would allow the leader to switch to the new version > earlier and without a second rebalance: leader would receive "used > version == old" and "supported version = old/new" -- as long as at least > one instance sends a "supported version = old" leader sends old version > assignment back. However, encoding both version would allow that the > leader can send a new version assignment back, right after the first > round or rebalance finished (all instances send "supported version = > new"). However, there are still two issues with this: > > 1) if we switch to the new format right after the last instance bounced, > the new stores might not be ready to be used -- this could lead to > "downtime" as store must be restored before processing can resume. > > 2) Assume an instance fails and is restarted again. At this point, the > instance will still have "upgrade mode" enabled and thus sends the old > protocol data. However, it would be desirable to never fall back to the > old protocol after the switch to the new protocol. > > The second issue is minor and I guess if users set-up the instance > properly it could be avoided. However, the first issue would prevent > "zero downtime" upgrades. Having said this, if we consider that we might > change the metadata protocol only if a future release, encoding both > used and supported version might be an advantage in the future and we > could consider to add this information in 1.2 release to prepare for this. > > Btw: monitoring the log, is also only required to give the instances > enough time to prepare the stores in new format. If you would do the > second rolling bounce before this, it would still work -- however, you > might see app "downtime" as the new store must be fully restored before > processing can resume. > > > Does this make sense? > > > -Matthias > > > > On 3/9/18 11:36 AM, James Cheng wrote: > > Matthias, > > > > For all the upgrade paths, is it possible to get rid of the 2nd rolling > bounce? > > > > For the in-place upgrade, it seems like primary difference between the > 1st rolling bounce and the 2nd rolling bounce is to decide whether to send > Subscription Version 2 or Subscription Version 3. (Actually, there is > another difference mentioned in that the KIP says that the 2nd rolling > bounce should happen after all new state stores are created by the > background thread. However, within the 2nd rolling bounce, we say that > there is still a background thread, so it seems like is no actual > requirement to wait for the new state stores to be created.) > > > > The 2nd rolling bounce already knows how to deal with mixed-mode (having > both Version 2 and Version 3 in the same consumer group). It seems like we > could get rid of the 2nd bounce if we added logic (somehow/somewhere) such > that: > > * Instances send Subscription Version 2 until all instances are running > the new code. > > * Once all the instances are running the new code, then one at a time, > the instances start sending Subscription V3. Leader still hands out > Assignment Version 2, until all new state stores are ready. > > * Once all instances report that new stores are ready, Leader sends out > Assignment Version 3. > > * Once an instance receives an Assignment Version 3, it can delete the > old state store. > > > > Doing it that way seems like it would reduce a lot of > operator/deployment overhead. No need to do 2 rolling restarts. No need to > monitor logs for state store rebuild. You just deploy it, and the instances > update themselves. > > > > What do you think? > > > > The thing that made me think of this is that the "2 rolling bounces" is > similar to what Kafka brokers have to do changes in > inter.broker.protocol.version and log.message.format.version. And in the > broker case, it seems like it would be possible (with some work of course) > to modify kafka to allow us to do similar auto-detection of broker > capabilities and automatically do a switchover from old/new versions. > > > > -James > > > > > >> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <bbej...@gmail.com> wrote: > >> > >> Matthias, > >> > >> Thanks for the KIP, it's a +1 from me. > >> > >> I do have one question regarding the retrieval methods on the new > >> interfaces. > >> > >> Would want to consider adding one method with a Predicate that would > allow > >> for filtering records by the timestamp stored with the record? Or is > this > >> better left for users to implement themselves once the data has been > >> retrieved? > >> > >> Thanks, > >> Bill > >> > >> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> > >>> Matthias: > >>> For my point #1, I don't have preference as to which separator is > chosen. > >>> Given the background you mentioned, current choice is good. > >>> > >>> For #2, I think my proposal is better since it is closer to English > >>> grammar. > >>> > >>> Would be good to listen to what other people think. > >>> > >>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <matth...@confluent.io > > > >>> wrote: > >>> > >>>> Thanks for the comments! > >>>> > >>>> @Guozhang: > >>>> > >>>> So far, there is one PR for the rebalance metadata upgrade fix > >>>> (addressing the mentioned > >>>> https://issues.apache.org/jira/browse/KAFKA-6054) It give a first > >>>> impression how the metadata upgrade works including a system test: > >>>> https://github.com/apache/kafka/pull/4636 > >>>> > >>>> I can share other PRs as soon as they are ready. I agree that the KIP > is > >>>> complex am I ok with putting out more code to give better discussion > >>>> context. > >>>> > >>>> @Ted: > >>>> > >>>> I picked `_` instead of `-` to align with the `processing.guarantee` > >>>> parameter that accepts `at_least_one` and `exactly_once` as values. > >>>> Personally, I don't care about underscore vs dash but I prefer > >>>> consistency. If you feel strong about it, we can also change it to > `-`. > >>>> > >>>> About the interface name: I am fine either way -- I stripped the > `With` > >>>> to keep the name a little shorter. Would be good to get feedback from > >>>> others and pick the name the majority prefers. > >>>> > >>>> @John: > >>>> > >>>> We can certainly change it. I agree that it would not make a > difference. > >>>> I'll dig into the code to see if any of the two version might > introduce > >>>> undesired complexity and update the KIP if I don't hit an issue with > >>>> putting the `-v2` to the store directory instead of `rocksdb-v2` > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 3/8/18 2:44 PM, John Roesler wrote: > >>>>> Hey Matthias, > >>>>> > >>>>> The KIP looks good to me. I had several questions queued up, but they > >>>> were > >>>>> all in the "rejected alternatives" section... oh, well. > >>>>> > >>>>> One very minor thought re changing the state directory from > >>>> "/<state.dir>/< > >>>>> application.id>/<task.id>/rocksdb/storeName/" to "/<state.dir>/< > >>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if you put the > "v2" > >>>>> marker on the storeName part of the path (i.e., "/<state.dir>/< > >>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), then you get the > >>> same > >>>>> benefits without altering the high-level directory structure. > >>>>> > >>>>> It may not matter, but I could imagine people running scripts to > >>> monitor > >>>>> rocksdb disk usage for each task, or other such use cases. > >>>>> > >>>>> Thanks, > >>>>> -John > >>>>> > >>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >>>>> > >>>>>> Matthias: > >>>>>> Nicely written KIP. > >>>>>> > >>>>>> "in_place" : can this be "in-place" ? Underscore may sometimes be > miss > >>>>>> typed (as '-'). I think using '-' is more friendly to user. > >>>>>> > >>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> { > >>>>>> > >>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for the class ? > >>>>>> > >>>>>> Thanks > >>>>>> > >>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>>> > >>>>>>> Hello Matthias, thanks for the KIP. > >>>>>>> > >>>>>>> I've read through the upgrade patch section and it looks good to > me, > >>> if > >>>>>> you > >>>>>>> already have a WIP PR for it could you also share it here so that > >>>> people > >>>>>>> can take a look? > >>>>>>> > >>>>>>> I'm +1 on the KIP itself. But large KIPs like this there are always > >>>> some > >>>>>>> devil hidden in the details, so I think it is better to have the > >>>>>>> implementation in parallel along with the design discussion :) > >>>>>>> > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax < > >>> matth...@confluent.io > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> I want to propose KIP-258 for the Streams API to allow storing > >>>>>>>> timestamps in RocksDB. This feature is the basis to resolve > multiple > >>>>>>>> tickets (issues and feature requests). > >>>>>>>> > >>>>>>>> Looking forward to your comments about this! > >>>>>>>> > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > > > >