Hi Matthias,

Thanks for the updates to the KIP. I've just read it over, and am
personally quite happy with it.

Thanks for tackling this dicey issue and putting in a huge amount of design
work to produce
a smooth upgrade path for DSL users.

Thanks,
-John

On Mon, Dec 17, 2018 at 10:35 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Dear all,
>
> I finally managed to update the KIP.
>
> To address the concerns about the complex upgrade path, I simplified the
> design. We don't need any configs and the upgrade can be done with the
> simple single rolling bounce pattern.
>
> The suggestion is to exploit RocksDB column families to isolate old and
> new on-disk format. Furthermore, the upgrade from old to new format
> happens "on the side" after an instance was upgraded.
>
> I also pushed a WIP PR in case you want to look into some details
> (potential reviewers, don't panic: I plan to break this down into
> multiple PRs for actual review if the KIP is accepted).
>
> https://github.com/apache/kafka/pull/6044
>
> @Eno: I think I never answered your question about being future proof:
>
> The latest design is not generic, because it does not support changes
> that need to be reflected in the changelog topic. I aimed for a
> non-generic design for now to keep it as simple as possible. Thus, other
> format changes might need a different design / upgrade path -- however,
> because this KIP is quite encapsulated in the current design, I don't
> see any issue to build this later and a generic upgrade path seems to be
> an orthogonal concern atm.
>
>
> -Matthias
>
>
> On 11/22/18 2:50 PM, Adam Bellemare wrote:
> > Thanks for the information Matthias.
> >
> > I will await your completion of this ticket then since it underpins the
> > essential parts of a RocksDB TTL aligned with the changelog topic. I am
> > eager to work on that ticket myself, so if I can help on this one in any
> > way please let me know.
> >
> > Thanks
> > Adam
> >
> >
> >
> > On Tue, Nov 20, 2018 at 5:26 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> It's an interesting idea to use second store, to maintain the
> >> timestamps. However, each RocksDB instance implies some overhead. In
> >> fact, we are looking into ColumnFamilies atm to see if we can use those
> >> and merge multiple RocksDBs into a single one to reduce this overhead.
> >>
> >> -Matthias
> >>
> >> On 11/20/18 5:15 AM, Patrik Kleindl wrote:
> >>> Hi Adam
> >>>
> >>> Sounds great, I was already planning to ask around if anyone had
> tackled
> >>> this.
> >>> We have a use case very similar to what you described in KAFKA-4212,
> only
> >>> with Global State Stores.
> >>> I have tried a few things with the normal DSL but was not really
> >> successful.
> >>> Schedule/Punctuate is not possible, supplying a windowed store is also
> >> not
> >>> allowed and the process method has no knowledge of the timestamp of the
> >>> record.
> >>> And anything loaded on startup is not filtered anyway.
> >>>
> >>> Regarding 4212, wouldn't it be easier (although a little less
> >>> space-efficient) to track the Timestamps in a separate Store with <K,
> >> Long>
> >>> ?
> >>> This would leave the original store intact and allow a migration of the
> >>> timestamps without touching the other data.
> >>>
> >>> So I am very interested in your PR :-)
> >>>
> >>> best regards
> >>>
> >>> Patrik
> >>>
> >>> On Tue, 20 Nov 2018 at 04:46, Adam Bellemare <adam.bellem...@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Hi Matthias
> >>>>
> >>>> Thanks - I figured that it was probably a case of just too much to do
> >> and
> >>>> not enough time. I know how that can go. I am asking about this one in
> >>>> relation to https://issues.apache.org/jira/browse/KAFKA-4212, adding
> a
> >> TTL
> >>>> to RocksDB. I have outlined a bit about my use-case within 4212, but
> for
> >>>> brevity here it is:
> >>>>
> >>>> My case:
> >>>> 1) I have a RocksDB with TTL implementation working where records are
> >> aged
> >>>> out using the TTL that comes with RocksDB (very simple).
> >>>> 2) We prevent records from loading from the changelog if recordTime +
> >> TTL <
> >>>> referenceTimeStamp (default = System.currentTimeInMillis() ).
> >>>>
> >>>> This assumes that the records are stored with the same time reference
> >> (say
> >>>> UTC) as the consumer materializing the RocksDB store.
> >>>>
> >>>> My questions about KIP-258 are as follows:
> >>>> 1) How does "we want to be able to store record timestamps in KTables"
> >>>> differ from inserting records into RocksDB with TTL at consumption
> >> time? I
> >>>> understand that it could be a difference of some seconds, minutes,
> >> hours,
> >>>> days etc between when the record was published and now, but given the
> >>>> nature of how RocksDB TTL works (eventual - based on compaction) I
> don't
> >>>> see how a precise TTL can be achieved, such as that which one can get
> >> with
> >>>> windowed stores.
> >>>>
> >>>> 2) Are you looking to change how records are inserted into a TTL
> >> RocksDB,
> >>>> such that the TTL would take effect from the record's published time?
> If
> >>>> not, what would be the ideal workflow here for a single record with
> TTL
> >>>> RocksDB?
> >>>> ie: Record Timestamp: 100
> >>>> TTL: 50
> >>>> Record inserted into rocksDB: 110
> >>>> Record to expire at 150?
> >>>>
> >>>> 3) I'm not sure I fully understand the importance of the upgrade
> path. I
> >>>> have read the link to (
> https://issues.apache.org/jira/browse/KAFKA-3522
> >> )
> >>>> in
> >>>> the KIP, and I can understand that a state-store on disk may not
> >> represent
> >>>> what the application is expecting. I don't think I have the full
> picture
> >>>> though, because that issue seems to be easy to fix with a simple
> >> versioned
> >>>> header or accompanying file, forcing the app to rebuild the state if
> the
> >>>> version is incompatible. Can you elaborate or add a scenario to the
> KIP
> >>>> that illustrates the need for the upgrade path?
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Adam
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax <
> matth...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Adam,
> >>>>>
> >>>>> I am still working on it. Was pulled into a lot of other tasks lately
> >> so
> >>>>> this was delayed. Also had some discussions about simplifying the
> >>>>> upgrade path with some colleagues and I am prototyping this atm. Hope
> >> to
> >>>>> update the KIP accordingly soon.
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 11/10/18 7:41 AM, Adam Bellemare wrote:
> >>>>>> Hello Matthias
> >>>>>>
> >>>>>> I am curious as to the status of this KIP. TTL and expiry of records
> >>>> will
> >>>>>> be extremely useful for several of our business use-cases, as well
> as
> >>>>>> another KIP I had been working on.
> >>>>>>
> >>>>>> Thanks
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <
> eno.there...@gmail.com
> >>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> Good stuff. Could you comment a bit on how future-proof is this
> >>>> change?
> >>>>> For
> >>>>>>> example, if we want to store both event timestamp "and" processing
> >>>> time
> >>>>> in
> >>>>>>> RocksDB will we then need another interface (e.g. called
> >>>>>>> KeyValueWithTwoTimestampsStore)?
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Eno
> >>>>>>>
> >>>>>>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <
> >>>> matth...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for your input Guozhang and John.
> >>>>>>>>
> >>>>>>>> I see your point, that the upgrade API is not simple. If you don't
> >>>>>>>> thinks it's valuable to make generic store upgrades possible
> (atm),
> >>>> we
> >>>>>>>> can make the API internal, too. The impact is, that we only
> support
> >> a
> >>>>>>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to
> >>>>>>>> WindowedWithTS etc) for which we implement the internal
> interfaces.
> >>>>>>>>
> >>>>>>>> We can keep the design generic, so if we decide to make it public,
> >> we
> >>>>>>>> don't need to re-invent it. This will also have the advantage,
> that
> >>>> we
> >>>>>>>> can add upgrade pattern for other stores later, too.
> >>>>>>>>
> >>>>>>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, but
> >> it
> >>>>>>>> was the only way I could find to design a generic upgrade
> interface.
> >>>> If
> >>>>>>>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder`
> >> would
> >>>>>>>> become an internal interface I guess (don't think we can remove
> it).
> >>>>>>>>
> >>>>>>>> I will wait for more feedback about this and if nobody wants to
> keep
> >>>> it
> >>>>>>>> as public API I will update the KIP accordingly. Will add some
> more
> >>>>>>>> clarifications for different upgrade patterns in the mean time and
> >>>> fix
> >>>>>>>> the typos/minor issues.
> >>>>>>>>
> >>>>>>>> About adding a new state UPGRADING: maybe we could do that.
> However,
> >>>> I
> >>>>>>>> find it particularly difficult to make the estimation when we
> should
> >>>>>>>> switch to RUNNING, thus, I am a little hesitant. Using store
> >>>> callbacks
> >>>>>>>> or just logging the progress including some indication about the
> >>>> "lag"
> >>>>>>>> might actually be sufficient. Not sure what others think?
> >>>>>>>>
> >>>>>>>> About "value before timestamp": no real reason and I think it does
> >>>> not
> >>>>>>>> make any difference. Do you want to change it?
> >>>>>>>>
> >>>>>>>> About upgrade robustness: yes, we cannot control if an instance
> >>>> fails.
> >>>>>>>> That is what I meant by "we need to write test". The upgrade
> should
> >>>> be
> >>>>>>>> able to continuous even is an instance goes down (and we must make
> >>>> sure
> >>>>>>>> that we don't end up in an invalid state that forces us to wipe
> out
> >>>> the
> >>>>>>>> whole store). Thus, we need to write system tests that fail
> >> instances
> >>>>>>>> during upgrade.
> >>>>>>>>
> >>>>>>>> For `in_place_offline` upgrade: I don't think we need this mode,
> >>>>> because
> >>>>>>>> people can do this via a single rolling bounce.
> >>>>>>>>
> >>>>>>>>  - prepare code and switch KV-Store to KVwithTs-Store
> >>>>>>>>  - do a single rolling bounce (don't set any upgrade config)
> >>>>>>>>
> >>>>>>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if
> we
> >>>>>>>> remove the `StoreUpgradeBuilder`) will detect that there is only
> an
> >>>> old
> >>>>>>>> local KV store w/o TS, will start to restore the new KVwithTs
> store,
> >>>>>>>> wipe out the old store and replace with the new store after
> restore
> >>>> is
> >>>>>>>> finished, and start processing only afterwards. (I guess we need
> to
> >>>>>>>> document this case -- will also add it to the KIP.)
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 8/9/18 1:10 PM, John Roesler wrote:
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>> I think this KIP is looking really good.
> >>>>>>>>>
> >>>>>>>>> I have a few thoughts to add to the others:
> >>>>>>>>>
> >>>>>>>>> 1. You mentioned at one point users needing to configure
> >>>>>>>>> `upgrade.mode="null"`. I think this was a typo and you meant to
> say
> >>>>>>> they
> >>>>>>>>> should remove the config. If they really have to set it to a
> string
> >>>>>>>> "null"
> >>>>>>>>> or even set it to a null value but not remove it, it would be
> >>>>>>>> unfortunate.
> >>>>>>>>>
> >>>>>>>>> 2. In response to Bill's comment #1 , you said that "The idea is
> >>>> that
> >>>>>>> the
> >>>>>>>>> upgrade should be robust and not fail. We need to write according
> >>>>>>> tests".
> >>>>>>>>> I may have misunderstood the conversation, but I don't think it's
> >>>>>>> within
> >>>>>>>>> our power to say that an instance won't fail. What if one of my
> >>>>>>> computers
> >>>>>>>>> catches on fire? What if I'm deployed in the cloud and one
> instance
> >>>>>>>>> disappears and is replaced by a new one? Or what if one instance
> >>>> goes
> >>>>>>>> AWOL
> >>>>>>>>> for a long time and then suddenly returns? How will the upgrade
> >>>>> process
> >>>>>>>>> behave in light of such failures?
> >>>>>>>>>
> >>>>>>>>> 3. your thought about making in-place an offline mode is
> >>>> interesting,
> >>>>>>> but
> >>>>>>>>> it might be a bummer for on-prem users who wish to upgrade
> online,
> >>>> but
> >>>>>>>>> cannot just add new machines to the pool. It could be a new
> upgrade
> >>>>>>> mode
> >>>>>>>>> "offline-in-place", though...
> >>>>>>>>>
> >>>>>>>>> 4. I was surprised to see that a user would need to modify the
> >>>>> topology
> >>>>>>>> to
> >>>>>>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of
> Guozhang's
> >>>>>>>>> suggestions would remove this necessity.
> >>>>>>>>>
> >>>>>>>>> Thanks for taking on this very complex but necessary work.
> >>>>>>>>>
> >>>>>>>>> -John
> >>>>>>>>>
> >>>>>>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <
> wangg...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello Matthias,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the updated KIP. Some more comments:
> >>>>>>>>>>
> >>>>>>>>>> 1. The current set of proposed API is a bit too complicated,
> which
> >>>>>>> makes
> >>>>>>>>>> the upgrade flow from user's perspective also a bit complex. I'd
> >>>> like
> >>>>>>> to
> >>>>>>>>>> check different APIs and discuss about their needs separately:
> >>>>>>>>>>
> >>>>>>>>>>     1.a. StoreProxy: needed for in-place upgrade only, between
> the
> >>>>>>> first
> >>>>>>>>>> and second rolling bounce, where the old-versioned stores can
> >>>> handle
> >>>>>>>>>> new-versioned store APIs. I think such upgrade paths (i.e. from
> >> one
> >>>>>>>> store
> >>>>>>>>>> type to another) would not be very common: users may want to
> >>>> upgrade
> >>>>>>>> from a
> >>>>>>>>>> certain store engine to another, but the interface would likely
> be
> >>>>>>>> staying
> >>>>>>>>>> the same. Hence personally I'd suggest we keep it internally and
> >>>> only
> >>>>>>>>>> consider exposing it in the future if it does become a common
> >>>>> pattern.
> >>>>>>>>>>
> >>>>>>>>>>     1.b. ConverterStore / RecordConverter: needed for both
> >> in-place
> >>>>>>> and
> >>>>>>>>>> roll-over upgrade, between the first and second rolling bounces,
> >>>> for
> >>>>>>> the
> >>>>>>>>>> new versioned store to be able to read old-versioned changelog
> >>>>> topics.
> >>>>>>>>>> Firstly I think we should not expose key in the public APIs but
> >>>> only
> >>>>>>> the
> >>>>>>>>>> values, since allowing key format changes would break log
> >>>> compaction,
> >>>>>>>> and
> >>>>>>>>>> hence would not be compatible anyways. As for value format
> >> changes,
> >>>>>>>>>> personally I think we can also keep its upgrade logic internally
> >> as
> >>>>> it
> >>>>>>>> may
> >>>>>>>>>> not worth generalizing to user customizable logic.
> >>>>>>>>>>
> >>>>>>>>>>     1.c. If you agrees with 2.a/b above, then we can also
> remove "
> >>>>>>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the
> >> public
> >>>>>>>> APIs.
> >>>>>>>>>>
> >>>>>>>>>>     1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore"
> >> is
> >>>>>>> not
> >>>>>>>>>> needed either given that we are exposing "ValueAndTimestamp"
> >>>> anyways.
> >>>>>>>> I.e.
> >>>>>>>>>> it is just a syntax sugar and for IQ, users can always just set
> a
> >> "
> >>>>>>>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>"
> as
> >>>> the
> >>>>>>>> new
> >>>>>>>>>> interface does not provide any additional functions.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 2. Could we further categorize the upgrade flow for different
> use
> >>>>>>> cases,
> >>>>>>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be used
> >>>>>>>>>> automatically for non-windowed aggregate; 2) PAPI users who do
> not
> >>>>>>> need
> >>>>>>>> to
> >>>>>>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to
> >> switch
> >>>>> to
> >>>>>>>>>> KeyValueWithTimestampStore. Just to give my understanding for
> 3),
> >>>> the
> >>>>>>>>>> upgrade flow for users may be simplified as the following (for
> >> both
> >>>>>>>>>> in-place and roll-over):
> >>>>>>>>>>
> >>>>>>>>>>     * Update the jar to new version, make code changes from
> >>>>>>>> KeyValueStore
> >>>>>>>>>> to KeyValueWithTimestampStore, set upgrade config.
> >>>>>>>>>>
> >>>>>>>>>>     * First rolling bounce, and library code can internally use
> >>>> proxy
> >>>>>>> /
> >>>>>>>>>> converter based on the specified config to handle new APIs with
> >> old
> >>>>>>>> stores,
> >>>>>>>>>> while let new stores read from old changelog data.
> >>>>>>>>>>
> >>>>>>>>>>     * Reset upgrade config.
> >>>>>>>>>>
> >>>>>>>>>>     * Second rolling bounce, and the library code automatically
> >>>> turn
> >>>>>>> off
> >>>>>>>>>> logic for proxy / converter.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 3. Some more detailed proposals are needed for when to recommend
> >>>>> users
> >>>>>>>> to
> >>>>>>>>>> trigger the second rolling bounce. I have one idea to share
> here:
> >>>> we
> >>>>>>>> add a
> >>>>>>>>>> new state to KafkaStreams, say UPGRADING, which is set when 1)
> >>>>> upgrade
> >>>>>>>>>> config is set, and 2) the new stores are still ramping up (for
> the
> >>>>>>>> second
> >>>>>>>>>> part, we can start with some internal hard-coded heuristics to
> >>>> decide
> >>>>>>>> when
> >>>>>>>>>> it is close to be ramped up). If either one of it is not true
> any
> >>>>>>> more,
> >>>>>>>> it
> >>>>>>>>>> should transit to RUNNING. Users can then watch on this state,
> and
> >>>>>>>> decide
> >>>>>>>>>> to only trigger the second rebalance when the state has
> transited
> >>>>> from
> >>>>>>>>>> UPGRADING. They can also choose to cut over while the instance
> is
> >>>>>>> still
> >>>>>>>>>> UPGRADING, the downside is that after that the application may
> >> have
> >>>>>>> long
> >>>>>>>>>> restoration phase which is, to user's pov, unavailability
> periods.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Below are just some minor things on the wiki:
> >>>>>>>>>>
> >>>>>>>>>> 4. "proxy story" => "proxy store".
> >>>>>>>>>>
> >>>>>>>>>> 5. "use the a builder " => "use a builder"
> >>>>>>>>>>
> >>>>>>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to the
> >>>>>>> value":
> >>>>>>>>>> what's the rationale of putting the timestamp before the value,
> >>>> than
> >>>>>>>> after
> >>>>>>>>>> the value?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax <
> >>>>>>> matth...@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks for the feedback Bill. I just update the KIP with some
> of
> >>>>> your
> >>>>>>>>>>> points.
> >>>>>>>>>>>
> >>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to
> >>>> watch
> >>>>>>> the
> >>>>>>>>>>>>> restore process), I'm wondering if we want to provide a type
> of
> >>>>>>>>>>>>> StateRestoreListener that could signal when the new stores
> have
> >>>>>>>>>> reached
> >>>>>>>>>>>>> parity with the existing old stores and that could be the
> >> signal
> >>>>> to
> >>>>>>>>>>> start
> >>>>>>>>>>>>> second rolling rebalance?
> >>>>>>>>>>>
> >>>>>>>>>>> I think we can reuse the existing listeners, thus, I did not
> >>>> include
> >>>>>>>>>>> anything in the KIP. About a signal to rebalance: this might be
> >>>>>>> tricky.
> >>>>>>>>>>> If we prepare the store "online", the active task will update
> the
> >>>>>>> state
> >>>>>>>>>>> continuously, and thus, state prepare is never finished. It
> will
> >>>> be
> >>>>>>> the
> >>>>>>>>>>> users responsibility to do the second rebalance (note, that the
> >>>>>>> second
> >>>>>>>>>>> rebalance will first finish the last delta of the upgrade to
> >>>> finish
> >>>>>>> the
> >>>>>>>>>>> upgrade before actual processing resumes). I clarified the KIP
> >>>> with
> >>>>>>>> this
> >>>>>>>>>>> regard a little bit.
> >>>>>>>>>>>
> >>>>>>>>>>>>> 1. Out of N instances, one fails midway through the process,
> >>>> would
> >>>>>>> we
> >>>>>>>>>>> allow
> >>>>>>>>>>>>> the other instances to complete or just fail the entire
> >> upgrade?
> >>>>>>>>>>>
> >>>>>>>>>>> The idea is that the upgrade should be robust and not fail. We
> >>>> need
> >>>>>>> to
> >>>>>>>>>>> write according tests.
> >>>>>>>>>>>
> >>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename
> the
> >>>>>>>> current
> >>>>>>>>>>>>> active directories vs. deleting them right away,  and when
> all
> >>>> the
> >>>>>>>>>>> prepare
> >>>>>>>>>>>>> task directories are successfully migrated then delete the
> >>>>> previous
> >>>>>>>>>>> active
> >>>>>>>>>>>>> ones.
> >>>>>>>>>>>
> >>>>>>>>>>> Ack. Updated the KIP.
> >>>>>>>>>>>
> >>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing any
> new
> >>>>>>>>>> records
> >>>>>>>>>>> and
> >>>>>>>>>>>>> just allow the prepare tasks to restore, then once all
> prepare
> >>>>>>> tasks
> >>>>>>>>>>> have
> >>>>>>>>>>>>> restored, it's a signal for the second round of rolling
> bounces
> >>>>> and
> >>>>>>>>>>> then as
> >>>>>>>>>>>>> each task successfully renames its prepare directories and
> >>>> deletes
> >>>>>>>> the
> >>>>>>>>>>> old
> >>>>>>>>>>>>> active task directories, normal processing of records
> resumes.
> >>>>>>>>>>>
> >>>>>>>>>>> The basic idea is to do an online upgrade to avoid downtime. We
> >>>> can
> >>>>>>>>>>> discuss to offer both options... For the offline upgrade
> option,
> >>>> we
> >>>>>>>>>>> could simplify user interaction and trigger the second
> rebalance
> >>>>>>>>>>> automatically with the requirement that a user needs to update
> >> any
> >>>>>>>>>> config.
> >>>>>>>>>>>
> >>>>>>>>>>> If might actually be worth to include this option: we know from
> >>>>>>>>>>> experience with state restore, that regular processing slows
> down
> >>>>> the
> >>>>>>>>>>> restore. For roll_over upgrade, it would be a different story
> and
> >>>>>>>>>>> upgrade should not be slowed down by regular processing. Thus,
> we
> >>>>>>>> should
> >>>>>>>>>>> even make in_place an offline upgrade and force people to use
> >>>>>>> roll_over
> >>>>>>>>>>> if they need onlint upgrade. Might be a fair tradeoff that may
> >>>>>>> simplify
> >>>>>>>>>>> the upgrade for the user and for the code complexity.
> >>>>>>>>>>>
> >>>>>>>>>>> Let's see what other think.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote:
> >>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the update and the working prototype, it helps with
> >>>>>>>>>>>> understanding the KIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I took an initial pass over this PR, and overall I find the
> >>>>>>> interfaces
> >>>>>>>>>>> and
> >>>>>>>>>>>> approach to be reasonable.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to
> >> watch
> >>>>>>> the
> >>>>>>>>>>>> restore process), I'm wondering if we want to provide a type
> of
> >>>>>>>>>>>> StateRestoreListener that could signal when the new stores
> have
> >>>>>>>> reached
> >>>>>>>>>>>> parity with the existing old stores and that could be the
> signal
> >>>> to
> >>>>>>>>>> start
> >>>>>>>>>>>> second rolling rebalance?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Although you solicited feedback on the interfaces involved, I
> >>>>> wanted
> >>>>>>>> to
> >>>>>>>>>>> put
> >>>>>>>>>>>> down some thoughts that have come to mind reviewing this KIP
> >>>> again
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. Out of N instances, one fails midway through the process,
> >>>> would
> >>>>>>> we
> >>>>>>>>>>> allow
> >>>>>>>>>>>> the other instances to complete or just fail the entire
> upgrade?
> >>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename the
> >>>>>>> current
> >>>>>>>>>>>> active directories vs. deleting them right away,  and when all
> >>>> the
> >>>>>>>>>>> prepare
> >>>>>>>>>>>> task directories are successfully migrated then delete the
> >>>> previous
> >>>>>>>>>>> active
> >>>>>>>>>>>> ones.
> >>>>>>>>>>>> 3. For the first rolling bounce we pause any processing any
> new
> >>>>>>>> records
> >>>>>>>>>>> and
> >>>>>>>>>>>> just allow the prepare tasks to restore, then once all prepare
> >>>>> tasks
> >>>>>>>>>> have
> >>>>>>>>>>>> restored, it's a signal for the second round of rolling
> bounces
> >>>> and
> >>>>>>>>>> then
> >>>>>>>>>>> as
> >>>>>>>>>>>> each task successfully renames its prepare directories and
> >>>> deletes
> >>>>>>> the
> >>>>>>>>>>> old
> >>>>>>>>>>>> active task directories, normal processing of records resumes.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Bill
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax <
> >>>>>>>> matth...@confluent.io
> >>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in AK
> >> 2.0
> >>>>>>>>>>>>> release. Thus, I want to pick up this KIP again to get the
> >>>> RocksDB
> >>>>>>>>>>>>> upgrade done for 2.1.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I updated the KIP accordingly and also have a "prove of
> >> concept"
> >>>>> PR
> >>>>>>>>>>>>> ready (for "in place" upgrade only):
> >>>>>>>>>>>>> https://github.com/apache/kafka/pull/5422/
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> There a still open questions, but I want to collect early
> >>>> feedback
> >>>>>>> on
> >>>>>>>>>>>>> the proposed interfaces we need for the store upgrade. Also
> >>>> note,
> >>>>>>>> that
> >>>>>>>>>>>>> the KIP now also aim to define a generic upgrade path from
> any
> >>>>>>> store
> >>>>>>>>>>>>> format A to any other store format B. Adding timestamps is
> just
> >>>> a
> >>>>>>>>>>>>> special case.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I will continue to work on the PR and refine the KIP in the
> >>>>>>> meantime,
> >>>>>>>>>>> too.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote:
> >>>>>>>>>>>>>> After some more thoughts, I want to follow John's suggestion
> >>>> and
> >>>>>>>>>> split
> >>>>>>>>>>>>>> upgrading the rebalance metadata from the store upgrade.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I extracted the metadata upgrade into it's own KIP:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%
> >>>>>>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'll update this KIP accordingly shortly. I also want to
> >>>> consider
> >>>>>>> to
> >>>>>>>>>>>>>> make the store format upgrade more flexible/generic. Atm,
> the
> >>>> KIP
> >>>>>>> is
> >>>>>>>>>>> too
> >>>>>>>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI
> >> users
> >>>>>>> that
> >>>>>>>>>> we
> >>>>>>>>>>>>>> should not force to upgrade the stores. I need to figure out
> >>>> the
> >>>>>>>>>>> details
> >>>>>>>>>>>>>> and follow up later.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Please give feedback for the new KIP-268 on the
> corresponding
> >>>>>>>>>>> discussion
> >>>>>>>>>>>>>> thread.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't
> figure
> >>>> out
> >>>>>>> a
> >>>>>>>>>> way
> >>>>>>>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a
> >> fix
> >>>>>>> for
> >>>>>>>>>>>>>> future upgrades. Please share your thoughts.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for all your feedback!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
> >>>>>>>>>>>>>>> @John: yes, we would throw if configs are missing (it's an
> >>>>>>>>>>>>>>> implementation details IMHO and thus I did not include it
> in
> >>>> the
> >>>>>>>>>> KIP)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> @Guozhang:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) I understand know what you mean. We can certainly, allow
> >>>> all
> >>>>>>>>>> values
> >>>>>>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for
> >>>>>>> `upgrade.from`
> >>>>>>>>>>>>>>> parameter. I had a similar though once but decided to
> >> collapse
> >>>>>>> them
> >>>>>>>>>>> into
> >>>>>>>>>>>>>>> one -- will update the KIP accordingly.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2) The idea to avoid any config would be, to always send
> both
> >>>>>>>>>> request.
> >>>>>>>>>>>>>>> If we add a config to eventually disable the old request,
> we
> >>>>>>> don't
> >>>>>>>>>>> gain
> >>>>>>>>>>>>>>> anything with this approach. The question is really, if we
> >> are
> >>>>>>>>>> willing
> >>>>>>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be
> limited
> >>>> to
> >>>>>>> 2
> >>>>>>>>>>>>>>> versions and not grow further in future releases. More
> >> details
> >>>>> in
> >>>>>>>>>> (3)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and
> >>>> allows
> >>>>>>> us
> >>>>>>>>>> to
> >>>>>>>>>>>>>>> stay with 2 "assignment strategies" we need to register, as
> >>>> the
> >>>>>>> new
> >>>>>>>>>>>>>>> assignment strategy will allow to "upgrade itself" via
> >>>> "version
> >>>>>>>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a
> >>>> config
> >>>>>>> if
> >>>>>>>>>>>>>>> people upgrade from pre-1.2 releases.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thus, I don't think we need to register new "assignment
> >>>>>>> strategies"
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>> send empty subscriptions for older version.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 4) I agree that this is a tricky thing to get right with a
> >>>>> single
> >>>>>>>>>>>>>>> rebalance. I share the concern that an application might
> >> never
> >>>>>>>> catch
> >>>>>>>>>>> up
> >>>>>>>>>>>>>>> and thus the hot standby will never be ready.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Maybe it's better to go with 2 rebalances for store
> upgrades.
> >>>> If
> >>>>>>> we
> >>>>>>>>>> do
> >>>>>>>>>>>>>>> this, we also don't need to go with (2) and can get (3) in
> >>>> place
> >>>>>>>> for
> >>>>>>>>>>>>>>> future upgrades. I also think that changes to the metadata
> >> are
> >>>>>>> more
> >>>>>>>>>>>>>>> likely and thus allowing for single rolling bounce for this
> >>>> case
> >>>>>>> is
> >>>>>>>>>>> more
> >>>>>>>>>>>>>>> important anyway. If we assume that store upgrade a rare,
> it
> >>>>>>> might
> >>>>>>>>>> be
> >>>>>>>>>>> ok
> >>>>>>>>>>>>>>> to sacrifice two rolling bounced for this case. It was just
> >> an
> >>>>>>> idea
> >>>>>>>>>> I
> >>>>>>>>>>>>>>> wanted to share (even if I see the issues).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
> >>>>>>>>>>>>>>>> Hello Matthias, thanks for your replies.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1) About the config names: actually I was trying to not
> >>>> expose
> >>>>>>>>>>>>>>>> implementation details :) My main concern was that in your
> >>>>>>>> proposal
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>> values need to cover the span of all the versions that are
> >>>>>>>> actually
> >>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a
> user)
> >>>> am
> >>>>>>>>>>>>> upgrading
> >>>>>>>>>>>>>>>> from any versions within this range I need to remember to
> >> use
> >>>>>>> the
> >>>>>>>>>>> value
> >>>>>>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In
> my
> >>>>>>>>>>> suggestion
> >>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>> was trying to argue the benefit of just letting users to
> >>>>> specify
> >>>>>>>>>> the
> >>>>>>>>>>>>> actual
> >>>>>>>>>>>>>>>> Kafka version she's trying to upgrade from, than
> specifying
> >> a
> >>>>>>>> range
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as
> >> the
> >>>>>>>>>> values,
> >>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>> still using Kafka versions like broker's
> `internal.version`
> >>>>>>>> config.
> >>>>>>>>>>>>> But if
> >>>>>>>>>>>>>>>> you were suggesting the same thing, i.e. by
> "0.10.1.x-1.1.x"
> >>>>> you
> >>>>>>>>>>> meant
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or
> "0.11.0"
> >>>> or
> >>>>>>>>>> "1.1"
> >>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>> are all recognizable config values then I think we are
> >>>> actually
> >>>>>>> on
> >>>>>>>>>>> the
> >>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>> page.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would
> increase
> >>>> the
> >>>>>>>>>>> network
> >>>>>>>>>>>>>>>> footprint, but not the message size, IF I'm not
> >>>>>>> mis-understanding
> >>>>>>>>>>> your
> >>>>>>>>>>>>> idea
> >>>>>>>>>>>>>>>> of registering multiple assignment. More details:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can
> >> encode
> >>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>> protocols each with their different metadata. The
> >> coordinator
> >>>>>>> will
> >>>>>>>>>>>>> pick the
> >>>>>>>>>>>>>>>> common one that everyone supports (if there are no common
> >>>> one,
> >>>>>>> it
> >>>>>>>>>>> will
> >>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>> an error back; if there are multiple ones, it will pick
> the
> >>>> one
> >>>>>>>>>> with
> >>>>>>>>>>>>> most
> >>>>>>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded
> list).
> >>>>>>> Since
> >>>>>>>>>> our
> >>>>>>>>>>>>>>>> current Streams rebalance protocol is still based on the
> >>>>>>> consumer
> >>>>>>>>>>>>>>>> coordinator, it means our protocol_type would be
> "consumer",
> >>>>> but
> >>>>>>>>>>>>> instead
> >>>>>>>>>>>>>>>> the protocol type we can have multiple protocols like
> >>>>> "streams",
> >>>>>>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we
> need
> >>>> to
> >>>>>>>>>>>>> implement a
> >>>>>>>>>>>>>>>> different assignor class for each version and register all
> >> of
> >>>>>>> them
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the
> >>>> future
> >>>>>>> if
> >>>>>>>>>> we
> >>>>>>>>>>>>>>>> re-factor our implementation to have our own client
> >>>> coordinator
> >>>>>>>>>> layer
> >>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>> Connect did, we can simplify this part of the
> >> implementation.
> >>>>>>> But
> >>>>>>>>>>> even
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> now with the above approach this is still doable.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On the broker side, the group coordinator will only
> persist
> >> a
> >>>>>>>> group
> >>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>> the selected protocol and its subscription metadata, e.g.
> if
> >>>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>> decides to pick "streams_v2" it will only sends that
> >>>> protocol's
> >>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>> from everyone to the leader to assign, AND when completing
> >>>> the
> >>>>>>>>>>>>> rebalance it
> >>>>>>>>>>>>>>>> will also only write the group metadata with that protocol
> >>>> and
> >>>>>>> the
> >>>>>>>>>>>>>>>> assignment only. In a word, although the network traffic
> >>>> maybe
> >>>>>>>>>>>>> increased a
> >>>>>>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One corner
> >>>>>>>>>> situation
> >>>>>>>>>>> we
> >>>>>>>>>>>>>>>> need to consider is how to stop registering very old
> >>>> assignors
> >>>>>>> to
> >>>>>>>>>>>>> avoid the
> >>>>>>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if you
> >> are
> >>>>>>>>>> rolling
> >>>>>>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1
> >>>>>>> assignor
> >>>>>>>>>>>>> anymore,
> >>>>>>>>>>>>>>>> but that would unfortunately still require some configs.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 3) About the  "version probing" idea, I think that's a
> >>>>> promising
> >>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>> as well, but if we are going to do the multi-assignment
> its
> >>>>>>> value
> >>>>>>>>>>> seems
> >>>>>>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top of
> >>>>>>>>>>>>> multi-assignment
> >>>>>>>>>>>>>>>> to save us from still requiring the config to avoid
> >>>> registering
> >>>>>>>> all
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>> metadata for all version. More details:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In the JoinGroupRequest, we still register all the
> assignor
> >>>> but
> >>>>>>>> for
> >>>>>>>>>>>>> all old
> >>>>>>>>>>>>>>>> assignors we do not encode any metadata, i.e. the encoded
> >>>> data
> >>>>>>>>>> would
> >>>>>>>>>>>>> be:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> "streams_vN" : "encoded metadata"
> >>>>>>>>>>>>>>>> "streams_vN-1":empty
> >>>>>>>>>>>>>>>> "streams_vN-2":empty
> >>>>>>>>>>>>>>>> ..
> >>>>>>>>>>>>>>>> "streams_0":empty
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> So the coordinator can still safely choose the latest
> common
> >>>>>>>>>> version;
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> then when leaders receive the subscription (note it should
> >>>>>>> always
> >>>>>>>>>>>>> recognize
> >>>>>>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of the
> >>>>>>>>>>>>> subscriptions
> >>>>>>>>>>>>>>>> are empty bytes, it will send the empty assignment with
> that
> >>>>>>>>>> version
> >>>>>>>>>>>>> number
> >>>>>>>>>>>>>>>> encoded in the metadata. So in the second auto-triggered
> all
> >>>>>>>>>> members
> >>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>> send the metadata with that version:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> "streams_vN" : empty
> >>>>>>>>>>>>>>>> "streams_vN-1" : empty
> >>>>>>>>>>>>>>>> "streams_vN-2" : "encoded metadata"
> >>>>>>>>>>>>>>>> ..
> >>>>>>>>>>>>>>>> "streams_0":empty
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> By doing this we would not require any configs for users.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear
> >>>> about
> >>>>>>>> the
> >>>>>>>>>>>>> details
> >>>>>>>>>>>>>>>> so probably we'd need to fill that out before making a
> call.
> >>>>> For
> >>>>>>>>>>>>> example,
> >>>>>>>>>>>>>>>> you mentioned "If we detect this situation, the Streams
> >>>>>>>> application
> >>>>>>>>>>>>> closes
> >>>>>>>>>>>>>>>> corresponding active tasks as well as "hot standby" tasks,
> >>>> and
> >>>>>>>>>>>>> re-creates
> >>>>>>>>>>>>>>>> the new active tasks using the new store." How could we
> >>>>>>> guarantee
> >>>>>>>>>>> that
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> gap between these two stores will keep decreasing than
> >>>>>>> increasing
> >>>>>>>>>> so
> >>>>>>>>>>>>> we'll
> >>>>>>>>>>>>>>>> eventually achieve the flip point? And also the longer we
> >> are
> >>>>>>>>>> before
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> flip point, the larger we are doubling the storage space,
> >>>> etc.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <
> >>>>>>>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> @John, Guozhang,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> thanks a lot for your comments. Very long reply...
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> About upgrading the rebalance metadata:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Another possibility to do this, would be to register
> >>>> multiple
> >>>>>>>>>>>>> assignment
> >>>>>>>>>>>>>>>>> strategies for the 1.2 applications. For this case, new
> >>>>>>> instances
> >>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> be configured to support both and the broker would pick
> the
> >>>>>>>>>> version
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> all instances understand. The disadvantage would be, that
> >> we
> >>>>>>> send
> >>>>>>>>>>> much
> >>>>>>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as
> long
> >>>> as
> >>>>>>> no
> >>>>>>>>>>>>> second
> >>>>>>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, using
> >>>> this
> >>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>>> would allow to avoid a second rebalance trading-off an
> >>>>>>> increased
> >>>>>>>>>>>>>>>>> rebalance network footprint (I also assume that this
> would
> >>>>>>>>>> increase
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> message size that is written into __consumer_offsets
> >>>> topic?).
> >>>>>>>>>>>>> Overall, I
> >>>>>>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it
> could
> >>>>>>> avoid
> >>>>>>>> a
> >>>>>>>>>>>>>>>>> second rebalance (I have some more thoughts about stores
> >>>> below
> >>>>>>>>>> that
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>> relevant for single rebalance upgrade).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For future upgrades we might be able to fix this though.
> I
> >>>> was
> >>>>>>>>>>>>> thinking
> >>>>>>>>>>>>>>>>> about the following:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> In the current implementation, the leader fails if it
> gets
> >> a
> >>>>>>>>>>>>>>>>> subscription it does not understand (ie, newer version).
> We
> >>>>>>> could
> >>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>> this behavior and let the leader send an empty assignment
> >>>> plus
> >>>>>>>>>> error
> >>>>>>>>>>>>>>>>> code (including supported version) back to the instance
> >>>>> sending
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> "bad" subscription. This would allow the following logic
> >> for
> >>>>> an
> >>>>>>>>>>>>>>>>> application instance:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>  - on startup, always send the latest subscription format
> >>>>>>>>>>>>>>>>>  - if leader understands it, we get an assignment back an
> >>>>> start
> >>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>  - if leader does not understand it, we get an empty
> >>>>> assignment
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>> supported version back
> >>>>>>>>>>>>>>>>>  - the application unsubscribe()/subscribe()/poll() again
> >>>> and
> >>>>>>>>>>> sends a
> >>>>>>>>>>>>>>>>> subscription using the leader's supported version
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> This protocol would allow to do a single rolling bounce,
> >> and
> >>>>>>>>>>>>> implements
> >>>>>>>>>>>>>>>>> a "version probing" step, that might result in two
> executed
> >>>>>>>>>>>>> rebalances.
> >>>>>>>>>>>>>>>>> The advantage would be, that the user does not need to
> set
> >>>> any
> >>>>>>>>>>> configs
> >>>>>>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care of
> >>>> this
> >>>>>>>>>>>>>>>>> automatically.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> One disadvantage would be, that two rebalances happen and
> >>>> that
> >>>>>>>> for
> >>>>>>>>>>> an
> >>>>>>>>>>>>>>>>> error case during rebalance, we loose the information
> about
> >>>>> the
> >>>>>>>>>>>>>>>>> supported leader version and the "probing step" would
> >>>> happen a
> >>>>>>>>>>> second
> >>>>>>>>>>>>> time.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If the leader is eventually updated, it will include it's
> >>>> own
> >>>>>>>>>>>>> supported
> >>>>>>>>>>>>>>>>> version in all assignments, to allow a "down graded"
> >>>>>>> application
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> upgrade its version later. Also, if a application fails,
> >> the
> >>>>>>>> first
> >>>>>>>>>>>>>>>>> probing would always be successful and only a single
> >>>> rebalance
> >>>>>>>>>>>>> happens.
> >>>>>>>>>>>>>>>>> If we use this protocol, I think we don't need any
> >>>>>>> configuration
> >>>>>>>>>>>>>>>>> parameter for future upgrades.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version":
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Users would set "upgrade.from" to the release version the
> >>>>>>>>>>> current/old
> >>>>>>>>>>>>>>>>> application is using. I think this is simpler, as users
> >> know
> >>>>>>> this
> >>>>>>>>>>>>>>>>> version. If we use "internal.protocol.version" instead,
> we
> >>>>>>> expose
> >>>>>>>>>>>>>>>>> implementation details and users need to know the
> protocol
> >>>>>>>> version
> >>>>>>>>>>>>> (ie,
> >>>>>>>>>>>>>>>>> they need to map from the release version to the protocol
> >>>>>>>> version;
> >>>>>>>>>>> ie,
> >>>>>>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version
> >>>> 2").
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Also the KIP states that for the second rolling bounce,
> the
> >>>>>>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- and
> >>>>> thus,
> >>>>>>>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored
> (I
> >>>>> will
> >>>>>>>>>>> update
> >>>>>>>>>>>>>>>>> the KIP to point out this dependency).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> About your second point: I'll update the KIP accordingly
> to
> >>>>>>>>>> describe
> >>>>>>>>>>>>>>>>> future updates as well. Both will be different.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> One more point about upgrading the store format. I was
> >>>>> thinking
> >>>>>>>>>>> about
> >>>>>>>>>>>>>>>>> avoiding the second rolling bounce all together in the
> >>>> future:
> >>>>>>>> (1)
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this
> >>>>>>>> required
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the
> >> switch
> >>>>>>> and
> >>>>>>>>>>>>> delete
> >>>>>>>>>>>>>>>>> the old stores. (3) the current proposal does the switch
> >>>>>>>>>> "globally"
> >>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> this is simpler and due to the required second rebalance
> no
> >>>>>>>>>>>>> disadvantage.
> >>>>>>>>>>>>>>>>> However, a global consistent switch over might actually
> not
> >>>> be
> >>>>>>>>>>>>> required.
> >>>>>>>>>>>>>>>>> For "in_place" upgrade, following the protocol from
> above,
> >>>> we
> >>>>>>>>>> could
> >>>>>>>>>>>>>>>>> decouple the store switch and each instance could switch
> >> its
> >>>>>>>> store
> >>>>>>>>>>>>>>>>> independently from all other instances. After the rolling
> >>>>>>> bounce,
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>>> seems to be ok to switch from the old store to the new
> >> store
> >>>>>>>>>> "under
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> hood" whenever the new store is ready (this could even be
> >>>>> done,
> >>>>>>>>>>> before
> >>>>>>>>>>>>>>>>> we switch to the new metadata version). Each time we
> update
> >>>>> the
> >>>>>>>>>> "hot
> >>>>>>>>>>>>>>>>> standby" we check if it reached the "endOffset"  (or
> maybe
> >>>> X%
> >>>>>>>> that
> >>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>> either be hardcoded or configurable). If we detect this
> >>>>>>>> situation,
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> Streams application closes corresponding active tasks as
> >>>> well
> >>>>>>> as
> >>>>>>>>>>> "hot
> >>>>>>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks using
> >>>> the
> >>>>>>> new
> >>>>>>>>>>>>> store.
> >>>>>>>>>>>>>>>>> (I need to go through the details once again, but it
> seems
> >>>> to
> >>>>>>> be
> >>>>>>>>>>>>>>>>> feasible.).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Combining this strategy with the "multiple assignment"
> >> idea,
> >>>>>>>> might
> >>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1
> >> ->
> >>>>>>> 1.2.
> >>>>>>>>>>>>>>>>> Applications would just use the old store, as long as the
> >>>> new
> >>>>>>>>>> store
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> not ready, even if the new metadata version is used
> >> already.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For future upgrades, a single rebalance would be
> >> sufficient,
> >>>>>>> too,
> >>>>>>>>>>> even
> >>>>>>>>>>>>>>>>> if the stores are upgraded. We would not need any config
> >>>>>>>>>> parameters
> >>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> the "probe" step allows us to detect the supported
> >> rebalance
> >>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>> version (and we would also not need multiple "assigmnent
> >>>>>>>>>> strategies"
> >>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> out own protocol encoded everything we need).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
> >>>>>>>>>>>>>>>>>> @John:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> For the protocol version upgrade, it is only for the
> >>>> encoded
> >>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>> bytes
> >>>>>>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from
> >> Consumer's
> >>>>>>> pov,
> >>>>>>>>>>> so I
> >>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>> this change should be in the Streams layer as well.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> @Matthias:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> for 2), I agree that adding a "newest supported version"
> >>>>>>> besides
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> "currently used version for encoding" is a good idea to
> >>>> allow
> >>>>>>>>>>> either
> >>>>>>>>>>>>>>>>> case;
> >>>>>>>>>>>>>>>>>> the key is that in Streams we would likely end up with a
> >>>>>>> mapping
> >>>>>>>>>>>>> from the
> >>>>>>>>>>>>>>>>>> protocol version to the other persistent data format
> >>>> versions
> >>>>>>>>>> such
> >>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually
> >>>>> achieve
> >>>>>>>>>> both
> >>>>>>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded
> >>>>> protocol
> >>>>>>>>>>>>> version's
> >>>>>>>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0
> ->
> >>>>>>> 0.10.1
> >>>>>>>>>>>>> leaders
> >>>>>>>>>>>>>>>>>> can choose to use the newer version in the first rolling
> >>>>>>> bounce
> >>>>>>>>>>>>> directly
> >>>>>>>>>>>>>>>>>> and we can document to users that they would not need to
> >>>> set
> >>>>>>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the
> upgraded
> >>>>>>>>>> protocol
> >>>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2,
> >> and
> >>>>>>> then
> >>>>>>>>>> we
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the
> first
> >>>>>>>> rolling
> >>>>>>>>>>>>> bounce
> >>>>>>>>>>>>>>>>>> and reset in the second.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Besides that, some additional comments:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for
> >> users
> >>>>> to
> >>>>>>>>>> set
> >>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>> "internal.protocol.version" where for the latter users
> >> only
> >>>>>>> need
> >>>>>>>>>> to
> >>>>>>>>>>>>> set a
> >>>>>>>>>>>>>>>>>> single version, while the Streams will map that version
> to
> >>>>> the
> >>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>> assignor's behavior as well as the data format. But
> maybe
> >> I
> >>>>>>> did
> >>>>>>>>>> not
> >>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>> your idea about how the  "upgrade.from" config will be
> >> set,
> >>>>>>>>>> because
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> your Compatibility section how the upgrade.from config
> >> will
> >>>>> be
> >>>>>>>>>> set
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> these two rolling bounces are not very clear: for
> example,
> >>>>>>>> should
> >>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>> reset it to null in the second rolling bounce?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2) In the upgrade path description, rather than talking
> >>>> about
> >>>>>>>>>>>>> specific
> >>>>>>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just
> >>>> categorize
> >>>>>>> all
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> possible scenarios, even for future upgrade versions,
> what
> >>>>>>>> should
> >>>>>>>>>>> be
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> standard operations? The categorized we can summarize to
> >>>>> would
> >>>>>>>> be
> >>>>>>>>>>>>>>>>> (assuming
> >>>>>>>>>>>>>>>>>> user upgrade from version X to version Y, where X and Y
> >> are
> >>>>>>>> Kafka
> >>>>>>>>>>>>>>>>> versions,
> >>>>>>>>>>>>>>>>>> with the corresponding supported protocol version x and
> >> y):
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and
> >>>> hence
> >>>>>>> no
> >>>>>>>>>>>>>>>>> persistent
> >>>>>>>>>>>>>>>>>> data formats have changed.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> b. x != y, but all persistent data format remains the
> >> same.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB
> >>>>> format,
> >>>>>>>>>>>>> changelog
> >>>>>>>>>>>>>>>>>> format, has been changed.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> c. special case: we may need some special handling logic
> >>>> when
> >>>>>>>>>>>>> "current
> >>>>>>>>>>>>>>>>>> version" or "newest supported version" are not available
> >> in
> >>>>>>> the
> >>>>>>>>>>>>> protocol,
> >>>>>>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> under the above scenarios, how many rolling bounces
> users
> >>>>> need
> >>>>>>>> to
> >>>>>>>>>>>>>>>>> execute?
> >>>>>>>>>>>>>>>>>> how they should set the configs in each rolling bounce?
> >> and
> >>>>>>> how
> >>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>> library will execute in these cases?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <
> >>>>>>>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Ted,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I still consider changing the KIP to include it right
> >> away
> >>>>> --
> >>>>>>>> if
> >>>>>>>>>>>>> not,
> >>>>>>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more
> >>>> detail
> >>>>>>>>>> first.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> (Same for other open questions like interface names --
> I
> >>>>>>>> collect
> >>>>>>>>>>>>>>>>>>> feedback and update the KIP after we reach consensus
> :))
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
> >>>>>>>>>>>>>>>>>>>> Thanks for the details, Matthias.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future
> >>>> release,
> >>>>>>>>>>> encoding
> >>>>>>>>>>>>>>>>> both
> >>>>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>>>> and supported version might be an advantage
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be
> >> implemented
> >>>>> in
> >>>>>>>>>> this
> >>>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding
> >>>> proposal.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Cheers
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <
> >>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of
> >> user
> >>>>>>>> code.
> >>>>>>>>>>> And
> >>>>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>> if we want to add something like this, I would prefer
> >> to
> >>>>> do
> >>>>>>>> it
> >>>>>>>>>>> in
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> separate KIP.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling
> bounce.
> >>>> But
> >>>>>>>>>> from
> >>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>> understanding it would not be possible.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to
> >>>>>>> switch
> >>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to
> switch
> >>>>>>> from
> >>>>>>>>>> the
> >>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>>>>>> store to the new store (this happens after the last
> >>>>>>> instance
> >>>>>>>>>>>>> bounces a
> >>>>>>>>>>>>>>>>>>>>> second time).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is,
> that
> >>>>> it's
> >>>>>>>>>>>>> unclear
> >>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>> to which from version 2 to version 3. The
> >>>>>>>>>>>>> StreamsPartitionsAssignor is
> >>>>>>>>>>>>>>>>>>>>> stateless by design, and thus, the information which
> >>>>>>> version
> >>>>>>>>>> it
> >>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>> use must be passed in from externally -- and we want
> to
> >>>>> use
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> StreamsConfig to pass in this information.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> During upgrade, all new instanced have no information
> >>>>> about
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> progress
> >>>>>>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got
> >>>> upgrades
> >>>>>>>>>>>>> already).
> >>>>>>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3
> >>>>>>>>>>>>> subscription.
> >>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>> leader also has this limited view on the world and
> can
> >>>>> only
> >>>>>>>>>> send
> >>>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>>> 2 assignments back.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can
> >> simplify
> >>>>>>> the
> >>>>>>>>>>>>> upgrade.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> We did consider to change the metadata to make later
> >>>>>>> upgrades
> >>>>>>>>>>> (ie,
> >>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change
> the
> >>>>>>>>>> metadata
> >>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>> storage format again -- as long as we don't change
> it,
> >> a
> >>>>>>>>>> single
> >>>>>>>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and
> >>>>>>>>>> "supported
> >>>>>>>>>>>>>>>>>>>>> version". This would allow the leader to switch to
> the
> >>>> new
> >>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader would
> >>>>>>> receive
> >>>>>>>>>>> "used
> >>>>>>>>>>>>>>>>>>>>> version == old" and "supported version = old/new" --
> as
> >>>>>>> long
> >>>>>>>>>> as
> >>>>>>>>>>> at
> >>>>>>>>>>>>>>>>> least
> >>>>>>>>>>>>>>>>>>>>> one instance sends a "supported version = old" leader
> >>>>> sends
> >>>>>>>>>> old
> >>>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>>> assignment back. However, encoding both version would
> >>>>> allow
> >>>>>>>>>> that
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> leader can send a new version assignment back, right
> >>>> after
> >>>>>>>> the
> >>>>>>>>>>>>> first
> >>>>>>>>>>>>>>>>>>>>> round or rebalance finished (all instances send
> >>>> "supported
> >>>>>>>>>>>>> version =
> >>>>>>>>>>>>>>>>>>>>> new"). However, there are still two issues with this:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the
> last
> >>>>>>>>>> instance
> >>>>>>>>>>>>>>>>> bounced,
> >>>>>>>>>>>>>>>>>>>>> the new stores might not be ready to be used -- this
> >>>> could
> >>>>>>>>>> lead
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> "downtime" as store must be restored before
> processing
> >>>> can
> >>>>>>>>>>> resume.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again.
> At
> >>>>> this
> >>>>>>>>>>>>> point, the
> >>>>>>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and
> >> thus
> >>>>>>>> sends
> >>>>>>>>>>>>> the old
> >>>>>>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to
> never
> >>>>> fall
> >>>>>>>>>> back
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> old protocol after the switch to the new protocol.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> The second issue is minor and I guess if users set-up
> >>>> the
> >>>>>>>>>>> instance
> >>>>>>>>>>>>>>>>>>>>> properly it could be avoided. However, the first
> issue
> >>>>>>> would
> >>>>>>>>>>>>> prevent
> >>>>>>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we
> >>>> consider
> >>>>>>>>>> that
> >>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>>>> change the metadata protocol only if a future
> release,
> >>>>>>>>>> encoding
> >>>>>>>>>>>>> both
> >>>>>>>>>>>>>>>>>>>>> used and supported version might be an advantage in
> the
> >>>>>>>> future
> >>>>>>>>>>>>> and we
> >>>>>>>>>>>>>>>>>>>>> could consider to add this information in 1.2 release
> >> to
> >>>>>>>>>> prepare
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to
> give
> >>>> the
> >>>>>>>>>>>>> instances
> >>>>>>>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If
> you
> >>>>>>> would
> >>>>>>>>>> do
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> second rolling bounce before this, it would still
> work
> >>>> --
> >>>>>>>>>>>>> however, you
> >>>>>>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be
> fully
> >>>>>>>>>> restored
> >>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>> processing can resume.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Does this make sense?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
> >>>>>>>>>>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid
> >> of
> >>>>>>> the
> >>>>>>>>>> 2nd
> >>>>>>>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>>>> bounce?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary
> >>>>> difference
> >>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to
> >>>> decide
> >>>>>>>>>>>>> whether to
> >>>>>>>>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3.
> >>>>>>> (Actually,
> >>>>>>>>>>>>> there is
> >>>>>>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says
> that
> >>>> the
> >>>>>>>> 2nd
> >>>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>>>> bounce should happen after all new state stores are
> >>>>> created
> >>>>>>>> by
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling
> >>>> bounce,
> >>>>>>> we
> >>>>>>>>>>> say
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>> there is still a background thread, so it seems like
> is
> >>>> no
> >>>>>>>>>>> actual
> >>>>>>>>>>>>>>>>>>>>> requirement to wait for the new state stores to be
> >>>>>>> created.)
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal
> with
> >>>>>>>>>>> mixed-mode
> >>>>>>>>>>>>>>>>>>> (having
> >>>>>>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer
> >>>> group).
> >>>>>>> It
> >>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic
> >>>>>>>>>>>>> (somehow/somewhere)
> >>>>>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>>>>>> that:
> >>>>>>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all
> >>>>>>> instances
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>>> running
> >>>>>>>>>>>>>>>>>>>>> the new code.
> >>>>>>>>>>>>>>>>>>>>>> * Once all the instances are running the new code,
> >> then
> >>>>>>> one
> >>>>>>>>>> at
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> time,
> >>>>>>>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader
> >>>> still
> >>>>>>>>>> hands
> >>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores are
> >>>>> ready.
> >>>>>>>>>>>>>>>>>>>>>> * Once all instances report that new stores are
> ready,
> >>>>>>>> Leader
> >>>>>>>>>>>>> sends
> >>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>> Assignment Version 3.
> >>>>>>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3,
> >> it
> >>>>>>> can
> >>>>>>>>>>>>> delete
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> old state store.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot
> of
> >>>>>>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling
> >>>>>>>>>> restarts.
> >>>>>>>>>>> No
> >>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy
> >>>> it,
> >>>>>>> and
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> instances
> >>>>>>>>>>>>>>>>>>>>> update themselves.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The thing that made me think of this is that the "2
> >>>>>>> rolling
> >>>>>>>>>>>>> bounces"
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in
> >>>>>>>>>>>>>>>>>>>>> inter.broker.protocol.version and
> >>>>>>> log.message.format.version.
> >>>>>>>>>>> And
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> broker case, it seems like it would be possible (with
> >>>> some
> >>>>>>>>>> work
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> course)
> >>>>>>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar
> >> auto-detection
> >>>>> of
> >>>>>>>>>>> broker
> >>>>>>>>>>>>>>>>>>>>> capabilities and automatically do a switchover from
> >>>>> old/new
> >>>>>>>>>>>>> versions.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> -James
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <
> >>>>>>>> bbej...@gmail.com
> >>>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval
> >> methods
> >>>>> on
> >>>>>>>>>> the
> >>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>> interfaces.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a
> >>>>> Predicate
> >>>>>>>>>> that
> >>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored with
> >> the
> >>>>>>>>>> record?
> >>>>>>>>>>>>> Or
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>> better left for users to implement themselves once
> >> the
> >>>>>>> data
> >>>>>>>>>>> has
> >>>>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>>>>>>> retrieved?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <
> >>>>>>>> yuzhih...@gmail.com
> >>>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to
> which
> >>>>>>>>>>> separator
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> chosen.
> >>>>>>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current choice
> >> is
> >>>>>>>> good.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is
> >>>>> closer
> >>>>>>>> to
> >>>>>>>>>>>>> English
> >>>>>>>>>>>>>>>>>>>>>>>> grammar.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people
> think.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <
> >>>>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comments!
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance
> metadata
> >>>>>>>> upgrade
> >>>>>>>>>>> fix
> >>>>>>>>>>>>>>>>>>>>>>>>> (addressing the mentioned
> >>>>>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054
> )
> >>>> It
> >>>>>>>>>> give a
> >>>>>>>>>>>>> first
> >>>>>>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works
> including
> >>>> a
> >>>>>>>>>> system
> >>>>>>>>>>>>> test:
> >>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready.
> I
> >>>>>>> agree
> >>>>>>>>>>> that
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to
> give
> >>>>>>> better
> >>>>>>>>>>>>>>>>> discussion
> >>>>>>>>>>>>>>>>>>>>>>>>> context.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> @Ted:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the
> >>>>>>>>>>>>>>>>> `processing.guarantee`
> >>>>>>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and
> >>>>>>> `exactly_once`
> >>>>>>>>>> as
> >>>>>>>>>>>>>>>>> values.
> >>>>>>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash
> >>>> but
> >>>>> I
> >>>>>>>>>>> prefer
> >>>>>>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can
> >>>> also
> >>>>>>>>>> change
> >>>>>>>>>>>>> it to
> >>>>>>>>>>>>>>>>>>>>> `-`.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way
> -- I
> >>>>>>>>>> stripped
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> `With`
> >>>>>>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good
> to
> >>>>> get
> >>>>>>>>>>>>> feedback
> >>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> @John:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would
> >>>> not
> >>>>>>>>>> make a
> >>>>>>>>>>>>>>>>>>>>> difference.
> >>>>>>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two
> >>>>> version
> >>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>>>> introduce
> >>>>>>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I
> don't
> >>>> hit
> >>>>>>> an
> >>>>>>>>>>>>> issue
> >>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead
> of
> >>>>>>>>>>>>> `rocksdb-v2`
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hey Matthias,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several
> questions
> >>>>>>> queued
> >>>>>>>>>>> up,
> >>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>>>>>>>> were
> >>>>>>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section...
> oh,
> >>>>>>> well.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state
> >>>>> directory
> >>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/"
> to
> >>>>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
> >/rocksdb-v2/storeName/":
> >>>> if
> >>>>>>>> you
> >>>>>>>>>>>>> put the
> >>>>>>>>>>>>>>>>>>>>> "v2"
> >>>>>>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e.,
> >>>>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
> >>> /rocksdb/storeName-v2/"),
> >>>>>>> then
> >>>>>>>>>>> you
> >>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level
> directory
> >>>>>>>>>>> structure.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people
> >>>> running
> >>>>>>>>>>>>> scripts to
> >>>>>>>>>>>>>>>>>>>>>>>> monitor
> >>>>>>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such
> >> use
> >>>>>>>>>> cases.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <
> >>>>>>>>>>> yuzhih...@gmail.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ?
> Underscore
> >>>> may
> >>>>>>>>>>>>> sometimes
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> miss
> >>>>>>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more
> >> friendly
> >>>>> to
> >>>>>>>>>>> user.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> public interface
> >> ReadOnlyKeyValueTimestampStore<K,
> >>>>> V>
> >>>>>>>> {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better
> name
> >>>>> for
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> class ?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <
> >>>>>>>>>>>>>>>>> wangg...@gmail.com
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section
> and
> >>>> it
> >>>>>>>>>> looks
> >>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> me,
> >>>>>>>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also
> >> share
> >>>>> it
> >>>>>>>>>> here
> >>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>> people
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> can take a look?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like
> >>>> this
> >>>>>>>>>> there
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>> always
> >>>>>>>>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is
> >>>>> better
> >>>>>>>> to
> >>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the
> design
> >>>>>>>>>>>>> discussion :)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J.
> Sax
> >> <
> >>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API
> >> to
> >>>>>>>> allow
> >>>>>>>>>>>>> storing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the
> >> basis
> >>>>> to
> >>>>>>>>>>>>> resolve
> >>>>>>>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to