Hi Matthias, Thanks for the updates to the KIP. I've just read it over, and am personally quite happy with it.
Thanks for tackling this dicey issue and putting in a huge amount of design work to produce a smooth upgrade path for DSL users. Thanks, -John On Mon, Dec 17, 2018 at 10:35 AM Matthias J. Sax <matth...@confluent.io> wrote: > Dear all, > > I finally managed to update the KIP. > > To address the concerns about the complex upgrade path, I simplified the > design. We don't need any configs and the upgrade can be done with the > simple single rolling bounce pattern. > > The suggestion is to exploit RocksDB column families to isolate old and > new on-disk format. Furthermore, the upgrade from old to new format > happens "on the side" after an instance was upgraded. > > I also pushed a WIP PR in case you want to look into some details > (potential reviewers, don't panic: I plan to break this down into > multiple PRs for actual review if the KIP is accepted). > > https://github.com/apache/kafka/pull/6044 > > @Eno: I think I never answered your question about being future proof: > > The latest design is not generic, because it does not support changes > that need to be reflected in the changelog topic. I aimed for a > non-generic design for now to keep it as simple as possible. Thus, other > format changes might need a different design / upgrade path -- however, > because this KIP is quite encapsulated in the current design, I don't > see any issue to build this later and a generic upgrade path seems to be > an orthogonal concern atm. > > > -Matthias > > > On 11/22/18 2:50 PM, Adam Bellemare wrote: > > Thanks for the information Matthias. > > > > I will await your completion of this ticket then since it underpins the > > essential parts of a RocksDB TTL aligned with the changelog topic. I am > > eager to work on that ticket myself, so if I can help on this one in any > > way please let me know. > > > > Thanks > > Adam > > > > > > > > On Tue, Nov 20, 2018 at 5:26 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> It's an interesting idea to use second store, to maintain the > >> timestamps. However, each RocksDB instance implies some overhead. In > >> fact, we are looking into ColumnFamilies atm to see if we can use those > >> and merge multiple RocksDBs into a single one to reduce this overhead. > >> > >> -Matthias > >> > >> On 11/20/18 5:15 AM, Patrik Kleindl wrote: > >>> Hi Adam > >>> > >>> Sounds great, I was already planning to ask around if anyone had > tackled > >>> this. > >>> We have a use case very similar to what you described in KAFKA-4212, > only > >>> with Global State Stores. > >>> I have tried a few things with the normal DSL but was not really > >> successful. > >>> Schedule/Punctuate is not possible, supplying a windowed store is also > >> not > >>> allowed and the process method has no knowledge of the timestamp of the > >>> record. > >>> And anything loaded on startup is not filtered anyway. > >>> > >>> Regarding 4212, wouldn't it be easier (although a little less > >>> space-efficient) to track the Timestamps in a separate Store with <K, > >> Long> > >>> ? > >>> This would leave the original store intact and allow a migration of the > >>> timestamps without touching the other data. > >>> > >>> So I am very interested in your PR :-) > >>> > >>> best regards > >>> > >>> Patrik > >>> > >>> On Tue, 20 Nov 2018 at 04:46, Adam Bellemare <adam.bellem...@gmail.com > > > >>> wrote: > >>> > >>>> Hi Matthias > >>>> > >>>> Thanks - I figured that it was probably a case of just too much to do > >> and > >>>> not enough time. I know how that can go. I am asking about this one in > >>>> relation to https://issues.apache.org/jira/browse/KAFKA-4212, adding > a > >> TTL > >>>> to RocksDB. I have outlined a bit about my use-case within 4212, but > for > >>>> brevity here it is: > >>>> > >>>> My case: > >>>> 1) I have a RocksDB with TTL implementation working where records are > >> aged > >>>> out using the TTL that comes with RocksDB (very simple). > >>>> 2) We prevent records from loading from the changelog if recordTime + > >> TTL < > >>>> referenceTimeStamp (default = System.currentTimeInMillis() ). > >>>> > >>>> This assumes that the records are stored with the same time reference > >> (say > >>>> UTC) as the consumer materializing the RocksDB store. > >>>> > >>>> My questions about KIP-258 are as follows: > >>>> 1) How does "we want to be able to store record timestamps in KTables" > >>>> differ from inserting records into RocksDB with TTL at consumption > >> time? I > >>>> understand that it could be a difference of some seconds, minutes, > >> hours, > >>>> days etc between when the record was published and now, but given the > >>>> nature of how RocksDB TTL works (eventual - based on compaction) I > don't > >>>> see how a precise TTL can be achieved, such as that which one can get > >> with > >>>> windowed stores. > >>>> > >>>> 2) Are you looking to change how records are inserted into a TTL > >> RocksDB, > >>>> such that the TTL would take effect from the record's published time? > If > >>>> not, what would be the ideal workflow here for a single record with > TTL > >>>> RocksDB? > >>>> ie: Record Timestamp: 100 > >>>> TTL: 50 > >>>> Record inserted into rocksDB: 110 > >>>> Record to expire at 150? > >>>> > >>>> 3) I'm not sure I fully understand the importance of the upgrade > path. I > >>>> have read the link to ( > https://issues.apache.org/jira/browse/KAFKA-3522 > >> ) > >>>> in > >>>> the KIP, and I can understand that a state-store on disk may not > >> represent > >>>> what the application is expecting. I don't think I have the full > picture > >>>> though, because that issue seems to be easy to fix with a simple > >> versioned > >>>> header or accompanying file, forcing the app to rebuild the state if > the > >>>> version is incompatible. Can you elaborate or add a scenario to the > KIP > >>>> that illustrates the need for the upgrade path? > >>>> > >>>> Thanks, > >>>> > >>>> Adam > >>>> > >>>> > >>>> > >>>> > >>>> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax < > matth...@confluent.io> > >>>> wrote: > >>>> > >>>>> Adam, > >>>>> > >>>>> I am still working on it. Was pulled into a lot of other tasks lately > >> so > >>>>> this was delayed. Also had some discussions about simplifying the > >>>>> upgrade path with some colleagues and I am prototyping this atm. Hope > >> to > >>>>> update the KIP accordingly soon. > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 11/10/18 7:41 AM, Adam Bellemare wrote: > >>>>>> Hello Matthias > >>>>>> > >>>>>> I am curious as to the status of this KIP. TTL and expiry of records > >>>> will > >>>>>> be extremely useful for several of our business use-cases, as well > as > >>>>>> another KIP I had been working on. > >>>>>> > >>>>>> Thanks > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska < > eno.there...@gmail.com > >>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Matthias, > >>>>>>> > >>>>>>> Good stuff. Could you comment a bit on how future-proof is this > >>>> change? > >>>>> For > >>>>>>> example, if we want to store both event timestamp "and" processing > >>>> time > >>>>> in > >>>>>>> RocksDB will we then need another interface (e.g. called > >>>>>>> KeyValueWithTwoTimestampsStore)? > >>>>>>> > >>>>>>> Thanks > >>>>>>> Eno > >>>>>>> > >>>>>>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax < > >>>> matth...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Thanks for your input Guozhang and John. > >>>>>>>> > >>>>>>>> I see your point, that the upgrade API is not simple. If you don't > >>>>>>>> thinks it's valuable to make generic store upgrades possible > (atm), > >>>> we > >>>>>>>> can make the API internal, too. The impact is, that we only > support > >> a > >>>>>>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to > >>>>>>>> WindowedWithTS etc) for which we implement the internal > interfaces. > >>>>>>>> > >>>>>>>> We can keep the design generic, so if we decide to make it public, > >> we > >>>>>>>> don't need to re-invent it. This will also have the advantage, > that > >>>> we > >>>>>>>> can add upgrade pattern for other stores later, too. > >>>>>>>> > >>>>>>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, but > >> it > >>>>>>>> was the only way I could find to design a generic upgrade > interface. > >>>> If > >>>>>>>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` > >> would > >>>>>>>> become an internal interface I guess (don't think we can remove > it). > >>>>>>>> > >>>>>>>> I will wait for more feedback about this and if nobody wants to > keep > >>>> it > >>>>>>>> as public API I will update the KIP accordingly. Will add some > more > >>>>>>>> clarifications for different upgrade patterns in the mean time and > >>>> fix > >>>>>>>> the typos/minor issues. > >>>>>>>> > >>>>>>>> About adding a new state UPGRADING: maybe we could do that. > However, > >>>> I > >>>>>>>> find it particularly difficult to make the estimation when we > should > >>>>>>>> switch to RUNNING, thus, I am a little hesitant. Using store > >>>> callbacks > >>>>>>>> or just logging the progress including some indication about the > >>>> "lag" > >>>>>>>> might actually be sufficient. Not sure what others think? > >>>>>>>> > >>>>>>>> About "value before timestamp": no real reason and I think it does > >>>> not > >>>>>>>> make any difference. Do you want to change it? > >>>>>>>> > >>>>>>>> About upgrade robustness: yes, we cannot control if an instance > >>>> fails. > >>>>>>>> That is what I meant by "we need to write test". The upgrade > should > >>>> be > >>>>>>>> able to continuous even is an instance goes down (and we must make > >>>> sure > >>>>>>>> that we don't end up in an invalid state that forces us to wipe > out > >>>> the > >>>>>>>> whole store). Thus, we need to write system tests that fail > >> instances > >>>>>>>> during upgrade. > >>>>>>>> > >>>>>>>> For `in_place_offline` upgrade: I don't think we need this mode, > >>>>> because > >>>>>>>> people can do this via a single rolling bounce. > >>>>>>>> > >>>>>>>> - prepare code and switch KV-Store to KVwithTs-Store > >>>>>>>> - do a single rolling bounce (don't set any upgrade config) > >>>>>>>> > >>>>>>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if > we > >>>>>>>> remove the `StoreUpgradeBuilder`) will detect that there is only > an > >>>> old > >>>>>>>> local KV store w/o TS, will start to restore the new KVwithTs > store, > >>>>>>>> wipe out the old store and replace with the new store after > restore > >>>> is > >>>>>>>> finished, and start processing only afterwards. (I guess we need > to > >>>>>>>> document this case -- will also add it to the KIP.) > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On 8/9/18 1:10 PM, John Roesler wrote: > >>>>>>>>> Hi Matthias, > >>>>>>>>> > >>>>>>>>> I think this KIP is looking really good. > >>>>>>>>> > >>>>>>>>> I have a few thoughts to add to the others: > >>>>>>>>> > >>>>>>>>> 1. You mentioned at one point users needing to configure > >>>>>>>>> `upgrade.mode="null"`. I think this was a typo and you meant to > say > >>>>>>> they > >>>>>>>>> should remove the config. If they really have to set it to a > string > >>>>>>>> "null" > >>>>>>>>> or even set it to a null value but not remove it, it would be > >>>>>>>> unfortunate. > >>>>>>>>> > >>>>>>>>> 2. In response to Bill's comment #1 , you said that "The idea is > >>>> that > >>>>>>> the > >>>>>>>>> upgrade should be robust and not fail. We need to write according > >>>>>>> tests". > >>>>>>>>> I may have misunderstood the conversation, but I don't think it's > >>>>>>> within > >>>>>>>>> our power to say that an instance won't fail. What if one of my > >>>>>>> computers > >>>>>>>>> catches on fire? What if I'm deployed in the cloud and one > instance > >>>>>>>>> disappears and is replaced by a new one? Or what if one instance > >>>> goes > >>>>>>>> AWOL > >>>>>>>>> for a long time and then suddenly returns? How will the upgrade > >>>>> process > >>>>>>>>> behave in light of such failures? > >>>>>>>>> > >>>>>>>>> 3. your thought about making in-place an offline mode is > >>>> interesting, > >>>>>>> but > >>>>>>>>> it might be a bummer for on-prem users who wish to upgrade > online, > >>>> but > >>>>>>>>> cannot just add new machines to the pool. It could be a new > upgrade > >>>>>>> mode > >>>>>>>>> "offline-in-place", though... > >>>>>>>>> > >>>>>>>>> 4. I was surprised to see that a user would need to modify the > >>>>> topology > >>>>>>>> to > >>>>>>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of > Guozhang's > >>>>>>>>> suggestions would remove this necessity. > >>>>>>>>> > >>>>>>>>> Thanks for taking on this very complex but necessary work. > >>>>>>>>> > >>>>>>>>> -John > >>>>>>>>> > >>>>>>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang < > wangg...@gmail.com> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hello Matthias, > >>>>>>>>>> > >>>>>>>>>> Thanks for the updated KIP. Some more comments: > >>>>>>>>>> > >>>>>>>>>> 1. The current set of proposed API is a bit too complicated, > which > >>>>>>> makes > >>>>>>>>>> the upgrade flow from user's perspective also a bit complex. I'd > >>>> like > >>>>>>> to > >>>>>>>>>> check different APIs and discuss about their needs separately: > >>>>>>>>>> > >>>>>>>>>> 1.a. StoreProxy: needed for in-place upgrade only, between > the > >>>>>>> first > >>>>>>>>>> and second rolling bounce, where the old-versioned stores can > >>>> handle > >>>>>>>>>> new-versioned store APIs. I think such upgrade paths (i.e. from > >> one > >>>>>>>> store > >>>>>>>>>> type to another) would not be very common: users may want to > >>>> upgrade > >>>>>>>> from a > >>>>>>>>>> certain store engine to another, but the interface would likely > be > >>>>>>>> staying > >>>>>>>>>> the same. Hence personally I'd suggest we keep it internally and > >>>> only > >>>>>>>>>> consider exposing it in the future if it does become a common > >>>>> pattern. > >>>>>>>>>> > >>>>>>>>>> 1.b. ConverterStore / RecordConverter: needed for both > >> in-place > >>>>>>> and > >>>>>>>>>> roll-over upgrade, between the first and second rolling bounces, > >>>> for > >>>>>>> the > >>>>>>>>>> new versioned store to be able to read old-versioned changelog > >>>>> topics. > >>>>>>>>>> Firstly I think we should not expose key in the public APIs but > >>>> only > >>>>>>> the > >>>>>>>>>> values, since allowing key format changes would break log > >>>> compaction, > >>>>>>>> and > >>>>>>>>>> hence would not be compatible anyways. As for value format > >> changes, > >>>>>>>>>> personally I think we can also keep its upgrade logic internally > >> as > >>>>> it > >>>>>>>> may > >>>>>>>>>> not worth generalizing to user customizable logic. > >>>>>>>>>> > >>>>>>>>>> 1.c. If you agrees with 2.a/b above, then we can also > remove " > >>>>>>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the > >> public > >>>>>>>> APIs. > >>>>>>>>>> > >>>>>>>>>> 1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" > >> is > >>>>>>> not > >>>>>>>>>> needed either given that we are exposing "ValueAndTimestamp" > >>>> anyways. > >>>>>>>> I.e. > >>>>>>>>>> it is just a syntax sugar and for IQ, users can always just set > a > >> " > >>>>>>>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" > as > >>>> the > >>>>>>>> new > >>>>>>>>>> interface does not provide any additional functions. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 2. Could we further categorize the upgrade flow for different > use > >>>>>>> cases, > >>>>>>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be used > >>>>>>>>>> automatically for non-windowed aggregate; 2) PAPI users who do > not > >>>>>>> need > >>>>>>>> to > >>>>>>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to > >> switch > >>>>> to > >>>>>>>>>> KeyValueWithTimestampStore. Just to give my understanding for > 3), > >>>> the > >>>>>>>>>> upgrade flow for users may be simplified as the following (for > >> both > >>>>>>>>>> in-place and roll-over): > >>>>>>>>>> > >>>>>>>>>> * Update the jar to new version, make code changes from > >>>>>>>> KeyValueStore > >>>>>>>>>> to KeyValueWithTimestampStore, set upgrade config. > >>>>>>>>>> > >>>>>>>>>> * First rolling bounce, and library code can internally use > >>>> proxy > >>>>>>> / > >>>>>>>>>> converter based on the specified config to handle new APIs with > >> old > >>>>>>>> stores, > >>>>>>>>>> while let new stores read from old changelog data. > >>>>>>>>>> > >>>>>>>>>> * Reset upgrade config. > >>>>>>>>>> > >>>>>>>>>> * Second rolling bounce, and the library code automatically > >>>> turn > >>>>>>> off > >>>>>>>>>> logic for proxy / converter. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 3. Some more detailed proposals are needed for when to recommend > >>>>> users > >>>>>>>> to > >>>>>>>>>> trigger the second rolling bounce. I have one idea to share > here: > >>>> we > >>>>>>>> add a > >>>>>>>>>> new state to KafkaStreams, say UPGRADING, which is set when 1) > >>>>> upgrade > >>>>>>>>>> config is set, and 2) the new stores are still ramping up (for > the > >>>>>>>> second > >>>>>>>>>> part, we can start with some internal hard-coded heuristics to > >>>> decide > >>>>>>>> when > >>>>>>>>>> it is close to be ramped up). If either one of it is not true > any > >>>>>>> more, > >>>>>>>> it > >>>>>>>>>> should transit to RUNNING. Users can then watch on this state, > and > >>>>>>>> decide > >>>>>>>>>> to only trigger the second rebalance when the state has > transited > >>>>> from > >>>>>>>>>> UPGRADING. They can also choose to cut over while the instance > is > >>>>>>> still > >>>>>>>>>> UPGRADING, the downside is that after that the application may > >> have > >>>>>>> long > >>>>>>>>>> restoration phase which is, to user's pov, unavailability > periods. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Below are just some minor things on the wiki: > >>>>>>>>>> > >>>>>>>>>> 4. "proxy story" => "proxy store". > >>>>>>>>>> > >>>>>>>>>> 5. "use the a builder " => "use a builder" > >>>>>>>>>> > >>>>>>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to the > >>>>>>> value": > >>>>>>>>>> what's the rationale of putting the timestamp before the value, > >>>> than > >>>>>>>> after > >>>>>>>>>> the value? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Guozhang > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax < > >>>>>>> matth...@confluent.io> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks for the feedback Bill. I just update the KIP with some > of > >>>>> your > >>>>>>>>>>> points. > >>>>>>>>>>> > >>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to > >>>> watch > >>>>>>> the > >>>>>>>>>>>>> restore process), I'm wondering if we want to provide a type > of > >>>>>>>>>>>>> StateRestoreListener that could signal when the new stores > have > >>>>>>>>>> reached > >>>>>>>>>>>>> parity with the existing old stores and that could be the > >> signal > >>>>> to > >>>>>>>>>>> start > >>>>>>>>>>>>> second rolling rebalance? > >>>>>>>>>>> > >>>>>>>>>>> I think we can reuse the existing listeners, thus, I did not > >>>> include > >>>>>>>>>>> anything in the KIP. About a signal to rebalance: this might be > >>>>>>> tricky. > >>>>>>>>>>> If we prepare the store "online", the active task will update > the > >>>>>>> state > >>>>>>>>>>> continuously, and thus, state prepare is never finished. It > will > >>>> be > >>>>>>> the > >>>>>>>>>>> users responsibility to do the second rebalance (note, that the > >>>>>>> second > >>>>>>>>>>> rebalance will first finish the last delta of the upgrade to > >>>> finish > >>>>>>> the > >>>>>>>>>>> upgrade before actual processing resumes). I clarified the KIP > >>>> with > >>>>>>>> this > >>>>>>>>>>> regard a little bit. > >>>>>>>>>>> > >>>>>>>>>>>>> 1. Out of N instances, one fails midway through the process, > >>>> would > >>>>>>> we > >>>>>>>>>>> allow > >>>>>>>>>>>>> the other instances to complete or just fail the entire > >> upgrade? > >>>>>>>>>>> > >>>>>>>>>>> The idea is that the upgrade should be robust and not fail. We > >>>> need > >>>>>>> to > >>>>>>>>>>> write according tests. > >>>>>>>>>>> > >>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename > the > >>>>>>>> current > >>>>>>>>>>>>> active directories vs. deleting them right away, and when > all > >>>> the > >>>>>>>>>>> prepare > >>>>>>>>>>>>> task directories are successfully migrated then delete the > >>>>> previous > >>>>>>>>>>> active > >>>>>>>>>>>>> ones. > >>>>>>>>>>> > >>>>>>>>>>> Ack. Updated the KIP. > >>>>>>>>>>> > >>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing any > new > >>>>>>>>>> records > >>>>>>>>>>> and > >>>>>>>>>>>>> just allow the prepare tasks to restore, then once all > prepare > >>>>>>> tasks > >>>>>>>>>>> have > >>>>>>>>>>>>> restored, it's a signal for the second round of rolling > bounces > >>>>> and > >>>>>>>>>>> then as > >>>>>>>>>>>>> each task successfully renames its prepare directories and > >>>> deletes > >>>>>>>> the > >>>>>>>>>>> old > >>>>>>>>>>>>> active task directories, normal processing of records > resumes. > >>>>>>>>>>> > >>>>>>>>>>> The basic idea is to do an online upgrade to avoid downtime. We > >>>> can > >>>>>>>>>>> discuss to offer both options... For the offline upgrade > option, > >>>> we > >>>>>>>>>>> could simplify user interaction and trigger the second > rebalance > >>>>>>>>>>> automatically with the requirement that a user needs to update > >> any > >>>>>>>>>> config. > >>>>>>>>>>> > >>>>>>>>>>> If might actually be worth to include this option: we know from > >>>>>>>>>>> experience with state restore, that regular processing slows > down > >>>>> the > >>>>>>>>>>> restore. For roll_over upgrade, it would be a different story > and > >>>>>>>>>>> upgrade should not be slowed down by regular processing. Thus, > we > >>>>>>>> should > >>>>>>>>>>> even make in_place an offline upgrade and force people to use > >>>>>>> roll_over > >>>>>>>>>>> if they need onlint upgrade. Might be a fair tradeoff that may > >>>>>>> simplify > >>>>>>>>>>> the upgrade for the user and for the code complexity. > >>>>>>>>>>> > >>>>>>>>>>> Let's see what other think. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote: > >>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the update and the working prototype, it helps with > >>>>>>>>>>>> understanding the KIP. > >>>>>>>>>>>> > >>>>>>>>>>>> I took an initial pass over this PR, and overall I find the > >>>>>>> interfaces > >>>>>>>>>>> and > >>>>>>>>>>>> approach to be reasonable. > >>>>>>>>>>>> > >>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to > >> watch > >>>>>>> the > >>>>>>>>>>>> restore process), I'm wondering if we want to provide a type > of > >>>>>>>>>>>> StateRestoreListener that could signal when the new stores > have > >>>>>>>> reached > >>>>>>>>>>>> parity with the existing old stores and that could be the > signal > >>>> to > >>>>>>>>>> start > >>>>>>>>>>>> second rolling rebalance? > >>>>>>>>>>>> > >>>>>>>>>>>> Although you solicited feedback on the interfaces involved, I > >>>>> wanted > >>>>>>>> to > >>>>>>>>>>> put > >>>>>>>>>>>> down some thoughts that have come to mind reviewing this KIP > >>>> again > >>>>>>>>>>>> > >>>>>>>>>>>> 1. Out of N instances, one fails midway through the process, > >>>> would > >>>>>>> we > >>>>>>>>>>> allow > >>>>>>>>>>>> the other instances to complete or just fail the entire > upgrade? > >>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename the > >>>>>>> current > >>>>>>>>>>>> active directories vs. deleting them right away, and when all > >>>> the > >>>>>>>>>>> prepare > >>>>>>>>>>>> task directories are successfully migrated then delete the > >>>> previous > >>>>>>>>>>> active > >>>>>>>>>>>> ones. > >>>>>>>>>>>> 3. For the first rolling bounce we pause any processing any > new > >>>>>>>> records > >>>>>>>>>>> and > >>>>>>>>>>>> just allow the prepare tasks to restore, then once all prepare > >>>>> tasks > >>>>>>>>>> have > >>>>>>>>>>>> restored, it's a signal for the second round of rolling > bounces > >>>> and > >>>>>>>>>> then > >>>>>>>>>>> as > >>>>>>>>>>>> each task successfully renames its prepare directories and > >>>> deletes > >>>>>>> the > >>>>>>>>>>> old > >>>>>>>>>>>> active task directories, normal processing of records resumes. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Bill > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax < > >>>>>>>> matth...@confluent.io > >>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi, > >>>>>>>>>>>>> > >>>>>>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in AK > >> 2.0 > >>>>>>>>>>>>> release. Thus, I want to pick up this KIP again to get the > >>>> RocksDB > >>>>>>>>>>>>> upgrade done for 2.1. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I updated the KIP accordingly and also have a "prove of > >> concept" > >>>>> PR > >>>>>>>>>>>>> ready (for "in place" upgrade only): > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/5422/ > >>>>>>>>>>>>> > >>>>>>>>>>>>> There a still open questions, but I want to collect early > >>>> feedback > >>>>>>> on > >>>>>>>>>>>>> the proposed interfaces we need for the store upgrade. Also > >>>> note, > >>>>>>>> that > >>>>>>>>>>>>> the KIP now also aim to define a generic upgrade path from > any > >>>>>>> store > >>>>>>>>>>>>> format A to any other store format B. Adding timestamps is > just > >>>> a > >>>>>>>>>>>>> special case. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I will continue to work on the PR and refine the KIP in the > >>>>>>> meantime, > >>>>>>>>>>> too. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Looking forward to your feedback. > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote: > >>>>>>>>>>>>>> After some more thoughts, I want to follow John's suggestion > >>>> and > >>>>>>>>>> split > >>>>>>>>>>>>>> upgrading the rebalance metadata from the store upgrade. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I extracted the metadata upgrade into it's own KIP: > >>>>>>>>>>>>>> > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268% > >>>>>>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'll update this KIP accordingly shortly. I also want to > >>>> consider > >>>>>>> to > >>>>>>>>>>>>>> make the store format upgrade more flexible/generic. Atm, > the > >>>> KIP > >>>>>>> is > >>>>>>>>>>> too > >>>>>>>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI > >> users > >>>>>>> that > >>>>>>>>>> we > >>>>>>>>>>>>>> should not force to upgrade the stores. I need to figure out > >>>> the > >>>>>>>>>>> details > >>>>>>>>>>>>>> and follow up later. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Please give feedback for the new KIP-268 on the > corresponding > >>>>>>>>>>> discussion > >>>>>>>>>>>>>> thread. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't > figure > >>>> out > >>>>>>> a > >>>>>>>>>> way > >>>>>>>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a > >> fix > >>>>>>> for > >>>>>>>>>>>>>> future upgrades. Please share your thoughts. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for all your feedback! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote: > >>>>>>>>>>>>>>> @John: yes, we would throw if configs are missing (it's an > >>>>>>>>>>>>>>> implementation details IMHO and thus I did not include it > in > >>>> the > >>>>>>>>>> KIP) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> @Guozhang: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1) I understand know what you mean. We can certainly, allow > >>>> all > >>>>>>>>>> values > >>>>>>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for > >>>>>>> `upgrade.from` > >>>>>>>>>>>>>>> parameter. I had a similar though once but decided to > >> collapse > >>>>>>> them > >>>>>>>>>>> into > >>>>>>>>>>>>>>> one -- will update the KIP accordingly. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2) The idea to avoid any config would be, to always send > both > >>>>>>>>>> request. > >>>>>>>>>>>>>>> If we add a config to eventually disable the old request, > we > >>>>>>> don't > >>>>>>>>>>> gain > >>>>>>>>>>>>>>> anything with this approach. The question is really, if we > >> are > >>>>>>>>>> willing > >>>>>>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be > limited > >>>> to > >>>>>>> 2 > >>>>>>>>>>>>>>> versions and not grow further in future releases. More > >> details > >>>>> in > >>>>>>>>>> (3) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and > >>>> allows > >>>>>>> us > >>>>>>>>>> to > >>>>>>>>>>>>>>> stay with 2 "assignment strategies" we need to register, as > >>>> the > >>>>>>> new > >>>>>>>>>>>>>>> assignment strategy will allow to "upgrade itself" via > >>>> "version > >>>>>>>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a > >>>> config > >>>>>>> if > >>>>>>>>>>>>>>> people upgrade from pre-1.2 releases. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thus, I don't think we need to register new "assignment > >>>>>>> strategies" > >>>>>>>>>>> and > >>>>>>>>>>>>>>> send empty subscriptions for older version. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 4) I agree that this is a tricky thing to get right with a > >>>>> single > >>>>>>>>>>>>>>> rebalance. I share the concern that an application might > >> never > >>>>>>>> catch > >>>>>>>>>>> up > >>>>>>>>>>>>>>> and thus the hot standby will never be ready. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Maybe it's better to go with 2 rebalances for store > upgrades. > >>>> If > >>>>>>> we > >>>>>>>>>> do > >>>>>>>>>>>>>>> this, we also don't need to go with (2) and can get (3) in > >>>> place > >>>>>>>> for > >>>>>>>>>>>>>>> future upgrades. I also think that changes to the metadata > >> are > >>>>>>> more > >>>>>>>>>>>>>>> likely and thus allowing for single rolling bounce for this > >>>> case > >>>>>>> is > >>>>>>>>>>> more > >>>>>>>>>>>>>>> important anyway. If we assume that store upgrade a rare, > it > >>>>>>> might > >>>>>>>>>> be > >>>>>>>>>>> ok > >>>>>>>>>>>>>>> to sacrifice two rolling bounced for this case. It was just > >> an > >>>>>>> idea > >>>>>>>>>> I > >>>>>>>>>>>>>>> wanted to share (even if I see the issues). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote: > >>>>>>>>>>>>>>>> Hello Matthias, thanks for your replies. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1) About the config names: actually I was trying to not > >>>> expose > >>>>>>>>>>>>>>>> implementation details :) My main concern was that in your > >>>>>>>> proposal > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> values need to cover the span of all the versions that are > >>>>>>>> actually > >>>>>>>>>>>>> using > >>>>>>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a > user) > >>>> am > >>>>>>>>>>>>> upgrading > >>>>>>>>>>>>>>>> from any versions within this range I need to remember to > >> use > >>>>>>> the > >>>>>>>>>>> value > >>>>>>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In > my > >>>>>>>>>>> suggestion > >>>>>>>>>>>>> I > >>>>>>>>>>>>>>>> was trying to argue the benefit of just letting users to > >>>>> specify > >>>>>>>>>> the > >>>>>>>>>>>>> actual > >>>>>>>>>>>>>>>> Kafka version she's trying to upgrade from, than > specifying > >> a > >>>>>>>> range > >>>>>>>>>>> of > >>>>>>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as > >> the > >>>>>>>>>> values, > >>>>>>>>>>>>> but > >>>>>>>>>>>>>>>> still using Kafka versions like broker's > `internal.version` > >>>>>>>> config. > >>>>>>>>>>>>> But if > >>>>>>>>>>>>>>>> you were suggesting the same thing, i.e. by > "0.10.1.x-1.1.x" > >>>>> you > >>>>>>>>>>> meant > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or > "0.11.0" > >>>> or > >>>>>>>>>> "1.1" > >>>>>>>>>>>>> which > >>>>>>>>>>>>>>>> are all recognizable config values then I think we are > >>>> actually > >>>>>>> on > >>>>>>>>>>> the > >>>>>>>>>>>>> same > >>>>>>>>>>>>>>>> page. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would > increase > >>>> the > >>>>>>>>>>> network > >>>>>>>>>>>>>>>> footprint, but not the message size, IF I'm not > >>>>>>> mis-understanding > >>>>>>>>>>> your > >>>>>>>>>>>>> idea > >>>>>>>>>>>>>>>> of registering multiple assignment. More details: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can > >> encode > >>>>>>>>>>> multiple > >>>>>>>>>>>>>>>> protocols each with their different metadata. The > >> coordinator > >>>>>>> will > >>>>>>>>>>>>> pick the > >>>>>>>>>>>>>>>> common one that everyone supports (if there are no common > >>>> one, > >>>>>>> it > >>>>>>>>>>> will > >>>>>>>>>>>>> send > >>>>>>>>>>>>>>>> an error back; if there are multiple ones, it will pick > the > >>>> one > >>>>>>>>>> with > >>>>>>>>>>>>> most > >>>>>>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded > list). > >>>>>>> Since > >>>>>>>>>> our > >>>>>>>>>>>>>>>> current Streams rebalance protocol is still based on the > >>>>>>> consumer > >>>>>>>>>>>>>>>> coordinator, it means our protocol_type would be > "consumer", > >>>>> but > >>>>>>>>>>>>> instead > >>>>>>>>>>>>>>>> the protocol type we can have multiple protocols like > >>>>> "streams", > >>>>>>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we > need > >>>> to > >>>>>>>>>>>>> implement a > >>>>>>>>>>>>>>>> different assignor class for each version and register all > >> of > >>>>>>> them > >>>>>>>>>> in > >>>>>>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the > >>>> future > >>>>>>> if > >>>>>>>>>> we > >>>>>>>>>>>>>>>> re-factor our implementation to have our own client > >>>> coordinator > >>>>>>>>>> layer > >>>>>>>>>>>>> like > >>>>>>>>>>>>>>>> Connect did, we can simplify this part of the > >> implementation. > >>>>>>> But > >>>>>>>>>>> even > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>> now with the above approach this is still doable. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On the broker side, the group coordinator will only > persist > >> a > >>>>>>>> group > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>>> the selected protocol and its subscription metadata, e.g. > if > >>>>>>>>>>>>> coordinator > >>>>>>>>>>>>>>>> decides to pick "streams_v2" it will only sends that > >>>> protocol's > >>>>>>>>>>>>> metadata > >>>>>>>>>>>>>>>> from everyone to the leader to assign, AND when completing > >>>> the > >>>>>>>>>>>>> rebalance it > >>>>>>>>>>>>>>>> will also only write the group metadata with that protocol > >>>> and > >>>>>>> the > >>>>>>>>>>>>>>>> assignment only. In a word, although the network traffic > >>>> maybe > >>>>>>>>>>>>> increased a > >>>>>>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One corner > >>>>>>>>>> situation > >>>>>>>>>>> we > >>>>>>>>>>>>>>>> need to consider is how to stop registering very old > >>>> assignors > >>>>>>> to > >>>>>>>>>>>>> avoid the > >>>>>>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if you > >> are > >>>>>>>>>> rolling > >>>>>>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1 > >>>>>>> assignor > >>>>>>>>>>>>> anymore, > >>>>>>>>>>>>>>>> but that would unfortunately still require some configs. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 3) About the "version probing" idea, I think that's a > >>>>> promising > >>>>>>>>>>>>> approach > >>>>>>>>>>>>>>>> as well, but if we are going to do the multi-assignment > its > >>>>>>> value > >>>>>>>>>>> seems > >>>>>>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top of > >>>>>>>>>>>>> multi-assignment > >>>>>>>>>>>>>>>> to save us from still requiring the config to avoid > >>>> registering > >>>>>>>> all > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> metadata for all version. More details: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> In the JoinGroupRequest, we still register all the > assignor > >>>> but > >>>>>>>> for > >>>>>>>>>>>>> all old > >>>>>>>>>>>>>>>> assignors we do not encode any metadata, i.e. the encoded > >>>> data > >>>>>>>>>> would > >>>>>>>>>>>>> be: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> "streams_vN" : "encoded metadata" > >>>>>>>>>>>>>>>> "streams_vN-1":empty > >>>>>>>>>>>>>>>> "streams_vN-2":empty > >>>>>>>>>>>>>>>> .. > >>>>>>>>>>>>>>>> "streams_0":empty > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> So the coordinator can still safely choose the latest > common > >>>>>>>>>> version; > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> then when leaders receive the subscription (note it should > >>>>>>> always > >>>>>>>>>>>>> recognize > >>>>>>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of the > >>>>>>>>>>>>> subscriptions > >>>>>>>>>>>>>>>> are empty bytes, it will send the empty assignment with > that > >>>>>>>>>> version > >>>>>>>>>>>>> number > >>>>>>>>>>>>>>>> encoded in the metadata. So in the second auto-triggered > all > >>>>>>>>>> members > >>>>>>>>>>>>> would > >>>>>>>>>>>>>>>> send the metadata with that version: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> "streams_vN" : empty > >>>>>>>>>>>>>>>> "streams_vN-1" : empty > >>>>>>>>>>>>>>>> "streams_vN-2" : "encoded metadata" > >>>>>>>>>>>>>>>> .. > >>>>>>>>>>>>>>>> "streams_0":empty > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> By doing this we would not require any configs for users. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear > >>>> about > >>>>>>>> the > >>>>>>>>>>>>> details > >>>>>>>>>>>>>>>> so probably we'd need to fill that out before making a > call. > >>>>> For > >>>>>>>>>>>>> example, > >>>>>>>>>>>>>>>> you mentioned "If we detect this situation, the Streams > >>>>>>>> application > >>>>>>>>>>>>> closes > >>>>>>>>>>>>>>>> corresponding active tasks as well as "hot standby" tasks, > >>>> and > >>>>>>>>>>>>> re-creates > >>>>>>>>>>>>>>>> the new active tasks using the new store." How could we > >>>>>>> guarantee > >>>>>>>>>>> that > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> gap between these two stores will keep decreasing than > >>>>>>> increasing > >>>>>>>>>> so > >>>>>>>>>>>>> we'll > >>>>>>>>>>>>>>>> eventually achieve the flip point? And also the longer we > >> are > >>>>>>>>>> before > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> flip point, the larger we are doubling the storage space, > >>>> etc. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax < > >>>>>>>>>>>>> matth...@confluent.io> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> @John, Guozhang, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> thanks a lot for your comments. Very long reply... > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> About upgrading the rebalance metadata: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Another possibility to do this, would be to register > >>>> multiple > >>>>>>>>>>>>> assignment > >>>>>>>>>>>>>>>>> strategies for the 1.2 applications. For this case, new > >>>>>>> instances > >>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>> be configured to support both and the broker would pick > the > >>>>>>>>>> version > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>> all instances understand. The disadvantage would be, that > >> we > >>>>>>> send > >>>>>>>>>>> much > >>>>>>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as > long > >>>> as > >>>>>>> no > >>>>>>>>>>>>> second > >>>>>>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, using > >>>> this > >>>>>>>>>>>>> approach > >>>>>>>>>>>>>>>>> would allow to avoid a second rebalance trading-off an > >>>>>>> increased > >>>>>>>>>>>>>>>>> rebalance network footprint (I also assume that this > would > >>>>>>>>>> increase > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> message size that is written into __consumer_offsets > >>>> topic?). > >>>>>>>>>>>>> Overall, I > >>>>>>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it > could > >>>>>>> avoid > >>>>>>>> a > >>>>>>>>>>>>>>>>> second rebalance (I have some more thoughts about stores > >>>> below > >>>>>>>>>> that > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>> relevant for single rebalance upgrade). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> For future upgrades we might be able to fix this though. > I > >>>> was > >>>>>>>>>>>>> thinking > >>>>>>>>>>>>>>>>> about the following: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> In the current implementation, the leader fails if it > gets > >> a > >>>>>>>>>>>>>>>>> subscription it does not understand (ie, newer version). > We > >>>>>>> could > >>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>> this behavior and let the leader send an empty assignment > >>>> plus > >>>>>>>>>> error > >>>>>>>>>>>>>>>>> code (including supported version) back to the instance > >>>>> sending > >>>>>>>>>> the > >>>>>>>>>>>>>>>>> "bad" subscription. This would allow the following logic > >> for > >>>>> an > >>>>>>>>>>>>>>>>> application instance: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> - on startup, always send the latest subscription format > >>>>>>>>>>>>>>>>> - if leader understands it, we get an assignment back an > >>>>> start > >>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>> - if leader does not understand it, we get an empty > >>>>> assignment > >>>>>>>>>> and > >>>>>>>>>>>>>>>>> supported version back > >>>>>>>>>>>>>>>>> - the application unsubscribe()/subscribe()/poll() again > >>>> and > >>>>>>>>>>> sends a > >>>>>>>>>>>>>>>>> subscription using the leader's supported version > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> This protocol would allow to do a single rolling bounce, > >> and > >>>>>>>>>>>>> implements > >>>>>>>>>>>>>>>>> a "version probing" step, that might result in two > executed > >>>>>>>>>>>>> rebalances. > >>>>>>>>>>>>>>>>> The advantage would be, that the user does not need to > set > >>>> any > >>>>>>>>>>> configs > >>>>>>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care of > >>>> this > >>>>>>>>>>>>>>>>> automatically. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> One disadvantage would be, that two rebalances happen and > >>>> that > >>>>>>>> for > >>>>>>>>>>> an > >>>>>>>>>>>>>>>>> error case during rebalance, we loose the information > about > >>>>> the > >>>>>>>>>>>>>>>>> supported leader version and the "probing step" would > >>>> happen a > >>>>>>>>>>> second > >>>>>>>>>>>>> time. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> If the leader is eventually updated, it will include it's > >>>> own > >>>>>>>>>>>>> supported > >>>>>>>>>>>>>>>>> version in all assignments, to allow a "down graded" > >>>>>>> application > >>>>>>>>>> to > >>>>>>>>>>>>>>>>> upgrade its version later. Also, if a application fails, > >> the > >>>>>>>> first > >>>>>>>>>>>>>>>>> probing would always be successful and only a single > >>>> rebalance > >>>>>>>>>>>>> happens. > >>>>>>>>>>>>>>>>> If we use this protocol, I think we don't need any > >>>>>>> configuration > >>>>>>>>>>>>>>>>> parameter for future upgrades. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version": > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Users would set "upgrade.from" to the release version the > >>>>>>>>>>> current/old > >>>>>>>>>>>>>>>>> application is using. I think this is simpler, as users > >> know > >>>>>>> this > >>>>>>>>>>>>>>>>> version. If we use "internal.protocol.version" instead, > we > >>>>>>> expose > >>>>>>>>>>>>>>>>> implementation details and users need to know the > protocol > >>>>>>>> version > >>>>>>>>>>>>> (ie, > >>>>>>>>>>>>>>>>> they need to map from the release version to the protocol > >>>>>>>> version; > >>>>>>>>>>> ie, > >>>>>>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version > >>>> 2"). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Also the KIP states that for the second rolling bounce, > the > >>>>>>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- and > >>>>> thus, > >>>>>>>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored > (I > >>>>> will > >>>>>>>>>>> update > >>>>>>>>>>>>>>>>> the KIP to point out this dependency). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> About your second point: I'll update the KIP accordingly > to > >>>>>>>>>> describe > >>>>>>>>>>>>>>>>> future updates as well. Both will be different. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> One more point about upgrading the store format. I was > >>>>> thinking > >>>>>>>>>>> about > >>>>>>>>>>>>>>>>> avoiding the second rolling bounce all together in the > >>>> future: > >>>>>>>> (1) > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this > >>>>>>>> required > >>>>>>>>>>> to > >>>>>>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the > >> switch > >>>>>>> and > >>>>>>>>>>>>> delete > >>>>>>>>>>>>>>>>> the old stores. (3) the current proposal does the switch > >>>>>>>>>> "globally" > >>>>>>>>>>> -- > >>>>>>>>>>>>>>>>> this is simpler and due to the required second rebalance > no > >>>>>>>>>>>>> disadvantage. > >>>>>>>>>>>>>>>>> However, a global consistent switch over might actually > not > >>>> be > >>>>>>>>>>>>> required. > >>>>>>>>>>>>>>>>> For "in_place" upgrade, following the protocol from > above, > >>>> we > >>>>>>>>>> could > >>>>>>>>>>>>>>>>> decouple the store switch and each instance could switch > >> its > >>>>>>>> store > >>>>>>>>>>>>>>>>> independently from all other instances. After the rolling > >>>>>>> bounce, > >>>>>>>>>> it > >>>>>>>>>>>>>>>>> seems to be ok to switch from the old store to the new > >> store > >>>>>>>>>> "under > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> hood" whenever the new store is ready (this could even be > >>>>> done, > >>>>>>>>>>> before > >>>>>>>>>>>>>>>>> we switch to the new metadata version). Each time we > update > >>>>> the > >>>>>>>>>> "hot > >>>>>>>>>>>>>>>>> standby" we check if it reached the "endOffset" (or > maybe > >>>> X% > >>>>>>>> that > >>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>> either be hardcoded or configurable). If we detect this > >>>>>>>> situation, > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> Streams application closes corresponding active tasks as > >>>> well > >>>>>>> as > >>>>>>>>>>> "hot > >>>>>>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks using > >>>> the > >>>>>>> new > >>>>>>>>>>>>> store. > >>>>>>>>>>>>>>>>> (I need to go through the details once again, but it > seems > >>>> to > >>>>>>> be > >>>>>>>>>>>>>>>>> feasible.). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Combining this strategy with the "multiple assignment" > >> idea, > >>>>>>>> might > >>>>>>>>>>>>> even > >>>>>>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1 > >> -> > >>>>>>> 1.2. > >>>>>>>>>>>>>>>>> Applications would just use the old store, as long as the > >>>> new > >>>>>>>>>> store > >>>>>>>>>>> is > >>>>>>>>>>>>>>>>> not ready, even if the new metadata version is used > >> already. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> For future upgrades, a single rebalance would be > >> sufficient, > >>>>>>> too, > >>>>>>>>>>> even > >>>>>>>>>>>>>>>>> if the stores are upgraded. We would not need any config > >>>>>>>>>> parameters > >>>>>>>>>>> as > >>>>>>>>>>>>>>>>> the "probe" step allows us to detect the supported > >> rebalance > >>>>>>>>>>> metadata > >>>>>>>>>>>>>>>>> version (and we would also not need multiple "assigmnent > >>>>>>>>>> strategies" > >>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> out own protocol encoded everything we need). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Let me know what you think. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote: > >>>>>>>>>>>>>>>>>> @John: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> For the protocol version upgrade, it is only for the > >>>> encoded > >>>>>>>>>>> metadata > >>>>>>>>>>>>>>>>> bytes > >>>>>>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from > >> Consumer's > >>>>>>> pov, > >>>>>>>>>>> so I > >>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>> this change should be in the Streams layer as well. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> @Matthias: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> for 2), I agree that adding a "newest supported version" > >>>>>>> besides > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> "currently used version for encoding" is a good idea to > >>>> allow > >>>>>>>>>>> either > >>>>>>>>>>>>>>>>> case; > >>>>>>>>>>>>>>>>>> the key is that in Streams we would likely end up with a > >>>>>>> mapping > >>>>>>>>>>>>> from the > >>>>>>>>>>>>>>>>>> protocol version to the other persistent data format > >>>> versions > >>>>>>>>>> such > >>>>>>>>>>> as > >>>>>>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually > >>>>> achieve > >>>>>>>>>> both > >>>>>>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded > >>>>> protocol > >>>>>>>>>>>>> version's > >>>>>>>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 > -> > >>>>>>> 0.10.1 > >>>>>>>>>>>>> leaders > >>>>>>>>>>>>>>>>>> can choose to use the newer version in the first rolling > >>>>>>> bounce > >>>>>>>>>>>>> directly > >>>>>>>>>>>>>>>>>> and we can document to users that they would not need to > >>>> set > >>>>>>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the > upgraded > >>>>>>>>>> protocol > >>>>>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, > >> and > >>>>>>> then > >>>>>>>>>> we > >>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the > first > >>>>>>>> rolling > >>>>>>>>>>>>> bounce > >>>>>>>>>>>>>>>>>> and reset in the second. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Besides that, some additional comments: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for > >> users > >>>>> to > >>>>>>>>>> set > >>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>> "internal.protocol.version" where for the latter users > >> only > >>>>>>> need > >>>>>>>>>> to > >>>>>>>>>>>>> set a > >>>>>>>>>>>>>>>>>> single version, while the Streams will map that version > to > >>>>> the > >>>>>>>>>>>>> Streams > >>>>>>>>>>>>>>>>>> assignor's behavior as well as the data format. But > maybe > >> I > >>>>>>> did > >>>>>>>>>> not > >>>>>>>>>>>>> get > >>>>>>>>>>>>>>>>>> your idea about how the "upgrade.from" config will be > >> set, > >>>>>>>>>> because > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> your Compatibility section how the upgrade.from config > >> will > >>>>> be > >>>>>>>>>> set > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> these two rolling bounces are not very clear: for > example, > >>>>>>>> should > >>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>> reset it to null in the second rolling bounce? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 2) In the upgrade path description, rather than talking > >>>> about > >>>>>>>>>>>>> specific > >>>>>>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just > >>>> categorize > >>>>>>> all > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> possible scenarios, even for future upgrade versions, > what > >>>>>>>> should > >>>>>>>>>>> be > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> standard operations? The categorized we can summarize to > >>>>> would > >>>>>>>> be > >>>>>>>>>>>>>>>>> (assuming > >>>>>>>>>>>>>>>>>> user upgrade from version X to version Y, where X and Y > >> are > >>>>>>>> Kafka > >>>>>>>>>>>>>>>>> versions, > >>>>>>>>>>>>>>>>>> with the corresponding supported protocol version x and > >> y): > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and > >>>> hence > >>>>>>> no > >>>>>>>>>>>>>>>>> persistent > >>>>>>>>>>>>>>>>>> data formats have changed. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> b. x != y, but all persistent data format remains the > >> same. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB > >>>>> format, > >>>>>>>>>>>>> changelog > >>>>>>>>>>>>>>>>>> format, has been changed. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> c. special case: we may need some special handling logic > >>>> when > >>>>>>>>>>>>> "current > >>>>>>>>>>>>>>>>>> version" or "newest supported version" are not available > >> in > >>>>>>> the > >>>>>>>>>>>>> protocol, > >>>>>>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> under the above scenarios, how many rolling bounces > users > >>>>> need > >>>>>>>> to > >>>>>>>>>>>>>>>>> execute? > >>>>>>>>>>>>>>>>>> how they should set the configs in each rolling bounce? > >> and > >>>>>>> how > >>>>>>>>>>>>> Streams > >>>>>>>>>>>>>>>>>> library will execute in these cases? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax < > >>>>>>>>>>>>> matth...@confluent.io> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Ted, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I still consider changing the KIP to include it right > >> away > >>>>> -- > >>>>>>>> if > >>>>>>>>>>>>> not, > >>>>>>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more > >>>> detail > >>>>>>>>>> first. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> (Same for other open questions like interface names -- > I > >>>>>>>> collect > >>>>>>>>>>>>>>>>>>> feedback and update the KIP after we reach consensus > :)) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote: > >>>>>>>>>>>>>>>>>>>> Thanks for the details, Matthias. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future > >>>> release, > >>>>>>>>>>> encoding > >>>>>>>>>>>>>>>>> both > >>>>>>>>>>>>>>>>>>> used > >>>>>>>>>>>>>>>>>>>> and supported version might be an advantage > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be > >> implemented > >>>>> in > >>>>>>>>>> this > >>>>>>>>>>>>> KIP. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding > >>>> proposal. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Cheers > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax < > >>>>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of > >> user > >>>>>>>> code. > >>>>>>>>>>> And > >>>>>>>>>>>>>>>>> even > >>>>>>>>>>>>>>>>>>>>> if we want to add something like this, I would prefer > >> to > >>>>> do > >>>>>>>> it > >>>>>>>>>>> in > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>> separate KIP. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling > bounce. > >>>> But > >>>>>>>>>> from > >>>>>>>>>>> my > >>>>>>>>>>>>>>>>>>>>> understanding it would not be possible. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to > >>>>>>> switch > >>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to > switch > >>>>>>> from > >>>>>>>>>> the > >>>>>>>>>>>>> old > >>>>>>>>>>>>>>>>>>>>> store to the new store (this happens after the last > >>>>>>> instance > >>>>>>>>>>>>> bounces a > >>>>>>>>>>>>>>>>>>>>> second time). > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is, > that > >>>>> it's > >>>>>>>>>>>>> unclear > >>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>>> to which from version 2 to version 3. The > >>>>>>>>>>>>> StreamsPartitionsAssignor is > >>>>>>>>>>>>>>>>>>>>> stateless by design, and thus, the information which > >>>>>>> version > >>>>>>>>>> it > >>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>> use must be passed in from externally -- and we want > to > >>>>> use > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> StreamsConfig to pass in this information. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> During upgrade, all new instanced have no information > >>>>> about > >>>>>>>>>> the > >>>>>>>>>>>>>>>>> progress > >>>>>>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got > >>>> upgrades > >>>>>>>>>>>>> already). > >>>>>>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3 > >>>>>>>>>>>>> subscription. > >>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>> leader also has this limited view on the world and > can > >>>>> only > >>>>>>>>>> send > >>>>>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>>>>> 2 assignments back. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can > >> simplify > >>>>>>> the > >>>>>>>>>>>>> upgrade. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> We did consider to change the metadata to make later > >>>>>>> upgrades > >>>>>>>>>>> (ie, > >>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change > the > >>>>>>>>>> metadata > >>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>> storage format again -- as long as we don't change > it, > >> a > >>>>>>>>>> single > >>>>>>>>>>>>>>>>> rolling > >>>>>>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and > >>>>>>>>>> "supported > >>>>>>>>>>>>>>>>>>>>> version". This would allow the leader to switch to > the > >>>> new > >>>>>>>>>>> version > >>>>>>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader would > >>>>>>> receive > >>>>>>>>>>> "used > >>>>>>>>>>>>>>>>>>>>> version == old" and "supported version = old/new" -- > as > >>>>>>> long > >>>>>>>>>> as > >>>>>>>>>>> at > >>>>>>>>>>>>>>>>> least > >>>>>>>>>>>>>>>>>>>>> one instance sends a "supported version = old" leader > >>>>> sends > >>>>>>>>>> old > >>>>>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>>>>> assignment back. However, encoding both version would > >>>>> allow > >>>>>>>>>> that > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> leader can send a new version assignment back, right > >>>> after > >>>>>>>> the > >>>>>>>>>>>>> first > >>>>>>>>>>>>>>>>>>>>> round or rebalance finished (all instances send > >>>> "supported > >>>>>>>>>>>>> version = > >>>>>>>>>>>>>>>>>>>>> new"). However, there are still two issues with this: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the > last > >>>>>>>>>> instance > >>>>>>>>>>>>>>>>> bounced, > >>>>>>>>>>>>>>>>>>>>> the new stores might not be ready to be used -- this > >>>> could > >>>>>>>>>> lead > >>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> "downtime" as store must be restored before > processing > >>>> can > >>>>>>>>>>> resume. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. > At > >>>>> this > >>>>>>>>>>>>> point, the > >>>>>>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and > >> thus > >>>>>>>> sends > >>>>>>>>>>>>> the old > >>>>>>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to > never > >>>>> fall > >>>>>>>>>> back > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> old protocol after the switch to the new protocol. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> The second issue is minor and I guess if users set-up > >>>> the > >>>>>>>>>>> instance > >>>>>>>>>>>>>>>>>>>>> properly it could be avoided. However, the first > issue > >>>>>>> would > >>>>>>>>>>>>> prevent > >>>>>>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we > >>>> consider > >>>>>>>>>> that > >>>>>>>>>>> we > >>>>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>>>>>>> change the metadata protocol only if a future > release, > >>>>>>>>>> encoding > >>>>>>>>>>>>> both > >>>>>>>>>>>>>>>>>>>>> used and supported version might be an advantage in > the > >>>>>>>> future > >>>>>>>>>>>>> and we > >>>>>>>>>>>>>>>>>>>>> could consider to add this information in 1.2 release > >> to > >>>>>>>>>> prepare > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to > give > >>>> the > >>>>>>>>>>>>> instances > >>>>>>>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If > you > >>>>>>> would > >>>>>>>>>> do > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> second rolling bounce before this, it would still > work > >>>> -- > >>>>>>>>>>>>> however, you > >>>>>>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be > fully > >>>>>>>>>> restored > >>>>>>>>>>>>>>>>> before > >>>>>>>>>>>>>>>>>>>>> processing can resume. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Does this make sense? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote: > >>>>>>>>>>>>>>>>>>>>>> Matthias, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid > >> of > >>>>>>> the > >>>>>>>>>> 2nd > >>>>>>>>>>>>>>>>> rolling > >>>>>>>>>>>>>>>>>>>>> bounce? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary > >>>>> difference > >>>>>>>>>>>>> between > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to > >>>> decide > >>>>>>>>>>>>> whether to > >>>>>>>>>>>>>>>>>>> send > >>>>>>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3. > >>>>>>> (Actually, > >>>>>>>>>>>>> there is > >>>>>>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says > that > >>>> the > >>>>>>>> 2nd > >>>>>>>>>>>>> rolling > >>>>>>>>>>>>>>>>>>>>> bounce should happen after all new state stores are > >>>>> created > >>>>>>>> by > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling > >>>> bounce, > >>>>>>> we > >>>>>>>>>>> say > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>> there is still a background thread, so it seems like > is > >>>> no > >>>>>>>>>>> actual > >>>>>>>>>>>>>>>>>>>>> requirement to wait for the new state stores to be > >>>>>>> created.) > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal > with > >>>>>>>>>>> mixed-mode > >>>>>>>>>>>>>>>>>>> (having > >>>>>>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer > >>>> group). > >>>>>>> It > >>>>>>>>>>> seems > >>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic > >>>>>>>>>>>>> (somehow/somewhere) > >>>>>>>>>>>>>>>>>>> such > >>>>>>>>>>>>>>>>>>>>> that: > >>>>>>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all > >>>>>>> instances > >>>>>>>>>> are > >>>>>>>>>>>>>>>>> running > >>>>>>>>>>>>>>>>>>>>> the new code. > >>>>>>>>>>>>>>>>>>>>>> * Once all the instances are running the new code, > >> then > >>>>>>> one > >>>>>>>>>> at > >>>>>>>>>>> a > >>>>>>>>>>>>>>>>> time, > >>>>>>>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader > >>>> still > >>>>>>>>>> hands > >>>>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores are > >>>>> ready. > >>>>>>>>>>>>>>>>>>>>>> * Once all instances report that new stores are > ready, > >>>>>>>> Leader > >>>>>>>>>>>>> sends > >>>>>>>>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>>>> Assignment Version 3. > >>>>>>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3, > >> it > >>>>>>> can > >>>>>>>>>>>>> delete > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> old state store. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot > of > >>>>>>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling > >>>>>>>>>> restarts. > >>>>>>>>>>> No > >>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy > >>>> it, > >>>>>>> and > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> instances > >>>>>>>>>>>>>>>>>>>>> update themselves. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> What do you think? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> The thing that made me think of this is that the "2 > >>>>>>> rolling > >>>>>>>>>>>>> bounces" > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in > >>>>>>>>>>>>>>>>>>>>> inter.broker.protocol.version and > >>>>>>> log.message.format.version. > >>>>>>>>>>> And > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> broker case, it seems like it would be possible (with > >>>> some > >>>>>>>>>> work > >>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>> course) > >>>>>>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar > >> auto-detection > >>>>> of > >>>>>>>>>>> broker > >>>>>>>>>>>>>>>>>>>>> capabilities and automatically do a switchover from > >>>>> old/new > >>>>>>>>>>>>> versions. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> -James > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck < > >>>>>>>> bbej...@gmail.com > >>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Matthias, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval > >> methods > >>>>> on > >>>>>>>>>> the > >>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>> interfaces. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a > >>>>> Predicate > >>>>>>>>>> that > >>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored with > >> the > >>>>>>>>>> record? > >>>>>>>>>>>>> Or > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>> better left for users to implement themselves once > >> the > >>>>>>> data > >>>>>>>>>>> has > >>>>>>>>>>>>> been > >>>>>>>>>>>>>>>>>>>>>>> retrieved? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>> Bill > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu < > >>>>>>>> yuzhih...@gmail.com > >>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Matthias: > >>>>>>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to > which > >>>>>>>>>>> separator > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>> chosen. > >>>>>>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current choice > >> is > >>>>>>>> good. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is > >>>>> closer > >>>>>>>> to > >>>>>>>>>>>>> English > >>>>>>>>>>>>>>>>>>>>>>>> grammar. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people > think. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax < > >>>>>>>>>>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comments! > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance > metadata > >>>>>>>> upgrade > >>>>>>>>>>> fix > >>>>>>>>>>>>>>>>>>>>>>>>> (addressing the mentioned > >>>>>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054 > ) > >>>> It > >>>>>>>>>> give a > >>>>>>>>>>>>> first > >>>>>>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works > including > >>>> a > >>>>>>>>>> system > >>>>>>>>>>>>> test: > >>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636 > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. > I > >>>>>>> agree > >>>>>>>>>>> that > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to > give > >>>>>>> better > >>>>>>>>>>>>>>>>> discussion > >>>>>>>>>>>>>>>>>>>>>>>>> context. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> @Ted: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the > >>>>>>>>>>>>>>>>> `processing.guarantee` > >>>>>>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and > >>>>>>> `exactly_once` > >>>>>>>>>> as > >>>>>>>>>>>>>>>>> values. > >>>>>>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash > >>>> but > >>>>> I > >>>>>>>>>>> prefer > >>>>>>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can > >>>> also > >>>>>>>>>> change > >>>>>>>>>>>>> it to > >>>>>>>>>>>>>>>>>>>>> `-`. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way > -- I > >>>>>>>>>> stripped > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> `With` > >>>>>>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good > to > >>>>> get > >>>>>>>>>>>>> feedback > >>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> @John: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would > >>>> not > >>>>>>>>>> make a > >>>>>>>>>>>>>>>>>>>>> difference. > >>>>>>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two > >>>>> version > >>>>>>>>>>> might > >>>>>>>>>>>>>>>>>>>>> introduce > >>>>>>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I > don't > >>>> hit > >>>>>>> an > >>>>>>>>>>>>> issue > >>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead > of > >>>>>>>>>>>>> `rocksdb-v2` > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>> Hey Matthias, > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several > questions > >>>>>>> queued > >>>>>>>>>>> up, > >>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>>>>>>>>> were > >>>>>>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... > oh, > >>>>>>> well. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state > >>>>> directory > >>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>>>>> "/<state.dir>/< > >>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" > to > >>>>>>>>>>>>> "/<state.dir>/< > >>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id > >/rocksdb-v2/storeName/": > >>>> if > >>>>>>>> you > >>>>>>>>>>>>> put the > >>>>>>>>>>>>>>>>>>>>> "v2" > >>>>>>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e., > >>>>>>>>>>>>> "/<state.dir>/< > >>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id > >>> /rocksdb/storeName-v2/"), > >>>>>>> then > >>>>>>>>>>> you > >>>>>>>>>>>>> get > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level > directory > >>>>>>>>>>> structure. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people > >>>> running > >>>>>>>>>>>>> scripts to > >>>>>>>>>>>>>>>>>>>>>>>> monitor > >>>>>>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such > >> use > >>>>>>>>>> cases. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu < > >>>>>>>>>>> yuzhih...@gmail.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias: > >>>>>>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? > Underscore > >>>> may > >>>>>>>>>>>>> sometimes > >>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>> miss > >>>>>>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more > >> friendly > >>>>> to > >>>>>>>>>>> user. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> public interface > >> ReadOnlyKeyValueTimestampStore<K, > >>>>> V> > >>>>>>>> { > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better > name > >>>>> for > >>>>>>>>>> the > >>>>>>>>>>>>>>>>> class ? > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang < > >>>>>>>>>>>>>>>>> wangg...@gmail.com > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section > and > >>>> it > >>>>>>>>>> looks > >>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> me, > >>>>>>>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also > >> share > >>>>> it > >>>>>>>>>> here > >>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>> people > >>>>>>>>>>>>>>>>>>>>>>>>>>>> can take a look? > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like > >>>> this > >>>>>>>>>> there > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>> always > >>>>>>>>>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is > >>>>> better > >>>>>>>> to > >>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the > design > >>>>>>>>>>>>> discussion :) > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. > Sax > >> < > >>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API > >> to > >>>>>>>> allow > >>>>>>>>>>>>> storing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the > >> basis > >>>>> to > >>>>>>>>>>>>> resolve > >>>>>>>>>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> -- Guozhang > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >>> > >> > >> > > > >