After some more thoughts, I want to follow John's suggestion and split upgrading the rebalance metadata from the store upgrade.
I extracted the metadata upgrade into it's own KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade I'll update this KIP accordingly shortly. I also want to consider to make the store format upgrade more flexible/generic. Atm, the KIP is too much tailored to the DSL IMHO and does not encounter PAPI users that we should not force to upgrade the stores. I need to figure out the details and follow up later. Please give feedback for the new KIP-268 on the corresponding discussion thread. @James: unfortunately, for upgrading to 1.2 I couldn't figure out a way for a single rolling bounce upgrade. But KIP-268 proposes a fix for future upgrades. Please share your thoughts. Thanks for all your feedback! -Matthias On 3/12/18 11:56 PM, Matthias J. Sax wrote: > @John: yes, we would throw if configs are missing (it's an > implementation details IMHO and thus I did not include it in the KIP) > > @Guozhang: > > 1) I understand know what you mean. We can certainly, allow all values > "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for `upgrade.from` > parameter. I had a similar though once but decided to collapse them into > one -- will update the KIP accordingly. > > 2) The idea to avoid any config would be, to always send both request. > If we add a config to eventually disable the old request, we don't gain > anything with this approach. The question is really, if we are willing > to pay this overhead from 1.2 on -- note, it would be limited to 2 > versions and not grow further in future releases. More details in (3) > > 3) Yes, this approach subsumes (2) for later releases and allows us to > stay with 2 "assignment strategies" we need to register, as the new > assignment strategy will allow to "upgrade itself" via "version > probing". Thus, (2) would only be a workaround to avoid a config if > people upgrade from pre-1.2 releases. > > Thus, I don't think we need to register new "assignment strategies" and > send empty subscriptions for older version. > > 4) I agree that this is a tricky thing to get right with a single > rebalance. I share the concern that an application might never catch up > and thus the hot standby will never be ready. > > Maybe it's better to go with 2 rebalances for store upgrades. If we do > this, we also don't need to go with (2) and can get (3) in place for > future upgrades. I also think that changes to the metadata are more > likely and thus allowing for single rolling bounce for this case is more > important anyway. If we assume that store upgrade a rare, it might be ok > to sacrifice two rolling bounced for this case. It was just an idea I > wanted to share (even if I see the issues). > > > -Matthias > > > > On 3/12/18 11:45 AM, Guozhang Wang wrote: >> Hello Matthias, thanks for your replies. >> >> >> 1) About the config names: actually I was trying to not expose >> implementation details :) My main concern was that in your proposal the >> values need to cover the span of all the versions that are actually using >> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am upgrading >> from any versions within this range I need to remember to use the value >> "0.10.1.x-1.1.x" than just specifying my old version. In my suggestion I >> was trying to argue the benefit of just letting users to specify the actual >> Kafka version she's trying to upgrade from, than specifying a range of >> versions. I was not suggesting to use "v1, v2, v3" etc as the values, but >> still using Kafka versions like broker's `internal.version` config. But if >> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you meant to >> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or "1.1" which >> are all recognizable config values then I think we are actually on the same >> page. >> >> 2) About the "multi-assignment" idea: yes it would increase the network >> footprint, but not the message size, IF I'm not mis-understanding your idea >> of registering multiple assignment. More details: >> >> In the JoinGroupRequest, in the protocols field we can encode multiple >> protocols each with their different metadata. The coordinator will pick the >> common one that everyone supports (if there are no common one, it will send >> an error back; if there are multiple ones, it will pick the one with most >> votes, i.e. the one which was earlier in the encoded list). Since our >> current Streams rebalance protocol is still based on the consumer >> coordinator, it means our protocol_type would be "consumer", but instead >> the protocol type we can have multiple protocols like "streams", >> "streams_v2", "streams_v3" etc. The downside is that we need to implement a >> different assignor class for each version and register all of them in >> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future if we >> re-factor our implementation to have our own client coordinator layer like >> Connect did, we can simplify this part of the implementation. But even for >> now with the above approach this is still doable. >> >> On the broker side, the group coordinator will only persist a group with >> the selected protocol and its subscription metadata, e.g. if coordinator >> decides to pick "streams_v2" it will only sends that protocol's metadata >> from everyone to the leader to assign, AND when completing the rebalance it >> will also only write the group metadata with that protocol and the >> assignment only. In a word, although the network traffic maybe increased a >> bit, it would not be a bummer in our trade-off. One corner situation we >> need to consider is how to stop registering very old assignors to avoid the >> network traffic from increasing indefinitely, e.g. if you are rolling >> bounce from v2 to v3, then you'd not need to register v1 assignor anymore, >> but that would unfortunately still require some configs. >> >> 3) About the "version probing" idea, I think that's a promising approach >> as well, but if we are going to do the multi-assignment its value seems >> subsumed? But I'm thinking maybe it can be added on top of multi-assignment >> to save us from still requiring the config to avoid registering all the >> metadata for all version. More details: >> >> In the JoinGroupRequest, we still register all the assignor but for all old >> assignors we do not encode any metadata, i.e. the encoded data would be: >> >> "streams_vN" : "encoded metadata" >> "streams_vN-1":empty >> "streams_vN-2":empty >> .. >> "streams_0":empty >> >> So the coordinator can still safely choose the latest common version; and >> then when leaders receive the subscription (note it should always recognize >> that version), let's say it is streams_vN-2, if one of the subscriptions >> are empty bytes, it will send the empty assignment with that version number >> encoded in the metadata. So in the second auto-triggered all members would >> send the metadata with that version: >> >> "streams_vN" : empty >> "streams_vN-1" : empty >> "streams_vN-2" : "encoded metadata" >> .. >> "streams_0":empty >> >> >> By doing this we would not require any configs for users. >> >> >> 4) About the "in_place" upgrade on rocksDB, I'm not clear about the details >> so probably we'd need to fill that out before making a call. For example, >> you mentioned "If we detect this situation, the Streams application closes >> corresponding active tasks as well as "hot standby" tasks, and re-creates >> the new active tasks using the new store." How could we guarantee that the >> gap between these two stores will keep decreasing than increasing so we'll >> eventually achieve the flip point? And also the longer we are before the >> flip point, the larger we are doubling the storage space, etc. >> >> >> >> Guozhang >> >> >> >> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> @John, Guozhang, >>> >>> thanks a lot for your comments. Very long reply... >>> >>> >>> About upgrading the rebalance metadata: >>> >>> Another possibility to do this, would be to register multiple assignment >>> strategies for the 1.2 applications. For this case, new instances would >>> be configured to support both and the broker would pick the version that >>> all instances understand. The disadvantage would be, that we send much >>> more data (ie, two subscriptions) in each rebalance as long as no second >>> rebalance is done disabling the old protocol. Thus, using this approach >>> would allow to avoid a second rebalance trading-off an increased >>> rebalance network footprint (I also assume that this would increase the >>> message size that is written into __consumer_offsets topic?). Overall, I >>> am not sure if this would be a good tradeoff, but it could avoid a >>> second rebalance (I have some more thoughts about stores below that are >>> relevant for single rebalance upgrade). >>> >>> For future upgrades we might be able to fix this though. I was thinking >>> about the following: >>> >>> In the current implementation, the leader fails if it gets a >>> subscription it does not understand (ie, newer version). We could change >>> this behavior and let the leader send an empty assignment plus error >>> code (including supported version) back to the instance sending the >>> "bad" subscription. This would allow the following logic for an >>> application instance: >>> >>> - on startup, always send the latest subscription format >>> - if leader understands it, we get an assignment back an start processing >>> - if leader does not understand it, we get an empty assignment and >>> supported version back >>> - the application unsubscribe()/subscribe()/poll() again and sends a >>> subscription using the leader's supported version >>> >>> This protocol would allow to do a single rolling bounce, and implements >>> a "version probing" step, that might result in two executed rebalances. >>> The advantage would be, that the user does not need to set any configs >>> or do multiple rolling bounces, as Streams takes care of this >>> automatically. >>> >>> One disadvantage would be, that two rebalances happen and that for an >>> error case during rebalance, we loose the information about the >>> supported leader version and the "probing step" would happen a second time. >>> >>> If the leader is eventually updated, it will include it's own supported >>> version in all assignments, to allow a "down graded" application to >>> upgrade its version later. Also, if a application fails, the first >>> probing would always be successful and only a single rebalance happens. >>> If we use this protocol, I think we don't need any configuration >>> parameter for future upgrades. >>> >>> >>> About "upgrade.from" vs "internal.protocol.version": >>> >>> Users would set "upgrade.from" to the release version the current/old >>> application is using. I think this is simpler, as users know this >>> version. If we use "internal.protocol.version" instead, we expose >>> implementation details and users need to know the protocol version (ie, >>> they need to map from the release version to the protocol version; ie, >>> "I am run 0.11.0 that runs with metadata protocol version 2"). >>> >>> Also the KIP states that for the second rolling bounce, the >>> "upgrade.mode" config should be set back to `null` -- and thus, >>> "upgrade.from" would not have any effect and is ignored (I will update >>> the KIP to point out this dependency). >>> >>> >>> >>> About your second point: I'll update the KIP accordingly to describe >>> future updates as well. Both will be different. >>> >>> >>> >>> One more point about upgrading the store format. I was thinking about >>> avoiding the second rolling bounce all together in the future: (1) the >>> goal is to achieve an upgrade with zero downtime (2) this required to >>> prepare the stores as "hot standbys" before we do the switch and delete >>> the old stores. (3) the current proposal does the switch "globally" -- >>> this is simpler and due to the required second rebalance no disadvantage. >>> However, a global consistent switch over might actually not be required. >>> For "in_place" upgrade, following the protocol from above, we could >>> decouple the store switch and each instance could switch its store >>> independently from all other instances. After the rolling bounce, it >>> seems to be ok to switch from the old store to the new store "under the >>> hood" whenever the new store is ready (this could even be done, before >>> we switch to the new metadata version). Each time we update the "hot >>> standby" we check if it reached the "endOffset" (or maybe X% that could >>> either be hardcoded or configurable). If we detect this situation, the >>> Streams application closes corresponding active tasks as well as "hot >>> standby" tasks, and re-creates the new active tasks using the new store. >>> (I need to go through the details once again, but it seems to be >>> feasible.). >>> >>> Combining this strategy with the "multiple assignment" idea, might even >>> enable us to do an single rolling bounce upgrade from 1.1 -> 1.2. >>> Applications would just use the old store, as long as the new store is >>> not ready, even if the new metadata version is used already. >>> >>> For future upgrades, a single rebalance would be sufficient, too, even >>> if the stores are upgraded. We would not need any config parameters as >>> the "probe" step allows us to detect the supported rebalance metadata >>> version (and we would also not need multiple "assigmnent strategies" as >>> out own protocol encoded everything we need). >>> >>> >>> Let me know what you think. >>> >>> >>> -Matthias >>> >>> >>> >>> On 3/9/18 10:33 PM, Guozhang Wang wrote: >>>> @John: >>>> >>>> For the protocol version upgrade, it is only for the encoded metadata >>> bytes >>>> protocol, which are just bytes-in bytes-out from Consumer's pov, so I >>> think >>>> this change should be in the Streams layer as well. >>>> >>>> @Matthias: >>>> >>>> for 2), I agree that adding a "newest supported version" besides the >>>> "currently used version for encoding" is a good idea to allow either >>> case; >>>> the key is that in Streams we would likely end up with a mapping from the >>>> protocol version to the other persistent data format versions such as >>>> rocksDB, changelog. So with such a map we can actually achieve both >>>> scenarios, i.e. 1) one rolling bounce if the upgraded protocol version's >>>> corresponding data format does not change, e.g. 0.10.0 -> 0.10.1 leaders >>>> can choose to use the newer version in the first rolling bounce directly >>>> and we can document to users that they would not need to set >>>> "upgrade.mode", and 2) two rolling bounce if the upgraded protocol >>> version >>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and then we can >>>> document that "upgrade.mode" needs to be set in the first rolling bounce >>>> and reset in the second. >>>> >>>> >>>> Besides that, some additional comments: >>>> >>>> 1) I still think "upgrade.from" is less intuitive for users to set than >>>> "internal.protocol.version" where for the latter users only need to set a >>>> single version, while the Streams will map that version to the Streams >>>> assignor's behavior as well as the data format. But maybe I did not get >>>> your idea about how the "upgrade.from" config will be set, because in >>>> your Compatibility section how the upgrade.from config will be set for >>>> these two rolling bounces are not very clear: for example, should user >>>> reset it to null in the second rolling bounce? >>>> >>>> 2) In the upgrade path description, rather than talking about specific >>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize all the >>>> possible scenarios, even for future upgrade versions, what should be the >>>> standard operations? The categorized we can summarize to would be >>> (assuming >>>> user upgrade from version X to version Y, where X and Y are Kafka >>> versions, >>>> with the corresponding supported protocol version x and y): >>>> >>>> >>>> a. x == y, i.e. metadata protocol does not change, and hence no >>> persistent >>>> data formats have changed. >>>> >>>> b. x != y, but all persistent data format remains the same. >>>> >>>> b. x !=y, AND some persistene data format like RocksDB format, changelog >>>> format, has been changed. >>>> >>>> c. special case: we may need some special handling logic when "current >>>> version" or "newest supported version" are not available in the protocol, >>>> i.e. for X as old as 0.10.0 and before 1.2. >>>> >>>> >>>> under the above scenarios, how many rolling bounces users need to >>> execute? >>>> how they should set the configs in each rolling bounce? and how Streams >>>> library will execute in these cases? >>>> >>>> >>>> >>>> Guozhang >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> Ted, >>>>> >>>>> I still consider changing the KIP to include it right away -- if not, >>>>> I'll create a JIRA. Need to think it through in more detail first. >>>>> >>>>> (Same for other open questions like interface names -- I collect >>>>> feedback and update the KIP after we reach consensus :)) >>>>> >>>>> -Matthias >>>>> >>>>> On 3/9/18 3:35 PM, Ted Yu wrote: >>>>>> Thanks for the details, Matthias. >>>>>> >>>>>> bq. change the metadata protocol only if a future release, encoding >>> both >>>>> used >>>>>> and supported version might be an advantage >>>>>> >>>>>> Looks like encoding both versions wouldn't be implemented in this KIP. >>>>>> >>>>>> Please consider logging a JIRA with the encoding proposal. >>>>>> >>>>>> Cheers >>>>>> >>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <matth...@confluent.io >>>> >>>>>> wrote: >>>>>> >>>>>>> @Bill: I think a filter predicate should be part of user code. And >>> even >>>>>>> if we want to add something like this, I would prefer to do it in a >>>>>>> separate KIP. >>>>>>> >>>>>>> >>>>>>> @James: I would love to avoid a second rolling bounce. But from my >>>>>>> understanding it would not be possible. >>>>>>> >>>>>>> The purpose of the second rolling bounce is indeed to switch from >>>>>>> version 2 to 3. It also has a second purpose, to switch from the old >>>>>>> store to the new store (this happens after the last instance bounces a >>>>>>> second time). >>>>>>> >>>>>>> The problem with one round of rolling bounces is, that it's unclear >>> when >>>>>>> to which from version 2 to version 3. The StreamsPartitionsAssignor is >>>>>>> stateless by design, and thus, the information which version it should >>>>>>> use must be passed in from externally -- and we want to use the >>>>>>> StreamsConfig to pass in this information. >>>>>>> >>>>>>> During upgrade, all new instanced have no information about the >>> progress >>>>>>> of the upgrade (ie, how many other instanced got upgrades already). >>>>>>> Therefore, it's not safe for them to send a version 3 subscription. >>> The >>>>>>> leader also has this limited view on the world and can only send >>> version >>>>>>> 2 assignments back. >>>>>>> >>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify the upgrade. >>>>>>> >>>>>>> We did consider to change the metadata to make later upgrades (ie, >>> from >>>>>>> 1.2 to 1.x) simpler though (for the case we change the metadata or >>>>>>> storage format again -- as long as we don't change it, a single >>> rolling >>>>>>> bounce is sufficient), by encoding "used version" and "supported >>>>>>> version". This would allow the leader to switch to the new version >>>>>>> earlier and without a second rebalance: leader would receive "used >>>>>>> version == old" and "supported version = old/new" -- as long as at >>> least >>>>>>> one instance sends a "supported version = old" leader sends old >>> version >>>>>>> assignment back. However, encoding both version would allow that the >>>>>>> leader can send a new version assignment back, right after the first >>>>>>> round or rebalance finished (all instances send "supported version = >>>>>>> new"). However, there are still two issues with this: >>>>>>> >>>>>>> 1) if we switch to the new format right after the last instance >>> bounced, >>>>>>> the new stores might not be ready to be used -- this could lead to >>>>>>> "downtime" as store must be restored before processing can resume. >>>>>>> >>>>>>> 2) Assume an instance fails and is restarted again. At this point, the >>>>>>> instance will still have "upgrade mode" enabled and thus sends the old >>>>>>> protocol data. However, it would be desirable to never fall back to >>> the >>>>>>> old protocol after the switch to the new protocol. >>>>>>> >>>>>>> The second issue is minor and I guess if users set-up the instance >>>>>>> properly it could be avoided. However, the first issue would prevent >>>>>>> "zero downtime" upgrades. Having said this, if we consider that we >>> might >>>>>>> change the metadata protocol only if a future release, encoding both >>>>>>> used and supported version might be an advantage in the future and we >>>>>>> could consider to add this information in 1.2 release to prepare for >>>>> this. >>>>>>> >>>>>>> Btw: monitoring the log, is also only required to give the instances >>>>>>> enough time to prepare the stores in new format. If you would do the >>>>>>> second rolling bounce before this, it would still work -- however, you >>>>>>> might see app "downtime" as the new store must be fully restored >>> before >>>>>>> processing can resume. >>>>>>> >>>>>>> >>>>>>> Does this make sense? >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 3/9/18 11:36 AM, James Cheng wrote: >>>>>>>> Matthias, >>>>>>>> >>>>>>>> For all the upgrade paths, is it possible to get rid of the 2nd >>> rolling >>>>>>> bounce? >>>>>>>> >>>>>>>> For the in-place upgrade, it seems like primary difference between >>> the >>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide whether to >>>>> send >>>>>>> Subscription Version 2 or Subscription Version 3. (Actually, there is >>>>>>> another difference mentioned in that the KIP says that the 2nd rolling >>>>>>> bounce should happen after all new state stores are created by the >>>>>>> background thread. However, within the 2nd rolling bounce, we say that >>>>>>> there is still a background thread, so it seems like is no actual >>>>>>> requirement to wait for the new state stores to be created.) >>>>>>>> >>>>>>>> The 2nd rolling bounce already knows how to deal with mixed-mode >>>>> (having >>>>>>> both Version 2 and Version 3 in the same consumer group). It seems >>> like >>>>> we >>>>>>> could get rid of the 2nd bounce if we added logic (somehow/somewhere) >>>>> such >>>>>>> that: >>>>>>>> * Instances send Subscription Version 2 until all instances are >>> running >>>>>>> the new code. >>>>>>>> * Once all the instances are running the new code, then one at a >>> time, >>>>>>> the instances start sending Subscription V3. Leader still hands out >>>>>>> Assignment Version 2, until all new state stores are ready. >>>>>>>> * Once all instances report that new stores are ready, Leader sends >>> out >>>>>>> Assignment Version 3. >>>>>>>> * Once an instance receives an Assignment Version 3, it can delete >>> the >>>>>>> old state store. >>>>>>>> >>>>>>>> Doing it that way seems like it would reduce a lot of >>>>>>> operator/deployment overhead. No need to do 2 rolling restarts. No >>> need >>>>> to >>>>>>> monitor logs for state store rebuild. You just deploy it, and the >>>>> instances >>>>>>> update themselves. >>>>>>>> >>>>>>>> What do you think? >>>>>>>> >>>>>>>> The thing that made me think of this is that the "2 rolling bounces" >>> is >>>>>>> similar to what Kafka brokers have to do changes in >>>>>>> inter.broker.protocol.version and log.message.format.version. And in >>> the >>>>>>> broker case, it seems like it would be possible (with some work of >>>>> course) >>>>>>> to modify kafka to allow us to do similar auto-detection of broker >>>>>>> capabilities and automatically do a switchover from old/new versions. >>>>>>>> >>>>>>>> -James >>>>>>>> >>>>>>>> >>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <bbej...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Matthias, >>>>>>>>> >>>>>>>>> Thanks for the KIP, it's a +1 from me. >>>>>>>>> >>>>>>>>> I do have one question regarding the retrieval methods on the new >>>>>>>>> interfaces. >>>>>>>>> >>>>>>>>> Would want to consider adding one method with a Predicate that would >>>>>>> allow >>>>>>>>> for filtering records by the timestamp stored with the record? Or >>> is >>>>>>> this >>>>>>>>> better left for users to implement themselves once the data has been >>>>>>>>> retrieved? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Bill >>>>>>>>> >>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Matthias: >>>>>>>>>> For my point #1, I don't have preference as to which separator is >>>>>>> chosen. >>>>>>>>>> Given the background you mentioned, current choice is good. >>>>>>>>>> >>>>>>>>>> For #2, I think my proposal is better since it is closer to English >>>>>>>>>> grammar. >>>>>>>>>> >>>>>>>>>> Would be good to listen to what other people think. >>>>>>>>>> >>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax < >>>>> matth...@confluent.io >>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for the comments! >>>>>>>>>>> >>>>>>>>>>> @Guozhang: >>>>>>>>>>> >>>>>>>>>>> So far, there is one PR for the rebalance metadata upgrade fix >>>>>>>>>>> (addressing the mentioned >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It give a first >>>>>>>>>>> impression how the metadata upgrade works including a system test: >>>>>>>>>>> https://github.com/apache/kafka/pull/4636 >>>>>>>>>>> >>>>>>>>>>> I can share other PRs as soon as they are ready. I agree that the >>>>> KIP >>>>>>> is >>>>>>>>>>> complex am I ok with putting out more code to give better >>> discussion >>>>>>>>>>> context. >>>>>>>>>>> >>>>>>>>>>> @Ted: >>>>>>>>>>> >>>>>>>>>>> I picked `_` instead of `-` to align with the >>> `processing.guarantee` >>>>>>>>>>> parameter that accepts `at_least_one` and `exactly_once` as >>> values. >>>>>>>>>>> Personally, I don't care about underscore vs dash but I prefer >>>>>>>>>>> consistency. If you feel strong about it, we can also change it to >>>>>>> `-`. >>>>>>>>>>> >>>>>>>>>>> About the interface name: I am fine either way -- I stripped the >>>>>>> `With` >>>>>>>>>>> to keep the name a little shorter. Would be good to get feedback >>>>> from >>>>>>>>>>> others and pick the name the majority prefers. >>>>>>>>>>> >>>>>>>>>>> @John: >>>>>>>>>>> >>>>>>>>>>> We can certainly change it. I agree that it would not make a >>>>>>> difference. >>>>>>>>>>> I'll dig into the code to see if any of the two version might >>>>>>> introduce >>>>>>>>>>> undesired complexity and update the KIP if I don't hit an issue >>> with >>>>>>>>>>> putting the `-v2` to the store directory instead of `rocksdb-v2` >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: >>>>>>>>>>>> Hey Matthias, >>>>>>>>>>>> >>>>>>>>>>>> The KIP looks good to me. I had several questions queued up, but >>>>> they >>>>>>>>>>> were >>>>>>>>>>>> all in the "rejected alternatives" section... oh, well. >>>>>>>>>>>> >>>>>>>>>>>> One very minor thought re changing the state directory from >>>>>>>>>>> "/<state.dir>/< >>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to "/<state.dir>/< >>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if you put the >>>>>>> "v2" >>>>>>>>>>>> marker on the storeName part of the path (i.e., "/<state.dir>/< >>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), then you get >>>>> the >>>>>>>>>> same >>>>>>>>>>>> benefits without altering the high-level directory structure. >>>>>>>>>>>> >>>>>>>>>>>> It may not matter, but I could imagine people running scripts to >>>>>>>>>> monitor >>>>>>>>>>>> rocksdb disk usage for each task, or other such use cases. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> -John >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <yuzhih...@gmail.com> >>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Matthias: >>>>>>>>>>>>> Nicely written KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may sometimes >>> be >>>>>>> miss >>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly to user. >>>>>>>>>>>>> >>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> { >>>>>>>>>>>>> >>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for the >>> class ? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang < >>> wangg...@gmail.com >>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello Matthias, thanks for the KIP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I've read through the upgrade patch section and it looks good >>> to >>>>>>> me, >>>>>>>>>> if >>>>>>>>>>>>> you >>>>>>>>>>>>>> already have a WIP PR for it could you also share it here so >>> that >>>>>>>>>>> people >>>>>>>>>>>>>> can take a look? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this there are >>>>> always >>>>>>>>>>> some >>>>>>>>>>>>>> devil hidden in the details, so I think it is better to have >>> the >>>>>>>>>>>>>> implementation in parallel along with the design discussion :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax < >>>>>>>>>> matth...@confluent.io >>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to allow storing >>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to resolve >>>>>>> multiple >>>>>>>>>>>>>>> tickets (issues and feature requests). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Looking forward to your comments about this! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature