Thanks for the details, Matthias. bq. change the metadata protocol only if a future release, encoding both used and supported version might be an advantage
Looks like encoding both versions wouldn't be implemented in this KIP. Please consider logging a JIRA with the encoding proposal. Cheers On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <matth...@confluent.io> wrote: > @Bill: I think a filter predicate should be part of user code. And even > if we want to add something like this, I would prefer to do it in a > separate KIP. > > > @James: I would love to avoid a second rolling bounce. But from my > understanding it would not be possible. > > The purpose of the second rolling bounce is indeed to switch from > version 2 to 3. It also has a second purpose, to switch from the old > store to the new store (this happens after the last instance bounces a > second time). > > The problem with one round of rolling bounces is, that it's unclear when > to which from version 2 to version 3. The StreamsPartitionsAssignor is > stateless by design, and thus, the information which version it should > use must be passed in from externally -- and we want to use the > StreamsConfig to pass in this information. > > During upgrade, all new instanced have no information about the progress > of the upgrade (ie, how many other instanced got upgrades already). > Therefore, it's not safe for them to send a version 3 subscription. The > leader also has this limited view on the world and can only send version > 2 assignments back. > > Thus, for the 1.2 upgrade, I don't think we can simplify the upgrade. > > We did consider to change the metadata to make later upgrades (ie, from > 1.2 to 1.x) simpler though (for the case we change the metadata or > storage format again -- as long as we don't change it, a single rolling > bounce is sufficient), by encoding "used version" and "supported > version". This would allow the leader to switch to the new version > earlier and without a second rebalance: leader would receive "used > version == old" and "supported version = old/new" -- as long as at least > one instance sends a "supported version = old" leader sends old version > assignment back. However, encoding both version would allow that the > leader can send a new version assignment back, right after the first > round or rebalance finished (all instances send "supported version = > new"). However, there are still two issues with this: > > 1) if we switch to the new format right after the last instance bounced, > the new stores might not be ready to be used -- this could lead to > "downtime" as store must be restored before processing can resume. > > 2) Assume an instance fails and is restarted again. At this point, the > instance will still have "upgrade mode" enabled and thus sends the old > protocol data. However, it would be desirable to never fall back to the > old protocol after the switch to the new protocol. > > The second issue is minor and I guess if users set-up the instance > properly it could be avoided. However, the first issue would prevent > "zero downtime" upgrades. Having said this, if we consider that we might > change the metadata protocol only if a future release, encoding both > used and supported version might be an advantage in the future and we > could consider to add this information in 1.2 release to prepare for this. > > Btw: monitoring the log, is also only required to give the instances > enough time to prepare the stores in new format. If you would do the > second rolling bounce before this, it would still work -- however, you > might see app "downtime" as the new store must be fully restored before > processing can resume. > > > Does this make sense? > > > -Matthias > > > > On 3/9/18 11:36 AM, James Cheng wrote: > > Matthias, > > > > For all the upgrade paths, is it possible to get rid of the 2nd rolling > bounce? > > > > For the in-place upgrade, it seems like primary difference between the > 1st rolling bounce and the 2nd rolling bounce is to decide whether to send > Subscription Version 2 or Subscription Version 3. (Actually, there is > another difference mentioned in that the KIP says that the 2nd rolling > bounce should happen after all new state stores are created by the > background thread. However, within the 2nd rolling bounce, we say that > there is still a background thread, so it seems like is no actual > requirement to wait for the new state stores to be created.) > > > > The 2nd rolling bounce already knows how to deal with mixed-mode (having > both Version 2 and Version 3 in the same consumer group). It seems like we > could get rid of the 2nd bounce if we added logic (somehow/somewhere) such > that: > > * Instances send Subscription Version 2 until all instances are running > the new code. > > * Once all the instances are running the new code, then one at a time, > the instances start sending Subscription V3. Leader still hands out > Assignment Version 2, until all new state stores are ready. > > * Once all instances report that new stores are ready, Leader sends out > Assignment Version 3. > > * Once an instance receives an Assignment Version 3, it can delete the > old state store. > > > > Doing it that way seems like it would reduce a lot of > operator/deployment overhead. No need to do 2 rolling restarts. No need to > monitor logs for state store rebuild. You just deploy it, and the instances > update themselves. > > > > What do you think? > > > > The thing that made me think of this is that the "2 rolling bounces" is > similar to what Kafka brokers have to do changes in > inter.broker.protocol.version and log.message.format.version. And in the > broker case, it seems like it would be possible (with some work of course) > to modify kafka to allow us to do similar auto-detection of broker > capabilities and automatically do a switchover from old/new versions. > > > > -James > > > > > >> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <bbej...@gmail.com> wrote: > >> > >> Matthias, > >> > >> Thanks for the KIP, it's a +1 from me. > >> > >> I do have one question regarding the retrieval methods on the new > >> interfaces. > >> > >> Would want to consider adding one method with a Predicate that would > allow > >> for filtering records by the timestamp stored with the record? Or is > this > >> better left for users to implement themselves once the data has been > >> retrieved? > >> > >> Thanks, > >> Bill > >> > >> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> > >>> Matthias: > >>> For my point #1, I don't have preference as to which separator is > chosen. > >>> Given the background you mentioned, current choice is good. > >>> > >>> For #2, I think my proposal is better since it is closer to English > >>> grammar. > >>> > >>> Would be good to listen to what other people think. > >>> > >>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <matth...@confluent.io > > > >>> wrote: > >>> > >>>> Thanks for the comments! > >>>> > >>>> @Guozhang: > >>>> > >>>> So far, there is one PR for the rebalance metadata upgrade fix > >>>> (addressing the mentioned > >>>> https://issues.apache.org/jira/browse/KAFKA-6054) It give a first > >>>> impression how the metadata upgrade works including a system test: > >>>> https://github.com/apache/kafka/pull/4636 > >>>> > >>>> I can share other PRs as soon as they are ready. I agree that the KIP > is > >>>> complex am I ok with putting out more code to give better discussion > >>>> context. > >>>> > >>>> @Ted: > >>>> > >>>> I picked `_` instead of `-` to align with the `processing.guarantee` > >>>> parameter that accepts `at_least_one` and `exactly_once` as values. > >>>> Personally, I don't care about underscore vs dash but I prefer > >>>> consistency. If you feel strong about it, we can also change it to > `-`. > >>>> > >>>> About the interface name: I am fine either way -- I stripped the > `With` > >>>> to keep the name a little shorter. Would be good to get feedback from > >>>> others and pick the name the majority prefers. > >>>> > >>>> @John: > >>>> > >>>> We can certainly change it. I agree that it would not make a > difference. > >>>> I'll dig into the code to see if any of the two version might > introduce > >>>> undesired complexity and update the KIP if I don't hit an issue with > >>>> putting the `-v2` to the store directory instead of `rocksdb-v2` > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 3/8/18 2:44 PM, John Roesler wrote: > >>>>> Hey Matthias, > >>>>> > >>>>> The KIP looks good to me. I had several questions queued up, but they > >>>> were > >>>>> all in the "rejected alternatives" section... oh, well. > >>>>> > >>>>> One very minor thought re changing the state directory from > >>>> "/<state.dir>/< > >>>>> application.id>/<task.id>/rocksdb/storeName/" to "/<state.dir>/< > >>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if you put the > "v2" > >>>>> marker on the storeName part of the path (i.e., "/<state.dir>/< > >>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), then you get the > >>> same > >>>>> benefits without altering the high-level directory structure. > >>>>> > >>>>> It may not matter, but I could imagine people running scripts to > >>> monitor > >>>>> rocksdb disk usage for each task, or other such use cases. > >>>>> > >>>>> Thanks, > >>>>> -John > >>>>> > >>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >>>>> > >>>>>> Matthias: > >>>>>> Nicely written KIP. > >>>>>> > >>>>>> "in_place" : can this be "in-place" ? Underscore may sometimes be > miss > >>>>>> typed (as '-'). I think using '-' is more friendly to user. > >>>>>> > >>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> { > >>>>>> > >>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for the class ? > >>>>>> > >>>>>> Thanks > >>>>>> > >>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>>> > >>>>>>> Hello Matthias, thanks for the KIP. > >>>>>>> > >>>>>>> I've read through the upgrade patch section and it looks good to > me, > >>> if > >>>>>> you > >>>>>>> already have a WIP PR for it could you also share it here so that > >>>> people > >>>>>>> can take a look? > >>>>>>> > >>>>>>> I'm +1 on the KIP itself. But large KIPs like this there are always > >>>> some > >>>>>>> devil hidden in the details, so I think it is better to have the > >>>>>>> implementation in parallel along with the design discussion :) > >>>>>>> > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax < > >>> matth...@confluent.io > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> I want to propose KIP-258 for the Streams API to allow storing > >>>>>>>> timestamps in RocksDB. This feature is the basis to resolve > multiple > >>>>>>>> tickets (issues and feature requests). > >>>>>>>> > >>>>>>>> Looking forward to your comments about this! > >>>>>>>> > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > > > >