Thanks for the feedback. I updated the interface names to `TimestampedXxxStore`.
About `RecordConverter` I would like to keep the name, because it's a generic concept and we will have a special implementation that does the timestamp conversion -- however, this should be reflected in the (internal) class name, not the interface name. I also added more details about `RecordConverter` interface and impact on custom store implementations. About Bill's question. Happy to address this in the PR. But it seems to be an implementation details and thus, I did not add anything to the KIP. -Matthias On 1/16/19 10:00 AM, Guozhang Wang wrote: > Matthias, > > Thanks for the updated wiki page. I made another pass and here are some > minor comments I have (mostly about API names and writing itself): > > 1. The scope of the RecordConverter may not be very clear from the wiki > page reads. Here's my understanding: > > 1.a) For build-in implementations (rocksdb, memory) of the defined types of > StateStore (k-v, window, session), Streams will implement the > RecordConverter itself *so there's nothing that users need to do*; > 1.b) For customized implementations of the defined types of StateStore, > Streams will use a proxy store internally to always "down-convert" the new > APIs to the old formats, *so there' nothing that users need to do still*; > but users can opt-in to let their custom impl to also extend RecordConver. > 1.c) For user-defined StateStore (e.g. Alice has her own interface named > Database extending StateStore with her own impl). *This is the only place > users are required to do extra work by implementing the RecordConverter > interface*. > > 2. Naming. > > 2.a) I'd also prefer TimestampedXXXStore over XXXWithTimestampStore. > 2.b) RecordConverter: I felt having a more specific name is better if we > believe future format changes cannot reuse it anyways. Maybe > "TimestampedRecordConverter"? > > 3. Regarding Bill's question above. One idea is that we can add an extra > check logic during the starting up phase, to check if the old CF is empty > or not already, and if yes set a flag so that we can skip that CF for state > store access. The rationale is that Streams apps are expected to be bounced > / re-launched frequently so just having this check logic upon starting up > should be a good trade-off between complexity and efficiency. > > > Guozhang > > > > > Guozhang > > > On Sat, Jan 12, 2019 at 6:46 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> I also want to point out, that Ryanne Dolan commented on the WIP PR >> (https://github.com/apache/kafka/pull/6044) about the naming. I asked >> him to reply to this thread, but this did no happen yet, thus I want to >> point it out myself, because it seems to important. >> >> >> WindowWithTimestampStore.java >> >>> Hard to parse this class name -- sounds like "trait Window with >> TimestampStore" instead of WindowStore<... ValueAndTimestamp...> >>> >>> Maybe TimestampWindowStore or TimestampedWindowStore? shrug. >>> >> >> >> >> Another comment, that I personally do not necessarily agree: >> >> >> KeyValueWithTimestampStore.java >> >>> possible alternative: implement KeyValueStore<K, V>, and then expose an >> additional putWithTimestamp, getWithTimestamp etc for callers that want >> ValueAndTimestamp<V> instead of V. This would probably require fewer code >> changes elsewhere. >> >> >> >> What do you think? >> >> >> -Matthias >> >> >> >> On 1/12/19 6:43 PM, Matthias J. Sax wrote: >>> Bill, >>> >>> I left the question about legacy column family out, because as a matter >>> of fact, we use the default column family atm that cannot be deleted. >>> Thus, this old column family will always be there. >>> >>> Nevertheless, as an implementation detail, it might make sense to avoid >>> accessing both column families forever (ie, each time a key is not >>> found). Also, we might want/need a way, to "force" upgrading to the new >>> column family, for the case that some records are not accessed for a >>> long time. Again, this seems to be an implementation detail (and I am >>> also not sure if we really need it). If you thing both are not >>> implementation details, I can of course extend the KIP accordingly. >>> >>> >>> -Matthias >>> >>> On 1/11/19 1:27 PM, Bill Bejeck wrote: >>>> Hi Matthias, >>>> >>>> Thanks for the KIP, it goes into good detail and is well done. >>>> >>>> Overall I'm a +1 on the KIP and have one minor question. >>>> >>>> Regarding the upgrade path, we'll use two column families to do a lazy >>>> conversion which makes sense to me. What is the plan to get >>>> rid of the "legacy" column family (if ever)? Would we drop the "legacy" >>>> column family once it is empty? I'm not sure we'd ever need to as it >> would >>>> just be a column family that doesn't get used. >>>> >>>> Maybe this is an implementation detail and doesn't need to be addressed >>>> now, but it came to mind when I read the KIP. >>>> >>>> Thanks again, >>>> Bill >>>> >>>> On Fri, Jan 11, 2019 at 1:19 PM John Roesler <j...@confluent.io> wrote: >>>> >>>>> Hi Matthias, >>>>> >>>>> Thanks for the updates to the KIP. I've just read it over, and am >>>>> personally quite happy with it. >>>>> >>>>> Thanks for tackling this dicey issue and putting in a huge amount of >> design >>>>> work to produce >>>>> a smooth upgrade path for DSL users. >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> On Mon, Dec 17, 2018 at 10:35 AM Matthias J. Sax < >> matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> Dear all, >>>>>> >>>>>> I finally managed to update the KIP. >>>>>> >>>>>> To address the concerns about the complex upgrade path, I simplified >> the >>>>>> design. We don't need any configs and the upgrade can be done with the >>>>>> simple single rolling bounce pattern. >>>>>> >>>>>> The suggestion is to exploit RocksDB column families to isolate old >> and >>>>>> new on-disk format. Furthermore, the upgrade from old to new format >>>>>> happens "on the side" after an instance was upgraded. >>>>>> >>>>>> I also pushed a WIP PR in case you want to look into some details >>>>>> (potential reviewers, don't panic: I plan to break this down into >>>>>> multiple PRs for actual review if the KIP is accepted). >>>>>> >>>>>> https://github.com/apache/kafka/pull/6044 >>>>>> >>>>>> @Eno: I think I never answered your question about being future proof: >>>>>> >>>>>> The latest design is not generic, because it does not support changes >>>>>> that need to be reflected in the changelog topic. I aimed for a >>>>>> non-generic design for now to keep it as simple as possible. Thus, >> other >>>>>> format changes might need a different design / upgrade path -- >> however, >>>>>> because this KIP is quite encapsulated in the current design, I don't >>>>>> see any issue to build this later and a generic upgrade path seems to >> be >>>>>> an orthogonal concern atm. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 11/22/18 2:50 PM, Adam Bellemare wrote: >>>>>>> Thanks for the information Matthias. >>>>>>> >>>>>>> I will await your completion of this ticket then since it underpins >> the >>>>>>> essential parts of a RocksDB TTL aligned with the changelog topic. I >> am >>>>>>> eager to work on that ticket myself, so if I can help on this one in >>>>> any >>>>>>> way please let me know. >>>>>>> >>>>>>> Thanks >>>>>>> Adam >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Nov 20, 2018 at 5:26 PM Matthias J. Sax < >> matth...@confluent.io >>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> It's an interesting idea to use second store, to maintain the >>>>>>>> timestamps. However, each RocksDB instance implies some overhead. In >>>>>>>> fact, we are looking into ColumnFamilies atm to see if we can use >>>>> those >>>>>>>> and merge multiple RocksDBs into a single one to reduce this >> overhead. >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 11/20/18 5:15 AM, Patrik Kleindl wrote: >>>>>>>>> Hi Adam >>>>>>>>> >>>>>>>>> Sounds great, I was already planning to ask around if anyone had >>>>>> tackled >>>>>>>>> this. >>>>>>>>> We have a use case very similar to what you described in >> KAFKA-4212, >>>>>> only >>>>>>>>> with Global State Stores. >>>>>>>>> I have tried a few things with the normal DSL but was not really >>>>>>>> successful. >>>>>>>>> Schedule/Punctuate is not possible, supplying a windowed store is >>>>> also >>>>>>>> not >>>>>>>>> allowed and the process method has no knowledge of the timestamp of >>>>> the >>>>>>>>> record. >>>>>>>>> And anything loaded on startup is not filtered anyway. >>>>>>>>> >>>>>>>>> Regarding 4212, wouldn't it be easier (although a little less >>>>>>>>> space-efficient) to track the Timestamps in a separate Store with >> <K, >>>>>>>> Long> >>>>>>>>> ? >>>>>>>>> This would leave the original store intact and allow a migration of >>>>> the >>>>>>>>> timestamps without touching the other data. >>>>>>>>> >>>>>>>>> So I am very interested in your PR :-) >>>>>>>>> >>>>>>>>> best regards >>>>>>>>> >>>>>>>>> Patrik >>>>>>>>> >>>>>>>>> On Tue, 20 Nov 2018 at 04:46, Adam Bellemare < >>>>> adam.bellem...@gmail.com >>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Matthias >>>>>>>>>> >>>>>>>>>> Thanks - I figured that it was probably a case of just too much to >>>>> do >>>>>>>> and >>>>>>>>>> not enough time. I know how that can go. I am asking about this >> one >>>>> in >>>>>>>>>> relation to https://issues.apache.org/jira/browse/KAFKA-4212, >>>>> adding >>>>>> a >>>>>>>> TTL >>>>>>>>>> to RocksDB. I have outlined a bit about my use-case within 4212, >> but >>>>>> for >>>>>>>>>> brevity here it is: >>>>>>>>>> >>>>>>>>>> My case: >>>>>>>>>> 1) I have a RocksDB with TTL implementation working where records >>>>> are >>>>>>>> aged >>>>>>>>>> out using the TTL that comes with RocksDB (very simple). >>>>>>>>>> 2) We prevent records from loading from the changelog if >> recordTime >>>>> + >>>>>>>> TTL < >>>>>>>>>> referenceTimeStamp (default = System.currentTimeInMillis() ). >>>>>>>>>> >>>>>>>>>> This assumes that the records are stored with the same time >>>>> reference >>>>>>>> (say >>>>>>>>>> UTC) as the consumer materializing the RocksDB store. >>>>>>>>>> >>>>>>>>>> My questions about KIP-258 are as follows: >>>>>>>>>> 1) How does "we want to be able to store record timestamps in >>>>> KTables" >>>>>>>>>> differ from inserting records into RocksDB with TTL at consumption >>>>>>>> time? I >>>>>>>>>> understand that it could be a difference of some seconds, minutes, >>>>>>>> hours, >>>>>>>>>> days etc between when the record was published and now, but given >>>>> the >>>>>>>>>> nature of how RocksDB TTL works (eventual - based on compaction) I >>>>>> don't >>>>>>>>>> see how a precise TTL can be achieved, such as that which one can >>>>> get >>>>>>>> with >>>>>>>>>> windowed stores. >>>>>>>>>> >>>>>>>>>> 2) Are you looking to change how records are inserted into a TTL >>>>>>>> RocksDB, >>>>>>>>>> such that the TTL would take effect from the record's published >>>>> time? >>>>>> If >>>>>>>>>> not, what would be the ideal workflow here for a single record >> with >>>>>> TTL >>>>>>>>>> RocksDB? >>>>>>>>>> ie: Record Timestamp: 100 >>>>>>>>>> TTL: 50 >>>>>>>>>> Record inserted into rocksDB: 110 >>>>>>>>>> Record to expire at 150? >>>>>>>>>> >>>>>>>>>> 3) I'm not sure I fully understand the importance of the upgrade >>>>>> path. I >>>>>>>>>> have read the link to ( >>>>>> https://issues.apache.org/jira/browse/KAFKA-3522 >>>>>>>> ) >>>>>>>>>> in >>>>>>>>>> the KIP, and I can understand that a state-store on disk may not >>>>>>>> represent >>>>>>>>>> what the application is expecting. I don't think I have the full >>>>>> picture >>>>>>>>>> though, because that issue seems to be easy to fix with a simple >>>>>>>> versioned >>>>>>>>>> header or accompanying file, forcing the app to rebuild the state >> if >>>>>> the >>>>>>>>>> version is incompatible. Can you elaborate or add a scenario to >> the >>>>>> KIP >>>>>>>>>> that illustrates the need for the upgrade path? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Adam >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax < >>>>>> matth...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Adam, >>>>>>>>>>> >>>>>>>>>>> I am still working on it. Was pulled into a lot of other tasks >>>>> lately >>>>>>>> so >>>>>>>>>>> this was delayed. Also had some discussions about simplifying the >>>>>>>>>>> upgrade path with some colleagues and I am prototyping this atm. >>>>> Hope >>>>>>>> to >>>>>>>>>>> update the KIP accordingly soon. >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 11/10/18 7:41 AM, Adam Bellemare wrote: >>>>>>>>>>>> Hello Matthias >>>>>>>>>>>> >>>>>>>>>>>> I am curious as to the status of this KIP. TTL and expiry of >>>>> records >>>>>>>>>> will >>>>>>>>>>>> be extremely useful for several of our business use-cases, as >> well >>>>>> as >>>>>>>>>>>> another KIP I had been working on. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska < >>>>>> eno.there...@gmail.com >>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>> >>>>>>>>>>>>> Good stuff. Could you comment a bit on how future-proof is this >>>>>>>>>> change? >>>>>>>>>>> For >>>>>>>>>>>>> example, if we want to store both event timestamp "and" >>>>> processing >>>>>>>>>> time >>>>>>>>>>> in >>>>>>>>>>>>> RocksDB will we then need another interface (e.g. called >>>>>>>>>>>>> KeyValueWithTwoTimestampsStore)? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Eno >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax < >>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for your input Guozhang and John. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I see your point, that the upgrade API is not simple. If you >>>>> don't >>>>>>>>>>>>>> thinks it's valuable to make generic store upgrades possible >>>>>> (atm), >>>>>>>>>> we >>>>>>>>>>>>>> can make the API internal, too. The impact is, that we only >>>>>> support >>>>>>>> a >>>>>>>>>>>>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to >>>>>>>>>>>>>> WindowedWithTS etc) for which we implement the internal >>>>>> interfaces. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We can keep the design generic, so if we decide to make it >>>>> public, >>>>>>>> we >>>>>>>>>>>>>> don't need to re-invent it. This will also have the advantage, >>>>>> that >>>>>>>>>> we >>>>>>>>>>>>>> can add upgrade pattern for other stores later, too. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, >>>>> but >>>>>>>> it >>>>>>>>>>>>>> was the only way I could find to design a generic upgrade >>>>>> interface. >>>>>>>>>> If >>>>>>>>>>>>>> we decide the hide all the upgrade stuff, >> `StoreUpgradeBuilder` >>>>>>>> would >>>>>>>>>>>>>> become an internal interface I guess (don't think we can >> remove >>>>>> it). >>>>>>>>>>>>>> >>>>>>>>>>>>>> I will wait for more feedback about this and if nobody wants >> to >>>>>> keep >>>>>>>>>> it >>>>>>>>>>>>>> as public API I will update the KIP accordingly. Will add some >>>>>> more >>>>>>>>>>>>>> clarifications for different upgrade patterns in the mean time >>>>> and >>>>>>>>>> fix >>>>>>>>>>>>>> the typos/minor issues. >>>>>>>>>>>>>> >>>>>>>>>>>>>> About adding a new state UPGRADING: maybe we could do that. >>>>>> However, >>>>>>>>>> I >>>>>>>>>>>>>> find it particularly difficult to make the estimation when we >>>>>> should >>>>>>>>>>>>>> switch to RUNNING, thus, I am a little hesitant. Using store >>>>>>>>>> callbacks >>>>>>>>>>>>>> or just logging the progress including some indication about >> the >>>>>>>>>> "lag" >>>>>>>>>>>>>> might actually be sufficient. Not sure what others think? >>>>>>>>>>>>>> >>>>>>>>>>>>>> About "value before timestamp": no real reason and I think it >>>>> does >>>>>>>>>> not >>>>>>>>>>>>>> make any difference. Do you want to change it? >>>>>>>>>>>>>> >>>>>>>>>>>>>> About upgrade robustness: yes, we cannot control if an >> instance >>>>>>>>>> fails. >>>>>>>>>>>>>> That is what I meant by "we need to write test". The upgrade >>>>>> should >>>>>>>>>> be >>>>>>>>>>>>>> able to continuous even is an instance goes down (and we must >>>>> make >>>>>>>>>> sure >>>>>>>>>>>>>> that we don't end up in an invalid state that forces us to >> wipe >>>>>> out >>>>>>>>>> the >>>>>>>>>>>>>> whole store). Thus, we need to write system tests that fail >>>>>>>> instances >>>>>>>>>>>>>> during upgrade. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For `in_place_offline` upgrade: I don't think we need this >> mode, >>>>>>>>>>> because >>>>>>>>>>>>>> people can do this via a single rolling bounce. >>>>>>>>>>>>>> >>>>>>>>>>>>>> - prepare code and switch KV-Store to KVwithTs-Store >>>>>>>>>>>>>> - do a single rolling bounce (don't set any upgrade config) >>>>>>>>>>>>>> >>>>>>>>>>>>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` >> if >>>>>> we >>>>>>>>>>>>>> remove the `StoreUpgradeBuilder`) will detect that there is >> only >>>>>> an >>>>>>>>>> old >>>>>>>>>>>>>> local KV store w/o TS, will start to restore the new KVwithTs >>>>>> store, >>>>>>>>>>>>>> wipe out the old store and replace with the new store after >>>>>> restore >>>>>>>>>> is >>>>>>>>>>>>>> finished, and start processing only afterwards. (I guess we >> need >>>>>> to >>>>>>>>>>>>>> document this case -- will also add it to the KIP.) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 8/9/18 1:10 PM, John Roesler wrote: >>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think this KIP is looking really good. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a few thoughts to add to the others: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. You mentioned at one point users needing to configure >>>>>>>>>>>>>>> `upgrade.mode="null"`. I think this was a typo and you meant >> to >>>>>> say >>>>>>>>>>>>> they >>>>>>>>>>>>>>> should remove the config. If they really have to set it to a >>>>>> string >>>>>>>>>>>>>> "null" >>>>>>>>>>>>>>> or even set it to a null value but not remove it, it would be >>>>>>>>>>>>>> unfortunate. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2. In response to Bill's comment #1 , you said that "The idea >>>>> is >>>>>>>>>> that >>>>>>>>>>>>> the >>>>>>>>>>>>>>> upgrade should be robust and not fail. We need to write >>>>> according >>>>>>>>>>>>> tests". >>>>>>>>>>>>>>> I may have misunderstood the conversation, but I don't think >>>>> it's >>>>>>>>>>>>> within >>>>>>>>>>>>>>> our power to say that an instance won't fail. What if one of >> my >>>>>>>>>>>>> computers >>>>>>>>>>>>>>> catches on fire? What if I'm deployed in the cloud and one >>>>>> instance >>>>>>>>>>>>>>> disappears and is replaced by a new one? Or what if one >>>>> instance >>>>>>>>>> goes >>>>>>>>>>>>>> AWOL >>>>>>>>>>>>>>> for a long time and then suddenly returns? How will the >> upgrade >>>>>>>>>>> process >>>>>>>>>>>>>>> behave in light of such failures? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 3. your thought about making in-place an offline mode is >>>>>>>>>> interesting, >>>>>>>>>>>>> but >>>>>>>>>>>>>>> it might be a bummer for on-prem users who wish to upgrade >>>>>> online, >>>>>>>>>> but >>>>>>>>>>>>>>> cannot just add new machines to the pool. It could be a new >>>>>> upgrade >>>>>>>>>>>>> mode >>>>>>>>>>>>>>> "offline-in-place", though... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 4. I was surprised to see that a user would need to modify >> the >>>>>>>>>>> topology >>>>>>>>>>>>>> to >>>>>>>>>>>>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of >>>>>> Guozhang's >>>>>>>>>>>>>>> suggestions would remove this necessity. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for taking on this very complex but necessary work. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang < >>>>>> wangg...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello Matthias, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the updated KIP. Some more comments: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. The current set of proposed API is a bit too complicated, >>>>>> which >>>>>>>>>>>>> makes >>>>>>>>>>>>>>>> the upgrade flow from user's perspective also a bit complex. >>>>> I'd >>>>>>>>>> like >>>>>>>>>>>>> to >>>>>>>>>>>>>>>> check different APIs and discuss about their needs >> separately: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1.a. StoreProxy: needed for in-place upgrade only, >> between >>>>>> the >>>>>>>>>>>>> first >>>>>>>>>>>>>>>> and second rolling bounce, where the old-versioned stores >> can >>>>>>>>>> handle >>>>>>>>>>>>>>>> new-versioned store APIs. I think such upgrade paths (i.e. >>>>> from >>>>>>>> one >>>>>>>>>>>>>> store >>>>>>>>>>>>>>>> type to another) would not be very common: users may want to >>>>>>>>>> upgrade >>>>>>>>>>>>>> from a >>>>>>>>>>>>>>>> certain store engine to another, but the interface would >>>>> likely >>>>>> be >>>>>>>>>>>>>> staying >>>>>>>>>>>>>>>> the same. Hence personally I'd suggest we keep it internally >>>>> and >>>>>>>>>> only >>>>>>>>>>>>>>>> consider exposing it in the future if it does become a >> common >>>>>>>>>>> pattern. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1.b. ConverterStore / RecordConverter: needed for both >>>>>>>> in-place >>>>>>>>>>>>> and >>>>>>>>>>>>>>>> roll-over upgrade, between the first and second rolling >>>>> bounces, >>>>>>>>>> for >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> new versioned store to be able to read old-versioned >> changelog >>>>>>>>>>> topics. >>>>>>>>>>>>>>>> Firstly I think we should not expose key in the public APIs >>>>> but >>>>>>>>>> only >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> values, since allowing key format changes would break log >>>>>>>>>> compaction, >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> hence would not be compatible anyways. As for value format >>>>>>>> changes, >>>>>>>>>>>>>>>> personally I think we can also keep its upgrade logic >>>>> internally >>>>>>>> as >>>>>>>>>>> it >>>>>>>>>>>>>> may >>>>>>>>>>>>>>>> not worth generalizing to user customizable logic. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1.c. If you agrees with 2.a/b above, then we can also >>>>>> remove " >>>>>>>>>>>>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the >>>>>>>> public >>>>>>>>>>>>>> APIs. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1.d. Personally I think >>>>> "ReadOnlyKeyValueWithTimestampStore" >>>>>>>> is >>>>>>>>>>>>> not >>>>>>>>>>>>>>>> needed either given that we are exposing "ValueAndTimestamp" >>>>>>>>>> anyways. >>>>>>>>>>>>>> I.e. >>>>>>>>>>>>>>>> it is just a syntax sugar and for IQ, users can always just >>>>> set >>>>>> a >>>>>>>> " >>>>>>>>>>>>>>>> QueryableStoreType<ReadOnlyKeyValue<K, >> ValueAndTimestamp<V>>>" >>>>>> as >>>>>>>>>> the >>>>>>>>>>>>>> new >>>>>>>>>>>>>>>> interface does not provide any additional functions. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2. Could we further categorize the upgrade flow for >> different >>>>>> use >>>>>>>>>>>>> cases, >>>>>>>>>>>>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be >>>>> used >>>>>>>>>>>>>>>> automatically for non-windowed aggregate; 2) PAPI users who >> do >>>>>> not >>>>>>>>>>>>> need >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to >>>>>>>> switch >>>>>>>>>>> to >>>>>>>>>>>>>>>> KeyValueWithTimestampStore. Just to give my understanding >> for >>>>>> 3), >>>>>>>>>> the >>>>>>>>>>>>>>>> upgrade flow for users may be simplified as the following >> (for >>>>>>>> both >>>>>>>>>>>>>>>> in-place and roll-over): >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * Update the jar to new version, make code changes from >>>>>>>>>>>>>> KeyValueStore >>>>>>>>>>>>>>>> to KeyValueWithTimestampStore, set upgrade config. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * First rolling bounce, and library code can internally >>>>> use >>>>>>>>>> proxy >>>>>>>>>>>>> / >>>>>>>>>>>>>>>> converter based on the specified config to handle new APIs >>>>> with >>>>>>>> old >>>>>>>>>>>>>> stores, >>>>>>>>>>>>>>>> while let new stores read from old changelog data. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * Reset upgrade config. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> * Second rolling bounce, and the library code >>>>> automatically >>>>>>>>>> turn >>>>>>>>>>>>> off >>>>>>>>>>>>>>>> logic for proxy / converter. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 3. Some more detailed proposals are needed for when to >>>>> recommend >>>>>>>>>>> users >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> trigger the second rolling bounce. I have one idea to share >>>>>> here: >>>>>>>>>> we >>>>>>>>>>>>>> add a >>>>>>>>>>>>>>>> new state to KafkaStreams, say UPGRADING, which is set when >> 1) >>>>>>>>>>> upgrade >>>>>>>>>>>>>>>> config is set, and 2) the new stores are still ramping up >> (for >>>>>> the >>>>>>>>>>>>>> second >>>>>>>>>>>>>>>> part, we can start with some internal hard-coded heuristics >> to >>>>>>>>>> decide >>>>>>>>>>>>>> when >>>>>>>>>>>>>>>> it is close to be ramped up). If either one of it is not >> true >>>>>> any >>>>>>>>>>>>> more, >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>> should transit to RUNNING. Users can then watch on this >> state, >>>>>> and >>>>>>>>>>>>>> decide >>>>>>>>>>>>>>>> to only trigger the second rebalance when the state has >>>>>> transited >>>>>>>>>>> from >>>>>>>>>>>>>>>> UPGRADING. They can also choose to cut over while the >> instance >>>>>> is >>>>>>>>>>>>> still >>>>>>>>>>>>>>>> UPGRADING, the downside is that after that the application >> may >>>>>>>> have >>>>>>>>>>>>> long >>>>>>>>>>>>>>>> restoration phase which is, to user's pov, unavailability >>>>>> periods. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Below are just some minor things on the wiki: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 4. "proxy story" => "proxy store". >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 5. "use the a builder " => "use a builder" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to >>>>> the >>>>>>>>>>>>> value": >>>>>>>>>>>>>>>> what's the rationale of putting the timestamp before the >>>>> value, >>>>>>>>>> than >>>>>>>>>>>>>> after >>>>>>>>>>>>>>>> the value? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax < >>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the feedback Bill. I just update the KIP with >> some >>>>>> of >>>>>>>>>>> your >>>>>>>>>>>>>>>>> points. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing >> to >>>>>>>>>> watch >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a >>>>> type >>>>>> of >>>>>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new >> stores >>>>>> have >>>>>>>>>>>>>>>> reached >>>>>>>>>>>>>>>>>>> parity with the existing old stores and that could be the >>>>>>>> signal >>>>>>>>>>> to >>>>>>>>>>>>>>>>> start >>>>>>>>>>>>>>>>>>> second rolling rebalance? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think we can reuse the existing listeners, thus, I did >> not >>>>>>>>>> include >>>>>>>>>>>>>>>>> anything in the KIP. About a signal to rebalance: this >> might >>>>> be >>>>>>>>>>>>> tricky. >>>>>>>>>>>>>>>>> If we prepare the store "online", the active task will >> update >>>>>> the >>>>>>>>>>>>> state >>>>>>>>>>>>>>>>> continuously, and thus, state prepare is never finished. It >>>>>> will >>>>>>>>>> be >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> users responsibility to do the second rebalance (note, that >>>>> the >>>>>>>>>>>>> second >>>>>>>>>>>>>>>>> rebalance will first finish the last delta of the upgrade >> to >>>>>>>>>> finish >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> upgrade before actual processing resumes). I clarified the >>>>> KIP >>>>>>>>>> with >>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> regard a little bit. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the >>>>> process, >>>>>>>>>> would >>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>>> the other instances to complete or just fail the entire >>>>>>>> upgrade? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The idea is that the upgrade should be robust and not fail. >>>>> We >>>>>>>>>> need >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> write according tests. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could >> rename >>>>>> the >>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>>> active directories vs. deleting them right away, and >> when >>>>>> all >>>>>>>>>> the >>>>>>>>>>>>>>>>> prepare >>>>>>>>>>>>>>>>>>> task directories are successfully migrated then delete >> the >>>>>>>>>>> previous >>>>>>>>>>>>>>>>> active >>>>>>>>>>>>>>>>>>> ones. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ack. Updated the KIP. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing >> any >>>>>> new >>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all >>>>>> prepare >>>>>>>>>>>>> tasks >>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling >>>>>> bounces >>>>>>>>>>> and >>>>>>>>>>>>>>>>> then as >>>>>>>>>>>>>>>>>>> each task successfully renames its prepare directories >> and >>>>>>>>>> deletes >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>>>> active task directories, normal processing of records >>>>>> resumes. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The basic idea is to do an online upgrade to avoid >> downtime. >>>>> We >>>>>>>>>> can >>>>>>>>>>>>>>>>> discuss to offer both options... For the offline upgrade >>>>>> option, >>>>>>>>>> we >>>>>>>>>>>>>>>>> could simplify user interaction and trigger the second >>>>>> rebalance >>>>>>>>>>>>>>>>> automatically with the requirement that a user needs to >>>>> update >>>>>>>> any >>>>>>>>>>>>>>>> config. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If might actually be worth to include this option: we know >>>>> from >>>>>>>>>>>>>>>>> experience with state restore, that regular processing >> slows >>>>>> down >>>>>>>>>>> the >>>>>>>>>>>>>>>>> restore. For roll_over upgrade, it would be a different >> story >>>>>> and >>>>>>>>>>>>>>>>> upgrade should not be slowed down by regular processing. >>>>> Thus, >>>>>> we >>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>> even make in_place an offline upgrade and force people to >> use >>>>>>>>>>>>> roll_over >>>>>>>>>>>>>>>>> if they need onlint upgrade. Might be a fair tradeoff that >>>>> may >>>>>>>>>>>>> simplify >>>>>>>>>>>>>>>>> the upgrade for the user and for the code complexity. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Let's see what other think. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote: >>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the update and the working prototype, it helps >>>>> with >>>>>>>>>>>>>>>>>> understanding the KIP. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I took an initial pass over this PR, and overall I find >> the >>>>>>>>>>>>> interfaces >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> approach to be reasonable. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing >> to >>>>>>>> watch >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a >> type >>>>>> of >>>>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new stores >>>>>> have >>>>>>>>>>>>>> reached >>>>>>>>>>>>>>>>>> parity with the existing old stores and that could be the >>>>>> signal >>>>>>>>>> to >>>>>>>>>>>>>>>> start >>>>>>>>>>>>>>>>>> second rolling rebalance? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Although you solicited feedback on the interfaces >> involved, >>>>> I >>>>>>>>>>> wanted >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> put >>>>>>>>>>>>>>>>>> down some thoughts that have come to mind reviewing this >> KIP >>>>>>>>>> again >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the >> process, >>>>>>>>>> would >>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>> the other instances to complete or just fail the entire >>>>>> upgrade? >>>>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename >>>>> the >>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>> active directories vs. deleting them right away, and when >>>>> all >>>>>>>>>> the >>>>>>>>>>>>>>>>> prepare >>>>>>>>>>>>>>>>>> task directories are successfully migrated then delete the >>>>>>>>>> previous >>>>>>>>>>>>>>>>> active >>>>>>>>>>>>>>>>>> ones. >>>>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing >> any >>>>>> new >>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all >>>>> prepare >>>>>>>>>>> tasks >>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling >>>>>> bounces >>>>>>>>>> and >>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>> each task successfully renames its prepare directories and >>>>>>>>>> deletes >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>>> active task directories, normal processing of records >>>>> resumes. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax < >>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in >>>>> AK >>>>>>>> 2.0 >>>>>>>>>>>>>>>>>>> release. Thus, I want to pick up this KIP again to get >> the >>>>>>>>>> RocksDB >>>>>>>>>>>>>>>>>>> upgrade done for 2.1. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I updated the KIP accordingly and also have a "prove of >>>>>>>> concept" >>>>>>>>>>> PR >>>>>>>>>>>>>>>>>>> ready (for "in place" upgrade only): >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5422/ >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> There a still open questions, but I want to collect early >>>>>>>>>> feedback >>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>> the proposed interfaces we need for the store upgrade. >> Also >>>>>>>>>> note, >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> the KIP now also aim to define a generic upgrade path >> from >>>>>> any >>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>>> format A to any other store format B. Adding timestamps >> is >>>>>> just >>>>>>>>>> a >>>>>>>>>>>>>>>>>>> special case. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I will continue to work on the PR and refine the KIP in >> the >>>>>>>>>>>>> meantime, >>>>>>>>>>>>>>>>> too. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Looking forward to your feedback. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote: >>>>>>>>>>>>>>>>>>>> After some more thoughts, I want to follow John's >>>>> suggestion >>>>>>>>>> and >>>>>>>>>>>>>>>> split >>>>>>>>>>>>>>>>>>>> upgrading the rebalance metadata from the store upgrade. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I extracted the metadata upgrade into it's own KIP: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268% >>>>>>>>>>>>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'll update this KIP accordingly shortly. I also want to >>>>>>>>>> consider >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> make the store format upgrade more flexible/generic. >> Atm, >>>>>> the >>>>>>>>>> KIP >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> too >>>>>>>>>>>>>>>>>>>> much tailored to the DSL IMHO and does not encounter >> PAPI >>>>>>>> users >>>>>>>>>>>>> that >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> should not force to upgrade the stores. I need to figure >>>>> out >>>>>>>>>> the >>>>>>>>>>>>>>>>> details >>>>>>>>>>>>>>>>>>>> and follow up later. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Please give feedback for the new KIP-268 on the >>>>>> corresponding >>>>>>>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>>>> thread. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't >>>>>> figure >>>>>>>>>> out >>>>>>>>>>>>> a >>>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>>>>>> for a single rolling bounce upgrade. But KIP-268 >> proposes >>>>> a >>>>>>>> fix >>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> future upgrades. Please share your thoughts. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for all your feedback! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote: >>>>>>>>>>>>>>>>>>>>> @John: yes, we would throw if configs are missing (it's >>>>> an >>>>>>>>>>>>>>>>>>>>> implementation details IMHO and thus I did not include >> it >>>>>> in >>>>>>>>>> the >>>>>>>>>>>>>>>> KIP) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> @Guozhang: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 1) I understand know what you mean. We can certainly, >>>>> allow >>>>>>>>>> all >>>>>>>>>>>>>>>> values >>>>>>>>>>>>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for >>>>>>>>>>>>> `upgrade.from` >>>>>>>>>>>>>>>>>>>>> parameter. I had a similar though once but decided to >>>>>>>> collapse >>>>>>>>>>>>> them >>>>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>>>>>> one -- will update the KIP accordingly. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 2) The idea to avoid any config would be, to always >> send >>>>>> both >>>>>>>>>>>>>>>> request. >>>>>>>>>>>>>>>>>>>>> If we add a config to eventually disable the old >> request, >>>>>> we >>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>> gain >>>>>>>>>>>>>>>>>>>>> anything with this approach. The question is really, if >>>>> we >>>>>>>> are >>>>>>>>>>>>>>>> willing >>>>>>>>>>>>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be >>>>>> limited >>>>>>>>>> to >>>>>>>>>>>>> 2 >>>>>>>>>>>>>>>>>>>>> versions and not grow further in future releases. More >>>>>>>> details >>>>>>>>>>> in >>>>>>>>>>>>>>>> (3) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases >> and >>>>>>>>>> allows >>>>>>>>>>>>> us >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> stay with 2 "assignment strategies" we need to >> register, >>>>> as >>>>>>>>>> the >>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>> assignment strategy will allow to "upgrade itself" via >>>>>>>>>> "version >>>>>>>>>>>>>>>>>>>>> probing". Thus, (2) would only be a workaround to >> avoid a >>>>>>>>>> config >>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>> people upgrade from pre-1.2 releases. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thus, I don't think we need to register new "assignment >>>>>>>>>>>>> strategies" >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> send empty subscriptions for older version. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 4) I agree that this is a tricky thing to get right >> with >>>>> a >>>>>>>>>>> single >>>>>>>>>>>>>>>>>>>>> rebalance. I share the concern that an application >> might >>>>>>>> never >>>>>>>>>>>>>> catch >>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>>> and thus the hot standby will never be ready. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Maybe it's better to go with 2 rebalances for store >>>>>> upgrades. >>>>>>>>>> If >>>>>>>>>>>>> we >>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>>> this, we also don't need to go with (2) and can get (3) >>>>> in >>>>>>>>>> place >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> future upgrades. I also think that changes to the >>>>> metadata >>>>>>>> are >>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>> likely and thus allowing for single rolling bounce for >>>>> this >>>>>>>>>> case >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>> important anyway. If we assume that store upgrade a >> rare, >>>>>> it >>>>>>>>>>>>> might >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> ok >>>>>>>>>>>>>>>>>>>>> to sacrifice two rolling bounced for this case. It was >>>>> just >>>>>>>> an >>>>>>>>>>>>> idea >>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>> wanted to share (even if I see the issues). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for your replies. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1) About the config names: actually I was trying to >> not >>>>>>>>>> expose >>>>>>>>>>>>>>>>>>>>>> implementation details :) My main concern was that in >>>>> your >>>>>>>>>>>>>> proposal >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> values need to cover the span of all the versions that >>>>> are >>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a >>>>>> user) >>>>>>>>>> am >>>>>>>>>>>>>>>>>>> upgrading >>>>>>>>>>>>>>>>>>>>>> from any versions within this range I need to remember >>>>> to >>>>>>>> use >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> value >>>>>>>>>>>>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. >> In >>>>>> my >>>>>>>>>>>>>>>>> suggestion >>>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>>>> was trying to argue the benefit of just letting users >> to >>>>>>>>>>> specify >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> actual >>>>>>>>>>>>>>>>>>>>>> Kafka version she's trying to upgrade from, than >>>>>> specifying >>>>>>>> a >>>>>>>>>>>>>> range >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc >>>>> as >>>>>>>> the >>>>>>>>>>>>>>>> values, >>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>> still using Kafka versions like broker's >>>>>> `internal.version` >>>>>>>>>>>>>> config. >>>>>>>>>>>>>>>>>>> But if >>>>>>>>>>>>>>>>>>>>>> you were suggesting the same thing, i.e. by >>>>>> "0.10.1.x-1.1.x" >>>>>>>>>>> you >>>>>>>>>>>>>>>>> meant >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or >>>>>> "0.11.0" >>>>>>>>>> or >>>>>>>>>>>>>>>> "1.1" >>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>> are all recognizable config values then I think we are >>>>>>>>>> actually >>>>>>>>>>>>> on >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>> page. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would >>>>>> increase >>>>>>>>>> the >>>>>>>>>>>>>>>>> network >>>>>>>>>>>>>>>>>>>>>> footprint, but not the message size, IF I'm not >>>>>>>>>>>>> mis-understanding >>>>>>>>>>>>>>>>> your >>>>>>>>>>>>>>>>>>> idea >>>>>>>>>>>>>>>>>>>>>> of registering multiple assignment. More details: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can >>>>>>>> encode >>>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>>>>>> protocols each with their different metadata. The >>>>>>>> coordinator >>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>> pick the >>>>>>>>>>>>>>>>>>>>>> common one that everyone supports (if there are no >>>>> common >>>>>>>>>> one, >>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>> send >>>>>>>>>>>>>>>>>>>>>> an error back; if there are multiple ones, it will >> pick >>>>>> the >>>>>>>>>> one >>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>> most >>>>>>>>>>>>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded >>>>>> list). >>>>>>>>>>>>> Since >>>>>>>>>>>>>>>> our >>>>>>>>>>>>>>>>>>>>>> current Streams rebalance protocol is still based on >> the >>>>>>>>>>>>> consumer >>>>>>>>>>>>>>>>>>>>>> coordinator, it means our protocol_type would be >>>>>> "consumer", >>>>>>>>>>> but >>>>>>>>>>>>>>>>>>> instead >>>>>>>>>>>>>>>>>>>>>> the protocol type we can have multiple protocols like >>>>>>>>>>> "streams", >>>>>>>>>>>>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that >> we >>>>>> need >>>>>>>>>> to >>>>>>>>>>>>>>>>>>> implement a >>>>>>>>>>>>>>>>>>>>>> different assignor class for each version and register >>>>> all >>>>>>>> of >>>>>>>>>>>>> them >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In >> the >>>>>>>>>> future >>>>>>>>>>>>> if >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>> re-factor our implementation to have our own client >>>>>>>>>> coordinator >>>>>>>>>>>>>>>> layer >>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>> Connect did, we can simplify this part of the >>>>>>>> implementation. >>>>>>>>>>>>> But >>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> now with the above approach this is still doable. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On the broker side, the group coordinator will only >>>>>> persist >>>>>>>> a >>>>>>>>>>>>>> group >>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>> the selected protocol and its subscription metadata, >>>>> e.g. >>>>>> if >>>>>>>>>>>>>>>>>>> coordinator >>>>>>>>>>>>>>>>>>>>>> decides to pick "streams_v2" it will only sends that >>>>>>>>>> protocol's >>>>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>>> from everyone to the leader to assign, AND when >>>>> completing >>>>>>>>>> the >>>>>>>>>>>>>>>>>>> rebalance it >>>>>>>>>>>>>>>>>>>>>> will also only write the group metadata with that >>>>> protocol >>>>>>>>>> and >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> assignment only. In a word, although the network >> traffic >>>>>>>>>> maybe >>>>>>>>>>>>>>>>>>> increased a >>>>>>>>>>>>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One >>>>> corner >>>>>>>>>>>>>>>> situation >>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>> need to consider is how to stop registering very old >>>>>>>>>> assignors >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> avoid the >>>>>>>>>>>>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if >>>>> you >>>>>>>> are >>>>>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register >> v1 >>>>>>>>>>>>> assignor >>>>>>>>>>>>>>>>>>> anymore, >>>>>>>>>>>>>>>>>>>>>> but that would unfortunately still require some >> configs. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 3) About the "version probing" idea, I think that's a >>>>>>>>>>> promising >>>>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>>>>> as well, but if we are going to do the >> multi-assignment >>>>>> its >>>>>>>>>>>>> value >>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on >> top >>>>> of >>>>>>>>>>>>>>>>>>> multi-assignment >>>>>>>>>>>>>>>>>>>>>> to save us from still requiring the config to avoid >>>>>>>>>> registering >>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> metadata for all version. More details: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, we still register all the >>>>>> assignor >>>>>>>>>> but >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> all old >>>>>>>>>>>>>>>>>>>>>> assignors we do not encode any metadata, i.e. the >>>>> encoded >>>>>>>>>> data >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> be: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> "streams_vN" : "encoded metadata" >>>>>>>>>>>>>>>>>>>>>> "streams_vN-1":empty >>>>>>>>>>>>>>>>>>>>>> "streams_vN-2":empty >>>>>>>>>>>>>>>>>>>>>> .. >>>>>>>>>>>>>>>>>>>>>> "streams_0":empty >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> So the coordinator can still safely choose the latest >>>>>> common >>>>>>>>>>>>>>>> version; >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> then when leaders receive the subscription (note it >>>>> should >>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>> recognize >>>>>>>>>>>>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of >>>>> the >>>>>>>>>>>>>>>>>>> subscriptions >>>>>>>>>>>>>>>>>>>>>> are empty bytes, it will send the empty assignment >> with >>>>>> that >>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>> number >>>>>>>>>>>>>>>>>>>>>> encoded in the metadata. So in the second >> auto-triggered >>>>>> all >>>>>>>>>>>>>>>> members >>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>> send the metadata with that version: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> "streams_vN" : empty >>>>>>>>>>>>>>>>>>>>>> "streams_vN-1" : empty >>>>>>>>>>>>>>>>>>>>>> "streams_vN-2" : "encoded metadata" >>>>>>>>>>>>>>>>>>>>>> .. >>>>>>>>>>>>>>>>>>>>>> "streams_0":empty >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> By doing this we would not require any configs for >>>>> users. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not >>>>> clear >>>>>>>>>> about >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> details >>>>>>>>>>>>>>>>>>>>>> so probably we'd need to fill that out before making a >>>>>> call. >>>>>>>>>>> For >>>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>>>> you mentioned "If we detect this situation, the >> Streams >>>>>>>>>>>>>> application >>>>>>>>>>>>>>>>>>> closes >>>>>>>>>>>>>>>>>>>>>> corresponding active tasks as well as "hot standby" >>>>> tasks, >>>>>>>>>> and >>>>>>>>>>>>>>>>>>> re-creates >>>>>>>>>>>>>>>>>>>>>> the new active tasks using the new store." How could >> we >>>>>>>>>>>>> guarantee >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> gap between these two stores will keep decreasing than >>>>>>>>>>>>> increasing >>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>> we'll >>>>>>>>>>>>>>>>>>>>>> eventually achieve the flip point? And also the longer >>>>> we >>>>>>>> are >>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> flip point, the larger we are doubling the storage >>>>> space, >>>>>>>>>> etc. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax < >>>>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> @John, Guozhang, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> thanks a lot for your comments. Very long reply... >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> About upgrading the rebalance metadata: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Another possibility to do this, would be to register >>>>>>>>>> multiple >>>>>>>>>>>>>>>>>>> assignment >>>>>>>>>>>>>>>>>>>>>>> strategies for the 1.2 applications. For this case, >> new >>>>>>>>>>>>> instances >>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>> be configured to support both and the broker would >> pick >>>>>> the >>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>> all instances understand. The disadvantage would be, >>>>> that >>>>>>>> we >>>>>>>>>>>>> send >>>>>>>>>>>>>>>>> much >>>>>>>>>>>>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance >> as >>>>>> long >>>>>>>>>> as >>>>>>>>>>>>> no >>>>>>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, >>>>> using >>>>>>>>>> this >>>>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>>>>>> would allow to avoid a second rebalance trading-off >> an >>>>>>>>>>>>> increased >>>>>>>>>>>>>>>>>>>>>>> rebalance network footprint (I also assume that this >>>>>> would >>>>>>>>>>>>>>>> increase >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> message size that is written into __consumer_offsets >>>>>>>>>> topic?). >>>>>>>>>>>>>>>>>>> Overall, I >>>>>>>>>>>>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it >>>>>> could >>>>>>>>>>>>> avoid >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>> second rebalance (I have some more thoughts about >>>>> stores >>>>>>>>>> below >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>> relevant for single rebalance upgrade). >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> For future upgrades we might be able to fix this >>>>> though. >>>>>> I >>>>>>>>>> was >>>>>>>>>>>>>>>>>>> thinking >>>>>>>>>>>>>>>>>>>>>>> about the following: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> In the current implementation, the leader fails if it >>>>>> gets >>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>> subscription it does not understand (ie, newer >>>>> version). >>>>>> We >>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>>>>>> this behavior and let the leader send an empty >>>>> assignment >>>>>>>>>> plus >>>>>>>>>>>>>>>> error >>>>>>>>>>>>>>>>>>>>>>> code (including supported version) back to the >> instance >>>>>>>>>>> sending >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> "bad" subscription. This would allow the following >>>>> logic >>>>>>>> for >>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>> application instance: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> - on startup, always send the latest subscription >>>>> format >>>>>>>>>>>>>>>>>>>>>>> - if leader understands it, we get an assignment >> back >>>>> an >>>>>>>>>>> start >>>>>>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>>>>>>>>> - if leader does not understand it, we get an empty >>>>>>>>>>> assignment >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>> supported version back >>>>>>>>>>>>>>>>>>>>>>> - the application unsubscribe()/subscribe()/poll() >>>>> again >>>>>>>>>> and >>>>>>>>>>>>>>>>> sends a >>>>>>>>>>>>>>>>>>>>>>> subscription using the leader's supported version >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> This protocol would allow to do a single rolling >>>>> bounce, >>>>>>>> and >>>>>>>>>>>>>>>>>>> implements >>>>>>>>>>>>>>>>>>>>>>> a "version probing" step, that might result in two >>>>>> executed >>>>>>>>>>>>>>>>>>> rebalances. >>>>>>>>>>>>>>>>>>>>>>> The advantage would be, that the user does not need >> to >>>>>> set >>>>>>>>>> any >>>>>>>>>>>>>>>>> configs >>>>>>>>>>>>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care >>>>> of >>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> automatically. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> One disadvantage would be, that two rebalances happen >>>>> and >>>>>>>>>> that >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>> error case during rebalance, we loose the information >>>>>> about >>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> supported leader version and the "probing step" would >>>>>>>>>> happen a >>>>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>>>> time. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> If the leader is eventually updated, it will include >>>>> it's >>>>>>>>>> own >>>>>>>>>>>>>>>>>>> supported >>>>>>>>>>>>>>>>>>>>>>> version in all assignments, to allow a "down graded" >>>>>>>>>>>>> application >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> upgrade its version later. Also, if a application >>>>> fails, >>>>>>>> the >>>>>>>>>>>>>> first >>>>>>>>>>>>>>>>>>>>>>> probing would always be successful and only a single >>>>>>>>>> rebalance >>>>>>>>>>>>>>>>>>> happens. >>>>>>>>>>>>>>>>>>>>>>> If we use this protocol, I think we don't need any >>>>>>>>>>>>> configuration >>>>>>>>>>>>>>>>>>>>>>> parameter for future upgrades. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version": >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Users would set "upgrade.from" to the release version >>>>> the >>>>>>>>>>>>>>>>> current/old >>>>>>>>>>>>>>>>>>>>>>> application is using. I think this is simpler, as >> users >>>>>>>> know >>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> version. If we use "internal.protocol.version" >> instead, >>>>>> we >>>>>>>>>>>>> expose >>>>>>>>>>>>>>>>>>>>>>> implementation details and users need to know the >>>>>> protocol >>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>> (ie, >>>>>>>>>>>>>>>>>>>>>>> they need to map from the release version to the >>>>> protocol >>>>>>>>>>>>>> version; >>>>>>>>>>>>>>>>> ie, >>>>>>>>>>>>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol >>>>> version >>>>>>>>>> 2"). >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Also the KIP states that for the second rolling >> bounce, >>>>>> the >>>>>>>>>>>>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- >>>>> and >>>>>>>>>>> thus, >>>>>>>>>>>>>>>>>>>>>>> "upgrade.from" would not have any effect and is >> ignored >>>>>> (I >>>>>>>>>>> will >>>>>>>>>>>>>>>>> update >>>>>>>>>>>>>>>>>>>>>>> the KIP to point out this dependency). >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> About your second point: I'll update the KIP >>>>> accordingly >>>>>> to >>>>>>>>>>>>>>>> describe >>>>>>>>>>>>>>>>>>>>>>> future updates as well. Both will be different. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> One more point about upgrading the store format. I >> was >>>>>>>>>>> thinking >>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>>> avoiding the second rolling bounce all together in >> the >>>>>>>>>> future: >>>>>>>>>>>>>> (1) >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) >>>>> this >>>>>>>>>>>>>> required >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the >>>>>>>> switch >>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> delete >>>>>>>>>>>>>>>>>>>>>>> the old stores. (3) the current proposal does the >>>>> switch >>>>>>>>>>>>>>>> "globally" >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>> this is simpler and due to the required second >>>>> rebalance >>>>>> no >>>>>>>>>>>>>>>>>>> disadvantage. >>>>>>>>>>>>>>>>>>>>>>> However, a global consistent switch over might >> actually >>>>>> not >>>>>>>>>> be >>>>>>>>>>>>>>>>>>> required. >>>>>>>>>>>>>>>>>>>>>>> For "in_place" upgrade, following the protocol from >>>>>> above, >>>>>>>>>> we >>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>> decouple the store switch and each instance could >>>>> switch >>>>>>>> its >>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>>>>>>> independently from all other instances. After the >>>>> rolling >>>>>>>>>>>>> bounce, >>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>> seems to be ok to switch from the old store to the >> new >>>>>>>> store >>>>>>>>>>>>>>>> "under >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> hood" whenever the new store is ready (this could >> even >>>>> be >>>>>>>>>>> done, >>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>>>>>> we switch to the new metadata version). Each time we >>>>>> update >>>>>>>>>>> the >>>>>>>>>>>>>>>> "hot >>>>>>>>>>>>>>>>>>>>>>> standby" we check if it reached the "endOffset" (or >>>>>> maybe >>>>>>>>>> X% >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>> either be hardcoded or configurable). If we detect >> this >>>>>>>>>>>>>> situation, >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> Streams application closes corresponding active tasks >>>>> as >>>>>>>>>> well >>>>>>>>>>>>> as >>>>>>>>>>>>>>>>> "hot >>>>>>>>>>>>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks >>>>> using >>>>>>>>>> the >>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>> store. >>>>>>>>>>>>>>>>>>>>>>> (I need to go through the details once again, but it >>>>>> seems >>>>>>>>>> to >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>> feasible.). >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Combining this strategy with the "multiple >> assignment" >>>>>>>> idea, >>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from >>>>> 1.1 >>>>>>>> -> >>>>>>>>>>>>> 1.2. >>>>>>>>>>>>>>>>>>>>>>> Applications would just use the old store, as long as >>>>> the >>>>>>>>>> new >>>>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> not ready, even if the new metadata version is used >>>>>>>> already. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> For future upgrades, a single rebalance would be >>>>>>>> sufficient, >>>>>>>>>>>>> too, >>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>>>> if the stores are upgraded. We would not need any >>>>> config >>>>>>>>>>>>>>>> parameters >>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>> the "probe" step allows us to detect the supported >>>>>>>> rebalance >>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>>>> version (and we would also not need multiple >>>>> "assigmnent >>>>>>>>>>>>>>>> strategies" >>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>> out own protocol encoded everything we need). >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Let me know what you think. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>>>>>>>>> @John: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For the protocol version upgrade, it is only for the >>>>>>>>>> encoded >>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>>>> bytes >>>>>>>>>>>>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from >>>>>>>> Consumer's >>>>>>>>>>>>> pov, >>>>>>>>>>>>>>>>> so I >>>>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>>>> this change should be in the Streams layer as well. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> @Matthias: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> for 2), I agree that adding a "newest supported >>>>> version" >>>>>>>>>>>>> besides >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> "currently used version for encoding" is a good idea >>>>> to >>>>>>>>>> allow >>>>>>>>>>>>>>>>> either >>>>>>>>>>>>>>>>>>>>>>> case; >>>>>>>>>>>>>>>>>>>>>>>> the key is that in Streams we would likely end up >>>>> with a >>>>>>>>>>>>> mapping >>>>>>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>>>>>>>>>>> protocol version to the other persistent data format >>>>>>>>>> versions >>>>>>>>>>>>>>>> such >>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can >> actually >>>>>>>>>>> achieve >>>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the >> upgraded >>>>>>>>>>> protocol >>>>>>>>>>>>>>>>>>> version's >>>>>>>>>>>>>>>>>>>>>>>> corresponding data format does not change, e.g. >> 0.10.0 >>>>>> -> >>>>>>>>>>>>> 0.10.1 >>>>>>>>>>>>>>>>>>> leaders >>>>>>>>>>>>>>>>>>>>>>>> can choose to use the newer version in the first >>>>> rolling >>>>>>>>>>>>> bounce >>>>>>>>>>>>>>>>>>> directly >>>>>>>>>>>>>>>>>>>>>>>> and we can document to users that they would not >> need >>>>> to >>>>>>>>>> set >>>>>>>>>>>>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the >>>>>> upgraded >>>>>>>>>>>>>>>> protocol >>>>>>>>>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> >>>>> 1.2, >>>>>>>> and >>>>>>>>>>>>> then >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the >>>>>> first >>>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>> bounce >>>>>>>>>>>>>>>>>>>>>>>> and reset in the second. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Besides that, some additional comments: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive >> for >>>>>>>> users >>>>>>>>>>> to >>>>>>>>>>>>>>>> set >>>>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>>>>>> "internal.protocol.version" where for the latter >> users >>>>>>>> only >>>>>>>>>>>>> need >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> set a >>>>>>>>>>>>>>>>>>>>>>>> single version, while the Streams will map that >>>>> version >>>>>> to >>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>>>>>>>> assignor's behavior as well as the data format. But >>>>>> maybe >>>>>>>> I >>>>>>>>>>>>> did >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>>>> your idea about how the "upgrade.from" config will >> be >>>>>>>> set, >>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>> your Compatibility section how the upgrade.from >> config >>>>>>>> will >>>>>>>>>>> be >>>>>>>>>>>>>>>> set >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>> these two rolling bounces are not very clear: for >>>>>> example, >>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>>>> reset it to null in the second rolling bounce? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 2) In the upgrade path description, rather than >>>>> talking >>>>>>>>>> about >>>>>>>>>>>>>>>>>>> specific >>>>>>>>>>>>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just >>>>>>>>>> categorize >>>>>>>>>>>>> all >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> possible scenarios, even for future upgrade >> versions, >>>>>> what >>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> standard operations? The categorized we can >> summarize >>>>> to >>>>>>>>>>> would >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>> (assuming >>>>>>>>>>>>>>>>>>>>>>>> user upgrade from version X to version Y, where X >> and >>>>> Y >>>>>>>> are >>>>>>>>>>>>>> Kafka >>>>>>>>>>>>>>>>>>>>>>> versions, >>>>>>>>>>>>>>>>>>>>>>>> with the corresponding supported protocol version x >>>>> and >>>>>>>> y): >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, >> and >>>>>>>>>> hence >>>>>>>>>>>>> no >>>>>>>>>>>>>>>>>>>>>>> persistent >>>>>>>>>>>>>>>>>>>>>>>> data formats have changed. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> b. x != y, but all persistent data format remains >> the >>>>>>>> same. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> b. x !=y, AND some persistene data format like >> RocksDB >>>>>>>>>>> format, >>>>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>>>> format, has been changed. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> c. special case: we may need some special handling >>>>> logic >>>>>>>>>> when >>>>>>>>>>>>>>>>>>> "current >>>>>>>>>>>>>>>>>>>>>>>> version" or "newest supported version" are not >>>>> available >>>>>>>> in >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> protocol, >>>>>>>>>>>>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> under the above scenarios, how many rolling bounces >>>>>> users >>>>>>>>>>> need >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> execute? >>>>>>>>>>>>>>>>>>>>>>>> how they should set the configs in each rolling >>>>> bounce? >>>>>>>> and >>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>>>>>>>> library will execute in these cases? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax < >>>>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ted, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I still consider changing the KIP to include it >> right >>>>>>>> away >>>>>>>>>>> -- >>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> not, >>>>>>>>>>>>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in >> more >>>>>>>>>> detail >>>>>>>>>>>>>>>> first. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> (Same for other open questions like interface names >>>>> -- >>>>>> I >>>>>>>>>>>>>> collect >>>>>>>>>>>>>>>>>>>>>>>>> feedback and update the KIP after we reach >> consensus >>>>>> :)) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the details, Matthias. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future >>>>>>>>>> release, >>>>>>>>>>>>>>>>> encoding >>>>>>>>>>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>>>>>>>>>> and supported version might be an advantage >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be >>>>>>>> implemented >>>>>>>>>>> in >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> KIP. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding >>>>>>>>>> proposal. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax < >>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part >> of >>>>>>>> user >>>>>>>>>>>>>> code. >>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>>>>>>>> if we want to add something like this, I would >>>>> prefer >>>>>>>> to >>>>>>>>>>> do >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>> separate KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling >>>>>> bounce. >>>>>>>>>> But >>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>>>>>> understanding it would not be possible. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is >> indeed >>>>> to >>>>>>>>>>>>> switch >>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to >>>>>> switch >>>>>>>>>>>>> from >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>>>>>>>>>>>> store to the new store (this happens after the >> last >>>>>>>>>>>>> instance >>>>>>>>>>>>>>>>>>> bounces a >>>>>>>>>>>>>>>>>>>>>>>>>>> second time). >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is, >>>>>> that >>>>>>>>>>> it's >>>>>>>>>>>>>>>>>>> unclear >>>>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>>>> to which from version 2 to version 3. The >>>>>>>>>>>>>>>>>>> StreamsPartitionsAssignor is >>>>>>>>>>>>>>>>>>>>>>>>>>> stateless by design, and thus, the information >>>>> which >>>>>>>>>>>>> version >>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>> use must be passed in from externally -- and we >>>>> want >>>>>> to >>>>>>>>>>> use >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsConfig to pass in this information. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> During upgrade, all new instanced have no >>>>> information >>>>>>>>>>> about >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> progress >>>>>>>>>>>>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got >>>>>>>>>> upgrades >>>>>>>>>>>>>>>>>>> already). >>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a >>>>> version 3 >>>>>>>>>>>>>>>>>>> subscription. >>>>>>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>>>>> leader also has this limited view on the world >> and >>>>>> can >>>>>>>>>>> only >>>>>>>>>>>>>>>> send >>>>>>>>>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>>>>>>>>> 2 assignments back. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can >>>>>>>> simplify >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> upgrade. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> We did consider to change the metadata to make >>>>> later >>>>>>>>>>>>> upgrades >>>>>>>>>>>>>>>>> (ie, >>>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we >> change >>>>>> the >>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>>>> storage format again -- as long as we don't >> change >>>>>> it, >>>>>>>> a >>>>>>>>>>>>>>>> single >>>>>>>>>>>>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" >>>>> and >>>>>>>>>>>>>>>> "supported >>>>>>>>>>>>>>>>>>>>>>>>>>> version". This would allow the leader to switch >> to >>>>>> the >>>>>>>>>> new >>>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader >>>>> would >>>>>>>>>>>>> receive >>>>>>>>>>>>>>>>> "used >>>>>>>>>>>>>>>>>>>>>>>>>>> version == old" and "supported version = old/new" >>>>> -- >>>>>> as >>>>>>>>>>>>> long >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>>>>>> least >>>>>>>>>>>>>>>>>>>>>>>>>>> one instance sends a "supported version = old" >>>>> leader >>>>>>>>>>> sends >>>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>>>>>>>>> assignment back. However, encoding both version >>>>> would >>>>>>>>>>> allow >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> leader can send a new version assignment back, >>>>> right >>>>>>>>>> after >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> first >>>>>>>>>>>>>>>>>>>>>>>>>>> round or rebalance finished (all instances send >>>>>>>>>> "supported >>>>>>>>>>>>>>>>>>> version = >>>>>>>>>>>>>>>>>>>>>>>>>>> new"). However, there are still two issues with >>>>> this: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the >>>>>> last >>>>>>>>>>>>>>>> instance >>>>>>>>>>>>>>>>>>>>>>> bounced, >>>>>>>>>>>>>>>>>>>>>>>>>>> the new stores might not be ready to be used -- >>>>> this >>>>>>>>>> could >>>>>>>>>>>>>>>> lead >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> "downtime" as store must be restored before >>>>>> processing >>>>>>>>>> can >>>>>>>>>>>>>>>>> resume. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted >> again. >>>>>> At >>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> point, the >>>>>>>>>>>>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled >> and >>>>>>>> thus >>>>>>>>>>>>>> sends >>>>>>>>>>>>>>>>>>> the old >>>>>>>>>>>>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to >>>>>> never >>>>>>>>>>> fall >>>>>>>>>>>>>>>> back >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> old protocol after the switch to the new >> protocol. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> The second issue is minor and I guess if users >>>>> set-up >>>>>>>>>> the >>>>>>>>>>>>>>>>> instance >>>>>>>>>>>>>>>>>>>>>>>>>>> properly it could be avoided. However, the first >>>>>> issue >>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> prevent >>>>>>>>>>>>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we >>>>>>>>>> consider >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>>>>>>> change the metadata protocol only if a future >>>>>> release, >>>>>>>>>>>>>>>> encoding >>>>>>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>>>>>>> used and supported version might be an advantage >> in >>>>>> the >>>>>>>>>>>>>> future >>>>>>>>>>>>>>>>>>> and we >>>>>>>>>>>>>>>>>>>>>>>>>>> could consider to add this information in 1.2 >>>>> release >>>>>>>> to >>>>>>>>>>>>>>>> prepare >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to >>>>>> give >>>>>>>>>> the >>>>>>>>>>>>>>>>>>> instances >>>>>>>>>>>>>>>>>>>>>>>>>>> enough time to prepare the stores in new format. >> If >>>>>> you >>>>>>>>>>>>> would >>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> second rolling bounce before this, it would still >>>>>> work >>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> however, you >>>>>>>>>>>>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be >>>>>> fully >>>>>>>>>>>>>>>> restored >>>>>>>>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>>>>>>>>>> processing can resume. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Does this make sense? >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get >>>>> rid >>>>>>>> of >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> 2nd >>>>>>>>>>>>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>>>>>>>>>> bounce? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary >>>>>>>>>>> difference >>>>>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is >> to >>>>>>>>>> decide >>>>>>>>>>>>>>>>>>> whether to >>>>>>>>>>>>>>>>>>>>>>>>> send >>>>>>>>>>>>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3. >>>>>>>>>>>>> (Actually, >>>>>>>>>>>>>>>>>>> there is >>>>>>>>>>>>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says >>>>>> that >>>>>>>>>> the >>>>>>>>>>>>>> 2nd >>>>>>>>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>>>>>>>>>> bounce should happen after all new state stores >> are >>>>>>>>>>> created >>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> background thread. However, within the 2nd >> rolling >>>>>>>>>> bounce, >>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> say >>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>> there is still a background thread, so it seems >>>>> like >>>>>> is >>>>>>>>>> no >>>>>>>>>>>>>>>>> actual >>>>>>>>>>>>>>>>>>>>>>>>>>> requirement to wait for the new state stores to >> be >>>>>>>>>>>>> created.) >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal >>>>>> with >>>>>>>>>>>>>>>>> mixed-mode >>>>>>>>>>>>>>>>>>>>>>>>> (having >>>>>>>>>>>>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer >>>>>>>>>> group). >>>>>>>>>>>>> It >>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic >>>>>>>>>>>>>>>>>>> (somehow/somewhere) >>>>>>>>>>>>>>>>>>>>>>>>> such >>>>>>>>>>>>>>>>>>>>>>>>>>> that: >>>>>>>>>>>>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until >> all >>>>>>>>>>>>> instances >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>> running >>>>>>>>>>>>>>>>>>>>>>>>>>> the new code. >>>>>>>>>>>>>>>>>>>>>>>>>>>> * Once all the instances are running the new >> code, >>>>>>>> then >>>>>>>>>>>>> one >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>> time, >>>>>>>>>>>>>>>>>>>>>>>>>>> the instances start sending Subscription V3. >> Leader >>>>>>>>>> still >>>>>>>>>>>>>>>> hands >>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores >>>>> are >>>>>>>>>>> ready. >>>>>>>>>>>>>>>>>>>>>>>>>>>> * Once all instances report that new stores are >>>>>> ready, >>>>>>>>>>>>>> Leader >>>>>>>>>>>>>>>>>>> sends >>>>>>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 3. >>>>>>>>>>>>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment >> Version >>>>> 3, >>>>>>>> it >>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>> delete >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> old state store. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a >> lot >>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 >>>>> rolling >>>>>>>>>>>>>>>> restarts. >>>>>>>>>>>>>>>>> No >>>>>>>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just >>>>> deploy >>>>>>>>>> it, >>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> instances >>>>>>>>>>>>>>>>>>>>>>>>>>> update themselves. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> The thing that made me think of this is that the >>>>> "2 >>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>> bounces" >>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes >> in >>>>>>>>>>>>>>>>>>>>>>>>>>> inter.broker.protocol.version and >>>>>>>>>>>>> log.message.format.version. >>>>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> broker case, it seems like it would be possible >>>>> (with >>>>>>>>>> some >>>>>>>>>>>>>>>> work >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>> course) >>>>>>>>>>>>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar >>>>>>>> auto-detection >>>>>>>>>>> of >>>>>>>>>>>>>>>>> broker >>>>>>>>>>>>>>>>>>>>>>>>>>> capabilities and automatically do a switchover >> from >>>>>>>>>>> old/new >>>>>>>>>>>>>>>>>>> versions. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> -James >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck < >>>>>>>>>>>>>> bbej...@gmail.com >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval >>>>>>>> methods >>>>>>>>>>> on >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>>>>>> interfaces. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a >>>>>>>>>>> Predicate >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored >>>>> with >>>>>>>> the >>>>>>>>>>>>>>>> record? >>>>>>>>>>>>>>>>>>> Or >>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>> better left for users to implement themselves >>>>> once >>>>>>>> the >>>>>>>>>>>>> data >>>>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>> been >>>>>>>>>>>>>>>>>>>>>>>>>>>>> retrieved? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu < >>>>>>>>>>>>>> yuzhih...@gmail.com >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to >>>>>> which >>>>>>>>>>>>>>>>> separator >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>> chosen. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current >>>>> choice >>>>>>>> is >>>>>>>>>>>>>> good. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it >>>>> is >>>>>>>>>>> closer >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> English >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> grammar. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people >>>>>> think. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. >> Sax >>>>> < >>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comments! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance >>>>>> metadata >>>>>>>>>>>>>> upgrade >>>>>>>>>>>>>>>>> fix >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (addressing the mentioned >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>> https://issues.apache.org/jira/browse/KAFKA-6054 >>>>>> ) >>>>>>>>>> It >>>>>>>>>>>>>>>> give a >>>>>>>>>>>>>>>>>>> first >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works >>>>>> including >>>>>>>>>> a >>>>>>>>>>>>>>>> system >>>>>>>>>>>>>>>>>>> test: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are >>>>> ready. >>>>>> I >>>>>>>>>>>>> agree >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to >>>>>> give >>>>>>>>>>>>> better >>>>>>>>>>>>>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> context. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Ted: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the >>>>>>>>>>>>>>>>>>>>>>> `processing.guarantee` >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and >>>>>>>>>>>>> `exactly_once` >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>> values. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs >>>>> dash >>>>>>>>>> but >>>>>>>>>>> I >>>>>>>>>>>>>>>>> prefer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we >>>>> can >>>>>>>>>> also >>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>> it to >>>>>>>>>>>>>>>>>>>>>>>>>>> `-`. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either >> way >>>>>> -- I >>>>>>>>>>>>>>>> stripped >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> `With` >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be >>>>> good >>>>>> to >>>>>>>>>>> get >>>>>>>>>>>>>>>>>>> feedback >>>>>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> others and pick the name the majority >> prefers. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @John: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it >>>>> would >>>>>>>>>> not >>>>>>>>>>>>>>>> make a >>>>>>>>>>>>>>>>>>>>>>>>>>> difference. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the >> two >>>>>>>>>>> version >>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>>>>>>> introduce >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I >>>>>> don't >>>>>>>>>> hit >>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>> issue >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory >>>>> instead >>>>>> of >>>>>>>>>>>>>>>>>>> `rocksdb-v2` >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several >>>>>> questions >>>>>>>>>>>>> queued >>>>>>>>>>>>>>>>> up, >>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> were >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" >> section... >>>>>> oh, >>>>>>>>>>>>> well. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state >>>>>>>>>>> directory >>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "/<state.dir>/< >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id >>> /rocksdb/storeName/" >>>>>> to >>>>>>>>>>>>>>>>>>> "/<state.dir>/< >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id >>>>>>> /rocksdb-v2/storeName/": >>>>>>>>>> if >>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>> put the >>>>>>>>>>>>>>>>>>>>>>>>>>> "v2" >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path >>>>> (i.e., >>>>>>>>>>>>>>>>>>> "/<state.dir>/< >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id >>>>>>>>> /rocksdb/storeName-v2/"), >>>>>>>>>>>>> then >>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level >>>>>> directory >>>>>>>>>>>>>>>>> structure. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine >> people >>>>>>>>>> running >>>>>>>>>>>>>>>>>>> scripts to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> monitor >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other >>>>> such >>>>>>>> use >>>>>>>>>>>>>>>> cases. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu < >>>>>>>>>>>>>>>>> yuzhih...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? >>>>>> Underscore >>>>>>>>>> may >>>>>>>>>>>>>>>>>>> sometimes >>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>> miss >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more >>>>>>>> friendly >>>>>>>>>>> to >>>>>>>>>>>>>>>>> user. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface >>>>>>>> ReadOnlyKeyValueTimestampStore<K, >>>>>>>>>>> V> >>>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp >> better >>>>>> name >>>>>>>>>>> for >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> class ? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang >>>>> Wang < >>>>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch >> section >>>>>> and >>>>>>>>>> it >>>>>>>>>>>>>>>> looks >>>>>>>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> me, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you >> also >>>>>>>> share >>>>>>>>>>> it >>>>>>>>>>>>>>>> here >>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> people >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can take a look? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs >>>>> like >>>>>>>>>> this >>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it >>>>> is >>>>>>>>>>> better >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the >>>>>> design >>>>>>>>>>>>>>>>>>> discussion :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias >> J. >>>>>> Sax >>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams >>>>> API >>>>>>>> to >>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>>> storing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is >> the >>>>>>>> basis >>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> resolve >>>>>>>>>>>>>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about >>>>> this! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature