I also want to point out, that Ryanne Dolan commented on the WIP PR (https://github.com/apache/kafka/pull/6044) about the naming. I asked him to reply to this thread, but this did no happen yet, thus I want to point it out myself, because it seems to important.
WindowWithTimestampStore.java > Hard to parse this class name -- sounds like "trait Window with > TimestampStore" instead of WindowStore<... ValueAndTimestamp...> > > Maybe TimestampWindowStore or TimestampedWindowStore? shrug. > Another comment, that I personally do not necessarily agree: KeyValueWithTimestampStore.java > possible alternative: implement KeyValueStore<K, V>, and then expose an > additional putWithTimestamp, getWithTimestamp etc for callers that want > ValueAndTimestamp<V> instead of V. This would probably require fewer code > changes elsewhere. What do you think? -Matthias On 1/12/19 6:43 PM, Matthias J. Sax wrote: > Bill, > > I left the question about legacy column family out, because as a matter > of fact, we use the default column family atm that cannot be deleted. > Thus, this old column family will always be there. > > Nevertheless, as an implementation detail, it might make sense to avoid > accessing both column families forever (ie, each time a key is not > found). Also, we might want/need a way, to "force" upgrading to the new > column family, for the case that some records are not accessed for a > long time. Again, this seems to be an implementation detail (and I am > also not sure if we really need it). If you thing both are not > implementation details, I can of course extend the KIP accordingly. > > > -Matthias > > On 1/11/19 1:27 PM, Bill Bejeck wrote: >> Hi Matthias, >> >> Thanks for the KIP, it goes into good detail and is well done. >> >> Overall I'm a +1 on the KIP and have one minor question. >> >> Regarding the upgrade path, we'll use two column families to do a lazy >> conversion which makes sense to me. What is the plan to get >> rid of the "legacy" column family (if ever)? Would we drop the "legacy" >> column family once it is empty? I'm not sure we'd ever need to as it would >> just be a column family that doesn't get used. >> >> Maybe this is an implementation detail and doesn't need to be addressed >> now, but it came to mind when I read the KIP. >> >> Thanks again, >> Bill >> >> On Fri, Jan 11, 2019 at 1:19 PM John Roesler <j...@confluent.io> wrote: >> >>> Hi Matthias, >>> >>> Thanks for the updates to the KIP. I've just read it over, and am >>> personally quite happy with it. >>> >>> Thanks for tackling this dicey issue and putting in a huge amount of design >>> work to produce >>> a smooth upgrade path for DSL users. >>> >>> Thanks, >>> -John >>> >>> On Mon, Dec 17, 2018 at 10:35 AM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Dear all, >>>> >>>> I finally managed to update the KIP. >>>> >>>> To address the concerns about the complex upgrade path, I simplified the >>>> design. We don't need any configs and the upgrade can be done with the >>>> simple single rolling bounce pattern. >>>> >>>> The suggestion is to exploit RocksDB column families to isolate old and >>>> new on-disk format. Furthermore, the upgrade from old to new format >>>> happens "on the side" after an instance was upgraded. >>>> >>>> I also pushed a WIP PR in case you want to look into some details >>>> (potential reviewers, don't panic: I plan to break this down into >>>> multiple PRs for actual review if the KIP is accepted). >>>> >>>> https://github.com/apache/kafka/pull/6044 >>>> >>>> @Eno: I think I never answered your question about being future proof: >>>> >>>> The latest design is not generic, because it does not support changes >>>> that need to be reflected in the changelog topic. I aimed for a >>>> non-generic design for now to keep it as simple as possible. Thus, other >>>> format changes might need a different design / upgrade path -- however, >>>> because this KIP is quite encapsulated in the current design, I don't >>>> see any issue to build this later and a generic upgrade path seems to be >>>> an orthogonal concern atm. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 11/22/18 2:50 PM, Adam Bellemare wrote: >>>>> Thanks for the information Matthias. >>>>> >>>>> I will await your completion of this ticket then since it underpins the >>>>> essential parts of a RocksDB TTL aligned with the changelog topic. I am >>>>> eager to work on that ticket myself, so if I can help on this one in >>> any >>>>> way please let me know. >>>>> >>>>> Thanks >>>>> Adam >>>>> >>>>> >>>>> >>>>> On Tue, Nov 20, 2018 at 5:26 PM Matthias J. Sax <matth...@confluent.io >>>> >>>>> wrote: >>>>> >>>>>> It's an interesting idea to use second store, to maintain the >>>>>> timestamps. However, each RocksDB instance implies some overhead. In >>>>>> fact, we are looking into ColumnFamilies atm to see if we can use >>> those >>>>>> and merge multiple RocksDBs into a single one to reduce this overhead. >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 11/20/18 5:15 AM, Patrik Kleindl wrote: >>>>>>> Hi Adam >>>>>>> >>>>>>> Sounds great, I was already planning to ask around if anyone had >>>> tackled >>>>>>> this. >>>>>>> We have a use case very similar to what you described in KAFKA-4212, >>>> only >>>>>>> with Global State Stores. >>>>>>> I have tried a few things with the normal DSL but was not really >>>>>> successful. >>>>>>> Schedule/Punctuate is not possible, supplying a windowed store is >>> also >>>>>> not >>>>>>> allowed and the process method has no knowledge of the timestamp of >>> the >>>>>>> record. >>>>>>> And anything loaded on startup is not filtered anyway. >>>>>>> >>>>>>> Regarding 4212, wouldn't it be easier (although a little less >>>>>>> space-efficient) to track the Timestamps in a separate Store with <K, >>>>>> Long> >>>>>>> ? >>>>>>> This would leave the original store intact and allow a migration of >>> the >>>>>>> timestamps without touching the other data. >>>>>>> >>>>>>> So I am very interested in your PR :-) >>>>>>> >>>>>>> best regards >>>>>>> >>>>>>> Patrik >>>>>>> >>>>>>> On Tue, 20 Nov 2018 at 04:46, Adam Bellemare < >>> adam.bellem...@gmail.com >>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Matthias >>>>>>>> >>>>>>>> Thanks - I figured that it was probably a case of just too much to >>> do >>>>>> and >>>>>>>> not enough time. I know how that can go. I am asking about this one >>> in >>>>>>>> relation to https://issues.apache.org/jira/browse/KAFKA-4212, >>> adding >>>> a >>>>>> TTL >>>>>>>> to RocksDB. I have outlined a bit about my use-case within 4212, but >>>> for >>>>>>>> brevity here it is: >>>>>>>> >>>>>>>> My case: >>>>>>>> 1) I have a RocksDB with TTL implementation working where records >>> are >>>>>> aged >>>>>>>> out using the TTL that comes with RocksDB (very simple). >>>>>>>> 2) We prevent records from loading from the changelog if recordTime >>> + >>>>>> TTL < >>>>>>>> referenceTimeStamp (default = System.currentTimeInMillis() ). >>>>>>>> >>>>>>>> This assumes that the records are stored with the same time >>> reference >>>>>> (say >>>>>>>> UTC) as the consumer materializing the RocksDB store. >>>>>>>> >>>>>>>> My questions about KIP-258 are as follows: >>>>>>>> 1) How does "we want to be able to store record timestamps in >>> KTables" >>>>>>>> differ from inserting records into RocksDB with TTL at consumption >>>>>> time? I >>>>>>>> understand that it could be a difference of some seconds, minutes, >>>>>> hours, >>>>>>>> days etc between when the record was published and now, but given >>> the >>>>>>>> nature of how RocksDB TTL works (eventual - based on compaction) I >>>> don't >>>>>>>> see how a precise TTL can be achieved, such as that which one can >>> get >>>>>> with >>>>>>>> windowed stores. >>>>>>>> >>>>>>>> 2) Are you looking to change how records are inserted into a TTL >>>>>> RocksDB, >>>>>>>> such that the TTL would take effect from the record's published >>> time? >>>> If >>>>>>>> not, what would be the ideal workflow here for a single record with >>>> TTL >>>>>>>> RocksDB? >>>>>>>> ie: Record Timestamp: 100 >>>>>>>> TTL: 50 >>>>>>>> Record inserted into rocksDB: 110 >>>>>>>> Record to expire at 150? >>>>>>>> >>>>>>>> 3) I'm not sure I fully understand the importance of the upgrade >>>> path. I >>>>>>>> have read the link to ( >>>> https://issues.apache.org/jira/browse/KAFKA-3522 >>>>>> ) >>>>>>>> in >>>>>>>> the KIP, and I can understand that a state-store on disk may not >>>>>> represent >>>>>>>> what the application is expecting. I don't think I have the full >>>> picture >>>>>>>> though, because that issue seems to be easy to fix with a simple >>>>>> versioned >>>>>>>> header or accompanying file, forcing the app to rebuild the state if >>>> the >>>>>>>> version is incompatible. Can you elaborate or add a scenario to the >>>> KIP >>>>>>>> that illustrates the need for the upgrade path? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Adam >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax < >>>> matth...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Adam, >>>>>>>>> >>>>>>>>> I am still working on it. Was pulled into a lot of other tasks >>> lately >>>>>> so >>>>>>>>> this was delayed. Also had some discussions about simplifying the >>>>>>>>> upgrade path with some colleagues and I am prototyping this atm. >>> Hope >>>>>> to >>>>>>>>> update the KIP accordingly soon. >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> On 11/10/18 7:41 AM, Adam Bellemare wrote: >>>>>>>>>> Hello Matthias >>>>>>>>>> >>>>>>>>>> I am curious as to the status of this KIP. TTL and expiry of >>> records >>>>>>>> will >>>>>>>>>> be extremely useful for several of our business use-cases, as well >>>> as >>>>>>>>>> another KIP I had been working on. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska < >>>> eno.there...@gmail.com >>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Matthias, >>>>>>>>>>> >>>>>>>>>>> Good stuff. Could you comment a bit on how future-proof is this >>>>>>>> change? >>>>>>>>> For >>>>>>>>>>> example, if we want to store both event timestamp "and" >>> processing >>>>>>>> time >>>>>>>>> in >>>>>>>>>>> RocksDB will we then need another interface (e.g. called >>>>>>>>>>> KeyValueWithTwoTimestampsStore)? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Eno >>>>>>>>>>> >>>>>>>>>>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax < >>>>>>>> matth...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for your input Guozhang and John. >>>>>>>>>>>> >>>>>>>>>>>> I see your point, that the upgrade API is not simple. If you >>> don't >>>>>>>>>>>> thinks it's valuable to make generic store upgrades possible >>>> (atm), >>>>>>>> we >>>>>>>>>>>> can make the API internal, too. The impact is, that we only >>>> support >>>>>> a >>>>>>>>>>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to >>>>>>>>>>>> WindowedWithTS etc) for which we implement the internal >>>> interfaces. >>>>>>>>>>>> >>>>>>>>>>>> We can keep the design generic, so if we decide to make it >>> public, >>>>>> we >>>>>>>>>>>> don't need to re-invent it. This will also have the advantage, >>>> that >>>>>>>> we >>>>>>>>>>>> can add upgrade pattern for other stores later, too. >>>>>>>>>>>> >>>>>>>>>>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, >>> but >>>>>> it >>>>>>>>>>>> was the only way I could find to design a generic upgrade >>>> interface. >>>>>>>> If >>>>>>>>>>>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` >>>>>> would >>>>>>>>>>>> become an internal interface I guess (don't think we can remove >>>> it). >>>>>>>>>>>> >>>>>>>>>>>> I will wait for more feedback about this and if nobody wants to >>>> keep >>>>>>>> it >>>>>>>>>>>> as public API I will update the KIP accordingly. Will add some >>>> more >>>>>>>>>>>> clarifications for different upgrade patterns in the mean time >>> and >>>>>>>> fix >>>>>>>>>>>> the typos/minor issues. >>>>>>>>>>>> >>>>>>>>>>>> About adding a new state UPGRADING: maybe we could do that. >>>> However, >>>>>>>> I >>>>>>>>>>>> find it particularly difficult to make the estimation when we >>>> should >>>>>>>>>>>> switch to RUNNING, thus, I am a little hesitant. Using store >>>>>>>> callbacks >>>>>>>>>>>> or just logging the progress including some indication about the >>>>>>>> "lag" >>>>>>>>>>>> might actually be sufficient. Not sure what others think? >>>>>>>>>>>> >>>>>>>>>>>> About "value before timestamp": no real reason and I think it >>> does >>>>>>>> not >>>>>>>>>>>> make any difference. Do you want to change it? >>>>>>>>>>>> >>>>>>>>>>>> About upgrade robustness: yes, we cannot control if an instance >>>>>>>> fails. >>>>>>>>>>>> That is what I meant by "we need to write test". The upgrade >>>> should >>>>>>>> be >>>>>>>>>>>> able to continuous even is an instance goes down (and we must >>> make >>>>>>>> sure >>>>>>>>>>>> that we don't end up in an invalid state that forces us to wipe >>>> out >>>>>>>> the >>>>>>>>>>>> whole store). Thus, we need to write system tests that fail >>>>>> instances >>>>>>>>>>>> during upgrade. >>>>>>>>>>>> >>>>>>>>>>>> For `in_place_offline` upgrade: I don't think we need this mode, >>>>>>>>> because >>>>>>>>>>>> people can do this via a single rolling bounce. >>>>>>>>>>>> >>>>>>>>>>>> - prepare code and switch KV-Store to KVwithTs-Store >>>>>>>>>>>> - do a single rolling bounce (don't set any upgrade config) >>>>>>>>>>>> >>>>>>>>>>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if >>>> we >>>>>>>>>>>> remove the `StoreUpgradeBuilder`) will detect that there is only >>>> an >>>>>>>> old >>>>>>>>>>>> local KV store w/o TS, will start to restore the new KVwithTs >>>> store, >>>>>>>>>>>> wipe out the old store and replace with the new store after >>>> restore >>>>>>>> is >>>>>>>>>>>> finished, and start processing only afterwards. (I guess we need >>>> to >>>>>>>>>>>> document this case -- will also add it to the KIP.) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 8/9/18 1:10 PM, John Roesler wrote: >>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>> >>>>>>>>>>>>> I think this KIP is looking really good. >>>>>>>>>>>>> >>>>>>>>>>>>> I have a few thoughts to add to the others: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. You mentioned at one point users needing to configure >>>>>>>>>>>>> `upgrade.mode="null"`. I think this was a typo and you meant to >>>> say >>>>>>>>>>> they >>>>>>>>>>>>> should remove the config. If they really have to set it to a >>>> string >>>>>>>>>>>> "null" >>>>>>>>>>>>> or even set it to a null value but not remove it, it would be >>>>>>>>>>>> unfortunate. >>>>>>>>>>>>> >>>>>>>>>>>>> 2. In response to Bill's comment #1 , you said that "The idea >>> is >>>>>>>> that >>>>>>>>>>> the >>>>>>>>>>>>> upgrade should be robust and not fail. We need to write >>> according >>>>>>>>>>> tests". >>>>>>>>>>>>> I may have misunderstood the conversation, but I don't think >>> it's >>>>>>>>>>> within >>>>>>>>>>>>> our power to say that an instance won't fail. What if one of my >>>>>>>>>>> computers >>>>>>>>>>>>> catches on fire? What if I'm deployed in the cloud and one >>>> instance >>>>>>>>>>>>> disappears and is replaced by a new one? Or what if one >>> instance >>>>>>>> goes >>>>>>>>>>>> AWOL >>>>>>>>>>>>> for a long time and then suddenly returns? How will the upgrade >>>>>>>>> process >>>>>>>>>>>>> behave in light of such failures? >>>>>>>>>>>>> >>>>>>>>>>>>> 3. your thought about making in-place an offline mode is >>>>>>>> interesting, >>>>>>>>>>> but >>>>>>>>>>>>> it might be a bummer for on-prem users who wish to upgrade >>>> online, >>>>>>>> but >>>>>>>>>>>>> cannot just add new machines to the pool. It could be a new >>>> upgrade >>>>>>>>>>> mode >>>>>>>>>>>>> "offline-in-place", though... >>>>>>>>>>>>> >>>>>>>>>>>>> 4. I was surprised to see that a user would need to modify the >>>>>>>>> topology >>>>>>>>>>>> to >>>>>>>>>>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of >>>> Guozhang's >>>>>>>>>>>>> suggestions would remove this necessity. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for taking on this very complex but necessary work. >>>>>>>>>>>>> >>>>>>>>>>>>> -John >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang < >>>> wangg...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello Matthias, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the updated KIP. Some more comments: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. The current set of proposed API is a bit too complicated, >>>> which >>>>>>>>>>> makes >>>>>>>>>>>>>> the upgrade flow from user's perspective also a bit complex. >>> I'd >>>>>>>> like >>>>>>>>>>> to >>>>>>>>>>>>>> check different APIs and discuss about their needs separately: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1.a. StoreProxy: needed for in-place upgrade only, between >>>> the >>>>>>>>>>> first >>>>>>>>>>>>>> and second rolling bounce, where the old-versioned stores can >>>>>>>> handle >>>>>>>>>>>>>> new-versioned store APIs. I think such upgrade paths (i.e. >>> from >>>>>> one >>>>>>>>>>>> store >>>>>>>>>>>>>> type to another) would not be very common: users may want to >>>>>>>> upgrade >>>>>>>>>>>> from a >>>>>>>>>>>>>> certain store engine to another, but the interface would >>> likely >>>> be >>>>>>>>>>>> staying >>>>>>>>>>>>>> the same. Hence personally I'd suggest we keep it internally >>> and >>>>>>>> only >>>>>>>>>>>>>> consider exposing it in the future if it does become a common >>>>>>>>> pattern. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1.b. ConverterStore / RecordConverter: needed for both >>>>>> in-place >>>>>>>>>>> and >>>>>>>>>>>>>> roll-over upgrade, between the first and second rolling >>> bounces, >>>>>>>> for >>>>>>>>>>> the >>>>>>>>>>>>>> new versioned store to be able to read old-versioned changelog >>>>>>>>> topics. >>>>>>>>>>>>>> Firstly I think we should not expose key in the public APIs >>> but >>>>>>>> only >>>>>>>>>>> the >>>>>>>>>>>>>> values, since allowing key format changes would break log >>>>>>>> compaction, >>>>>>>>>>>> and >>>>>>>>>>>>>> hence would not be compatible anyways. As for value format >>>>>> changes, >>>>>>>>>>>>>> personally I think we can also keep its upgrade logic >>> internally >>>>>> as >>>>>>>>> it >>>>>>>>>>>> may >>>>>>>>>>>>>> not worth generalizing to user customizable logic. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1.c. If you agrees with 2.a/b above, then we can also >>>> remove " >>>>>>>>>>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the >>>>>> public >>>>>>>>>>>> APIs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1.d. Personally I think >>> "ReadOnlyKeyValueWithTimestampStore" >>>>>> is >>>>>>>>>>> not >>>>>>>>>>>>>> needed either given that we are exposing "ValueAndTimestamp" >>>>>>>> anyways. >>>>>>>>>>>> I.e. >>>>>>>>>>>>>> it is just a syntax sugar and for IQ, users can always just >>> set >>>> a >>>>>> " >>>>>>>>>>>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" >>>> as >>>>>>>> the >>>>>>>>>>>> new >>>>>>>>>>>>>> interface does not provide any additional functions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. Could we further categorize the upgrade flow for different >>>> use >>>>>>>>>>> cases, >>>>>>>>>>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be >>> used >>>>>>>>>>>>>> automatically for non-windowed aggregate; 2) PAPI users who do >>>> not >>>>>>>>>>> need >>>>>>>>>>>> to >>>>>>>>>>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to >>>>>> switch >>>>>>>>> to >>>>>>>>>>>>>> KeyValueWithTimestampStore. Just to give my understanding for >>>> 3), >>>>>>>> the >>>>>>>>>>>>>> upgrade flow for users may be simplified as the following (for >>>>>> both >>>>>>>>>>>>>> in-place and roll-over): >>>>>>>>>>>>>> >>>>>>>>>>>>>> * Update the jar to new version, make code changes from >>>>>>>>>>>> KeyValueStore >>>>>>>>>>>>>> to KeyValueWithTimestampStore, set upgrade config. >>>>>>>>>>>>>> >>>>>>>>>>>>>> * First rolling bounce, and library code can internally >>> use >>>>>>>> proxy >>>>>>>>>>> / >>>>>>>>>>>>>> converter based on the specified config to handle new APIs >>> with >>>>>> old >>>>>>>>>>>> stores, >>>>>>>>>>>>>> while let new stores read from old changelog data. >>>>>>>>>>>>>> >>>>>>>>>>>>>> * Reset upgrade config. >>>>>>>>>>>>>> >>>>>>>>>>>>>> * Second rolling bounce, and the library code >>> automatically >>>>>>>> turn >>>>>>>>>>> off >>>>>>>>>>>>>> logic for proxy / converter. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3. Some more detailed proposals are needed for when to >>> recommend >>>>>>>>> users >>>>>>>>>>>> to >>>>>>>>>>>>>> trigger the second rolling bounce. I have one idea to share >>>> here: >>>>>>>> we >>>>>>>>>>>> add a >>>>>>>>>>>>>> new state to KafkaStreams, say UPGRADING, which is set when 1) >>>>>>>>> upgrade >>>>>>>>>>>>>> config is set, and 2) the new stores are still ramping up (for >>>> the >>>>>>>>>>>> second >>>>>>>>>>>>>> part, we can start with some internal hard-coded heuristics to >>>>>>>> decide >>>>>>>>>>>> when >>>>>>>>>>>>>> it is close to be ramped up). If either one of it is not true >>>> any >>>>>>>>>>> more, >>>>>>>>>>>> it >>>>>>>>>>>>>> should transit to RUNNING. Users can then watch on this state, >>>> and >>>>>>>>>>>> decide >>>>>>>>>>>>>> to only trigger the second rebalance when the state has >>>> transited >>>>>>>>> from >>>>>>>>>>>>>> UPGRADING. They can also choose to cut over while the instance >>>> is >>>>>>>>>>> still >>>>>>>>>>>>>> UPGRADING, the downside is that after that the application may >>>>>> have >>>>>>>>>>> long >>>>>>>>>>>>>> restoration phase which is, to user's pov, unavailability >>>> periods. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Below are just some minor things on the wiki: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 4. "proxy story" => "proxy store". >>>>>>>>>>>>>> >>>>>>>>>>>>>> 5. "use the a builder " => "use a builder" >>>>>>>>>>>>>> >>>>>>>>>>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to >>> the >>>>>>>>>>> value": >>>>>>>>>>>>>> what's the rationale of putting the timestamp before the >>> value, >>>>>>>> than >>>>>>>>>>>> after >>>>>>>>>>>>>> the value? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax < >>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the feedback Bill. I just update the KIP with some >>>> of >>>>>>>>> your >>>>>>>>>>>>>>> points. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to >>>>>>>> watch >>>>>>>>>>> the >>>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a >>> type >>>> of >>>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new stores >>>> have >>>>>>>>>>>>>> reached >>>>>>>>>>>>>>>>> parity with the existing old stores and that could be the >>>>>> signal >>>>>>>>> to >>>>>>>>>>>>>>> start >>>>>>>>>>>>>>>>> second rolling rebalance? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think we can reuse the existing listeners, thus, I did not >>>>>>>> include >>>>>>>>>>>>>>> anything in the KIP. About a signal to rebalance: this might >>> be >>>>>>>>>>> tricky. >>>>>>>>>>>>>>> If we prepare the store "online", the active task will update >>>> the >>>>>>>>>>> state >>>>>>>>>>>>>>> continuously, and thus, state prepare is never finished. It >>>> will >>>>>>>> be >>>>>>>>>>> the >>>>>>>>>>>>>>> users responsibility to do the second rebalance (note, that >>> the >>>>>>>>>>> second >>>>>>>>>>>>>>> rebalance will first finish the last delta of the upgrade to >>>>>>>> finish >>>>>>>>>>> the >>>>>>>>>>>>>>> upgrade before actual processing resumes). I clarified the >>> KIP >>>>>>>> with >>>>>>>>>>>> this >>>>>>>>>>>>>>> regard a little bit. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the >>> process, >>>>>>>> would >>>>>>>>>>> we >>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>> the other instances to complete or just fail the entire >>>>>> upgrade? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The idea is that the upgrade should be robust and not fail. >>> We >>>>>>>> need >>>>>>>>>>> to >>>>>>>>>>>>>>> write according tests. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename >>>> the >>>>>>>>>>>> current >>>>>>>>>>>>>>>>> active directories vs. deleting them right away, and when >>>> all >>>>>>>> the >>>>>>>>>>>>>>> prepare >>>>>>>>>>>>>>>>> task directories are successfully migrated then delete the >>>>>>>>> previous >>>>>>>>>>>>>>> active >>>>>>>>>>>>>>>>> ones. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ack. Updated the KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing any >>>> new >>>>>>>>>>>>>> records >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all >>>> prepare >>>>>>>>>>> tasks >>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling >>>> bounces >>>>>>>>> and >>>>>>>>>>>>>>> then as >>>>>>>>>>>>>>>>> each task successfully renames its prepare directories and >>>>>>>> deletes >>>>>>>>>>>> the >>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>> active task directories, normal processing of records >>>> resumes. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The basic idea is to do an online upgrade to avoid downtime. >>> We >>>>>>>> can >>>>>>>>>>>>>>> discuss to offer both options... For the offline upgrade >>>> option, >>>>>>>> we >>>>>>>>>>>>>>> could simplify user interaction and trigger the second >>>> rebalance >>>>>>>>>>>>>>> automatically with the requirement that a user needs to >>> update >>>>>> any >>>>>>>>>>>>>> config. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If might actually be worth to include this option: we know >>> from >>>>>>>>>>>>>>> experience with state restore, that regular processing slows >>>> down >>>>>>>>> the >>>>>>>>>>>>>>> restore. For roll_over upgrade, it would be a different story >>>> and >>>>>>>>>>>>>>> upgrade should not be slowed down by regular processing. >>> Thus, >>>> we >>>>>>>>>>>> should >>>>>>>>>>>>>>> even make in_place an offline upgrade and force people to use >>>>>>>>>>> roll_over >>>>>>>>>>>>>>> if they need onlint upgrade. Might be a fair tradeoff that >>> may >>>>>>>>>>> simplify >>>>>>>>>>>>>>> the upgrade for the user and for the code complexity. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Let's see what other think. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote: >>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the update and the working prototype, it helps >>> with >>>>>>>>>>>>>>>> understanding the KIP. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I took an initial pass over this PR, and overall I find the >>>>>>>>>>> interfaces >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> approach to be reasonable. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to >>>>>> watch >>>>>>>>>>> the >>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a type >>>> of >>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new stores >>>> have >>>>>>>>>>>> reached >>>>>>>>>>>>>>>> parity with the existing old stores and that could be the >>>> signal >>>>>>>> to >>>>>>>>>>>>>> start >>>>>>>>>>>>>>>> second rolling rebalance? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Although you solicited feedback on the interfaces involved, >>> I >>>>>>>>> wanted >>>>>>>>>>>> to >>>>>>>>>>>>>>> put >>>>>>>>>>>>>>>> down some thoughts that have come to mind reviewing this KIP >>>>>>>> again >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the process, >>>>>>>> would >>>>>>>>>>> we >>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>> the other instances to complete or just fail the entire >>>> upgrade? >>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename >>> the >>>>>>>>>>> current >>>>>>>>>>>>>>>> active directories vs. deleting them right away, and when >>> all >>>>>>>> the >>>>>>>>>>>>>>> prepare >>>>>>>>>>>>>>>> task directories are successfully migrated then delete the >>>>>>>> previous >>>>>>>>>>>>>>> active >>>>>>>>>>>>>>>> ones. >>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing any >>>> new >>>>>>>>>>>> records >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all >>> prepare >>>>>>>>> tasks >>>>>>>>>>>>>> have >>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling >>>> bounces >>>>>>>> and >>>>>>>>>>>>>> then >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>> each task successfully renames its prepare directories and >>>>>>>> deletes >>>>>>>>>>> the >>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>> active task directories, normal processing of records >>> resumes. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax < >>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in >>> AK >>>>>> 2.0 >>>>>>>>>>>>>>>>> release. Thus, I want to pick up this KIP again to get the >>>>>>>> RocksDB >>>>>>>>>>>>>>>>> upgrade done for 2.1. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I updated the KIP accordingly and also have a "prove of >>>>>> concept" >>>>>>>>> PR >>>>>>>>>>>>>>>>> ready (for "in place" upgrade only): >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5422/ >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> There a still open questions, but I want to collect early >>>>>>>> feedback >>>>>>>>>>> on >>>>>>>>>>>>>>>>> the proposed interfaces we need for the store upgrade. Also >>>>>>>> note, >>>>>>>>>>>> that >>>>>>>>>>>>>>>>> the KIP now also aim to define a generic upgrade path from >>>> any >>>>>>>>>>> store >>>>>>>>>>>>>>>>> format A to any other store format B. Adding timestamps is >>>> just >>>>>>>> a >>>>>>>>>>>>>>>>> special case. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I will continue to work on the PR and refine the KIP in the >>>>>>>>>>> meantime, >>>>>>>>>>>>>>> too. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Looking forward to your feedback. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote: >>>>>>>>>>>>>>>>>> After some more thoughts, I want to follow John's >>> suggestion >>>>>>>> and >>>>>>>>>>>>>> split >>>>>>>>>>>>>>>>>> upgrading the rebalance metadata from the store upgrade. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I extracted the metadata upgrade into it's own KIP: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268% >>>>>>>>>>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'll update this KIP accordingly shortly. I also want to >>>>>>>> consider >>>>>>>>>>> to >>>>>>>>>>>>>>>>>> make the store format upgrade more flexible/generic. Atm, >>>> the >>>>>>>> KIP >>>>>>>>>>> is >>>>>>>>>>>>>>> too >>>>>>>>>>>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI >>>>>> users >>>>>>>>>>> that >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> should not force to upgrade the stores. I need to figure >>> out >>>>>>>> the >>>>>>>>>>>>>>> details >>>>>>>>>>>>>>>>>> and follow up later. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Please give feedback for the new KIP-268 on the >>>> corresponding >>>>>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>> thread. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't >>>> figure >>>>>>>> out >>>>>>>>>>> a >>>>>>>>>>>>>> way >>>>>>>>>>>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes >>> a >>>>>> fix >>>>>>>>>>> for >>>>>>>>>>>>>>>>>> future upgrades. Please share your thoughts. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for all your feedback! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote: >>>>>>>>>>>>>>>>>>> @John: yes, we would throw if configs are missing (it's >>> an >>>>>>>>>>>>>>>>>>> implementation details IMHO and thus I did not include it >>>> in >>>>>>>> the >>>>>>>>>>>>>> KIP) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @Guozhang: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1) I understand know what you mean. We can certainly, >>> allow >>>>>>>> all >>>>>>>>>>>>>> values >>>>>>>>>>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for >>>>>>>>>>> `upgrade.from` >>>>>>>>>>>>>>>>>>> parameter. I had a similar though once but decided to >>>>>> collapse >>>>>>>>>>> them >>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>>>> one -- will update the KIP accordingly. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2) The idea to avoid any config would be, to always send >>>> both >>>>>>>>>>>>>> request. >>>>>>>>>>>>>>>>>>> If we add a config to eventually disable the old request, >>>> we >>>>>>>>>>> don't >>>>>>>>>>>>>>> gain >>>>>>>>>>>>>>>>>>> anything with this approach. The question is really, if >>> we >>>>>> are >>>>>>>>>>>>>> willing >>>>>>>>>>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be >>>> limited >>>>>>>> to >>>>>>>>>>> 2 >>>>>>>>>>>>>>>>>>> versions and not grow further in future releases. More >>>>>> details >>>>>>>>> in >>>>>>>>>>>>>> (3) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and >>>>>>>> allows >>>>>>>>>>> us >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> stay with 2 "assignment strategies" we need to register, >>> as >>>>>>>> the >>>>>>>>>>> new >>>>>>>>>>>>>>>>>>> assignment strategy will allow to "upgrade itself" via >>>>>>>> "version >>>>>>>>>>>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a >>>>>>>> config >>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> people upgrade from pre-1.2 releases. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thus, I don't think we need to register new "assignment >>>>>>>>>>> strategies" >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> send empty subscriptions for older version. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 4) I agree that this is a tricky thing to get right with >>> a >>>>>>>>> single >>>>>>>>>>>>>>>>>>> rebalance. I share the concern that an application might >>>>>> never >>>>>>>>>>>> catch >>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>> and thus the hot standby will never be ready. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Maybe it's better to go with 2 rebalances for store >>>> upgrades. >>>>>>>> If >>>>>>>>>>> we >>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>> this, we also don't need to go with (2) and can get (3) >>> in >>>>>>>> place >>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> future upgrades. I also think that changes to the >>> metadata >>>>>> are >>>>>>>>>>> more >>>>>>>>>>>>>>>>>>> likely and thus allowing for single rolling bounce for >>> this >>>>>>>> case >>>>>>>>>>> is >>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>> important anyway. If we assume that store upgrade a rare, >>>> it >>>>>>>>>>> might >>>>>>>>>>>>>> be >>>>>>>>>>>>>>> ok >>>>>>>>>>>>>>>>>>> to sacrifice two rolling bounced for this case. It was >>> just >>>>>> an >>>>>>>>>>> idea >>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>> wanted to share (even if I see the issues). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for your replies. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1) About the config names: actually I was trying to not >>>>>>>> expose >>>>>>>>>>>>>>>>>>>> implementation details :) My main concern was that in >>> your >>>>>>>>>>>> proposal >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> values need to cover the span of all the versions that >>> are >>>>>>>>>>>> actually >>>>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a >>>> user) >>>>>>>> am >>>>>>>>>>>>>>>>> upgrading >>>>>>>>>>>>>>>>>>>> from any versions within this range I need to remember >>> to >>>>>> use >>>>>>>>>>> the >>>>>>>>>>>>>>> value >>>>>>>>>>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In >>>> my >>>>>>>>>>>>>>> suggestion >>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>> was trying to argue the benefit of just letting users to >>>>>>>>> specify >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> actual >>>>>>>>>>>>>>>>>>>> Kafka version she's trying to upgrade from, than >>>> specifying >>>>>> a >>>>>>>>>>>> range >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc >>> as >>>>>> the >>>>>>>>>>>>>> values, >>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>> still using Kafka versions like broker's >>>> `internal.version` >>>>>>>>>>>> config. >>>>>>>>>>>>>>>>> But if >>>>>>>>>>>>>>>>>>>> you were suggesting the same thing, i.e. by >>>> "0.10.1.x-1.1.x" >>>>>>>>> you >>>>>>>>>>>>>>> meant >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or >>>> "0.11.0" >>>>>>>> or >>>>>>>>>>>>>> "1.1" >>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>> are all recognizable config values then I think we are >>>>>>>> actually >>>>>>>>>>> on >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>> page. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would >>>> increase >>>>>>>> the >>>>>>>>>>>>>>> network >>>>>>>>>>>>>>>>>>>> footprint, but not the message size, IF I'm not >>>>>>>>>>> mis-understanding >>>>>>>>>>>>>>> your >>>>>>>>>>>>>>>>> idea >>>>>>>>>>>>>>>>>>>> of registering multiple assignment. More details: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can >>>>>> encode >>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>>>> protocols each with their different metadata. The >>>>>> coordinator >>>>>>>>>>> will >>>>>>>>>>>>>>>>> pick the >>>>>>>>>>>>>>>>>>>> common one that everyone supports (if there are no >>> common >>>>>>>> one, >>>>>>>>>>> it >>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> send >>>>>>>>>>>>>>>>>>>> an error back; if there are multiple ones, it will pick >>>> the >>>>>>>> one >>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>> most >>>>>>>>>>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded >>>> list). >>>>>>>>>>> Since >>>>>>>>>>>>>> our >>>>>>>>>>>>>>>>>>>> current Streams rebalance protocol is still based on the >>>>>>>>>>> consumer >>>>>>>>>>>>>>>>>>>> coordinator, it means our protocol_type would be >>>> "consumer", >>>>>>>>> but >>>>>>>>>>>>>>>>> instead >>>>>>>>>>>>>>>>>>>> the protocol type we can have multiple protocols like >>>>>>>>> "streams", >>>>>>>>>>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we >>>> need >>>>>>>> to >>>>>>>>>>>>>>>>> implement a >>>>>>>>>>>>>>>>>>>> different assignor class for each version and register >>> all >>>>>> of >>>>>>>>>>> them >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the >>>>>>>> future >>>>>>>>>>> if >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> re-factor our implementation to have our own client >>>>>>>> coordinator >>>>>>>>>>>>>> layer >>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>> Connect did, we can simplify this part of the >>>>>> implementation. >>>>>>>>>>> But >>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> now with the above approach this is still doable. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On the broker side, the group coordinator will only >>>> persist >>>>>> a >>>>>>>>>>>> group >>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>> the selected protocol and its subscription metadata, >>> e.g. >>>> if >>>>>>>>>>>>>>>>> coordinator >>>>>>>>>>>>>>>>>>>> decides to pick "streams_v2" it will only sends that >>>>>>>> protocol's >>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>> from everyone to the leader to assign, AND when >>> completing >>>>>>>> the >>>>>>>>>>>>>>>>> rebalance it >>>>>>>>>>>>>>>>>>>> will also only write the group metadata with that >>> protocol >>>>>>>> and >>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> assignment only. In a word, although the network traffic >>>>>>>> maybe >>>>>>>>>>>>>>>>> increased a >>>>>>>>>>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One >>> corner >>>>>>>>>>>>>> situation >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> need to consider is how to stop registering very old >>>>>>>> assignors >>>>>>>>>>> to >>>>>>>>>>>>>>>>> avoid the >>>>>>>>>>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if >>> you >>>>>> are >>>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1 >>>>>>>>>>> assignor >>>>>>>>>>>>>>>>> anymore, >>>>>>>>>>>>>>>>>>>> but that would unfortunately still require some configs. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 3) About the "version probing" idea, I think that's a >>>>>>>>> promising >>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>>> as well, but if we are going to do the multi-assignment >>>> its >>>>>>>>>>> value >>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top >>> of >>>>>>>>>>>>>>>>> multi-assignment >>>>>>>>>>>>>>>>>>>> to save us from still requiring the config to avoid >>>>>>>> registering >>>>>>>>>>>> all >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> metadata for all version. More details: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, we still register all the >>>> assignor >>>>>>>> but >>>>>>>>>>>> for >>>>>>>>>>>>>>>>> all old >>>>>>>>>>>>>>>>>>>> assignors we do not encode any metadata, i.e. the >>> encoded >>>>>>>> data >>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>> be: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> "streams_vN" : "encoded metadata" >>>>>>>>>>>>>>>>>>>> "streams_vN-1":empty >>>>>>>>>>>>>>>>>>>> "streams_vN-2":empty >>>>>>>>>>>>>>>>>>>> .. >>>>>>>>>>>>>>>>>>>> "streams_0":empty >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> So the coordinator can still safely choose the latest >>>> common >>>>>>>>>>>>>> version; >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> then when leaders receive the subscription (note it >>> should >>>>>>>>>>> always >>>>>>>>>>>>>>>>> recognize >>>>>>>>>>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of >>> the >>>>>>>>>>>>>>>>> subscriptions >>>>>>>>>>>>>>>>>>>> are empty bytes, it will send the empty assignment with >>>> that >>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>> number >>>>>>>>>>>>>>>>>>>> encoded in the metadata. So in the second auto-triggered >>>> all >>>>>>>>>>>>>> members >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>> send the metadata with that version: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> "streams_vN" : empty >>>>>>>>>>>>>>>>>>>> "streams_vN-1" : empty >>>>>>>>>>>>>>>>>>>> "streams_vN-2" : "encoded metadata" >>>>>>>>>>>>>>>>>>>> .. >>>>>>>>>>>>>>>>>>>> "streams_0":empty >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> By doing this we would not require any configs for >>> users. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not >>> clear >>>>>>>> about >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> details >>>>>>>>>>>>>>>>>>>> so probably we'd need to fill that out before making a >>>> call. >>>>>>>>> For >>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>> you mentioned "If we detect this situation, the Streams >>>>>>>>>>>> application >>>>>>>>>>>>>>>>> closes >>>>>>>>>>>>>>>>>>>> corresponding active tasks as well as "hot standby" >>> tasks, >>>>>>>> and >>>>>>>>>>>>>>>>> re-creates >>>>>>>>>>>>>>>>>>>> the new active tasks using the new store." How could we >>>>>>>>>>> guarantee >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> gap between these two stores will keep decreasing than >>>>>>>>>>> increasing >>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>> we'll >>>>>>>>>>>>>>>>>>>> eventually achieve the flip point? And also the longer >>> we >>>>>> are >>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> flip point, the larger we are doubling the storage >>> space, >>>>>>>> etc. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax < >>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> @John, Guozhang, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> thanks a lot for your comments. Very long reply... >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> About upgrading the rebalance metadata: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Another possibility to do this, would be to register >>>>>>>> multiple >>>>>>>>>>>>>>>>> assignment >>>>>>>>>>>>>>>>>>>>> strategies for the 1.2 applications. For this case, new >>>>>>>>>>> instances >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>> be configured to support both and the broker would pick >>>> the >>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>> all instances understand. The disadvantage would be, >>> that >>>>>> we >>>>>>>>>>> send >>>>>>>>>>>>>>> much >>>>>>>>>>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as >>>> long >>>>>>>> as >>>>>>>>>>> no >>>>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, >>> using >>>>>>>> this >>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>>>> would allow to avoid a second rebalance trading-off an >>>>>>>>>>> increased >>>>>>>>>>>>>>>>>>>>> rebalance network footprint (I also assume that this >>>> would >>>>>>>>>>>>>> increase >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> message size that is written into __consumer_offsets >>>>>>>> topic?). >>>>>>>>>>>>>>>>> Overall, I >>>>>>>>>>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it >>>> could >>>>>>>>>>> avoid >>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>> second rebalance (I have some more thoughts about >>> stores >>>>>>>> below >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>> relevant for single rebalance upgrade). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For future upgrades we might be able to fix this >>> though. >>>> I >>>>>>>> was >>>>>>>>>>>>>>>>> thinking >>>>>>>>>>>>>>>>>>>>> about the following: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> In the current implementation, the leader fails if it >>>> gets >>>>>> a >>>>>>>>>>>>>>>>>>>>> subscription it does not understand (ie, newer >>> version). >>>> We >>>>>>>>>>> could >>>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>>>> this behavior and let the leader send an empty >>> assignment >>>>>>>> plus >>>>>>>>>>>>>> error >>>>>>>>>>>>>>>>>>>>> code (including supported version) back to the instance >>>>>>>>> sending >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> "bad" subscription. This would allow the following >>> logic >>>>>> for >>>>>>>>> an >>>>>>>>>>>>>>>>>>>>> application instance: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - on startup, always send the latest subscription >>> format >>>>>>>>>>>>>>>>>>>>> - if leader understands it, we get an assignment back >>> an >>>>>>>>> start >>>>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>>>>>>> - if leader does not understand it, we get an empty >>>>>>>>> assignment >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> supported version back >>>>>>>>>>>>>>>>>>>>> - the application unsubscribe()/subscribe()/poll() >>> again >>>>>>>> and >>>>>>>>>>>>>>> sends a >>>>>>>>>>>>>>>>>>>>> subscription using the leader's supported version >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This protocol would allow to do a single rolling >>> bounce, >>>>>> and >>>>>>>>>>>>>>>>> implements >>>>>>>>>>>>>>>>>>>>> a "version probing" step, that might result in two >>>> executed >>>>>>>>>>>>>>>>> rebalances. >>>>>>>>>>>>>>>>>>>>> The advantage would be, that the user does not need to >>>> set >>>>>>>> any >>>>>>>>>>>>>>> configs >>>>>>>>>>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care >>> of >>>>>>>> this >>>>>>>>>>>>>>>>>>>>> automatically. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> One disadvantage would be, that two rebalances happen >>> and >>>>>>>> that >>>>>>>>>>>> for >>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>> error case during rebalance, we loose the information >>>> about >>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> supported leader version and the "probing step" would >>>>>>>> happen a >>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>> time. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> If the leader is eventually updated, it will include >>> it's >>>>>>>> own >>>>>>>>>>>>>>>>> supported >>>>>>>>>>>>>>>>>>>>> version in all assignments, to allow a "down graded" >>>>>>>>>>> application >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> upgrade its version later. Also, if a application >>> fails, >>>>>> the >>>>>>>>>>>> first >>>>>>>>>>>>>>>>>>>>> probing would always be successful and only a single >>>>>>>> rebalance >>>>>>>>>>>>>>>>> happens. >>>>>>>>>>>>>>>>>>>>> If we use this protocol, I think we don't need any >>>>>>>>>>> configuration >>>>>>>>>>>>>>>>>>>>> parameter for future upgrades. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version": >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Users would set "upgrade.from" to the release version >>> the >>>>>>>>>>>>>>> current/old >>>>>>>>>>>>>>>>>>>>> application is using. I think this is simpler, as users >>>>>> know >>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> version. If we use "internal.protocol.version" instead, >>>> we >>>>>>>>>>> expose >>>>>>>>>>>>>>>>>>>>> implementation details and users need to know the >>>> protocol >>>>>>>>>>>> version >>>>>>>>>>>>>>>>> (ie, >>>>>>>>>>>>>>>>>>>>> they need to map from the release version to the >>> protocol >>>>>>>>>>>> version; >>>>>>>>>>>>>>> ie, >>>>>>>>>>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol >>> version >>>>>>>> 2"). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Also the KIP states that for the second rolling bounce, >>>> the >>>>>>>>>>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- >>> and >>>>>>>>> thus, >>>>>>>>>>>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored >>>> (I >>>>>>>>> will >>>>>>>>>>>>>>> update >>>>>>>>>>>>>>>>>>>>> the KIP to point out this dependency). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> About your second point: I'll update the KIP >>> accordingly >>>> to >>>>>>>>>>>>>> describe >>>>>>>>>>>>>>>>>>>>> future updates as well. Both will be different. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> One more point about upgrading the store format. I was >>>>>>>>> thinking >>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>> avoiding the second rolling bounce all together in the >>>>>>>> future: >>>>>>>>>>>> (1) >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) >>> this >>>>>>>>>>>> required >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the >>>>>> switch >>>>>>>>>>> and >>>>>>>>>>>>>>>>> delete >>>>>>>>>>>>>>>>>>>>> the old stores. (3) the current proposal does the >>> switch >>>>>>>>>>>>>> "globally" >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>> this is simpler and due to the required second >>> rebalance >>>> no >>>>>>>>>>>>>>>>> disadvantage. >>>>>>>>>>>>>>>>>>>>> However, a global consistent switch over might actually >>>> not >>>>>>>> be >>>>>>>>>>>>>>>>> required. >>>>>>>>>>>>>>>>>>>>> For "in_place" upgrade, following the protocol from >>>> above, >>>>>>>> we >>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>> decouple the store switch and each instance could >>> switch >>>>>> its >>>>>>>>>>>> store >>>>>>>>>>>>>>>>>>>>> independently from all other instances. After the >>> rolling >>>>>>>>>>> bounce, >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>> seems to be ok to switch from the old store to the new >>>>>> store >>>>>>>>>>>>>> "under >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> hood" whenever the new store is ready (this could even >>> be >>>>>>>>> done, >>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>>>> we switch to the new metadata version). Each time we >>>> update >>>>>>>>> the >>>>>>>>>>>>>> "hot >>>>>>>>>>>>>>>>>>>>> standby" we check if it reached the "endOffset" (or >>>> maybe >>>>>>>> X% >>>>>>>>>>>> that >>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>> either be hardcoded or configurable). If we detect this >>>>>>>>>>>> situation, >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> Streams application closes corresponding active tasks >>> as >>>>>>>> well >>>>>>>>>>> as >>>>>>>>>>>>>>> "hot >>>>>>>>>>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks >>> using >>>>>>>> the >>>>>>>>>>> new >>>>>>>>>>>>>>>>> store. >>>>>>>>>>>>>>>>>>>>> (I need to go through the details once again, but it >>>> seems >>>>>>>> to >>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>> feasible.). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Combining this strategy with the "multiple assignment" >>>>>> idea, >>>>>>>>>>>> might >>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from >>> 1.1 >>>>>> -> >>>>>>>>>>> 1.2. >>>>>>>>>>>>>>>>>>>>> Applications would just use the old store, as long as >>> the >>>>>>>> new >>>>>>>>>>>>>> store >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>> not ready, even if the new metadata version is used >>>>>> already. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For future upgrades, a single rebalance would be >>>>>> sufficient, >>>>>>>>>>> too, >>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>> if the stores are upgraded. We would not need any >>> config >>>>>>>>>>>>>> parameters >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>> the "probe" step allows us to detect the supported >>>>>> rebalance >>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>> version (and we would also not need multiple >>> "assigmnent >>>>>>>>>>>>>> strategies" >>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>> out own protocol encoded everything we need). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Let me know what you think. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>>>>>>> @John: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> For the protocol version upgrade, it is only for the >>>>>>>> encoded >>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>> bytes >>>>>>>>>>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from >>>>>> Consumer's >>>>>>>>>>> pov, >>>>>>>>>>>>>>> so I >>>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>>> this change should be in the Streams layer as well. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> @Matthias: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> for 2), I agree that adding a "newest supported >>> version" >>>>>>>>>>> besides >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> "currently used version for encoding" is a good idea >>> to >>>>>>>> allow >>>>>>>>>>>>>>> either >>>>>>>>>>>>>>>>>>>>> case; >>>>>>>>>>>>>>>>>>>>>> the key is that in Streams we would likely end up >>> with a >>>>>>>>>>> mapping >>>>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>>>>>>>>> protocol version to the other persistent data format >>>>>>>> versions >>>>>>>>>>>>>> such >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually >>>>>>>>> achieve >>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded >>>>>>>>> protocol >>>>>>>>>>>>>>>>> version's >>>>>>>>>>>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 >>>> -> >>>>>>>>>>> 0.10.1 >>>>>>>>>>>>>>>>> leaders >>>>>>>>>>>>>>>>>>>>>> can choose to use the newer version in the first >>> rolling >>>>>>>>>>> bounce >>>>>>>>>>>>>>>>> directly >>>>>>>>>>>>>>>>>>>>>> and we can document to users that they would not need >>> to >>>>>>>> set >>>>>>>>>>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the >>>> upgraded >>>>>>>>>>>>>> protocol >>>>>>>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> >>> 1.2, >>>>>> and >>>>>>>>>>> then >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the >>>> first >>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>> bounce >>>>>>>>>>>>>>>>>>>>>> and reset in the second. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Besides that, some additional comments: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for >>>>>> users >>>>>>>>> to >>>>>>>>>>>>>> set >>>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>>>>>> "internal.protocol.version" where for the latter users >>>>>> only >>>>>>>>>>> need >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> set a >>>>>>>>>>>>>>>>>>>>>> single version, while the Streams will map that >>> version >>>> to >>>>>>>>> the >>>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>>>>>> assignor's behavior as well as the data format. But >>>> maybe >>>>>> I >>>>>>>>>>> did >>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>> your idea about how the "upgrade.from" config will be >>>>>> set, >>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> your Compatibility section how the upgrade.from config >>>>>> will >>>>>>>>> be >>>>>>>>>>>>>> set >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> these two rolling bounces are not very clear: for >>>> example, >>>>>>>>>>>> should >>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>> reset it to null in the second rolling bounce? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 2) In the upgrade path description, rather than >>> talking >>>>>>>> about >>>>>>>>>>>>>>>>> specific >>>>>>>>>>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just >>>>>>>> categorize >>>>>>>>>>> all >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> possible scenarios, even for future upgrade versions, >>>> what >>>>>>>>>>>> should >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> standard operations? The categorized we can summarize >>> to >>>>>>>>> would >>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>> (assuming >>>>>>>>>>>>>>>>>>>>>> user upgrade from version X to version Y, where X and >>> Y >>>>>> are >>>>>>>>>>>> Kafka >>>>>>>>>>>>>>>>>>>>> versions, >>>>>>>>>>>>>>>>>>>>>> with the corresponding supported protocol version x >>> and >>>>>> y): >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and >>>>>>>> hence >>>>>>>>>>> no >>>>>>>>>>>>>>>>>>>>> persistent >>>>>>>>>>>>>>>>>>>>>> data formats have changed. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> b. x != y, but all persistent data format remains the >>>>>> same. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB >>>>>>>>> format, >>>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>>>>>> format, has been changed. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> c. special case: we may need some special handling >>> logic >>>>>>>> when >>>>>>>>>>>>>>>>> "current >>>>>>>>>>>>>>>>>>>>>> version" or "newest supported version" are not >>> available >>>>>> in >>>>>>>>>>> the >>>>>>>>>>>>>>>>> protocol, >>>>>>>>>>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> under the above scenarios, how many rolling bounces >>>> users >>>>>>>>> need >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> execute? >>>>>>>>>>>>>>>>>>>>>> how they should set the configs in each rolling >>> bounce? >>>>>> and >>>>>>>>>>> how >>>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>>>>>> library will execute in these cases? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax < >>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Ted, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I still consider changing the KIP to include it right >>>>>> away >>>>>>>>> -- >>>>>>>>>>>> if >>>>>>>>>>>>>>>>> not, >>>>>>>>>>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more >>>>>>>> detail >>>>>>>>>>>>>> first. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> (Same for other open questions like interface names >>> -- >>>> I >>>>>>>>>>>> collect >>>>>>>>>>>>>>>>>>>>>>> feedback and update the KIP after we reach consensus >>>> :)) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote: >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the details, Matthias. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future >>>>>>>> release, >>>>>>>>>>>>>>> encoding >>>>>>>>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>>>>>>>> and supported version might be an advantage >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be >>>>>> implemented >>>>>>>>> in >>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> KIP. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding >>>>>>>> proposal. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax < >>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of >>>>>> user >>>>>>>>>>>> code. >>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>>>>>> if we want to add something like this, I would >>> prefer >>>>>> to >>>>>>>>> do >>>>>>>>>>>> it >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>> separate KIP. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling >>>> bounce. >>>>>>>> But >>>>>>>>>>>>>> from >>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>>>>> understanding it would not be possible. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed >>> to >>>>>>>>>>> switch >>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to >>>> switch >>>>>>>>>>> from >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>>>>>>>>>> store to the new store (this happens after the last >>>>>>>>>>> instance >>>>>>>>>>>>>>>>> bounces a >>>>>>>>>>>>>>>>>>>>>>>>> second time). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is, >>>> that >>>>>>>>> it's >>>>>>>>>>>>>>>>> unclear >>>>>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>>>>>>> to which from version 2 to version 3. The >>>>>>>>>>>>>>>>> StreamsPartitionsAssignor is >>>>>>>>>>>>>>>>>>>>>>>>> stateless by design, and thus, the information >>> which >>>>>>>>>>> version >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>> use must be passed in from externally -- and we >>> want >>>> to >>>>>>>>> use >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> StreamsConfig to pass in this information. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> During upgrade, all new instanced have no >>> information >>>>>>>>> about >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> progress >>>>>>>>>>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got >>>>>>>> upgrades >>>>>>>>>>>>>>>>> already). >>>>>>>>>>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a >>> version 3 >>>>>>>>>>>>>>>>> subscription. >>>>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>>> leader also has this limited view on the world and >>>> can >>>>>>>>> only >>>>>>>>>>>>>> send >>>>>>>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>>>>>>> 2 assignments back. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can >>>>>> simplify >>>>>>>>>>> the >>>>>>>>>>>>>>>>> upgrade. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> We did consider to change the metadata to make >>> later >>>>>>>>>>> upgrades >>>>>>>>>>>>>>> (ie, >>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change >>>> the >>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>> storage format again -- as long as we don't change >>>> it, >>>>>> a >>>>>>>>>>>>>> single >>>>>>>>>>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" >>> and >>>>>>>>>>>>>> "supported >>>>>>>>>>>>>>>>>>>>>>>>> version". This would allow the leader to switch to >>>> the >>>>>>>> new >>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader >>> would >>>>>>>>>>> receive >>>>>>>>>>>>>>> "used >>>>>>>>>>>>>>>>>>>>>>>>> version == old" and "supported version = old/new" >>> -- >>>> as >>>>>>>>>>> long >>>>>>>>>>>>>> as >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>>>> least >>>>>>>>>>>>>>>>>>>>>>>>> one instance sends a "supported version = old" >>> leader >>>>>>>>> sends >>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>>>>>>> assignment back. However, encoding both version >>> would >>>>>>>>> allow >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> leader can send a new version assignment back, >>> right >>>>>>>> after >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> first >>>>>>>>>>>>>>>>>>>>>>>>> round or rebalance finished (all instances send >>>>>>>> "supported >>>>>>>>>>>>>>>>> version = >>>>>>>>>>>>>>>>>>>>>>>>> new"). However, there are still two issues with >>> this: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the >>>> last >>>>>>>>>>>>>> instance >>>>>>>>>>>>>>>>>>>>> bounced, >>>>>>>>>>>>>>>>>>>>>>>>> the new stores might not be ready to be used -- >>> this >>>>>>>> could >>>>>>>>>>>>>> lead >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> "downtime" as store must be restored before >>>> processing >>>>>>>> can >>>>>>>>>>>>>>> resume. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. >>>> At >>>>>>>>> this >>>>>>>>>>>>>>>>> point, the >>>>>>>>>>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and >>>>>> thus >>>>>>>>>>>> sends >>>>>>>>>>>>>>>>> the old >>>>>>>>>>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to >>>> never >>>>>>>>> fall >>>>>>>>>>>>>> back >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> old protocol after the switch to the new protocol. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> The second issue is minor and I guess if users >>> set-up >>>>>>>> the >>>>>>>>>>>>>>> instance >>>>>>>>>>>>>>>>>>>>>>>>> properly it could be avoided. However, the first >>>> issue >>>>>>>>>>> would >>>>>>>>>>>>>>>>> prevent >>>>>>>>>>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we >>>>>>>> consider >>>>>>>>>>>>>> that >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>>>>> change the metadata protocol only if a future >>>> release, >>>>>>>>>>>>>> encoding >>>>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>>>>> used and supported version might be an advantage in >>>> the >>>>>>>>>>>> future >>>>>>>>>>>>>>>>> and we >>>>>>>>>>>>>>>>>>>>>>>>> could consider to add this information in 1.2 >>> release >>>>>> to >>>>>>>>>>>>>> prepare >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to >>>> give >>>>>>>> the >>>>>>>>>>>>>>>>> instances >>>>>>>>>>>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If >>>> you >>>>>>>>>>> would >>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> second rolling bounce before this, it would still >>>> work >>>>>>>> -- >>>>>>>>>>>>>>>>> however, you >>>>>>>>>>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be >>>> fully >>>>>>>>>>>>>> restored >>>>>>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>>>>>>>> processing can resume. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Does this make sense? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get >>> rid >>>>>> of >>>>>>>>>>> the >>>>>>>>>>>>>> 2nd >>>>>>>>>>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>>>>>>>> bounce? >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary >>>>>>>>> difference >>>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to >>>>>>>> decide >>>>>>>>>>>>>>>>> whether to >>>>>>>>>>>>>>>>>>>>>>> send >>>>>>>>>>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3. >>>>>>>>>>> (Actually, >>>>>>>>>>>>>>>>> there is >>>>>>>>>>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says >>>> that >>>>>>>> the >>>>>>>>>>>> 2nd >>>>>>>>>>>>>>>>> rolling >>>>>>>>>>>>>>>>>>>>>>>>> bounce should happen after all new state stores are >>>>>>>>> created >>>>>>>>>>>> by >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling >>>>>>>> bounce, >>>>>>>>>>> we >>>>>>>>>>>>>>> say >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>> there is still a background thread, so it seems >>> like >>>> is >>>>>>>> no >>>>>>>>>>>>>>> actual >>>>>>>>>>>>>>>>>>>>>>>>> requirement to wait for the new state stores to be >>>>>>>>>>> created.) >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal >>>> with >>>>>>>>>>>>>>> mixed-mode >>>>>>>>>>>>>>>>>>>>>>> (having >>>>>>>>>>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer >>>>>>>> group). >>>>>>>>>>> It >>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic >>>>>>>>>>>>>>>>> (somehow/somewhere) >>>>>>>>>>>>>>>>>>>>>>> such >>>>>>>>>>>>>>>>>>>>>>>>> that: >>>>>>>>>>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all >>>>>>>>>>> instances >>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>> running >>>>>>>>>>>>>>>>>>>>>>>>> the new code. >>>>>>>>>>>>>>>>>>>>>>>>>> * Once all the instances are running the new code, >>>>>> then >>>>>>>>>>> one >>>>>>>>>>>>>> at >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>> time, >>>>>>>>>>>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader >>>>>>>> still >>>>>>>>>>>>>> hands >>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores >>> are >>>>>>>>> ready. >>>>>>>>>>>>>>>>>>>>>>>>>> * Once all instances report that new stores are >>>> ready, >>>>>>>>>>>> Leader >>>>>>>>>>>>>>>>> sends >>>>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 3. >>>>>>>>>>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version >>> 3, >>>>>> it >>>>>>>>>>> can >>>>>>>>>>>>>>>>> delete >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> old state store. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot >>>> of >>>>>>>>>>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 >>> rolling >>>>>>>>>>>>>> restarts. >>>>>>>>>>>>>>> No >>>>>>>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just >>> deploy >>>>>>>> it, >>>>>>>>>>> and >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> instances >>>>>>>>>>>>>>>>>>>>>>>>> update themselves. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> What do you think? >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> The thing that made me think of this is that the >>> "2 >>>>>>>>>>> rolling >>>>>>>>>>>>>>>>> bounces" >>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in >>>>>>>>>>>>>>>>>>>>>>>>> inter.broker.protocol.version and >>>>>>>>>>> log.message.format.version. >>>>>>>>>>>>>>> And >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> broker case, it seems like it would be possible >>> (with >>>>>>>> some >>>>>>>>>>>>>> work >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>> course) >>>>>>>>>>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar >>>>>> auto-detection >>>>>>>>> of >>>>>>>>>>>>>>> broker >>>>>>>>>>>>>>>>>>>>>>>>> capabilities and automatically do a switchover from >>>>>>>>> old/new >>>>>>>>>>>>>>>>> versions. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> -James >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck < >>>>>>>>>>>> bbej...@gmail.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval >>>>>> methods >>>>>>>>> on >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>>>> interfaces. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a >>>>>>>>> Predicate >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored >>> with >>>>>> the >>>>>>>>>>>>>> record? >>>>>>>>>>>>>>>>> Or >>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>> better left for users to implement themselves >>> once >>>>>> the >>>>>>>>>>> data >>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>> been >>>>>>>>>>>>>>>>>>>>>>>>>>> retrieved? >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu < >>>>>>>>>>>> yuzhih...@gmail.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias: >>>>>>>>>>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to >>>> which >>>>>>>>>>>>>>> separator >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>> chosen. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current >>> choice >>>>>> is >>>>>>>>>>>> good. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it >>> is >>>>>>>>> closer >>>>>>>>>>>> to >>>>>>>>>>>>>>>>> English >>>>>>>>>>>>>>>>>>>>>>>>>>>> grammar. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people >>>> think. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax >>> < >>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comments! >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance >>>> metadata >>>>>>>>>>>> upgrade >>>>>>>>>>>>>>> fix >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (addressing the mentioned >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>> https://issues.apache.org/jira/browse/KAFKA-6054 >>>> ) >>>>>>>> It >>>>>>>>>>>>>> give a >>>>>>>>>>>>>>>>> first >>>>>>>>>>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works >>>> including >>>>>>>> a >>>>>>>>>>>>>> system >>>>>>>>>>>>>>>>> test: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636 >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are >>> ready. >>>> I >>>>>>>>>>> agree >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to >>>> give >>>>>>>>>>> better >>>>>>>>>>>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>>>>>>>>>>>>> context. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Ted: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the >>>>>>>>>>>>>>>>>>>>> `processing.guarantee` >>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and >>>>>>>>>>> `exactly_once` >>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>> values. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs >>> dash >>>>>>>> but >>>>>>>>> I >>>>>>>>>>>>>>> prefer >>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we >>> can >>>>>>>> also >>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>> it to >>>>>>>>>>>>>>>>>>>>>>>>> `-`. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way >>>> -- I >>>>>>>>>>>>>> stripped >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> `With` >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be >>> good >>>> to >>>>>>>>> get >>>>>>>>>>>>>>>>> feedback >>>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> @John: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it >>> would >>>>>>>> not >>>>>>>>>>>>>> make a >>>>>>>>>>>>>>>>>>>>>>>>> difference. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two >>>>>>>>> version >>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>>>>> introduce >>>>>>>>>>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I >>>> don't >>>>>>>> hit >>>>>>>>>>> an >>>>>>>>>>>>>>>>> issue >>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory >>> instead >>>> of >>>>>>>>>>>>>>>>> `rocksdb-v2` >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several >>>> questions >>>>>>>>>>> queued >>>>>>>>>>>>>>> up, >>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>>>>>> were >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... >>>> oh, >>>>>>>>>>> well. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state >>>>>>>>> directory >>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "/<state.dir>/< >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" >>>> to >>>>>>>>>>>>>>>>> "/<state.dir>/< >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id >>>>> /rocksdb-v2/storeName/": >>>>>>>> if >>>>>>>>>>>> you >>>>>>>>>>>>>>>>> put the >>>>>>>>>>>>>>>>>>>>>>>>> "v2" >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path >>> (i.e., >>>>>>>>>>>>>>>>> "/<state.dir>/< >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id >>>>>>> /rocksdb/storeName-v2/"), >>>>>>>>>>> then >>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level >>>> directory >>>>>>>>>>>>>>> structure. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people >>>>>>>> running >>>>>>>>>>>>>>>>> scripts to >>>>>>>>>>>>>>>>>>>>>>>>>>>> monitor >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other >>> such >>>>>> use >>>>>>>>>>>>>> cases. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu < >>>>>>>>>>>>>>> yuzhih...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? >>>> Underscore >>>>>>>> may >>>>>>>>>>>>>>>>> sometimes >>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>> miss >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more >>>>>> friendly >>>>>>>>> to >>>>>>>>>>>>>>> user. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface >>>>>> ReadOnlyKeyValueTimestampStore<K, >>>>>>>>> V> >>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better >>>> name >>>>>>>>> for >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> class ? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang >>> Wang < >>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section >>>> and >>>>>>>> it >>>>>>>>>>>>>> looks >>>>>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> me, >>>>>>>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also >>>>>> share >>>>>>>>> it >>>>>>>>>>>>>> here >>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>> people >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can take a look? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs >>> like >>>>>>>> this >>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it >>> is >>>>>>>>> better >>>>>>>>>>>> to >>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the >>>> design >>>>>>>>>>>>>>>>> discussion :) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. >>>> Sax >>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams >>> API >>>>>> to >>>>>>>>>>>> allow >>>>>>>>>>>>>>>>> storing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the >>>>>> basis >>>>>>>>> to >>>>>>>>>>>>>>>>> resolve >>>>>>>>>>>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about >>> this! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature