Thanks for the information Matthias. I will await your completion of this ticket then since it underpins the essential parts of a RocksDB TTL aligned with the changelog topic. I am eager to work on that ticket myself, so if I can help on this one in any way please let me know.
Thanks Adam On Tue, Nov 20, 2018 at 5:26 PM Matthias J. Sax <matth...@confluent.io> wrote: > It's an interesting idea to use second store, to maintain the > timestamps. However, each RocksDB instance implies some overhead. In > fact, we are looking into ColumnFamilies atm to see if we can use those > and merge multiple RocksDBs into a single one to reduce this overhead. > > -Matthias > > On 11/20/18 5:15 AM, Patrik Kleindl wrote: > > Hi Adam > > > > Sounds great, I was already planning to ask around if anyone had tackled > > this. > > We have a use case very similar to what you described in KAFKA-4212, only > > with Global State Stores. > > I have tried a few things with the normal DSL but was not really > successful. > > Schedule/Punctuate is not possible, supplying a windowed store is also > not > > allowed and the process method has no knowledge of the timestamp of the > > record. > > And anything loaded on startup is not filtered anyway. > > > > Regarding 4212, wouldn't it be easier (although a little less > > space-efficient) to track the Timestamps in a separate Store with <K, > Long> > > ? > > This would leave the original store intact and allow a migration of the > > timestamps without touching the other data. > > > > So I am very interested in your PR :-) > > > > best regards > > > > Patrik > > > > On Tue, 20 Nov 2018 at 04:46, Adam Bellemare <adam.bellem...@gmail.com> > > wrote: > > > >> Hi Matthias > >> > >> Thanks - I figured that it was probably a case of just too much to do > and > >> not enough time. I know how that can go. I am asking about this one in > >> relation to https://issues.apache.org/jira/browse/KAFKA-4212, adding a > TTL > >> to RocksDB. I have outlined a bit about my use-case within 4212, but for > >> brevity here it is: > >> > >> My case: > >> 1) I have a RocksDB with TTL implementation working where records are > aged > >> out using the TTL that comes with RocksDB (very simple). > >> 2) We prevent records from loading from the changelog if recordTime + > TTL < > >> referenceTimeStamp (default = System.currentTimeInMillis() ). > >> > >> This assumes that the records are stored with the same time reference > (say > >> UTC) as the consumer materializing the RocksDB store. > >> > >> My questions about KIP-258 are as follows: > >> 1) How does "we want to be able to store record timestamps in KTables" > >> differ from inserting records into RocksDB with TTL at consumption > time? I > >> understand that it could be a difference of some seconds, minutes, > hours, > >> days etc between when the record was published and now, but given the > >> nature of how RocksDB TTL works (eventual - based on compaction) I don't > >> see how a precise TTL can be achieved, such as that which one can get > with > >> windowed stores. > >> > >> 2) Are you looking to change how records are inserted into a TTL > RocksDB, > >> such that the TTL would take effect from the record's published time? If > >> not, what would be the ideal workflow here for a single record with TTL > >> RocksDB? > >> ie: Record Timestamp: 100 > >> TTL: 50 > >> Record inserted into rocksDB: 110 > >> Record to expire at 150? > >> > >> 3) I'm not sure I fully understand the importance of the upgrade path. I > >> have read the link to (https://issues.apache.org/jira/browse/KAFKA-3522 > ) > >> in > >> the KIP, and I can understand that a state-store on disk may not > represent > >> what the application is expecting. I don't think I have the full picture > >> though, because that issue seems to be easy to fix with a simple > versioned > >> header or accompanying file, forcing the app to rebuild the state if the > >> version is incompatible. Can you elaborate or add a scenario to the KIP > >> that illustrates the need for the upgrade path? > >> > >> Thanks, > >> > >> Adam > >> > >> > >> > >> > >> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax <matth...@confluent.io> > >> wrote: > >> > >>> Adam, > >>> > >>> I am still working on it. Was pulled into a lot of other tasks lately > so > >>> this was delayed. Also had some discussions about simplifying the > >>> upgrade path with some colleagues and I am prototyping this atm. Hope > to > >>> update the KIP accordingly soon. > >>> > >>> -Matthias > >>> > >>> On 11/10/18 7:41 AM, Adam Bellemare wrote: > >>>> Hello Matthias > >>>> > >>>> I am curious as to the status of this KIP. TTL and expiry of records > >> will > >>>> be extremely useful for several of our business use-cases, as well as > >>>> another KIP I had been working on. > >>>> > >>>> Thanks > >>>> > >>>> > >>>> > >>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <eno.there...@gmail.com > > > >>>> wrote: > >>>> > >>>>> Hi Matthias, > >>>>> > >>>>> Good stuff. Could you comment a bit on how future-proof is this > >> change? > >>> For > >>>>> example, if we want to store both event timestamp "and" processing > >> time > >>> in > >>>>> RocksDB will we then need another interface (e.g. called > >>>>> KeyValueWithTwoTimestampsStore)? > >>>>> > >>>>> Thanks > >>>>> Eno > >>>>> > >>>>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> Thanks for your input Guozhang and John. > >>>>>> > >>>>>> I see your point, that the upgrade API is not simple. If you don't > >>>>>> thinks it's valuable to make generic store upgrades possible (atm), > >> we > >>>>>> can make the API internal, too. The impact is, that we only support > a > >>>>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to > >>>>>> WindowedWithTS etc) for which we implement the internal interfaces. > >>>>>> > >>>>>> We can keep the design generic, so if we decide to make it public, > we > >>>>>> don't need to re-invent it. This will also have the advantage, that > >> we > >>>>>> can add upgrade pattern for other stores later, too. > >>>>>> > >>>>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, but > it > >>>>>> was the only way I could find to design a generic upgrade interface. > >> If > >>>>>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` > would > >>>>>> become an internal interface I guess (don't think we can remove it). > >>>>>> > >>>>>> I will wait for more feedback about this and if nobody wants to keep > >> it > >>>>>> as public API I will update the KIP accordingly. Will add some more > >>>>>> clarifications for different upgrade patterns in the mean time and > >> fix > >>>>>> the typos/minor issues. > >>>>>> > >>>>>> About adding a new state UPGRADING: maybe we could do that. However, > >> I > >>>>>> find it particularly difficult to make the estimation when we should > >>>>>> switch to RUNNING, thus, I am a little hesitant. Using store > >> callbacks > >>>>>> or just logging the progress including some indication about the > >> "lag" > >>>>>> might actually be sufficient. Not sure what others think? > >>>>>> > >>>>>> About "value before timestamp": no real reason and I think it does > >> not > >>>>>> make any difference. Do you want to change it? > >>>>>> > >>>>>> About upgrade robustness: yes, we cannot control if an instance > >> fails. > >>>>>> That is what I meant by "we need to write test". The upgrade should > >> be > >>>>>> able to continuous even is an instance goes down (and we must make > >> sure > >>>>>> that we don't end up in an invalid state that forces us to wipe out > >> the > >>>>>> whole store). Thus, we need to write system tests that fail > instances > >>>>>> during upgrade. > >>>>>> > >>>>>> For `in_place_offline` upgrade: I don't think we need this mode, > >>> because > >>>>>> people can do this via a single rolling bounce. > >>>>>> > >>>>>> - prepare code and switch KV-Store to KVwithTs-Store > >>>>>> - do a single rolling bounce (don't set any upgrade config) > >>>>>> > >>>>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we > >>>>>> remove the `StoreUpgradeBuilder`) will detect that there is only an > >> old > >>>>>> local KV store w/o TS, will start to restore the new KVwithTs store, > >>>>>> wipe out the old store and replace with the new store after restore > >> is > >>>>>> finished, and start processing only afterwards. (I guess we need to > >>>>>> document this case -- will also add it to the KIP.) > >>>>>> > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 8/9/18 1:10 PM, John Roesler wrote: > >>>>>>> Hi Matthias, > >>>>>>> > >>>>>>> I think this KIP is looking really good. > >>>>>>> > >>>>>>> I have a few thoughts to add to the others: > >>>>>>> > >>>>>>> 1. You mentioned at one point users needing to configure > >>>>>>> `upgrade.mode="null"`. I think this was a typo and you meant to say > >>>>> they > >>>>>>> should remove the config. If they really have to set it to a string > >>>>>> "null" > >>>>>>> or even set it to a null value but not remove it, it would be > >>>>>> unfortunate. > >>>>>>> > >>>>>>> 2. In response to Bill's comment #1 , you said that "The idea is > >> that > >>>>> the > >>>>>>> upgrade should be robust and not fail. We need to write according > >>>>> tests". > >>>>>>> I may have misunderstood the conversation, but I don't think it's > >>>>> within > >>>>>>> our power to say that an instance won't fail. What if one of my > >>>>> computers > >>>>>>> catches on fire? What if I'm deployed in the cloud and one instance > >>>>>>> disappears and is replaced by a new one? Or what if one instance > >> goes > >>>>>> AWOL > >>>>>>> for a long time and then suddenly returns? How will the upgrade > >>> process > >>>>>>> behave in light of such failures? > >>>>>>> > >>>>>>> 3. your thought about making in-place an offline mode is > >> interesting, > >>>>> but > >>>>>>> it might be a bummer for on-prem users who wish to upgrade online, > >> but > >>>>>>> cannot just add new machines to the pool. It could be a new upgrade > >>>>> mode > >>>>>>> "offline-in-place", though... > >>>>>>> > >>>>>>> 4. I was surprised to see that a user would need to modify the > >>> topology > >>>>>> to > >>>>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's > >>>>>>> suggestions would remove this necessity. > >>>>>>> > >>>>>>> Thanks for taking on this very complex but necessary work. > >>>>>>> > >>>>>>> -John > >>>>>>> > >>>>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <wangg...@gmail.com> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hello Matthias, > >>>>>>>> > >>>>>>>> Thanks for the updated KIP. Some more comments: > >>>>>>>> > >>>>>>>> 1. The current set of proposed API is a bit too complicated, which > >>>>> makes > >>>>>>>> the upgrade flow from user's perspective also a bit complex. I'd > >> like > >>>>> to > >>>>>>>> check different APIs and discuss about their needs separately: > >>>>>>>> > >>>>>>>> 1.a. StoreProxy: needed for in-place upgrade only, between the > >>>>> first > >>>>>>>> and second rolling bounce, where the old-versioned stores can > >> handle > >>>>>>>> new-versioned store APIs. I think such upgrade paths (i.e. from > one > >>>>>> store > >>>>>>>> type to another) would not be very common: users may want to > >> upgrade > >>>>>> from a > >>>>>>>> certain store engine to another, but the interface would likely be > >>>>>> staying > >>>>>>>> the same. Hence personally I'd suggest we keep it internally and > >> only > >>>>>>>> consider exposing it in the future if it does become a common > >>> pattern. > >>>>>>>> > >>>>>>>> 1.b. ConverterStore / RecordConverter: needed for both > in-place > >>>>> and > >>>>>>>> roll-over upgrade, between the first and second rolling bounces, > >> for > >>>>> the > >>>>>>>> new versioned store to be able to read old-versioned changelog > >>> topics. > >>>>>>>> Firstly I think we should not expose key in the public APIs but > >> only > >>>>> the > >>>>>>>> values, since allowing key format changes would break log > >> compaction, > >>>>>> and > >>>>>>>> hence would not be compatible anyways. As for value format > changes, > >>>>>>>> personally I think we can also keep its upgrade logic internally > as > >>> it > >>>>>> may > >>>>>>>> not worth generalizing to user customizable logic. > >>>>>>>> > >>>>>>>> 1.c. If you agrees with 2.a/b above, then we can also remove " > >>>>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the > public > >>>>>> APIs. > >>>>>>>> > >>>>>>>> 1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" > is > >>>>> not > >>>>>>>> needed either given that we are exposing "ValueAndTimestamp" > >> anyways. > >>>>>> I.e. > >>>>>>>> it is just a syntax sugar and for IQ, users can always just set a > " > >>>>>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" as > >> the > >>>>>> new > >>>>>>>> interface does not provide any additional functions. > >>>>>>>> > >>>>>>>> > >>>>>>>> 2. Could we further categorize the upgrade flow for different use > >>>>> cases, > >>>>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be used > >>>>>>>> automatically for non-windowed aggregate; 2) PAPI users who do not > >>>>> need > >>>>>> to > >>>>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to > switch > >>> to > >>>>>>>> KeyValueWithTimestampStore. Just to give my understanding for 3), > >> the > >>>>>>>> upgrade flow for users may be simplified as the following (for > both > >>>>>>>> in-place and roll-over): > >>>>>>>> > >>>>>>>> * Update the jar to new version, make code changes from > >>>>>> KeyValueStore > >>>>>>>> to KeyValueWithTimestampStore, set upgrade config. > >>>>>>>> > >>>>>>>> * First rolling bounce, and library code can internally use > >> proxy > >>>>> / > >>>>>>>> converter based on the specified config to handle new APIs with > old > >>>>>> stores, > >>>>>>>> while let new stores read from old changelog data. > >>>>>>>> > >>>>>>>> * Reset upgrade config. > >>>>>>>> > >>>>>>>> * Second rolling bounce, and the library code automatically > >> turn > >>>>> off > >>>>>>>> logic for proxy / converter. > >>>>>>>> > >>>>>>>> > >>>>>>>> 3. Some more detailed proposals are needed for when to recommend > >>> users > >>>>>> to > >>>>>>>> trigger the second rolling bounce. I have one idea to share here: > >> we > >>>>>> add a > >>>>>>>> new state to KafkaStreams, say UPGRADING, which is set when 1) > >>> upgrade > >>>>>>>> config is set, and 2) the new stores are still ramping up (for the > >>>>>> second > >>>>>>>> part, we can start with some internal hard-coded heuristics to > >> decide > >>>>>> when > >>>>>>>> it is close to be ramped up). If either one of it is not true any > >>>>> more, > >>>>>> it > >>>>>>>> should transit to RUNNING. Users can then watch on this state, and > >>>>>> decide > >>>>>>>> to only trigger the second rebalance when the state has transited > >>> from > >>>>>>>> UPGRADING. They can also choose to cut over while the instance is > >>>>> still > >>>>>>>> UPGRADING, the downside is that after that the application may > have > >>>>> long > >>>>>>>> restoration phase which is, to user's pov, unavailability periods. > >>>>>>>> > >>>>>>>> > >>>>>>>> Below are just some minor things on the wiki: > >>>>>>>> > >>>>>>>> 4. "proxy story" => "proxy store". > >>>>>>>> > >>>>>>>> 5. "use the a builder " => "use a builder" > >>>>>>>> > >>>>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to the > >>>>> value": > >>>>>>>> what's the rationale of putting the timestamp before the value, > >> than > >>>>>> after > >>>>>>>> the value? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax < > >>>>> matth...@confluent.io> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Thanks for the feedback Bill. I just update the KIP with some of > >>> your > >>>>>>>>> points. > >>>>>>>>> > >>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to > >> watch > >>>>> the > >>>>>>>>>>> restore process), I'm wondering if we want to provide a type of > >>>>>>>>>>> StateRestoreListener that could signal when the new stores have > >>>>>>>> reached > >>>>>>>>>>> parity with the existing old stores and that could be the > signal > >>> to > >>>>>>>>> start > >>>>>>>>>>> second rolling rebalance? > >>>>>>>>> > >>>>>>>>> I think we can reuse the existing listeners, thus, I did not > >> include > >>>>>>>>> anything in the KIP. About a signal to rebalance: this might be > >>>>> tricky. > >>>>>>>>> If we prepare the store "online", the active task will update the > >>>>> state > >>>>>>>>> continuously, and thus, state prepare is never finished. It will > >> be > >>>>> the > >>>>>>>>> users responsibility to do the second rebalance (note, that the > >>>>> second > >>>>>>>>> rebalance will first finish the last delta of the upgrade to > >> finish > >>>>> the > >>>>>>>>> upgrade before actual processing resumes). I clarified the KIP > >> with > >>>>>> this > >>>>>>>>> regard a little bit. > >>>>>>>>> > >>>>>>>>>>> 1. Out of N instances, one fails midway through the process, > >> would > >>>>> we > >>>>>>>>> allow > >>>>>>>>>>> the other instances to complete or just fail the entire > upgrade? > >>>>>>>>> > >>>>>>>>> The idea is that the upgrade should be robust and not fail. We > >> need > >>>>> to > >>>>>>>>> write according tests. > >>>>>>>>> > >>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename the > >>>>>> current > >>>>>>>>>>> active directories vs. deleting them right away, and when all > >> the > >>>>>>>>> prepare > >>>>>>>>>>> task directories are successfully migrated then delete the > >>> previous > >>>>>>>>> active > >>>>>>>>>>> ones. > >>>>>>>>> > >>>>>>>>> Ack. Updated the KIP. > >>>>>>>>> > >>>>>>>>>>> 3. For the first rolling bounce we pause any processing any new > >>>>>>>> records > >>>>>>>>> and > >>>>>>>>>>> just allow the prepare tasks to restore, then once all prepare > >>>>> tasks > >>>>>>>>> have > >>>>>>>>>>> restored, it's a signal for the second round of rolling bounces > >>> and > >>>>>>>>> then as > >>>>>>>>>>> each task successfully renames its prepare directories and > >> deletes > >>>>>> the > >>>>>>>>> old > >>>>>>>>>>> active task directories, normal processing of records resumes. > >>>>>>>>> > >>>>>>>>> The basic idea is to do an online upgrade to avoid downtime. We > >> can > >>>>>>>>> discuss to offer both options... For the offline upgrade option, > >> we > >>>>>>>>> could simplify user interaction and trigger the second rebalance > >>>>>>>>> automatically with the requirement that a user needs to update > any > >>>>>>>> config. > >>>>>>>>> > >>>>>>>>> If might actually be worth to include this option: we know from > >>>>>>>>> experience with state restore, that regular processing slows down > >>> the > >>>>>>>>> restore. For roll_over upgrade, it would be a different story and > >>>>>>>>> upgrade should not be slowed down by regular processing. Thus, we > >>>>>> should > >>>>>>>>> even make in_place an offline upgrade and force people to use > >>>>> roll_over > >>>>>>>>> if they need onlint upgrade. Might be a fair tradeoff that may > >>>>> simplify > >>>>>>>>> the upgrade for the user and for the code complexity. > >>>>>>>>> > >>>>>>>>> Let's see what other think. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote: > >>>>>>>>>> Hi Matthias, > >>>>>>>>>> > >>>>>>>>>> Thanks for the update and the working prototype, it helps with > >>>>>>>>>> understanding the KIP. > >>>>>>>>>> > >>>>>>>>>> I took an initial pass over this PR, and overall I find the > >>>>> interfaces > >>>>>>>>> and > >>>>>>>>>> approach to be reasonable. > >>>>>>>>>> > >>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to > watch > >>>>> the > >>>>>>>>>> restore process), I'm wondering if we want to provide a type of > >>>>>>>>>> StateRestoreListener that could signal when the new stores have > >>>>>> reached > >>>>>>>>>> parity with the existing old stores and that could be the signal > >> to > >>>>>>>> start > >>>>>>>>>> second rolling rebalance? > >>>>>>>>>> > >>>>>>>>>> Although you solicited feedback on the interfaces involved, I > >>> wanted > >>>>>> to > >>>>>>>>> put > >>>>>>>>>> down some thoughts that have come to mind reviewing this KIP > >> again > >>>>>>>>>> > >>>>>>>>>> 1. Out of N instances, one fails midway through the process, > >> would > >>>>> we > >>>>>>>>> allow > >>>>>>>>>> the other instances to complete or just fail the entire upgrade? > >>>>>>>>>> 2. During the second rolling bounce, maybe we could rename the > >>>>> current > >>>>>>>>>> active directories vs. deleting them right away, and when all > >> the > >>>>>>>>> prepare > >>>>>>>>>> task directories are successfully migrated then delete the > >> previous > >>>>>>>>> active > >>>>>>>>>> ones. > >>>>>>>>>> 3. For the first rolling bounce we pause any processing any new > >>>>>> records > >>>>>>>>> and > >>>>>>>>>> just allow the prepare tasks to restore, then once all prepare > >>> tasks > >>>>>>>> have > >>>>>>>>>> restored, it's a signal for the second round of rolling bounces > >> and > >>>>>>>> then > >>>>>>>>> as > >>>>>>>>>> each task successfully renames its prepare directories and > >> deletes > >>>>> the > >>>>>>>>> old > >>>>>>>>>> active task directories, normal processing of records resumes. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Bill > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax < > >>>>>> matth...@confluent.io > >>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in AK > 2.0 > >>>>>>>>>>> release. Thus, I want to pick up this KIP again to get the > >> RocksDB > >>>>>>>>>>> upgrade done for 2.1. > >>>>>>>>>>> > >>>>>>>>>>> I updated the KIP accordingly and also have a "prove of > concept" > >>> PR > >>>>>>>>>>> ready (for "in place" upgrade only): > >>>>>>>>>>> https://github.com/apache/kafka/pull/5422/ > >>>>>>>>>>> > >>>>>>>>>>> There a still open questions, but I want to collect early > >> feedback > >>>>> on > >>>>>>>>>>> the proposed interfaces we need for the store upgrade. Also > >> note, > >>>>>> that > >>>>>>>>>>> the KIP now also aim to define a generic upgrade path from any > >>>>> store > >>>>>>>>>>> format A to any other store format B. Adding timestamps is just > >> a > >>>>>>>>>>> special case. > >>>>>>>>>>> > >>>>>>>>>>> I will continue to work on the PR and refine the KIP in the > >>>>> meantime, > >>>>>>>>> too. > >>>>>>>>>>> > >>>>>>>>>>> Looking forward to your feedback. > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote: > >>>>>>>>>>>> After some more thoughts, I want to follow John's suggestion > >> and > >>>>>>>> split > >>>>>>>>>>>> upgrading the rebalance metadata from the store upgrade. > >>>>>>>>>>>> > >>>>>>>>>>>> I extracted the metadata upgrade into it's own KIP: > >>>>>>>>>>>> > >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268% > >>>>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade > >>>>>>>>>>>> > >>>>>>>>>>>> I'll update this KIP accordingly shortly. I also want to > >> consider > >>>>> to > >>>>>>>>>>>> make the store format upgrade more flexible/generic. Atm, the > >> KIP > >>>>> is > >>>>>>>>> too > >>>>>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI > users > >>>>> that > >>>>>>>> we > >>>>>>>>>>>> should not force to upgrade the stores. I need to figure out > >> the > >>>>>>>>> details > >>>>>>>>>>>> and follow up later. > >>>>>>>>>>>> > >>>>>>>>>>>> Please give feedback for the new KIP-268 on the corresponding > >>>>>>>>> discussion > >>>>>>>>>>>> thread. > >>>>>>>>>>>> > >>>>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't figure > >> out > >>>>> a > >>>>>>>> way > >>>>>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a > fix > >>>>> for > >>>>>>>>>>>> future upgrades. Please share your thoughts. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for all your feedback! > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote: > >>>>>>>>>>>>> @John: yes, we would throw if configs are missing (it's an > >>>>>>>>>>>>> implementation details IMHO and thus I did not include it in > >> the > >>>>>>>> KIP) > >>>>>>>>>>>>> > >>>>>>>>>>>>> @Guozhang: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1) I understand know what you mean. We can certainly, allow > >> all > >>>>>>>> values > >>>>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for > >>>>> `upgrade.from` > >>>>>>>>>>>>> parameter. I had a similar though once but decided to > collapse > >>>>> them > >>>>>>>>> into > >>>>>>>>>>>>> one -- will update the KIP accordingly. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2) The idea to avoid any config would be, to always send both > >>>>>>>> request. > >>>>>>>>>>>>> If we add a config to eventually disable the old request, we > >>>>> don't > >>>>>>>>> gain > >>>>>>>>>>>>> anything with this approach. The question is really, if we > are > >>>>>>>> willing > >>>>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be limited > >> to > >>>>> 2 > >>>>>>>>>>>>> versions and not grow further in future releases. More > details > >>> in > >>>>>>>> (3) > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and > >> allows > >>>>> us > >>>>>>>> to > >>>>>>>>>>>>> stay with 2 "assignment strategies" we need to register, as > >> the > >>>>> new > >>>>>>>>>>>>> assignment strategy will allow to "upgrade itself" via > >> "version > >>>>>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a > >> config > >>>>> if > >>>>>>>>>>>>> people upgrade from pre-1.2 releases. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thus, I don't think we need to register new "assignment > >>>>> strategies" > >>>>>>>>> and > >>>>>>>>>>>>> send empty subscriptions for older version. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4) I agree that this is a tricky thing to get right with a > >>> single > >>>>>>>>>>>>> rebalance. I share the concern that an application might > never > >>>>>> catch > >>>>>>>>> up > >>>>>>>>>>>>> and thus the hot standby will never be ready. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Maybe it's better to go with 2 rebalances for store upgrades. > >> If > >>>>> we > >>>>>>>> do > >>>>>>>>>>>>> this, we also don't need to go with (2) and can get (3) in > >> place > >>>>>> for > >>>>>>>>>>>>> future upgrades. I also think that changes to the metadata > are > >>>>> more > >>>>>>>>>>>>> likely and thus allowing for single rolling bounce for this > >> case > >>>>> is > >>>>>>>>> more > >>>>>>>>>>>>> important anyway. If we assume that store upgrade a rare, it > >>>>> might > >>>>>>>> be > >>>>>>>>> ok > >>>>>>>>>>>>> to sacrifice two rolling bounced for this case. It was just > an > >>>>> idea > >>>>>>>> I > >>>>>>>>>>>>> wanted to share (even if I see the issues). > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote: > >>>>>>>>>>>>>> Hello Matthias, thanks for your replies. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1) About the config names: actually I was trying to not > >> expose > >>>>>>>>>>>>>> implementation details :) My main concern was that in your > >>>>>> proposal > >>>>>>>>> the > >>>>>>>>>>>>>> values need to cover the span of all the versions that are > >>>>>> actually > >>>>>>>>>>> using > >>>>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) > >> am > >>>>>>>>>>> upgrading > >>>>>>>>>>>>>> from any versions within this range I need to remember to > use > >>>>> the > >>>>>>>>> value > >>>>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In my > >>>>>>>>> suggestion > >>>>>>>>>>> I > >>>>>>>>>>>>>> was trying to argue the benefit of just letting users to > >>> specify > >>>>>>>> the > >>>>>>>>>>> actual > >>>>>>>>>>>>>> Kafka version she's trying to upgrade from, than specifying > a > >>>>>> range > >>>>>>>>> of > >>>>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as > the > >>>>>>>> values, > >>>>>>>>>>> but > >>>>>>>>>>>>>> still using Kafka versions like broker's `internal.version` > >>>>>> config. > >>>>>>>>>>> But if > >>>>>>>>>>>>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" > >>> you > >>>>>>>>> meant > >>>>>>>>>>> to > >>>>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" > >> or > >>>>>>>> "1.1" > >>>>>>>>>>> which > >>>>>>>>>>>>>> are all recognizable config values then I think we are > >> actually > >>>>> on > >>>>>>>>> the > >>>>>>>>>>> same > >>>>>>>>>>>>>> page. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would increase > >> the > >>>>>>>>> network > >>>>>>>>>>>>>> footprint, but not the message size, IF I'm not > >>>>> mis-understanding > >>>>>>>>> your > >>>>>>>>>>> idea > >>>>>>>>>>>>>> of registering multiple assignment. More details: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can > encode > >>>>>>>>> multiple > >>>>>>>>>>>>>> protocols each with their different metadata. The > coordinator > >>>>> will > >>>>>>>>>>> pick the > >>>>>>>>>>>>>> common one that everyone supports (if there are no common > >> one, > >>>>> it > >>>>>>>>> will > >>>>>>>>>>> send > >>>>>>>>>>>>>> an error back; if there are multiple ones, it will pick the > >> one > >>>>>>>> with > >>>>>>>>>>> most > >>>>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded list). > >>>>> Since > >>>>>>>> our > >>>>>>>>>>>>>> current Streams rebalance protocol is still based on the > >>>>> consumer > >>>>>>>>>>>>>> coordinator, it means our protocol_type would be "consumer", > >>> but > >>>>>>>>>>> instead > >>>>>>>>>>>>>> the protocol type we can have multiple protocols like > >>> "streams", > >>>>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we need > >> to > >>>>>>>>>>> implement a > >>>>>>>>>>>>>> different assignor class for each version and register all > of > >>>>> them > >>>>>>>> in > >>>>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the > >> future > >>>>> if > >>>>>>>> we > >>>>>>>>>>>>>> re-factor our implementation to have our own client > >> coordinator > >>>>>>>> layer > >>>>>>>>>>> like > >>>>>>>>>>>>>> Connect did, we can simplify this part of the > implementation. > >>>>> But > >>>>>>>>> even > >>>>>>>>>>> for > >>>>>>>>>>>>>> now with the above approach this is still doable. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On the broker side, the group coordinator will only persist > a > >>>>>> group > >>>>>>>>>>> with > >>>>>>>>>>>>>> the selected protocol and its subscription metadata, e.g. if > >>>>>>>>>>> coordinator > >>>>>>>>>>>>>> decides to pick "streams_v2" it will only sends that > >> protocol's > >>>>>>>>>>> metadata > >>>>>>>>>>>>>> from everyone to the leader to assign, AND when completing > >> the > >>>>>>>>>>> rebalance it > >>>>>>>>>>>>>> will also only write the group metadata with that protocol > >> and > >>>>> the > >>>>>>>>>>>>>> assignment only. In a word, although the network traffic > >> maybe > >>>>>>>>>>> increased a > >>>>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One corner > >>>>>>>> situation > >>>>>>>>> we > >>>>>>>>>>>>>> need to consider is how to stop registering very old > >> assignors > >>>>> to > >>>>>>>>>>> avoid the > >>>>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if you > are > >>>>>>>> rolling > >>>>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1 > >>>>> assignor > >>>>>>>>>>> anymore, > >>>>>>>>>>>>>> but that would unfortunately still require some configs. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3) About the "version probing" idea, I think that's a > >>> promising > >>>>>>>>>>> approach > >>>>>>>>>>>>>> as well, but if we are going to do the multi-assignment its > >>>>> value > >>>>>>>>> seems > >>>>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top of > >>>>>>>>>>> multi-assignment > >>>>>>>>>>>>>> to save us from still requiring the config to avoid > >> registering > >>>>>> all > >>>>>>>>> the > >>>>>>>>>>>>>> metadata for all version. More details: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In the JoinGroupRequest, we still register all the assignor > >> but > >>>>>> for > >>>>>>>>>>> all old > >>>>>>>>>>>>>> assignors we do not encode any metadata, i.e. the encoded > >> data > >>>>>>>> would > >>>>>>>>>>> be: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> "streams_vN" : "encoded metadata" > >>>>>>>>>>>>>> "streams_vN-1":empty > >>>>>>>>>>>>>> "streams_vN-2":empty > >>>>>>>>>>>>>> .. > >>>>>>>>>>>>>> "streams_0":empty > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> So the coordinator can still safely choose the latest common > >>>>>>>> version; > >>>>>>>>>>> and > >>>>>>>>>>>>>> then when leaders receive the subscription (note it should > >>>>> always > >>>>>>>>>>> recognize > >>>>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of the > >>>>>>>>>>> subscriptions > >>>>>>>>>>>>>> are empty bytes, it will send the empty assignment with that > >>>>>>>> version > >>>>>>>>>>> number > >>>>>>>>>>>>>> encoded in the metadata. So in the second auto-triggered all > >>>>>>>> members > >>>>>>>>>>> would > >>>>>>>>>>>>>> send the metadata with that version: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> "streams_vN" : empty > >>>>>>>>>>>>>> "streams_vN-1" : empty > >>>>>>>>>>>>>> "streams_vN-2" : "encoded metadata" > >>>>>>>>>>>>>> .. > >>>>>>>>>>>>>> "streams_0":empty > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> By doing this we would not require any configs for users. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear > >> about > >>>>>> the > >>>>>>>>>>> details > >>>>>>>>>>>>>> so probably we'd need to fill that out before making a call. > >>> For > >>>>>>>>>>> example, > >>>>>>>>>>>>>> you mentioned "If we detect this situation, the Streams > >>>>>> application > >>>>>>>>>>> closes > >>>>>>>>>>>>>> corresponding active tasks as well as "hot standby" tasks, > >> and > >>>>>>>>>>> re-creates > >>>>>>>>>>>>>> the new active tasks using the new store." How could we > >>>>> guarantee > >>>>>>>>> that > >>>>>>>>>>> the > >>>>>>>>>>>>>> gap between these two stores will keep decreasing than > >>>>> increasing > >>>>>>>> so > >>>>>>>>>>> we'll > >>>>>>>>>>>>>> eventually achieve the flip point? And also the longer we > are > >>>>>>>> before > >>>>>>>>>>> the > >>>>>>>>>>>>>> flip point, the larger we are doubling the storage space, > >> etc. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax < > >>>>>>>>>>> matth...@confluent.io> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> @John, Guozhang, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> thanks a lot for your comments. Very long reply... > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> About upgrading the rebalance metadata: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Another possibility to do this, would be to register > >> multiple > >>>>>>>>>>> assignment > >>>>>>>>>>>>>>> strategies for the 1.2 applications. For this case, new > >>>>> instances > >>>>>>>>>>> would > >>>>>>>>>>>>>>> be configured to support both and the broker would pick the > >>>>>>>> version > >>>>>>>>>>> that > >>>>>>>>>>>>>>> all instances understand. The disadvantage would be, that > we > >>>>> send > >>>>>>>>> much > >>>>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as long > >> as > >>>>> no > >>>>>>>>>>> second > >>>>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, using > >> this > >>>>>>>>>>> approach > >>>>>>>>>>>>>>> would allow to avoid a second rebalance trading-off an > >>>>> increased > >>>>>>>>>>>>>>> rebalance network footprint (I also assume that this would > >>>>>>>> increase > >>>>>>>>>>> the > >>>>>>>>>>>>>>> message size that is written into __consumer_offsets > >> topic?). > >>>>>>>>>>> Overall, I > >>>>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it could > >>>>> avoid > >>>>>> a > >>>>>>>>>>>>>>> second rebalance (I have some more thoughts about stores > >> below > >>>>>>>> that > >>>>>>>>>>> are > >>>>>>>>>>>>>>> relevant for single rebalance upgrade). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> For future upgrades we might be able to fix this though. I > >> was > >>>>>>>>>>> thinking > >>>>>>>>>>>>>>> about the following: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> In the current implementation, the leader fails if it gets > a > >>>>>>>>>>>>>>> subscription it does not understand (ie, newer version). We > >>>>> could > >>>>>>>>>>> change > >>>>>>>>>>>>>>> this behavior and let the leader send an empty assignment > >> plus > >>>>>>>> error > >>>>>>>>>>>>>>> code (including supported version) back to the instance > >>> sending > >>>>>>>> the > >>>>>>>>>>>>>>> "bad" subscription. This would allow the following logic > for > >>> an > >>>>>>>>>>>>>>> application instance: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - on startup, always send the latest subscription format > >>>>>>>>>>>>>>> - if leader understands it, we get an assignment back an > >>> start > >>>>>>>>>>> processing > >>>>>>>>>>>>>>> - if leader does not understand it, we get an empty > >>> assignment > >>>>>>>> and > >>>>>>>>>>>>>>> supported version back > >>>>>>>>>>>>>>> - the application unsubscribe()/subscribe()/poll() again > >> and > >>>>>>>>> sends a > >>>>>>>>>>>>>>> subscription using the leader's supported version > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> This protocol would allow to do a single rolling bounce, > and > >>>>>>>>>>> implements > >>>>>>>>>>>>>>> a "version probing" step, that might result in two executed > >>>>>>>>>>> rebalances. > >>>>>>>>>>>>>>> The advantage would be, that the user does not need to set > >> any > >>>>>>>>> configs > >>>>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care of > >> this > >>>>>>>>>>>>>>> automatically. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> One disadvantage would be, that two rebalances happen and > >> that > >>>>>> for > >>>>>>>>> an > >>>>>>>>>>>>>>> error case during rebalance, we loose the information about > >>> the > >>>>>>>>>>>>>>> supported leader version and the "probing step" would > >> happen a > >>>>>>>>> second > >>>>>>>>>>> time. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> If the leader is eventually updated, it will include it's > >> own > >>>>>>>>>>> supported > >>>>>>>>>>>>>>> version in all assignments, to allow a "down graded" > >>>>> application > >>>>>>>> to > >>>>>>>>>>>>>>> upgrade its version later. Also, if a application fails, > the > >>>>>> first > >>>>>>>>>>>>>>> probing would always be successful and only a single > >> rebalance > >>>>>>>>>>> happens. > >>>>>>>>>>>>>>> If we use this protocol, I think we don't need any > >>>>> configuration > >>>>>>>>>>>>>>> parameter for future upgrades. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version": > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Users would set "upgrade.from" to the release version the > >>>>>>>>> current/old > >>>>>>>>>>>>>>> application is using. I think this is simpler, as users > know > >>>>> this > >>>>>>>>>>>>>>> version. If we use "internal.protocol.version" instead, we > >>>>> expose > >>>>>>>>>>>>>>> implementation details and users need to know the protocol > >>>>>> version > >>>>>>>>>>> (ie, > >>>>>>>>>>>>>>> they need to map from the release version to the protocol > >>>>>> version; > >>>>>>>>> ie, > >>>>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version > >> 2"). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Also the KIP states that for the second rolling bounce, the > >>>>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- and > >>> thus, > >>>>>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored (I > >>> will > >>>>>>>>> update > >>>>>>>>>>>>>>> the KIP to point out this dependency). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> About your second point: I'll update the KIP accordingly to > >>>>>>>> describe > >>>>>>>>>>>>>>> future updates as well. Both will be different. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> One more point about upgrading the store format. I was > >>> thinking > >>>>>>>>> about > >>>>>>>>>>>>>>> avoiding the second rolling bounce all together in the > >> future: > >>>>>> (1) > >>>>>>>>> the > >>>>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this > >>>>>> required > >>>>>>>>> to > >>>>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the > switch > >>>>> and > >>>>>>>>>>> delete > >>>>>>>>>>>>>>> the old stores. (3) the current proposal does the switch > >>>>>>>> "globally" > >>>>>>>>> -- > >>>>>>>>>>>>>>> this is simpler and due to the required second rebalance no > >>>>>>>>>>> disadvantage. > >>>>>>>>>>>>>>> However, a global consistent switch over might actually not > >> be > >>>>>>>>>>> required. > >>>>>>>>>>>>>>> For "in_place" upgrade, following the protocol from above, > >> we > >>>>>>>> could > >>>>>>>>>>>>>>> decouple the store switch and each instance could switch > its > >>>>>> store > >>>>>>>>>>>>>>> independently from all other instances. After the rolling > >>>>> bounce, > >>>>>>>> it > >>>>>>>>>>>>>>> seems to be ok to switch from the old store to the new > store > >>>>>>>> "under > >>>>>>>>>>> the > >>>>>>>>>>>>>>> hood" whenever the new store is ready (this could even be > >>> done, > >>>>>>>>> before > >>>>>>>>>>>>>>> we switch to the new metadata version). Each time we update > >>> the > >>>>>>>> "hot > >>>>>>>>>>>>>>> standby" we check if it reached the "endOffset" (or maybe > >> X% > >>>>>> that > >>>>>>>>>>> could > >>>>>>>>>>>>>>> either be hardcoded or configurable). If we detect this > >>>>>> situation, > >>>>>>>>> the > >>>>>>>>>>>>>>> Streams application closes corresponding active tasks as > >> well > >>>>> as > >>>>>>>>> "hot > >>>>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks using > >> the > >>>>> new > >>>>>>>>>>> store. > >>>>>>>>>>>>>>> (I need to go through the details once again, but it seems > >> to > >>>>> be > >>>>>>>>>>>>>>> feasible.). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Combining this strategy with the "multiple assignment" > idea, > >>>>>> might > >>>>>>>>>>> even > >>>>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1 > -> > >>>>> 1.2. > >>>>>>>>>>>>>>> Applications would just use the old store, as long as the > >> new > >>>>>>>> store > >>>>>>>>> is > >>>>>>>>>>>>>>> not ready, even if the new metadata version is used > already. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> For future upgrades, a single rebalance would be > sufficient, > >>>>> too, > >>>>>>>>> even > >>>>>>>>>>>>>>> if the stores are upgraded. We would not need any config > >>>>>>>> parameters > >>>>>>>>> as > >>>>>>>>>>>>>>> the "probe" step allows us to detect the supported > rebalance > >>>>>>>>> metadata > >>>>>>>>>>>>>>> version (and we would also not need multiple "assigmnent > >>>>>>>> strategies" > >>>>>>>>>>> as > >>>>>>>>>>>>>>> out own protocol encoded everything we need). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Let me know what you think. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote: > >>>>>>>>>>>>>>>> @John: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> For the protocol version upgrade, it is only for the > >> encoded > >>>>>>>>> metadata > >>>>>>>>>>>>>>> bytes > >>>>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from > Consumer's > >>>>> pov, > >>>>>>>>> so I > >>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>> this change should be in the Streams layer as well. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> @Matthias: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> for 2), I agree that adding a "newest supported version" > >>>>> besides > >>>>>>>>> the > >>>>>>>>>>>>>>>> "currently used version for encoding" is a good idea to > >> allow > >>>>>>>>> either > >>>>>>>>>>>>>>> case; > >>>>>>>>>>>>>>>> the key is that in Streams we would likely end up with a > >>>>> mapping > >>>>>>>>>>> from the > >>>>>>>>>>>>>>>> protocol version to the other persistent data format > >> versions > >>>>>>>> such > >>>>>>>>> as > >>>>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually > >>> achieve > >>>>>>>> both > >>>>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded > >>> protocol > >>>>>>>>>>> version's > >>>>>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 -> > >>>>> 0.10.1 > >>>>>>>>>>> leaders > >>>>>>>>>>>>>>>> can choose to use the newer version in the first rolling > >>>>> bounce > >>>>>>>>>>> directly > >>>>>>>>>>>>>>>> and we can document to users that they would not need to > >> set > >>>>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded > >>>>>>>> protocol > >>>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, > and > >>>>> then > >>>>>>>> we > >>>>>>>>>>> can > >>>>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the first > >>>>>> rolling > >>>>>>>>>>> bounce > >>>>>>>>>>>>>>>> and reset in the second. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Besides that, some additional comments: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for > users > >>> to > >>>>>>>> set > >>>>>>>>>>> than > >>>>>>>>>>>>>>>> "internal.protocol.version" where for the latter users > only > >>>>> need > >>>>>>>> to > >>>>>>>>>>> set a > >>>>>>>>>>>>>>>> single version, while the Streams will map that version to > >>> the > >>>>>>>>>>> Streams > >>>>>>>>>>>>>>>> assignor's behavior as well as the data format. But maybe > I > >>>>> did > >>>>>>>> not > >>>>>>>>>>> get > >>>>>>>>>>>>>>>> your idea about how the "upgrade.from" config will be > set, > >>>>>>>> because > >>>>>>>>>>> in > >>>>>>>>>>>>>>>> your Compatibility section how the upgrade.from config > will > >>> be > >>>>>>>> set > >>>>>>>>>>> for > >>>>>>>>>>>>>>>> these two rolling bounces are not very clear: for example, > >>>>>> should > >>>>>>>>>>> user > >>>>>>>>>>>>>>>> reset it to null in the second rolling bounce? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 2) In the upgrade path description, rather than talking > >> about > >>>>>>>>>>> specific > >>>>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just > >> categorize > >>>>> all > >>>>>>>>> the > >>>>>>>>>>>>>>>> possible scenarios, even for future upgrade versions, what > >>>>>> should > >>>>>>>>> be > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> standard operations? The categorized we can summarize to > >>> would > >>>>>> be > >>>>>>>>>>>>>>> (assuming > >>>>>>>>>>>>>>>> user upgrade from version X to version Y, where X and Y > are > >>>>>> Kafka > >>>>>>>>>>>>>>> versions, > >>>>>>>>>>>>>>>> with the corresponding supported protocol version x and > y): > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and > >> hence > >>>>> no > >>>>>>>>>>>>>>> persistent > >>>>>>>>>>>>>>>> data formats have changed. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> b. x != y, but all persistent data format remains the > same. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB > >>> format, > >>>>>>>>>>> changelog > >>>>>>>>>>>>>>>> format, has been changed. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> c. special case: we may need some special handling logic > >> when > >>>>>>>>>>> "current > >>>>>>>>>>>>>>>> version" or "newest supported version" are not available > in > >>>>> the > >>>>>>>>>>> protocol, > >>>>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> under the above scenarios, how many rolling bounces users > >>> need > >>>>>> to > >>>>>>>>>>>>>>> execute? > >>>>>>>>>>>>>>>> how they should set the configs in each rolling bounce? > and > >>>>> how > >>>>>>>>>>> Streams > >>>>>>>>>>>>>>>> library will execute in these cases? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax < > >>>>>>>>>>> matth...@confluent.io> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Ted, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I still consider changing the KIP to include it right > away > >>> -- > >>>>>> if > >>>>>>>>>>> not, > >>>>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more > >> detail > >>>>>>>> first. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> (Same for other open questions like interface names -- I > >>>>>> collect > >>>>>>>>>>>>>>>>> feedback and update the KIP after we reach consensus :)) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote: > >>>>>>>>>>>>>>>>>> Thanks for the details, Matthias. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future > >> release, > >>>>>>>>> encoding > >>>>>>>>>>>>>>> both > >>>>>>>>>>>>>>>>> used > >>>>>>>>>>>>>>>>>> and supported version might be an advantage > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be > implemented > >>> in > >>>>>>>> this > >>>>>>>>>>> KIP. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding > >> proposal. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Cheers > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax < > >>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of > user > >>>>>> code. > >>>>>>>>> And > >>>>>>>>>>>>>>> even > >>>>>>>>>>>>>>>>>>> if we want to add something like this, I would prefer > to > >>> do > >>>>>> it > >>>>>>>>> in > >>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> separate KIP. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling bounce. > >> But > >>>>>>>> from > >>>>>>>>> my > >>>>>>>>>>>>>>>>>>> understanding it would not be possible. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to > >>>>> switch > >>>>>>>>> from > >>>>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to switch > >>>>> from > >>>>>>>> the > >>>>>>>>>>> old > >>>>>>>>>>>>>>>>>>> store to the new store (this happens after the last > >>>>> instance > >>>>>>>>>>> bounces a > >>>>>>>>>>>>>>>>>>> second time). > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is, that > >>> it's > >>>>>>>>>>> unclear > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>> to which from version 2 to version 3. The > >>>>>>>>>>> StreamsPartitionsAssignor is > >>>>>>>>>>>>>>>>>>> stateless by design, and thus, the information which > >>>>> version > >>>>>>>> it > >>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>> use must be passed in from externally -- and we want to > >>> use > >>>>>>>> the > >>>>>>>>>>>>>>>>>>> StreamsConfig to pass in this information. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> During upgrade, all new instanced have no information > >>> about > >>>>>>>> the > >>>>>>>>>>>>>>> progress > >>>>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got > >> upgrades > >>>>>>>>>>> already). > >>>>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3 > >>>>>>>>>>> subscription. > >>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>> leader also has this limited view on the world and can > >>> only > >>>>>>>> send > >>>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>>> 2 assignments back. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can > simplify > >>>>> the > >>>>>>>>>>> upgrade. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> We did consider to change the metadata to make later > >>>>> upgrades > >>>>>>>>> (ie, > >>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the > >>>>>>>> metadata > >>>>>>>>> or > >>>>>>>>>>>>>>>>>>> storage format again -- as long as we don't change it, > a > >>>>>>>> single > >>>>>>>>>>>>>>> rolling > >>>>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and > >>>>>>>> "supported > >>>>>>>>>>>>>>>>>>> version". This would allow the leader to switch to the > >> new > >>>>>>>>> version > >>>>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader would > >>>>> receive > >>>>>>>>> "used > >>>>>>>>>>>>>>>>>>> version == old" and "supported version = old/new" -- as > >>>>> long > >>>>>>>> as > >>>>>>>>> at > >>>>>>>>>>>>>>> least > >>>>>>>>>>>>>>>>>>> one instance sends a "supported version = old" leader > >>> sends > >>>>>>>> old > >>>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>>> assignment back. However, encoding both version would > >>> allow > >>>>>>>> that > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> leader can send a new version assignment back, right > >> after > >>>>>> the > >>>>>>>>>>> first > >>>>>>>>>>>>>>>>>>> round or rebalance finished (all instances send > >> "supported > >>>>>>>>>>> version = > >>>>>>>>>>>>>>>>>>> new"). However, there are still two issues with this: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the last > >>>>>>>> instance > >>>>>>>>>>>>>>> bounced, > >>>>>>>>>>>>>>>>>>> the new stores might not be ready to be used -- this > >> could > >>>>>>>> lead > >>>>>>>>> to > >>>>>>>>>>>>>>>>>>> "downtime" as store must be restored before processing > >> can > >>>>>>>>> resume. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. At > >>> this > >>>>>>>>>>> point, the > >>>>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and > thus > >>>>>> sends > >>>>>>>>>>> the old > >>>>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to never > >>> fall > >>>>>>>> back > >>>>>>>>>>> to > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> old protocol after the switch to the new protocol. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The second issue is minor and I guess if users set-up > >> the > >>>>>>>>> instance > >>>>>>>>>>>>>>>>>>> properly it could be avoided. However, the first issue > >>>>> would > >>>>>>>>>>> prevent > >>>>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we > >> consider > >>>>>>>> that > >>>>>>>>> we > >>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>>>>> change the metadata protocol only if a future release, > >>>>>>>> encoding > >>>>>>>>>>> both > >>>>>>>>>>>>>>>>>>> used and supported version might be an advantage in the > >>>>>> future > >>>>>>>>>>> and we > >>>>>>>>>>>>>>>>>>> could consider to add this information in 1.2 release > to > >>>>>>>> prepare > >>>>>>>>>>> for > >>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to give > >> the > >>>>>>>>>>> instances > >>>>>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If you > >>>>> would > >>>>>>>> do > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> second rolling bounce before this, it would still work > >> -- > >>>>>>>>>>> however, you > >>>>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be fully > >>>>>>>> restored > >>>>>>>>>>>>>>> before > >>>>>>>>>>>>>>>>>>> processing can resume. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Does this make sense? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote: > >>>>>>>>>>>>>>>>>>>> Matthias, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid > of > >>>>> the > >>>>>>>> 2nd > >>>>>>>>>>>>>>> rolling > >>>>>>>>>>>>>>>>>>> bounce? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary > >>> difference > >>>>>>>>>>> between > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to > >> decide > >>>>>>>>>>> whether to > >>>>>>>>>>>>>>>>> send > >>>>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3. > >>>>> (Actually, > >>>>>>>>>>> there is > >>>>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says that > >> the > >>>>>> 2nd > >>>>>>>>>>> rolling > >>>>>>>>>>>>>>>>>>> bounce should happen after all new state stores are > >>> created > >>>>>> by > >>>>>>>>> the > >>>>>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling > >> bounce, > >>>>> we > >>>>>>>>> say > >>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> there is still a background thread, so it seems like is > >> no > >>>>>>>>> actual > >>>>>>>>>>>>>>>>>>> requirement to wait for the new state stores to be > >>>>> created.) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal with > >>>>>>>>> mixed-mode > >>>>>>>>>>>>>>>>> (having > >>>>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer > >> group). > >>>>> It > >>>>>>>>> seems > >>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic > >>>>>>>>>>> (somehow/somewhere) > >>>>>>>>>>>>>>>>> such > >>>>>>>>>>>>>>>>>>> that: > >>>>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all > >>>>> instances > >>>>>>>> are > >>>>>>>>>>>>>>> running > >>>>>>>>>>>>>>>>>>> the new code. > >>>>>>>>>>>>>>>>>>>> * Once all the instances are running the new code, > then > >>>>> one > >>>>>>>> at > >>>>>>>>> a > >>>>>>>>>>>>>>> time, > >>>>>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader > >> still > >>>>>>>> hands > >>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores are > >>> ready. > >>>>>>>>>>>>>>>>>>>> * Once all instances report that new stores are ready, > >>>>>> Leader > >>>>>>>>>>> sends > >>>>>>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>> Assignment Version 3. > >>>>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3, > it > >>>>> can > >>>>>>>>>>> delete > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> old state store. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot of > >>>>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling > >>>>>>>> restarts. > >>>>>>>>> No > >>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy > >> it, > >>>>> and > >>>>>>>>> the > >>>>>>>>>>>>>>>>> instances > >>>>>>>>>>>>>>>>>>> update themselves. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> What do you think? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The thing that made me think of this is that the "2 > >>>>> rolling > >>>>>>>>>>> bounces" > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in > >>>>>>>>>>>>>>>>>>> inter.broker.protocol.version and > >>>>> log.message.format.version. > >>>>>>>>> And > >>>>>>>>>>> in > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> broker case, it seems like it would be possible (with > >> some > >>>>>>>> work > >>>>>>>>> of > >>>>>>>>>>>>>>>>> course) > >>>>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar > auto-detection > >>> of > >>>>>>>>> broker > >>>>>>>>>>>>>>>>>>> capabilities and automatically do a switchover from > >>> old/new > >>>>>>>>>>> versions. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> -James > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck < > >>>>>> bbej...@gmail.com > >>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Matthias, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval > methods > >>> on > >>>>>>>> the > >>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>> interfaces. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a > >>> Predicate > >>>>>>>> that > >>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored with > the > >>>>>>>> record? > >>>>>>>>>>> Or > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>> better left for users to implement themselves once > the > >>>>> data > >>>>>>>>> has > >>>>>>>>>>> been > >>>>>>>>>>>>>>>>>>>>> retrieved? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>> Bill > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu < > >>>>>> yuzhih...@gmail.com > >>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Matthias: > >>>>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to which > >>>>>>>>> separator > >>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> chosen. > >>>>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current choice > is > >>>>>> good. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is > >>> closer > >>>>>> to > >>>>>>>>>>> English > >>>>>>>>>>>>>>>>>>>>>> grammar. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people think. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax < > >>>>>>>>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the comments! > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> @Guozhang: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance metadata > >>>>>> upgrade > >>>>>>>>> fix > >>>>>>>>>>>>>>>>>>>>>>> (addressing the mentioned > >>>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) > >> It > >>>>>>>> give a > >>>>>>>>>>> first > >>>>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works including > >> a > >>>>>>>> system > >>>>>>>>>>> test: > >>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636 > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. I > >>>>> agree > >>>>>>>>> that > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to give > >>>>> better > >>>>>>>>>>>>>>> discussion > >>>>>>>>>>>>>>>>>>>>>>> context. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> @Ted: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the > >>>>>>>>>>>>>>> `processing.guarantee` > >>>>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and > >>>>> `exactly_once` > >>>>>>>> as > >>>>>>>>>>>>>>> values. > >>>>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash > >> but > >>> I > >>>>>>>>> prefer > >>>>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can > >> also > >>>>>>>> change > >>>>>>>>>>> it to > >>>>>>>>>>>>>>>>>>> `-`. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way -- I > >>>>>>>> stripped > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> `With` > >>>>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good to > >>> get > >>>>>>>>>>> feedback > >>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> @John: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would > >> not > >>>>>>>> make a > >>>>>>>>>>>>>>>>>>> difference. > >>>>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two > >>> version > >>>>>>>>> might > >>>>>>>>>>>>>>>>>>> introduce > >>>>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I don't > >> hit > >>>>> an > >>>>>>>>>>> issue > >>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead of > >>>>>>>>>>> `rocksdb-v2` > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: > >>>>>>>>>>>>>>>>>>>>>>>> Hey Matthias, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several questions > >>>>> queued > >>>>>>>>> up, > >>>>>>>>>>> but > >>>>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>>>>>>> were > >>>>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... oh, > >>>>> well. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state > >>> directory > >>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>> "/<state.dir>/< > >>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to > >>>>>>>>>>> "/<state.dir>/< > >>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": > >> if > >>>>>> you > >>>>>>>>>>> put the > >>>>>>>>>>>>>>>>>>> "v2" > >>>>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e., > >>>>>>>>>>> "/<state.dir>/< > >>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id > >/rocksdb/storeName-v2/"), > >>>>> then > >>>>>>>>> you > >>>>>>>>>>> get > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level directory > >>>>>>>>> structure. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people > >> running > >>>>>>>>>>> scripts to > >>>>>>>>>>>>>>>>>>>>>> monitor > >>>>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such > use > >>>>>>>> cases. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu < > >>>>>>>>> yuzhih...@gmail.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Matthias: > >>>>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore > >> may > >>>>>>>>>>> sometimes > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>> miss > >>>>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more > friendly > >>> to > >>>>>>>>> user. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> public interface > ReadOnlyKeyValueTimestampStore<K, > >>> V> > >>>>>> { > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name > >>> for > >>>>>>>> the > >>>>>>>>>>>>>>> class ? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang < > >>>>>>>>>>>>>>> wangg...@gmail.com > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section and > >> it > >>>>>>>> looks > >>>>>>>>>>> good > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> me, > >>>>>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also > share > >>> it > >>>>>>>> here > >>>>>>>>>>> so > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>> people > >>>>>>>>>>>>>>>>>>>>>>>>>> can take a look? > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like > >> this > >>>>>>>> there > >>>>>>>>>>> are > >>>>>>>>>>>>>>>>> always > >>>>>>>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is > >>> better > >>>>>> to > >>>>>>>>>>> have > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the design > >>>>>>>>>>> discussion :) > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax > < > >>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API > to > >>>>>> allow > >>>>>>>>>>> storing > >>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the > basis > >>> to > >>>>>>>>>>> resolve > >>>>>>>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests). > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this! > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >>> > >> > > > >