Hello Matthias I am curious as to the status of this KIP. TTL and expiry of records will be extremely useful for several of our business use-cases, as well as another KIP I had been working on.
Thanks On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <eno.there...@gmail.com> wrote: > Hi Matthias, > > Good stuff. Could you comment a bit on how future-proof is this change? For > example, if we want to store both event timestamp "and" processing time in > RocksDB will we then need another interface (e.g. called > KeyValueWithTwoTimestampsStore)? > > Thanks > Eno > > On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Thanks for your input Guozhang and John. > > > > I see your point, that the upgrade API is not simple. If you don't > > thinks it's valuable to make generic store upgrades possible (atm), we > > can make the API internal, too. The impact is, that we only support a > > predefined set up upgrades (ie, KV to KVwithTs, Windowed to > > WindowedWithTS etc) for which we implement the internal interfaces. > > > > We can keep the design generic, so if we decide to make it public, we > > don't need to re-invent it. This will also have the advantage, that we > > can add upgrade pattern for other stores later, too. > > > > I also agree, that the `StoreUpgradeBuilder` is a little ugly, but it > > was the only way I could find to design a generic upgrade interface. If > > we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` would > > become an internal interface I guess (don't think we can remove it). > > > > I will wait for more feedback about this and if nobody wants to keep it > > as public API I will update the KIP accordingly. Will add some more > > clarifications for different upgrade patterns in the mean time and fix > > the typos/minor issues. > > > > About adding a new state UPGRADING: maybe we could do that. However, I > > find it particularly difficult to make the estimation when we should > > switch to RUNNING, thus, I am a little hesitant. Using store callbacks > > or just logging the progress including some indication about the "lag" > > might actually be sufficient. Not sure what others think? > > > > About "value before timestamp": no real reason and I think it does not > > make any difference. Do you want to change it? > > > > About upgrade robustness: yes, we cannot control if an instance fails. > > That is what I meant by "we need to write test". The upgrade should be > > able to continuous even is an instance goes down (and we must make sure > > that we don't end up in an invalid state that forces us to wipe out the > > whole store). Thus, we need to write system tests that fail instances > > during upgrade. > > > > For `in_place_offline` upgrade: I don't think we need this mode, because > > people can do this via a single rolling bounce. > > > > - prepare code and switch KV-Store to KVwithTs-Store > > - do a single rolling bounce (don't set any upgrade config) > > > > For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we > > remove the `StoreUpgradeBuilder`) will detect that there is only an old > > local KV store w/o TS, will start to restore the new KVwithTs store, > > wipe out the old store and replace with the new store after restore is > > finished, and start processing only afterwards. (I guess we need to > > document this case -- will also add it to the KIP.) > > > > > > > > -Matthias > > > > > > > > On 8/9/18 1:10 PM, John Roesler wrote: > > > Hi Matthias, > > > > > > I think this KIP is looking really good. > > > > > > I have a few thoughts to add to the others: > > > > > > 1. You mentioned at one point users needing to configure > > > `upgrade.mode="null"`. I think this was a typo and you meant to say > they > > > should remove the config. If they really have to set it to a string > > "null" > > > or even set it to a null value but not remove it, it would be > > unfortunate. > > > > > > 2. In response to Bill's comment #1 , you said that "The idea is that > the > > > upgrade should be robust and not fail. We need to write according > tests". > > > I may have misunderstood the conversation, but I don't think it's > within > > > our power to say that an instance won't fail. What if one of my > computers > > > catches on fire? What if I'm deployed in the cloud and one instance > > > disappears and is replaced by a new one? Or what if one instance goes > > AWOL > > > for a long time and then suddenly returns? How will the upgrade process > > > behave in light of such failures? > > > > > > 3. your thought about making in-place an offline mode is interesting, > but > > > it might be a bummer for on-prem users who wish to upgrade online, but > > > cannot just add new machines to the pool. It could be a new upgrade > mode > > > "offline-in-place", though... > > > > > > 4. I was surprised to see that a user would need to modify the topology > > to > > > do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's > > > suggestions would remove this necessity. > > > > > > Thanks for taking on this very complex but necessary work. > > > > > > -John > > > > > > On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Hello Matthias, > > >> > > >> Thanks for the updated KIP. Some more comments: > > >> > > >> 1. The current set of proposed API is a bit too complicated, which > makes > > >> the upgrade flow from user's perspective also a bit complex. I'd like > to > > >> check different APIs and discuss about their needs separately: > > >> > > >> 1.a. StoreProxy: needed for in-place upgrade only, between the > first > > >> and second rolling bounce, where the old-versioned stores can handle > > >> new-versioned store APIs. I think such upgrade paths (i.e. from one > > store > > >> type to another) would not be very common: users may want to upgrade > > from a > > >> certain store engine to another, but the interface would likely be > > staying > > >> the same. Hence personally I'd suggest we keep it internally and only > > >> consider exposing it in the future if it does become a common pattern. > > >> > > >> 1.b. ConverterStore / RecordConverter: needed for both in-place > and > > >> roll-over upgrade, between the first and second rolling bounces, for > the > > >> new versioned store to be able to read old-versioned changelog topics. > > >> Firstly I think we should not expose key in the public APIs but only > the > > >> values, since allowing key format changes would break log compaction, > > and > > >> hence would not be compatible anyways. As for value format changes, > > >> personally I think we can also keep its upgrade logic internally as it > > may > > >> not worth generalizing to user customizable logic. > > >> > > >> 1.c. If you agrees with 2.a/b above, then we can also remove " > > >> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the public > > APIs. > > >> > > >> 1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" is > not > > >> needed either given that we are exposing "ValueAndTimestamp" anyways. > > I.e. > > >> it is just a syntax sugar and for IQ, users can always just set a " > > >> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" as the > > new > > >> interface does not provide any additional functions. > > >> > > >> > > >> 2. Could we further categorize the upgrade flow for different use > cases, > > >> e.g. 1) DSL users where KeyValueWithTimestampStore will be used > > >> automatically for non-windowed aggregate; 2) PAPI users who do not > need > > to > > >> use KeyValueWithTimestampStore; 3) PAPI users who do want to switch to > > >> KeyValueWithTimestampStore. Just to give my understanding for 3), the > > >> upgrade flow for users may be simplified as the following (for both > > >> in-place and roll-over): > > >> > > >> * Update the jar to new version, make code changes from > > KeyValueStore > > >> to KeyValueWithTimestampStore, set upgrade config. > > >> > > >> * First rolling bounce, and library code can internally use proxy > / > > >> converter based on the specified config to handle new APIs with old > > stores, > > >> while let new stores read from old changelog data. > > >> > > >> * Reset upgrade config. > > >> > > >> * Second rolling bounce, and the library code automatically turn > off > > >> logic for proxy / converter. > > >> > > >> > > >> 3. Some more detailed proposals are needed for when to recommend users > > to > > >> trigger the second rolling bounce. I have one idea to share here: we > > add a > > >> new state to KafkaStreams, say UPGRADING, which is set when 1) upgrade > > >> config is set, and 2) the new stores are still ramping up (for the > > second > > >> part, we can start with some internal hard-coded heuristics to decide > > when > > >> it is close to be ramped up). If either one of it is not true any > more, > > it > > >> should transit to RUNNING. Users can then watch on this state, and > > decide > > >> to only trigger the second rebalance when the state has transited from > > >> UPGRADING. They can also choose to cut over while the instance is > still > > >> UPGRADING, the downside is that after that the application may have > long > > >> restoration phase which is, to user's pov, unavailability periods. > > >> > > >> > > >> Below are just some minor things on the wiki: > > >> > > >> 4. "proxy story" => "proxy store". > > >> > > >> 5. "use the a builder " => "use a builder" > > >> > > >> 6: "we add the record timestamp as a 8-byte (long) prefix to the > value": > > >> what's the rationale of putting the timestamp before the value, than > > after > > >> the value? > > >> > > >> > > >> > > >> Guozhang > > >> > > >> > > >> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax < > matth...@confluent.io> > > >> wrote: > > >> > > >>> Thanks for the feedback Bill. I just update the KIP with some of your > > >>> points. > > >>> > > >>>>> Regarding step 3C of the in-place upgrade (users needing to watch > the > > >>>>> restore process), I'm wondering if we want to provide a type of > > >>>>> StateRestoreListener that could signal when the new stores have > > >> reached > > >>>>> parity with the existing old stores and that could be the signal to > > >>> start > > >>>>> second rolling rebalance? > > >>> > > >>> I think we can reuse the existing listeners, thus, I did not include > > >>> anything in the KIP. About a signal to rebalance: this might be > tricky. > > >>> If we prepare the store "online", the active task will update the > state > > >>> continuously, and thus, state prepare is never finished. It will be > the > > >>> users responsibility to do the second rebalance (note, that the > second > > >>> rebalance will first finish the last delta of the upgrade to finish > the > > >>> upgrade before actual processing resumes). I clarified the KIP with > > this > > >>> regard a little bit. > > >>> > > >>>>> 1. Out of N instances, one fails midway through the process, would > we > > >>> allow > > >>>>> the other instances to complete or just fail the entire upgrade? > > >>> > > >>> The idea is that the upgrade should be robust and not fail. We need > to > > >>> write according tests. > > >>> > > >>>>> 2. During the second rolling bounce, maybe we could rename the > > current > > >>>>> active directories vs. deleting them right away, and when all the > > >>> prepare > > >>>>> task directories are successfully migrated then delete the previous > > >>> active > > >>>>> ones. > > >>> > > >>> Ack. Updated the KIP. > > >>> > > >>>>> 3. For the first rolling bounce we pause any processing any new > > >> records > > >>> and > > >>>>> just allow the prepare tasks to restore, then once all prepare > tasks > > >>> have > > >>>>> restored, it's a signal for the second round of rolling bounces and > > >>> then as > > >>>>> each task successfully renames its prepare directories and deletes > > the > > >>> old > > >>>>> active task directories, normal processing of records resumes. > > >>> > > >>> The basic idea is to do an online upgrade to avoid downtime. We can > > >>> discuss to offer both options... For the offline upgrade option, we > > >>> could simplify user interaction and trigger the second rebalance > > >>> automatically with the requirement that a user needs to update any > > >> config. > > >>> > > >>> If might actually be worth to include this option: we know from > > >>> experience with state restore, that regular processing slows down the > > >>> restore. For roll_over upgrade, it would be a different story and > > >>> upgrade should not be slowed down by regular processing. Thus, we > > should > > >>> even make in_place an offline upgrade and force people to use > roll_over > > >>> if they need onlint upgrade. Might be a fair tradeoff that may > simplify > > >>> the upgrade for the user and for the code complexity. > > >>> > > >>> Let's see what other think. > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> On 7/27/18 12:53 PM, Bill Bejeck wrote: > > >>>> Hi Matthias, > > >>>> > > >>>> Thanks for the update and the working prototype, it helps with > > >>>> understanding the KIP. > > >>>> > > >>>> I took an initial pass over this PR, and overall I find the > interfaces > > >>> and > > >>>> approach to be reasonable. > > >>>> > > >>>> Regarding step 3C of the in-place upgrade (users needing to watch > the > > >>>> restore process), I'm wondering if we want to provide a type of > > >>>> StateRestoreListener that could signal when the new stores have > > reached > > >>>> parity with the existing old stores and that could be the signal to > > >> start > > >>>> second rolling rebalance? > > >>>> > > >>>> Although you solicited feedback on the interfaces involved, I wanted > > to > > >>> put > > >>>> down some thoughts that have come to mind reviewing this KIP again > > >>>> > > >>>> 1. Out of N instances, one fails midway through the process, would > we > > >>> allow > > >>>> the other instances to complete or just fail the entire upgrade? > > >>>> 2. During the second rolling bounce, maybe we could rename the > current > > >>>> active directories vs. deleting them right away, and when all the > > >>> prepare > > >>>> task directories are successfully migrated then delete the previous > > >>> active > > >>>> ones. > > >>>> 3. For the first rolling bounce we pause any processing any new > > records > > >>> and > > >>>> just allow the prepare tasks to restore, then once all prepare tasks > > >> have > > >>>> restored, it's a signal for the second round of rolling bounces and > > >> then > > >>> as > > >>>> each task successfully renames its prepare directories and deletes > the > > >>> old > > >>>> active task directories, normal processing of records resumes. > > >>>> > > >>>> Thanks, > > >>>> Bill > > >>>> > > >>>> > > >>>> > > >>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax < > > matth...@confluent.io > > >>> > > >>>> wrote: > > >>>> > > >>>>> Hi, > > >>>>> > > >>>>> KIP-268 (rebalance meatadata) is finished and included in AK 2.0 > > >>>>> release. Thus, I want to pick up this KIP again to get the RocksDB > > >>>>> upgrade done for 2.1. > > >>>>> > > >>>>> I updated the KIP accordingly and also have a "prove of concept" PR > > >>>>> ready (for "in place" upgrade only): > > >>>>> https://github.com/apache/kafka/pull/5422/ > > >>>>> > > >>>>> There a still open questions, but I want to collect early feedback > on > > >>>>> the proposed interfaces we need for the store upgrade. Also note, > > that > > >>>>> the KIP now also aim to define a generic upgrade path from any > store > > >>>>> format A to any other store format B. Adding timestamps is just a > > >>>>> special case. > > >>>>> > > >>>>> I will continue to work on the PR and refine the KIP in the > meantime, > > >>> too. > > >>>>> > > >>>>> Looking forward to your feedback. > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> > > >>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote: > > >>>>>> After some more thoughts, I want to follow John's suggestion and > > >> split > > >>>>>> upgrading the rebalance metadata from the store upgrade. > > >>>>>> > > >>>>>> I extracted the metadata upgrade into it's own KIP: > > >>>>>> > > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268% > > >>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade > > >>>>>> > > >>>>>> I'll update this KIP accordingly shortly. I also want to consider > to > > >>>>>> make the store format upgrade more flexible/generic. Atm, the KIP > is > > >>> too > > >>>>>> much tailored to the DSL IMHO and does not encounter PAPI users > that > > >> we > > >>>>>> should not force to upgrade the stores. I need to figure out the > > >>> details > > >>>>>> and follow up later. > > >>>>>> > > >>>>>> Please give feedback for the new KIP-268 on the corresponding > > >>> discussion > > >>>>>> thread. > > >>>>>> > > >>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't figure out > a > > >> way > > >>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a fix > for > > >>>>>> future upgrades. Please share your thoughts. > > >>>>>> > > >>>>>> Thanks for all your feedback! > > >>>>>> > > >>>>>> -Matthias > > >>>>>> > > >>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote: > > >>>>>>> @John: yes, we would throw if configs are missing (it's an > > >>>>>>> implementation details IMHO and thus I did not include it in the > > >> KIP) > > >>>>>>> > > >>>>>>> @Guozhang: > > >>>>>>> > > >>>>>>> 1) I understand know what you mean. We can certainly, allow all > > >> values > > >>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for > `upgrade.from` > > >>>>>>> parameter. I had a similar though once but decided to collapse > them > > >>> into > > >>>>>>> one -- will update the KIP accordingly. > > >>>>>>> > > >>>>>>> 2) The idea to avoid any config would be, to always send both > > >> request. > > >>>>>>> If we add a config to eventually disable the old request, we > don't > > >>> gain > > >>>>>>> anything with this approach. The question is really, if we are > > >> willing > > >>>>>>> to pay this overhead from 1.2 on -- note, it would be limited to > 2 > > >>>>>>> versions and not grow further in future releases. More details in > > >> (3) > > >>>>>>> > > >>>>>>> 3) Yes, this approach subsumes (2) for later releases and allows > us > > >> to > > >>>>>>> stay with 2 "assignment strategies" we need to register, as the > new > > >>>>>>> assignment strategy will allow to "upgrade itself" via "version > > >>>>>>> probing". Thus, (2) would only be a workaround to avoid a config > if > > >>>>>>> people upgrade from pre-1.2 releases. > > >>>>>>> > > >>>>>>> Thus, I don't think we need to register new "assignment > strategies" > > >>> and > > >>>>>>> send empty subscriptions for older version. > > >>>>>>> > > >>>>>>> 4) I agree that this is a tricky thing to get right with a single > > >>>>>>> rebalance. I share the concern that an application might never > > catch > > >>> up > > >>>>>>> and thus the hot standby will never be ready. > > >>>>>>> > > >>>>>>> Maybe it's better to go with 2 rebalances for store upgrades. If > we > > >> do > > >>>>>>> this, we also don't need to go with (2) and can get (3) in place > > for > > >>>>>>> future upgrades. I also think that changes to the metadata are > more > > >>>>>>> likely and thus allowing for single rolling bounce for this case > is > > >>> more > > >>>>>>> important anyway. If we assume that store upgrade a rare, it > might > > >> be > > >>> ok > > >>>>>>> to sacrifice two rolling bounced for this case. It was just an > idea > > >> I > > >>>>>>> wanted to share (even if I see the issues). > > >>>>>>> > > >>>>>>> > > >>>>>>> -Matthias > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote: > > >>>>>>>> Hello Matthias, thanks for your replies. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 1) About the config names: actually I was trying to not expose > > >>>>>>>> implementation details :) My main concern was that in your > > proposal > > >>> the > > >>>>>>>> values need to cover the span of all the versions that are > > actually > > >>>>> using > > >>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am > > >>>>> upgrading > > >>>>>>>> from any versions within this range I need to remember to use > the > > >>> value > > >>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In my > > >>> suggestion > > >>>>> I > > >>>>>>>> was trying to argue the benefit of just letting users to specify > > >> the > > >>>>> actual > > >>>>>>>> Kafka version she's trying to upgrade from, than specifying a > > range > > >>> of > > >>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as the > > >> values, > > >>>>> but > > >>>>>>>> still using Kafka versions like broker's `internal.version` > > config. > > >>>>> But if > > >>>>>>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you > > >>> meant > > >>>>> to > > >>>>>>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or > > >> "1.1" > > >>>>> which > > >>>>>>>> are all recognizable config values then I think we are actually > on > > >>> the > > >>>>> same > > >>>>>>>> page. > > >>>>>>>> > > >>>>>>>> 2) About the "multi-assignment" idea: yes it would increase the > > >>> network > > >>>>>>>> footprint, but not the message size, IF I'm not > mis-understanding > > >>> your > > >>>>> idea > > >>>>>>>> of registering multiple assignment. More details: > > >>>>>>>> > > >>>>>>>> In the JoinGroupRequest, in the protocols field we can encode > > >>> multiple > > >>>>>>>> protocols each with their different metadata. The coordinator > will > > >>>>> pick the > > >>>>>>>> common one that everyone supports (if there are no common one, > it > > >>> will > > >>>>> send > > >>>>>>>> an error back; if there are multiple ones, it will pick the one > > >> with > > >>>>> most > > >>>>>>>> votes, i.e. the one which was earlier in the encoded list). > Since > > >> our > > >>>>>>>> current Streams rebalance protocol is still based on the > consumer > > >>>>>>>> coordinator, it means our protocol_type would be "consumer", but > > >>>>> instead > > >>>>>>>> the protocol type we can have multiple protocols like "streams", > > >>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we need to > > >>>>> implement a > > >>>>>>>> different assignor class for each version and register all of > them > > >> in > > >>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future > if > > >> we > > >>>>>>>> re-factor our implementation to have our own client coordinator > > >> layer > > >>>>> like > > >>>>>>>> Connect did, we can simplify this part of the implementation. > But > > >>> even > > >>>>> for > > >>>>>>>> now with the above approach this is still doable. > > >>>>>>>> > > >>>>>>>> On the broker side, the group coordinator will only persist a > > group > > >>>>> with > > >>>>>>>> the selected protocol and its subscription metadata, e.g. if > > >>>>> coordinator > > >>>>>>>> decides to pick "streams_v2" it will only sends that protocol's > > >>>>> metadata > > >>>>>>>> from everyone to the leader to assign, AND when completing the > > >>>>> rebalance it > > >>>>>>>> will also only write the group metadata with that protocol and > the > > >>>>>>>> assignment only. In a word, although the network traffic maybe > > >>>>> increased a > > >>>>>>>> bit, it would not be a bummer in our trade-off. One corner > > >> situation > > >>> we > > >>>>>>>> need to consider is how to stop registering very old assignors > to > > >>>>> avoid the > > >>>>>>>> network traffic from increasing indefinitely, e.g. if you are > > >> rolling > > >>>>>>>> bounce from v2 to v3, then you'd not need to register v1 > assignor > > >>>>> anymore, > > >>>>>>>> but that would unfortunately still require some configs. > > >>>>>>>> > > >>>>>>>> 3) About the "version probing" idea, I think that's a promising > > >>>>> approach > > >>>>>>>> as well, but if we are going to do the multi-assignment its > value > > >>> seems > > >>>>>>>> subsumed? But I'm thinking maybe it can be added on top of > > >>>>> multi-assignment > > >>>>>>>> to save us from still requiring the config to avoid registering > > all > > >>> the > > >>>>>>>> metadata for all version. More details: > > >>>>>>>> > > >>>>>>>> In the JoinGroupRequest, we still register all the assignor but > > for > > >>>>> all old > > >>>>>>>> assignors we do not encode any metadata, i.e. the encoded data > > >> would > > >>>>> be: > > >>>>>>>> > > >>>>>>>> "streams_vN" : "encoded metadata" > > >>>>>>>> "streams_vN-1":empty > > >>>>>>>> "streams_vN-2":empty > > >>>>>>>> .. > > >>>>>>>> "streams_0":empty > > >>>>>>>> > > >>>>>>>> So the coordinator can still safely choose the latest common > > >> version; > > >>>>> and > > >>>>>>>> then when leaders receive the subscription (note it should > always > > >>>>> recognize > > >>>>>>>> that version), let's say it is streams_vN-2, if one of the > > >>>>> subscriptions > > >>>>>>>> are empty bytes, it will send the empty assignment with that > > >> version > > >>>>> number > > >>>>>>>> encoded in the metadata. So in the second auto-triggered all > > >> members > > >>>>> would > > >>>>>>>> send the metadata with that version: > > >>>>>>>> > > >>>>>>>> "streams_vN" : empty > > >>>>>>>> "streams_vN-1" : empty > > >>>>>>>> "streams_vN-2" : "encoded metadata" > > >>>>>>>> .. > > >>>>>>>> "streams_0":empty > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> By doing this we would not require any configs for users. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about > > the > > >>>>> details > > >>>>>>>> so probably we'd need to fill that out before making a call. For > > >>>>> example, > > >>>>>>>> you mentioned "If we detect this situation, the Streams > > application > > >>>>> closes > > >>>>>>>> corresponding active tasks as well as "hot standby" tasks, and > > >>>>> re-creates > > >>>>>>>> the new active tasks using the new store." How could we > guarantee > > >>> that > > >>>>> the > > >>>>>>>> gap between these two stores will keep decreasing than > increasing > > >> so > > >>>>> we'll > > >>>>>>>> eventually achieve the flip point? And also the longer we are > > >> before > > >>>>> the > > >>>>>>>> flip point, the larger we are doubling the storage space, etc. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Guozhang > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax < > > >>>>> matth...@confluent.io> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> @John, Guozhang, > > >>>>>>>>> > > >>>>>>>>> thanks a lot for your comments. Very long reply... > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> About upgrading the rebalance metadata: > > >>>>>>>>> > > >>>>>>>>> Another possibility to do this, would be to register multiple > > >>>>> assignment > > >>>>>>>>> strategies for the 1.2 applications. For this case, new > instances > > >>>>> would > > >>>>>>>>> be configured to support both and the broker would pick the > > >> version > > >>>>> that > > >>>>>>>>> all instances understand. The disadvantage would be, that we > send > > >>> much > > >>>>>>>>> more data (ie, two subscriptions) in each rebalance as long as > no > > >>>>> second > > >>>>>>>>> rebalance is done disabling the old protocol. Thus, using this > > >>>>> approach > > >>>>>>>>> would allow to avoid a second rebalance trading-off an > increased > > >>>>>>>>> rebalance network footprint (I also assume that this would > > >> increase > > >>>>> the > > >>>>>>>>> message size that is written into __consumer_offsets topic?). > > >>>>> Overall, I > > >>>>>>>>> am not sure if this would be a good tradeoff, but it could > avoid > > a > > >>>>>>>>> second rebalance (I have some more thoughts about stores below > > >> that > > >>>>> are > > >>>>>>>>> relevant for single rebalance upgrade). > > >>>>>>>>> > > >>>>>>>>> For future upgrades we might be able to fix this though. I was > > >>>>> thinking > > >>>>>>>>> about the following: > > >>>>>>>>> > > >>>>>>>>> In the current implementation, the leader fails if it gets a > > >>>>>>>>> subscription it does not understand (ie, newer version). We > could > > >>>>> change > > >>>>>>>>> this behavior and let the leader send an empty assignment plus > > >> error > > >>>>>>>>> code (including supported version) back to the instance sending > > >> the > > >>>>>>>>> "bad" subscription. This would allow the following logic for an > > >>>>>>>>> application instance: > > >>>>>>>>> > > >>>>>>>>> - on startup, always send the latest subscription format > > >>>>>>>>> - if leader understands it, we get an assignment back an start > > >>>>> processing > > >>>>>>>>> - if leader does not understand it, we get an empty assignment > > >> and > > >>>>>>>>> supported version back > > >>>>>>>>> - the application unsubscribe()/subscribe()/poll() again and > > >>> sends a > > >>>>>>>>> subscription using the leader's supported version > > >>>>>>>>> > > >>>>>>>>> This protocol would allow to do a single rolling bounce, and > > >>>>> implements > > >>>>>>>>> a "version probing" step, that might result in two executed > > >>>>> rebalances. > > >>>>>>>>> The advantage would be, that the user does not need to set any > > >>> configs > > >>>>>>>>> or do multiple rolling bounces, as Streams takes care of this > > >>>>>>>>> automatically. > > >>>>>>>>> > > >>>>>>>>> One disadvantage would be, that two rebalances happen and that > > for > > >>> an > > >>>>>>>>> error case during rebalance, we loose the information about the > > >>>>>>>>> supported leader version and the "probing step" would happen a > > >>> second > > >>>>> time. > > >>>>>>>>> > > >>>>>>>>> If the leader is eventually updated, it will include it's own > > >>>>> supported > > >>>>>>>>> version in all assignments, to allow a "down graded" > application > > >> to > > >>>>>>>>> upgrade its version later. Also, if a application fails, the > > first > > >>>>>>>>> probing would always be successful and only a single rebalance > > >>>>> happens. > > >>>>>>>>> If we use this protocol, I think we don't need any > configuration > > >>>>>>>>> parameter for future upgrades. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> About "upgrade.from" vs "internal.protocol.version": > > >>>>>>>>> > > >>>>>>>>> Users would set "upgrade.from" to the release version the > > >>> current/old > > >>>>>>>>> application is using. I think this is simpler, as users know > this > > >>>>>>>>> version. If we use "internal.protocol.version" instead, we > expose > > >>>>>>>>> implementation details and users need to know the protocol > > version > > >>>>> (ie, > > >>>>>>>>> they need to map from the release version to the protocol > > version; > > >>> ie, > > >>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version 2"). > > >>>>>>>>> > > >>>>>>>>> Also the KIP states that for the second rolling bounce, the > > >>>>>>>>> "upgrade.mode" config should be set back to `null` -- and thus, > > >>>>>>>>> "upgrade.from" would not have any effect and is ignored (I will > > >>> update > > >>>>>>>>> the KIP to point out this dependency). > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> About your second point: I'll update the KIP accordingly to > > >> describe > > >>>>>>>>> future updates as well. Both will be different. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> One more point about upgrading the store format. I was thinking > > >>> about > > >>>>>>>>> avoiding the second rolling bounce all together in the future: > > (1) > > >>> the > > >>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this > > required > > >>> to > > >>>>>>>>> prepare the stores as "hot standbys" before we do the switch > and > > >>>>> delete > > >>>>>>>>> the old stores. (3) the current proposal does the switch > > >> "globally" > > >>> -- > > >>>>>>>>> this is simpler and due to the required second rebalance no > > >>>>> disadvantage. > > >>>>>>>>> However, a global consistent switch over might actually not be > > >>>>> required. > > >>>>>>>>> For "in_place" upgrade, following the protocol from above, we > > >> could > > >>>>>>>>> decouple the store switch and each instance could switch its > > store > > >>>>>>>>> independently from all other instances. After the rolling > bounce, > > >> it > > >>>>>>>>> seems to be ok to switch from the old store to the new store > > >> "under > > >>>>> the > > >>>>>>>>> hood" whenever the new store is ready (this could even be done, > > >>> before > > >>>>>>>>> we switch to the new metadata version). Each time we update the > > >> "hot > > >>>>>>>>> standby" we check if it reached the "endOffset" (or maybe X% > > that > > >>>>> could > > >>>>>>>>> either be hardcoded or configurable). If we detect this > > situation, > > >>> the > > >>>>>>>>> Streams application closes corresponding active tasks as well > as > > >>> "hot > > >>>>>>>>> standby" tasks, and re-creates the new active tasks using the > new > > >>>>> store. > > >>>>>>>>> (I need to go through the details once again, but it seems to > be > > >>>>>>>>> feasible.). > > >>>>>>>>> > > >>>>>>>>> Combining this strategy with the "multiple assignment" idea, > > might > > >>>>> even > > >>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1 -> > 1.2. > > >>>>>>>>> Applications would just use the old store, as long as the new > > >> store > > >>> is > > >>>>>>>>> not ready, even if the new metadata version is used already. > > >>>>>>>>> > > >>>>>>>>> For future upgrades, a single rebalance would be sufficient, > too, > > >>> even > > >>>>>>>>> if the stores are upgraded. We would not need any config > > >> parameters > > >>> as > > >>>>>>>>> the "probe" step allows us to detect the supported rebalance > > >>> metadata > > >>>>>>>>> version (and we would also not need multiple "assigmnent > > >> strategies" > > >>>>> as > > >>>>>>>>> out own protocol encoded everything we need). > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Let me know what you think. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -Matthias > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote: > > >>>>>>>>>> @John: > > >>>>>>>>>> > > >>>>>>>>>> For the protocol version upgrade, it is only for the encoded > > >>> metadata > > >>>>>>>>> bytes > > >>>>>>>>>> protocol, which are just bytes-in bytes-out from Consumer's > pov, > > >>> so I > > >>>>>>>>> think > > >>>>>>>>>> this change should be in the Streams layer as well. > > >>>>>>>>>> > > >>>>>>>>>> @Matthias: > > >>>>>>>>>> > > >>>>>>>>>> for 2), I agree that adding a "newest supported version" > besides > > >>> the > > >>>>>>>>>> "currently used version for encoding" is a good idea to allow > > >>> either > > >>>>>>>>> case; > > >>>>>>>>>> the key is that in Streams we would likely end up with a > mapping > > >>>>> from the > > >>>>>>>>>> protocol version to the other persistent data format versions > > >> such > > >>> as > > >>>>>>>>>> rocksDB, changelog. So with such a map we can actually achieve > > >> both > > >>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded protocol > > >>>>> version's > > >>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 -> > 0.10.1 > > >>>>> leaders > > >>>>>>>>>> can choose to use the newer version in the first rolling > bounce > > >>>>> directly > > >>>>>>>>>> and we can document to users that they would not need to set > > >>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded > > >> protocol > > >>>>>>>>> version > > >>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and > then > > >> we > > >>>>> can > > >>>>>>>>>> document that "upgrade.mode" needs to be set in the first > > rolling > > >>>>> bounce > > >>>>>>>>>> and reset in the second. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Besides that, some additional comments: > > >>>>>>>>>> > > >>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for users to > > >> set > > >>>>> than > > >>>>>>>>>> "internal.protocol.version" where for the latter users only > need > > >> to > > >>>>> set a > > >>>>>>>>>> single version, while the Streams will map that version to the > > >>>>> Streams > > >>>>>>>>>> assignor's behavior as well as the data format. But maybe I > did > > >> not > > >>>>> get > > >>>>>>>>>> your idea about how the "upgrade.from" config will be set, > > >> because > > >>>>> in > > >>>>>>>>>> your Compatibility section how the upgrade.from config will be > > >> set > > >>>>> for > > >>>>>>>>>> these two rolling bounces are not very clear: for example, > > should > > >>>>> user > > >>>>>>>>>> reset it to null in the second rolling bounce? > > >>>>>>>>>> > > >>>>>>>>>> 2) In the upgrade path description, rather than talking about > > >>>>> specific > > >>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize > all > > >>> the > > >>>>>>>>>> possible scenarios, even for future upgrade versions, what > > should > > >>> be > > >>>>> the > > >>>>>>>>>> standard operations? The categorized we can summarize to would > > be > > >>>>>>>>> (assuming > > >>>>>>>>>> user upgrade from version X to version Y, where X and Y are > > Kafka > > >>>>>>>>> versions, > > >>>>>>>>>> with the corresponding supported protocol version x and y): > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and hence > no > > >>>>>>>>> persistent > > >>>>>>>>>> data formats have changed. > > >>>>>>>>>> > > >>>>>>>>>> b. x != y, but all persistent data format remains the same. > > >>>>>>>>>> > > >>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB format, > > >>>>> changelog > > >>>>>>>>>> format, has been changed. > > >>>>>>>>>> > > >>>>>>>>>> c. special case: we may need some special handling logic when > > >>>>> "current > > >>>>>>>>>> version" or "newest supported version" are not available in > the > > >>>>> protocol, > > >>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> under the above scenarios, how many rolling bounces users need > > to > > >>>>>>>>> execute? > > >>>>>>>>>> how they should set the configs in each rolling bounce? and > how > > >>>>> Streams > > >>>>>>>>>> library will execute in these cases? > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Guozhang > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax < > > >>>>> matth...@confluent.io> > > >>>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Ted, > > >>>>>>>>>>> > > >>>>>>>>>>> I still consider changing the KIP to include it right away -- > > if > > >>>>> not, > > >>>>>>>>>>> I'll create a JIRA. Need to think it through in more detail > > >> first. > > >>>>>>>>>>> > > >>>>>>>>>>> (Same for other open questions like interface names -- I > > collect > > >>>>>>>>>>> feedback and update the KIP after we reach consensus :)) > > >>>>>>>>>>> > > >>>>>>>>>>> -Matthias > > >>>>>>>>>>> > > >>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote: > > >>>>>>>>>>>> Thanks for the details, Matthias. > > >>>>>>>>>>>> > > >>>>>>>>>>>> bq. change the metadata protocol only if a future release, > > >>> encoding > > >>>>>>>>> both > > >>>>>>>>>>> used > > >>>>>>>>>>>> and supported version might be an advantage > > >>>>>>>>>>>> > > >>>>>>>>>>>> Looks like encoding both versions wouldn't be implemented in > > >> this > > >>>>> KIP. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Please consider logging a JIRA with the encoding proposal. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Cheers > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax < > > >>>>> matth...@confluent.io > > >>>>>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> @Bill: I think a filter predicate should be part of user > > code. > > >>> And > > >>>>>>>>> even > > >>>>>>>>>>>>> if we want to add something like this, I would prefer to do > > it > > >>> in > > >>>>> a > > >>>>>>>>>>>>> separate KIP. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> @James: I would love to avoid a second rolling bounce. But > > >> from > > >>> my > > >>>>>>>>>>>>> understanding it would not be possible. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to > switch > > >>> from > > >>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to switch > from > > >> the > > >>>>> old > > >>>>>>>>>>>>> store to the new store (this happens after the last > instance > > >>>>> bounces a > > >>>>>>>>>>>>> second time). > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The problem with one round of rolling bounces is, that it's > > >>>>> unclear > > >>>>>>>>> when > > >>>>>>>>>>>>> to which from version 2 to version 3. The > > >>>>> StreamsPartitionsAssignor is > > >>>>>>>>>>>>> stateless by design, and thus, the information which > version > > >> it > > >>>>> should > > >>>>>>>>>>>>> use must be passed in from externally -- and we want to use > > >> the > > >>>>>>>>>>>>> StreamsConfig to pass in this information. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> During upgrade, all new instanced have no information about > > >> the > > >>>>>>>>> progress > > >>>>>>>>>>>>> of the upgrade (ie, how many other instanced got upgrades > > >>>>> already). > > >>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3 > > >>>>> subscription. > > >>>>>>>>> The > > >>>>>>>>>>>>> leader also has this limited view on the world and can only > > >> send > > >>>>>>>>> version > > >>>>>>>>>>>>> 2 assignments back. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify > the > > >>>>> upgrade. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> We did consider to change the metadata to make later > upgrades > > >>> (ie, > > >>>>>>>>> from > > >>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the > > >> metadata > > >>> or > > >>>>>>>>>>>>> storage format again -- as long as we don't change it, a > > >> single > > >>>>>>>>> rolling > > >>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and > > >> "supported > > >>>>>>>>>>>>> version". This would allow the leader to switch to the new > > >>> version > > >>>>>>>>>>>>> earlier and without a second rebalance: leader would > receive > > >>> "used > > >>>>>>>>>>>>> version == old" and "supported version = old/new" -- as > long > > >> as > > >>> at > > >>>>>>>>> least > > >>>>>>>>>>>>> one instance sends a "supported version = old" leader sends > > >> old > > >>>>>>>>> version > > >>>>>>>>>>>>> assignment back. However, encoding both version would allow > > >> that > > >>>>> the > > >>>>>>>>>>>>> leader can send a new version assignment back, right after > > the > > >>>>> first > > >>>>>>>>>>>>> round or rebalance finished (all instances send "supported > > >>>>> version = > > >>>>>>>>>>>>> new"). However, there are still two issues with this: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 1) if we switch to the new format right after the last > > >> instance > > >>>>>>>>> bounced, > > >>>>>>>>>>>>> the new stores might not be ready to be used -- this could > > >> lead > > >>> to > > >>>>>>>>>>>>> "downtime" as store must be restored before processing can > > >>> resume. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. At this > > >>>>> point, the > > >>>>>>>>>>>>> instance will still have "upgrade mode" enabled and thus > > sends > > >>>>> the old > > >>>>>>>>>>>>> protocol data. However, it would be desirable to never fall > > >> back > > >>>>> to > > >>>>>>>>> the > > >>>>>>>>>>>>> old protocol after the switch to the new protocol. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The second issue is minor and I guess if users set-up the > > >>> instance > > >>>>>>>>>>>>> properly it could be avoided. However, the first issue > would > > >>>>> prevent > > >>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we consider > > >> that > > >>> we > > >>>>>>>>> might > > >>>>>>>>>>>>> change the metadata protocol only if a future release, > > >> encoding > > >>>>> both > > >>>>>>>>>>>>> used and supported version might be an advantage in the > > future > > >>>>> and we > > >>>>>>>>>>>>> could consider to add this information in 1.2 release to > > >> prepare > > >>>>> for > > >>>>>>>>>>> this. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Btw: monitoring the log, is also only required to give the > > >>>>> instances > > >>>>>>>>>>>>> enough time to prepare the stores in new format. If you > would > > >> do > > >>>>> the > > >>>>>>>>>>>>> second rolling bounce before this, it would still work -- > > >>>>> however, you > > >>>>>>>>>>>>> might see app "downtime" as the new store must be fully > > >> restored > > >>>>>>>>> before > > >>>>>>>>>>>>> processing can resume. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Does this make sense? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote: > > >>>>>>>>>>>>>> Matthias, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid of > the > > >> 2nd > > >>>>>>>>> rolling > > >>>>>>>>>>>>> bounce? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> For the in-place upgrade, it seems like primary difference > > >>>>> between > > >>>>>>>>> the > > >>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide > > >>>>> whether to > > >>>>>>>>>>> send > > >>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3. > (Actually, > > >>>>> there is > > >>>>>>>>>>>>> another difference mentioned in that the KIP says that the > > 2nd > > >>>>> rolling > > >>>>>>>>>>>>> bounce should happen after all new state stores are created > > by > > >>> the > > >>>>>>>>>>>>> background thread. However, within the 2nd rolling bounce, > we > > >>> say > > >>>>> that > > >>>>>>>>>>>>> there is still a background thread, so it seems like is no > > >>> actual > > >>>>>>>>>>>>> requirement to wait for the new state stores to be > created.) > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal with > > >>> mixed-mode > > >>>>>>>>>>> (having > > >>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer group). > It > > >>> seems > > >>>>>>>>> like > > >>>>>>>>>>> we > > >>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic > > >>>>> (somehow/somewhere) > > >>>>>>>>>>> such > > >>>>>>>>>>>>> that: > > >>>>>>>>>>>>>> * Instances send Subscription Version 2 until all > instances > > >> are > > >>>>>>>>> running > > >>>>>>>>>>>>> the new code. > > >>>>>>>>>>>>>> * Once all the instances are running the new code, then > one > > >> at > > >>> a > > >>>>>>>>> time, > > >>>>>>>>>>>>> the instances start sending Subscription V3. Leader still > > >> hands > > >>>>> out > > >>>>>>>>>>>>> Assignment Version 2, until all new state stores are ready. > > >>>>>>>>>>>>>> * Once all instances report that new stores are ready, > > Leader > > >>>>> sends > > >>>>>>>>> out > > >>>>>>>>>>>>> Assignment Version 3. > > >>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3, it > can > > >>>>> delete > > >>>>>>>>> the > > >>>>>>>>>>>>> old state store. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot of > > >>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling > > >> restarts. > > >>> No > > >>>>>>>>> need > > >>>>>>>>>>> to > > >>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy it, > and > > >>> the > > >>>>>>>>>>> instances > > >>>>>>>>>>>>> update themselves. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> What do you think? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> The thing that made me think of this is that the "2 > rolling > > >>>>> bounces" > > >>>>>>>>> is > > >>>>>>>>>>>>> similar to what Kafka brokers have to do changes in > > >>>>>>>>>>>>> inter.broker.protocol.version and > log.message.format.version. > > >>> And > > >>>>> in > > >>>>>>>>> the > > >>>>>>>>>>>>> broker case, it seems like it would be possible (with some > > >> work > > >>> of > > >>>>>>>>>>> course) > > >>>>>>>>>>>>> to modify kafka to allow us to do similar auto-detection of > > >>> broker > > >>>>>>>>>>>>> capabilities and automatically do a switchover from old/new > > >>>>> versions. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> -James > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck < > > bbej...@gmail.com > > >>> > > >>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Matthias, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I do have one question regarding the retrieval methods on > > >> the > > >>>>> new > > >>>>>>>>>>>>>>> interfaces. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Would want to consider adding one method with a Predicate > > >> that > > >>>>> would > > >>>>>>>>>>>>> allow > > >>>>>>>>>>>>>>> for filtering records by the timestamp stored with the > > >> record? > > >>>>> Or > > >>>>>>>>> is > > >>>>>>>>>>>>> this > > >>>>>>>>>>>>>>> better left for users to implement themselves once the > data > > >>> has > > >>>>> been > > >>>>>>>>>>>>>>> retrieved? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>> Bill > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu < > > yuzhih...@gmail.com > > >>> > > >>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Matthias: > > >>>>>>>>>>>>>>>> For my point #1, I don't have preference as to which > > >>> separator > > >>>>> is > > >>>>>>>>>>>>> chosen. > > >>>>>>>>>>>>>>>> Given the background you mentioned, current choice is > > good. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is closer > > to > > >>>>> English > > >>>>>>>>>>>>>>>> grammar. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Would be good to listen to what other people think. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax < > > >>>>>>>>>>> matth...@confluent.io > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks for the comments! > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> @Guozhang: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance metadata > > upgrade > > >>> fix > > >>>>>>>>>>>>>>>>> (addressing the mentioned > > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It > > >> give a > > >>>>> first > > >>>>>>>>>>>>>>>>> impression how the metadata upgrade works including a > > >> system > > >>>>> test: > > >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636 > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. I > agree > > >>> that > > >>>>> the > > >>>>>>>>>>> KIP > > >>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>> complex am I ok with putting out more code to give > better > > >>>>>>>>> discussion > > >>>>>>>>>>>>>>>>> context. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> @Ted: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the > > >>>>>>>>> `processing.guarantee` > > >>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and > `exactly_once` > > >> as > > >>>>>>>>> values. > > >>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash but I > > >>> prefer > > >>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can also > > >> change > > >>>>> it to > > >>>>>>>>>>>>> `-`. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> About the interface name: I am fine either way -- I > > >> stripped > > >>>>> the > > >>>>>>>>>>>>> `With` > > >>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good to get > > >>>>> feedback > > >>>>>>>>>>> from > > >>>>>>>>>>>>>>>>> others and pick the name the majority prefers. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> @John: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would not > > >> make a > > >>>>>>>>>>>>> difference. > > >>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two version > > >>> might > > >>>>>>>>>>>>> introduce > > >>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I don't hit > an > > >>>>> issue > > >>>>>>>>> with > > >>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead of > > >>>>> `rocksdb-v2` > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: > > >>>>>>>>>>>>>>>>>> Hey Matthias, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several questions > queued > > >>> up, > > >>>>> but > > >>>>>>>>>>> they > > >>>>>>>>>>>>>>>>> were > > >>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... oh, > well. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> One very minor thought re changing the state directory > > >> from > > >>>>>>>>>>>>>>>>> "/<state.dir>/< > > >>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to > > >>>>> "/<state.dir>/< > > >>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if > > you > > >>>>> put the > > >>>>>>>>>>>>> "v2" > > >>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e., > > >>>>> "/<state.dir>/< > > >>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), > then > > >>> you > > >>>>> get > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>> same > > >>>>>>>>>>>>>>>>>> benefits without altering the high-level directory > > >>> structure. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people running > > >>>>> scripts to > > >>>>>>>>>>>>>>>> monitor > > >>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such use > > >> cases. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>> -John > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu < > > >>> yuzhih...@gmail.com> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Matthias: > > >>>>>>>>>>>>>>>>>>> Nicely written KIP. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may > > >>>>> sometimes > > >>>>>>>>> be > > >>>>>>>>>>>>> miss > > >>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly to > > >>> user. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> > > { > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for > > >> the > > >>>>>>>>> class ? > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang < > > >>>>>>>>> wangg...@gmail.com > > >>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section and it > > >> looks > > >>>>> good > > >>>>>>>>> to > > >>>>>>>>>>>>> me, > > >>>>>>>>>>>>>>>> if > > >>>>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also share it > > >> here > > >>>>> so > > >>>>>>>>> that > > >>>>>>>>>>>>>>>>> people > > >>>>>>>>>>>>>>>>>>>> can take a look? > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this > > >> there > > >>>>> are > > >>>>>>>>>>> always > > >>>>>>>>>>>>>>>>> some > > >>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is better > > to > > >>>>> have > > >>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> implementation in parallel along with the design > > >>>>> discussion :) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Guozhang > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax < > > >>>>>>>>>>>>>>>> matth...@confluent.io > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to > > allow > > >>>>> storing > > >>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to > > >>>>> resolve > > >>>>>>>>>>>>> multiple > > >>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests). > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this! > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>> > > >>> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > >