Hello Matthias, thanks for your replies.
1) About the config names: actually I was trying to not expose implementation details :) My main concern was that in your proposal the values need to cover the span of all the versions that are actually using the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am upgrading from any versions within this range I need to remember to use the value "0.10.1.x-1.1.x" than just specifying my old version. In my suggestion I was trying to argue the benefit of just letting users to specify the actual Kafka version she's trying to upgrade from, than specifying a range of versions. I was not suggesting to use "v1, v2, v3" etc as the values, but still using Kafka versions like broker's `internal.version` config. But if you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you meant to say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or "1.1" which are all recognizable config values then I think we are actually on the same page. 2) About the "multi-assignment" idea: yes it would increase the network footprint, but not the message size, IF I'm not mis-understanding your idea of registering multiple assignment. More details: In the JoinGroupRequest, in the protocols field we can encode multiple protocols each with their different metadata. The coordinator will pick the common one that everyone supports (if there are no common one, it will send an error back; if there are multiple ones, it will pick the one with most votes, i.e. the one which was earlier in the encoded list). Since our current Streams rebalance protocol is still based on the consumer coordinator, it means our protocol_type would be "consumer", but instead the protocol type we can have multiple protocols like "streams", "streams_v2", "streams_v3" etc. The downside is that we need to implement a different assignor class for each version and register all of them in consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future if we re-factor our implementation to have our own client coordinator layer like Connect did, we can simplify this part of the implementation. But even for now with the above approach this is still doable. On the broker side, the group coordinator will only persist a group with the selected protocol and its subscription metadata, e.g. if coordinator decides to pick "streams_v2" it will only sends that protocol's metadata from everyone to the leader to assign, AND when completing the rebalance it will also only write the group metadata with that protocol and the assignment only. In a word, although the network traffic maybe increased a bit, it would not be a bummer in our trade-off. One corner situation we need to consider is how to stop registering very old assignors to avoid the network traffic from increasing indefinitely, e.g. if you are rolling bounce from v2 to v3, then you'd not need to register v1 assignor anymore, but that would unfortunately still require some configs. 3) About the "version probing" idea, I think that's a promising approach as well, but if we are going to do the multi-assignment its value seems subsumed? But I'm thinking maybe it can be added on top of multi-assignment to save us from still requiring the config to avoid registering all the metadata for all version. More details: In the JoinGroupRequest, we still register all the assignor but for all old assignors we do not encode any metadata, i.e. the encoded data would be: "streams_vN" : "encoded metadata" "streams_vN-1":empty "streams_vN-2":empty .. "streams_0":empty So the coordinator can still safely choose the latest common version; and then when leaders receive the subscription (note it should always recognize that version), let's say it is streams_vN-2, if one of the subscriptions are empty bytes, it will send the empty assignment with that version number encoded in the metadata. So in the second auto-triggered all members would send the metadata with that version: "streams_vN" : empty "streams_vN-1" : empty "streams_vN-2" : "encoded metadata" .. "streams_0":empty By doing this we would not require any configs for users. 4) About the "in_place" upgrade on rocksDB, I'm not clear about the details so probably we'd need to fill that out before making a call. For example, you mentioned "If we detect this situation, the Streams application closes corresponding active tasks as well as "hot standby" tasks, and re-creates the new active tasks using the new store." How could we guarantee that the gap between these two stores will keep decreasing than increasing so we'll eventually achieve the flip point? And also the longer we are before the flip point, the larger we are doubling the storage space, etc. Guozhang On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <matth...@confluent.io> wrote: > @John, Guozhang, > > thanks a lot for your comments. Very long reply... > > > About upgrading the rebalance metadata: > > Another possibility to do this, would be to register multiple assignment > strategies for the 1.2 applications. For this case, new instances would > be configured to support both and the broker would pick the version that > all instances understand. The disadvantage would be, that we send much > more data (ie, two subscriptions) in each rebalance as long as no second > rebalance is done disabling the old protocol. Thus, using this approach > would allow to avoid a second rebalance trading-off an increased > rebalance network footprint (I also assume that this would increase the > message size that is written into __consumer_offsets topic?). Overall, I > am not sure if this would be a good tradeoff, but it could avoid a > second rebalance (I have some more thoughts about stores below that are > relevant for single rebalance upgrade). > > For future upgrades we might be able to fix this though. I was thinking > about the following: > > In the current implementation, the leader fails if it gets a > subscription it does not understand (ie, newer version). We could change > this behavior and let the leader send an empty assignment plus error > code (including supported version) back to the instance sending the > "bad" subscription. This would allow the following logic for an > application instance: > > - on startup, always send the latest subscription format > - if leader understands it, we get an assignment back an start processing > - if leader does not understand it, we get an empty assignment and > supported version back > - the application unsubscribe()/subscribe()/poll() again and sends a > subscription using the leader's supported version > > This protocol would allow to do a single rolling bounce, and implements > a "version probing" step, that might result in two executed rebalances. > The advantage would be, that the user does not need to set any configs > or do multiple rolling bounces, as Streams takes care of this > automatically. > > One disadvantage would be, that two rebalances happen and that for an > error case during rebalance, we loose the information about the > supported leader version and the "probing step" would happen a second time. > > If the leader is eventually updated, it will include it's own supported > version in all assignments, to allow a "down graded" application to > upgrade its version later. Also, if a application fails, the first > probing would always be successful and only a single rebalance happens. > If we use this protocol, I think we don't need any configuration > parameter for future upgrades. > > > About "upgrade.from" vs "internal.protocol.version": > > Users would set "upgrade.from" to the release version the current/old > application is using. I think this is simpler, as users know this > version. If we use "internal.protocol.version" instead, we expose > implementation details and users need to know the protocol version (ie, > they need to map from the release version to the protocol version; ie, > "I am run 0.11.0 that runs with metadata protocol version 2"). > > Also the KIP states that for the second rolling bounce, the > "upgrade.mode" config should be set back to `null` -- and thus, > "upgrade.from" would not have any effect and is ignored (I will update > the KIP to point out this dependency). > > > > About your second point: I'll update the KIP accordingly to describe > future updates as well. Both will be different. > > > > One more point about upgrading the store format. I was thinking about > avoiding the second rolling bounce all together in the future: (1) the > goal is to achieve an upgrade with zero downtime (2) this required to > prepare the stores as "hot standbys" before we do the switch and delete > the old stores. (3) the current proposal does the switch "globally" -- > this is simpler and due to the required second rebalance no disadvantage. > However, a global consistent switch over might actually not be required. > For "in_place" upgrade, following the protocol from above, we could > decouple the store switch and each instance could switch its store > independently from all other instances. After the rolling bounce, it > seems to be ok to switch from the old store to the new store "under the > hood" whenever the new store is ready (this could even be done, before > we switch to the new metadata version). Each time we update the "hot > standby" we check if it reached the "endOffset" (or maybe X% that could > either be hardcoded or configurable). If we detect this situation, the > Streams application closes corresponding active tasks as well as "hot > standby" tasks, and re-creates the new active tasks using the new store. > (I need to go through the details once again, but it seems to be > feasible.). > > Combining this strategy with the "multiple assignment" idea, might even > enable us to do an single rolling bounce upgrade from 1.1 -> 1.2. > Applications would just use the old store, as long as the new store is > not ready, even if the new metadata version is used already. > > For future upgrades, a single rebalance would be sufficient, too, even > if the stores are upgraded. We would not need any config parameters as > the "probe" step allows us to detect the supported rebalance metadata > version (and we would also not need multiple "assigmnent strategies" as > out own protocol encoded everything we need). > > > Let me know what you think. > > > -Matthias > > > > On 3/9/18 10:33 PM, Guozhang Wang wrote: > > @John: > > > > For the protocol version upgrade, it is only for the encoded metadata > bytes > > protocol, which are just bytes-in bytes-out from Consumer's pov, so I > think > > this change should be in the Streams layer as well. > > > > @Matthias: > > > > for 2), I agree that adding a "newest supported version" besides the > > "currently used version for encoding" is a good idea to allow either > case; > > the key is that in Streams we would likely end up with a mapping from the > > protocol version to the other persistent data format versions such as > > rocksDB, changelog. So with such a map we can actually achieve both > > scenarios, i.e. 1) one rolling bounce if the upgraded protocol version's > > corresponding data format does not change, e.g. 0.10.0 -> 0.10.1 leaders > > can choose to use the newer version in the first rolling bounce directly > > and we can document to users that they would not need to set > > "upgrade.mode", and 2) two rolling bounce if the upgraded protocol > version > > does indicate the data format changes, e.g. 1.1 -> 1.2, and then we can > > document that "upgrade.mode" needs to be set in the first rolling bounce > > and reset in the second. > > > > > > Besides that, some additional comments: > > > > 1) I still think "upgrade.from" is less intuitive for users to set than > > "internal.protocol.version" where for the latter users only need to set a > > single version, while the Streams will map that version to the Streams > > assignor's behavior as well as the data format. But maybe I did not get > > your idea about how the "upgrade.from" config will be set, because in > > your Compatibility section how the upgrade.from config will be set for > > these two rolling bounces are not very clear: for example, should user > > reset it to null in the second rolling bounce? > > > > 2) In the upgrade path description, rather than talking about specific > > version 0.10.0 -> version 0.10.1 etc, can we just categorize all the > > possible scenarios, even for future upgrade versions, what should be the > > standard operations? The categorized we can summarize to would be > (assuming > > user upgrade from version X to version Y, where X and Y are Kafka > versions, > > with the corresponding supported protocol version x and y): > > > > > > a. x == y, i.e. metadata protocol does not change, and hence no > persistent > > data formats have changed. > > > > b. x != y, but all persistent data format remains the same. > > > > b. x !=y, AND some persistene data format like RocksDB format, changelog > > format, has been changed. > > > > c. special case: we may need some special handling logic when "current > > version" or "newest supported version" are not available in the protocol, > > i.e. for X as old as 0.10.0 and before 1.2. > > > > > > under the above scenarios, how many rolling bounces users need to > execute? > > how they should set the configs in each rolling bounce? and how Streams > > library will execute in these cases? > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Ted, > >> > >> I still consider changing the KIP to include it right away -- if not, > >> I'll create a JIRA. Need to think it through in more detail first. > >> > >> (Same for other open questions like interface names -- I collect > >> feedback and update the KIP after we reach consensus :)) > >> > >> -Matthias > >> > >> On 3/9/18 3:35 PM, Ted Yu wrote: > >>> Thanks for the details, Matthias. > >>> > >>> bq. change the metadata protocol only if a future release, encoding > both > >> used > >>> and supported version might be an advantage > >>> > >>> Looks like encoding both versions wouldn't be implemented in this KIP. > >>> > >>> Please consider logging a JIRA with the encoding proposal. > >>> > >>> Cheers > >>> > >>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <matth...@confluent.io > > > >>> wrote: > >>> > >>>> @Bill: I think a filter predicate should be part of user code. And > even > >>>> if we want to add something like this, I would prefer to do it in a > >>>> separate KIP. > >>>> > >>>> > >>>> @James: I would love to avoid a second rolling bounce. But from my > >>>> understanding it would not be possible. > >>>> > >>>> The purpose of the second rolling bounce is indeed to switch from > >>>> version 2 to 3. It also has a second purpose, to switch from the old > >>>> store to the new store (this happens after the last instance bounces a > >>>> second time). > >>>> > >>>> The problem with one round of rolling bounces is, that it's unclear > when > >>>> to which from version 2 to version 3. The StreamsPartitionsAssignor is > >>>> stateless by design, and thus, the information which version it should > >>>> use must be passed in from externally -- and we want to use the > >>>> StreamsConfig to pass in this information. > >>>> > >>>> During upgrade, all new instanced have no information about the > progress > >>>> of the upgrade (ie, how many other instanced got upgrades already). > >>>> Therefore, it's not safe for them to send a version 3 subscription. > The > >>>> leader also has this limited view on the world and can only send > version > >>>> 2 assignments back. > >>>> > >>>> Thus, for the 1.2 upgrade, I don't think we can simplify the upgrade. > >>>> > >>>> We did consider to change the metadata to make later upgrades (ie, > from > >>>> 1.2 to 1.x) simpler though (for the case we change the metadata or > >>>> storage format again -- as long as we don't change it, a single > rolling > >>>> bounce is sufficient), by encoding "used version" and "supported > >>>> version". This would allow the leader to switch to the new version > >>>> earlier and without a second rebalance: leader would receive "used > >>>> version == old" and "supported version = old/new" -- as long as at > least > >>>> one instance sends a "supported version = old" leader sends old > version > >>>> assignment back. However, encoding both version would allow that the > >>>> leader can send a new version assignment back, right after the first > >>>> round or rebalance finished (all instances send "supported version = > >>>> new"). However, there are still two issues with this: > >>>> > >>>> 1) if we switch to the new format right after the last instance > bounced, > >>>> the new stores might not be ready to be used -- this could lead to > >>>> "downtime" as store must be restored before processing can resume. > >>>> > >>>> 2) Assume an instance fails and is restarted again. At this point, the > >>>> instance will still have "upgrade mode" enabled and thus sends the old > >>>> protocol data. However, it would be desirable to never fall back to > the > >>>> old protocol after the switch to the new protocol. > >>>> > >>>> The second issue is minor and I guess if users set-up the instance > >>>> properly it could be avoided. However, the first issue would prevent > >>>> "zero downtime" upgrades. Having said this, if we consider that we > might > >>>> change the metadata protocol only if a future release, encoding both > >>>> used and supported version might be an advantage in the future and we > >>>> could consider to add this information in 1.2 release to prepare for > >> this. > >>>> > >>>> Btw: monitoring the log, is also only required to give the instances > >>>> enough time to prepare the stores in new format. If you would do the > >>>> second rolling bounce before this, it would still work -- however, you > >>>> might see app "downtime" as the new store must be fully restored > before > >>>> processing can resume. > >>>> > >>>> > >>>> Does this make sense? > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> > >>>> On 3/9/18 11:36 AM, James Cheng wrote: > >>>>> Matthias, > >>>>> > >>>>> For all the upgrade paths, is it possible to get rid of the 2nd > rolling > >>>> bounce? > >>>>> > >>>>> For the in-place upgrade, it seems like primary difference between > the > >>>> 1st rolling bounce and the 2nd rolling bounce is to decide whether to > >> send > >>>> Subscription Version 2 or Subscription Version 3. (Actually, there is > >>>> another difference mentioned in that the KIP says that the 2nd rolling > >>>> bounce should happen after all new state stores are created by the > >>>> background thread. However, within the 2nd rolling bounce, we say that > >>>> there is still a background thread, so it seems like is no actual > >>>> requirement to wait for the new state stores to be created.) > >>>>> > >>>>> The 2nd rolling bounce already knows how to deal with mixed-mode > >> (having > >>>> both Version 2 and Version 3 in the same consumer group). It seems > like > >> we > >>>> could get rid of the 2nd bounce if we added logic (somehow/somewhere) > >> such > >>>> that: > >>>>> * Instances send Subscription Version 2 until all instances are > running > >>>> the new code. > >>>>> * Once all the instances are running the new code, then one at a > time, > >>>> the instances start sending Subscription V3. Leader still hands out > >>>> Assignment Version 2, until all new state stores are ready. > >>>>> * Once all instances report that new stores are ready, Leader sends > out > >>>> Assignment Version 3. > >>>>> * Once an instance receives an Assignment Version 3, it can delete > the > >>>> old state store. > >>>>> > >>>>> Doing it that way seems like it would reduce a lot of > >>>> operator/deployment overhead. No need to do 2 rolling restarts. No > need > >> to > >>>> monitor logs for state store rebuild. You just deploy it, and the > >> instances > >>>> update themselves. > >>>>> > >>>>> What do you think? > >>>>> > >>>>> The thing that made me think of this is that the "2 rolling bounces" > is > >>>> similar to what Kafka brokers have to do changes in > >>>> inter.broker.protocol.version and log.message.format.version. And in > the > >>>> broker case, it seems like it would be possible (with some work of > >> course) > >>>> to modify kafka to allow us to do similar auto-detection of broker > >>>> capabilities and automatically do a switchover from old/new versions. > >>>>> > >>>>> -James > >>>>> > >>>>> > >>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <bbej...@gmail.com> wrote: > >>>>>> > >>>>>> Matthias, > >>>>>> > >>>>>> Thanks for the KIP, it's a +1 from me. > >>>>>> > >>>>>> I do have one question regarding the retrieval methods on the new > >>>>>> interfaces. > >>>>>> > >>>>>> Would want to consider adding one method with a Predicate that would > >>>> allow > >>>>>> for filtering records by the timestamp stored with the record? Or > is > >>>> this > >>>>>> better left for users to implement themselves once the data has been > >>>>>> retrieved? > >>>>>> > >>>>>> Thanks, > >>>>>> Bill > >>>>>> > >>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >>>>>> > >>>>>>> Matthias: > >>>>>>> For my point #1, I don't have preference as to which separator is > >>>> chosen. > >>>>>>> Given the background you mentioned, current choice is good. > >>>>>>> > >>>>>>> For #2, I think my proposal is better since it is closer to English > >>>>>>> grammar. > >>>>>>> > >>>>>>> Would be good to listen to what other people think. > >>>>>>> > >>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax < > >> matth...@confluent.io > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Thanks for the comments! > >>>>>>>> > >>>>>>>> @Guozhang: > >>>>>>>> > >>>>>>>> So far, there is one PR for the rebalance metadata upgrade fix > >>>>>>>> (addressing the mentioned > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It give a first > >>>>>>>> impression how the metadata upgrade works including a system test: > >>>>>>>> https://github.com/apache/kafka/pull/4636 > >>>>>>>> > >>>>>>>> I can share other PRs as soon as they are ready. I agree that the > >> KIP > >>>> is > >>>>>>>> complex am I ok with putting out more code to give better > discussion > >>>>>>>> context. > >>>>>>>> > >>>>>>>> @Ted: > >>>>>>>> > >>>>>>>> I picked `_` instead of `-` to align with the > `processing.guarantee` > >>>>>>>> parameter that accepts `at_least_one` and `exactly_once` as > values. > >>>>>>>> Personally, I don't care about underscore vs dash but I prefer > >>>>>>>> consistency. If you feel strong about it, we can also change it to > >>>> `-`. > >>>>>>>> > >>>>>>>> About the interface name: I am fine either way -- I stripped the > >>>> `With` > >>>>>>>> to keep the name a little shorter. Would be good to get feedback > >> from > >>>>>>>> others and pick the name the majority prefers. > >>>>>>>> > >>>>>>>> @John: > >>>>>>>> > >>>>>>>> We can certainly change it. I agree that it would not make a > >>>> difference. > >>>>>>>> I'll dig into the code to see if any of the two version might > >>>> introduce > >>>>>>>> undesired complexity and update the KIP if I don't hit an issue > with > >>>>>>>> putting the `-v2` to the store directory instead of `rocksdb-v2` > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: > >>>>>>>>> Hey Matthias, > >>>>>>>>> > >>>>>>>>> The KIP looks good to me. I had several questions queued up, but > >> they > >>>>>>>> were > >>>>>>>>> all in the "rejected alternatives" section... oh, well. > >>>>>>>>> > >>>>>>>>> One very minor thought re changing the state directory from > >>>>>>>> "/<state.dir>/< > >>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to "/<state.dir>/< > >>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if you put the > >>>> "v2" > >>>>>>>>> marker on the storeName part of the path (i.e., "/<state.dir>/< > >>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), then you get > >> the > >>>>>>> same > >>>>>>>>> benefits without altering the high-level directory structure. > >>>>>>>>> > >>>>>>>>> It may not matter, but I could imagine people running scripts to > >>>>>>> monitor > >>>>>>>>> rocksdb disk usage for each task, or other such use cases. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> -John > >>>>>>>>> > >>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <yuzhih...@gmail.com> > >> wrote: > >>>>>>>>> > >>>>>>>>>> Matthias: > >>>>>>>>>> Nicely written KIP. > >>>>>>>>>> > >>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may sometimes > be > >>>> miss > >>>>>>>>>> typed (as '-'). I think using '-' is more friendly to user. > >>>>>>>>>> > >>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> { > >>>>>>>>>> > >>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for the > class ? > >>>>>>>>>> > >>>>>>>>>> Thanks > >>>>>>>>>> > >>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang < > wangg...@gmail.com > >>> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hello Matthias, thanks for the KIP. > >>>>>>>>>>> > >>>>>>>>>>> I've read through the upgrade patch section and it looks good > to > >>>> me, > >>>>>>> if > >>>>>>>>>> you > >>>>>>>>>>> already have a WIP PR for it could you also share it here so > that > >>>>>>>> people > >>>>>>>>>>> can take a look? > >>>>>>>>>>> > >>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this there are > >> always > >>>>>>>> some > >>>>>>>>>>> devil hidden in the details, so I think it is better to have > the > >>>>>>>>>>> implementation in parallel along with the design discussion :) > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Guozhang > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax < > >>>>>>> matth...@confluent.io > >>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi, > >>>>>>>>>>>> > >>>>>>>>>>>> I want to propose KIP-258 for the Streams API to allow storing > >>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to resolve > >>>> multiple > >>>>>>>>>>>> tickets (issues and feature requests). > >>>>>>>>>>>> > >>>>>>>>>>>> Looking forward to your comments about this! > >>>>>>>>>>>> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> -- Guozhang > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > > > > > -- -- Guozhang