Matthias,

Thanks for the updated wiki page. I made another pass and here are some
minor comments I have (mostly about API names and writing itself):

1. The scope of the RecordConverter may not be very clear from the wiki
page reads. Here's my understanding:

1.a) For build-in implementations (rocksdb, memory) of the defined types of
StateStore (k-v, window, session), Streams will implement the
RecordConverter itself *so there's nothing that users need to do*;
1.b) For customized implementations of the defined types of StateStore,
Streams will use a proxy store internally to always "down-convert" the new
APIs to the old formats, *so there' nothing that users need to do still*;
but users can opt-in to let their custom impl to also extend RecordConver.
1.c) For user-defined StateStore (e.g. Alice has her own interface named
Database extending StateStore with her own impl). *This is the only place
users are required to do extra work by implementing the RecordConverter
interface*.

2. Naming.

2.a) I'd also prefer TimestampedXXXStore over XXXWithTimestampStore.
2.b) RecordConverter: I felt having a more specific name is better if we
believe future format changes cannot reuse it anyways. Maybe
"TimestampedRecordConverter"?

3. Regarding Bill's question above. One idea is that we can add an extra
check logic during the starting up phase, to check if the old CF is empty
or not already, and if yes set a flag so that we can skip that CF for state
store access. The rationale is that Streams apps are expected to be bounced
/ re-launched frequently so just having this check logic upon starting up
should be a good trade-off between complexity and efficiency.


Guozhang




Guozhang


On Sat, Jan 12, 2019 at 6:46 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> I also want to point out, that Ryanne Dolan commented on the WIP PR
> (https://github.com/apache/kafka/pull/6044) about the naming. I asked
> him to reply to this thread, but this did no happen yet, thus I want to
> point it out myself, because it seems to important.
>
>
> WindowWithTimestampStore.java
>
> > Hard to parse this class name -- sounds like "trait Window with
> TimestampStore" instead of WindowStore<... ValueAndTimestamp...>
> >
> > Maybe TimestampWindowStore or TimestampedWindowStore? shrug.
> >
>
>
>
> Another comment, that I personally do not necessarily agree:
>
>
> KeyValueWithTimestampStore.java
>
> > possible alternative: implement KeyValueStore<K, V>, and then expose an
> additional putWithTimestamp, getWithTimestamp etc for callers that want
> ValueAndTimestamp<V> instead of V. This would probably require fewer code
> changes elsewhere.
>
>
>
> What do you think?
>
>
> -Matthias
>
>
>
> On 1/12/19 6:43 PM, Matthias J. Sax wrote:
> > Bill,
> >
> > I left the question about legacy column family out, because as a matter
> > of fact, we use the default column family atm that cannot be deleted.
> > Thus, this old column family will always be there.
> >
> > Nevertheless, as an implementation detail, it might make sense to avoid
> > accessing both column families forever (ie, each time a key is not
> > found). Also, we might want/need a way, to "force" upgrading to the new
> > column family, for the case that some records are not accessed for a
> > long time. Again, this seems to be an implementation detail (and I am
> > also not sure if we really need it). If you thing both are not
> > implementation details, I can of course extend the KIP accordingly.
> >
> >
> > -Matthias
> >
> > On 1/11/19 1:27 PM, Bill Bejeck wrote:
> >> Hi Matthias,
> >>
> >> Thanks for the KIP, it goes into good detail and is well done.
> >>
> >> Overall I'm a +1 on the KIP and have one minor question.
> >>
> >> Regarding the upgrade path, we'll use two column families to do a lazy
> >> conversion which makes sense to me.  What is the plan to get
> >> rid of the "legacy" column family (if ever)?  Would we drop the "legacy"
> >> column family once it is empty? I'm not sure we'd ever need to as it
> would
> >> just be a column family that doesn't get used.
> >>
> >> Maybe this is an implementation detail and doesn't need to be addressed
> >> now, but it came to mind when I read the KIP.
> >>
> >> Thanks again,
> >> Bill
> >>
> >> On Fri, Jan 11, 2019 at 1:19 PM John Roesler <j...@confluent.io> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> Thanks for the updates to the KIP. I've just read it over, and am
> >>> personally quite happy with it.
> >>>
> >>> Thanks for tackling this dicey issue and putting in a huge amount of
> design
> >>> work to produce
> >>> a smooth upgrade path for DSL users.
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>> On Mon, Dec 17, 2018 at 10:35 AM Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> Dear all,
> >>>>
> >>>> I finally managed to update the KIP.
> >>>>
> >>>> To address the concerns about the complex upgrade path, I simplified
> the
> >>>> design. We don't need any configs and the upgrade can be done with the
> >>>> simple single rolling bounce pattern.
> >>>>
> >>>> The suggestion is to exploit RocksDB column families to isolate old
> and
> >>>> new on-disk format. Furthermore, the upgrade from old to new format
> >>>> happens "on the side" after an instance was upgraded.
> >>>>
> >>>> I also pushed a WIP PR in case you want to look into some details
> >>>> (potential reviewers, don't panic: I plan to break this down into
> >>>> multiple PRs for actual review if the KIP is accepted).
> >>>>
> >>>> https://github.com/apache/kafka/pull/6044
> >>>>
> >>>> @Eno: I think I never answered your question about being future proof:
> >>>>
> >>>> The latest design is not generic, because it does not support changes
> >>>> that need to be reflected in the changelog topic. I aimed for a
> >>>> non-generic design for now to keep it as simple as possible. Thus,
> other
> >>>> format changes might need a different design / upgrade path --
> however,
> >>>> because this KIP is quite encapsulated in the current design, I don't
> >>>> see any issue to build this later and a generic upgrade path seems to
> be
> >>>> an orthogonal concern atm.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 11/22/18 2:50 PM, Adam Bellemare wrote:
> >>>>> Thanks for the information Matthias.
> >>>>>
> >>>>> I will await your completion of this ticket then since it underpins
> the
> >>>>> essential parts of a RocksDB TTL aligned with the changelog topic. I
> am
> >>>>> eager to work on that ticket myself, so if I can help on this one in
> >>> any
> >>>>> way please let me know.
> >>>>>
> >>>>> Thanks
> >>>>> Adam
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Nov 20, 2018 at 5:26 PM Matthias J. Sax <
> matth...@confluent.io
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> It's an interesting idea to use second store, to maintain the
> >>>>>> timestamps. However, each RocksDB instance implies some overhead. In
> >>>>>> fact, we are looking into ColumnFamilies atm to see if we can use
> >>> those
> >>>>>> and merge multiple RocksDBs into a single one to reduce this
> overhead.
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 11/20/18 5:15 AM, Patrik Kleindl wrote:
> >>>>>>> Hi Adam
> >>>>>>>
> >>>>>>> Sounds great, I was already planning to ask around if anyone had
> >>>> tackled
> >>>>>>> this.
> >>>>>>> We have a use case very similar to what you described in
> KAFKA-4212,
> >>>> only
> >>>>>>> with Global State Stores.
> >>>>>>> I have tried a few things with the normal DSL but was not really
> >>>>>> successful.
> >>>>>>> Schedule/Punctuate is not possible, supplying a windowed store is
> >>> also
> >>>>>> not
> >>>>>>> allowed and the process method has no knowledge of the timestamp of
> >>> the
> >>>>>>> record.
> >>>>>>> And anything loaded on startup is not filtered anyway.
> >>>>>>>
> >>>>>>> Regarding 4212, wouldn't it be easier (although a little less
> >>>>>>> space-efficient) to track the Timestamps in a separate Store with
> <K,
> >>>>>> Long>
> >>>>>>> ?
> >>>>>>> This would leave the original store intact and allow a migration of
> >>> the
> >>>>>>> timestamps without touching the other data.
> >>>>>>>
> >>>>>>> So I am very interested in your PR :-)
> >>>>>>>
> >>>>>>> best regards
> >>>>>>>
> >>>>>>> Patrik
> >>>>>>>
> >>>>>>> On Tue, 20 Nov 2018 at 04:46, Adam Bellemare <
> >>> adam.bellem...@gmail.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Matthias
> >>>>>>>>
> >>>>>>>> Thanks - I figured that it was probably a case of just too much to
> >>> do
> >>>>>> and
> >>>>>>>> not enough time. I know how that can go. I am asking about this
> one
> >>> in
> >>>>>>>> relation to https://issues.apache.org/jira/browse/KAFKA-4212,
> >>> adding
> >>>> a
> >>>>>> TTL
> >>>>>>>> to RocksDB. I have outlined a bit about my use-case within 4212,
> but
> >>>> for
> >>>>>>>> brevity here it is:
> >>>>>>>>
> >>>>>>>> My case:
> >>>>>>>> 1) I have a RocksDB with TTL implementation working where records
> >>> are
> >>>>>> aged
> >>>>>>>> out using the TTL that comes with RocksDB (very simple).
> >>>>>>>> 2) We prevent records from loading from the changelog if
> recordTime
> >>> +
> >>>>>> TTL <
> >>>>>>>> referenceTimeStamp (default = System.currentTimeInMillis() ).
> >>>>>>>>
> >>>>>>>> This assumes that the records are stored with the same time
> >>> reference
> >>>>>> (say
> >>>>>>>> UTC) as the consumer materializing the RocksDB store.
> >>>>>>>>
> >>>>>>>> My questions about KIP-258 are as follows:
> >>>>>>>> 1) How does "we want to be able to store record timestamps in
> >>> KTables"
> >>>>>>>> differ from inserting records into RocksDB with TTL at consumption
> >>>>>> time? I
> >>>>>>>> understand that it could be a difference of some seconds, minutes,
> >>>>>> hours,
> >>>>>>>> days etc between when the record was published and now, but given
> >>> the
> >>>>>>>> nature of how RocksDB TTL works (eventual - based on compaction) I
> >>>> don't
> >>>>>>>> see how a precise TTL can be achieved, such as that which one can
> >>> get
> >>>>>> with
> >>>>>>>> windowed stores.
> >>>>>>>>
> >>>>>>>> 2) Are you looking to change how records are inserted into a TTL
> >>>>>> RocksDB,
> >>>>>>>> such that the TTL would take effect from the record's published
> >>> time?
> >>>> If
> >>>>>>>> not, what would be the ideal workflow here for a single record
> with
> >>>> TTL
> >>>>>>>> RocksDB?
> >>>>>>>> ie: Record Timestamp: 100
> >>>>>>>> TTL: 50
> >>>>>>>> Record inserted into rocksDB: 110
> >>>>>>>> Record to expire at 150?
> >>>>>>>>
> >>>>>>>> 3) I'm not sure I fully understand the importance of the upgrade
> >>>> path. I
> >>>>>>>> have read the link to (
> >>>> https://issues.apache.org/jira/browse/KAFKA-3522
> >>>>>> )
> >>>>>>>> in
> >>>>>>>> the KIP, and I can understand that a state-store on disk may not
> >>>>>> represent
> >>>>>>>> what the application is expecting. I don't think I have the full
> >>>> picture
> >>>>>>>> though, because that issue seems to be easy to fix with a simple
> >>>>>> versioned
> >>>>>>>> header or accompanying file, forcing the app to rebuild the state
> if
> >>>> the
> >>>>>>>> version is incompatible. Can you elaborate or add a scenario to
> the
> >>>> KIP
> >>>>>>>> that illustrates the need for the upgrade path?
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>> Adam
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax <
> >>>> matth...@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Adam,
> >>>>>>>>>
> >>>>>>>>> I am still working on it. Was pulled into a lot of other tasks
> >>> lately
> >>>>>> so
> >>>>>>>>> this was delayed. Also had some discussions about simplifying the
> >>>>>>>>> upgrade path with some colleagues and I am prototyping this atm.
> >>> Hope
> >>>>>> to
> >>>>>>>>> update the KIP accordingly soon.
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>> On 11/10/18 7:41 AM, Adam Bellemare wrote:
> >>>>>>>>>> Hello Matthias
> >>>>>>>>>>
> >>>>>>>>>> I am curious as to the status of this KIP. TTL and expiry of
> >>> records
> >>>>>>>> will
> >>>>>>>>>> be extremely useful for several of our business use-cases, as
> well
> >>>> as
> >>>>>>>>>> another KIP I had been working on.
> >>>>>>>>>>
> >>>>>>>>>> Thanks
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <
> >>>> eno.there...@gmail.com
> >>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>
> >>>>>>>>>>> Good stuff. Could you comment a bit on how future-proof is this
> >>>>>>>> change?
> >>>>>>>>> For
> >>>>>>>>>>> example, if we want to store both event timestamp "and"
> >>> processing
> >>>>>>>> time
> >>>>>>>>> in
> >>>>>>>>>>> RocksDB will we then need another interface (e.g. called
> >>>>>>>>>>> KeyValueWithTwoTimestampsStore)?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks
> >>>>>>>>>>> Eno
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <
> >>>>>>>> matth...@confluent.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks for your input Guozhang and John.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I see your point, that the upgrade API is not simple. If you
> >>> don't
> >>>>>>>>>>>> thinks it's valuable to make generic store upgrades possible
> >>>> (atm),
> >>>>>>>> we
> >>>>>>>>>>>> can make the API internal, too. The impact is, that we only
> >>>> support
> >>>>>> a
> >>>>>>>>>>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to
> >>>>>>>>>>>> WindowedWithTS etc) for which we implement the internal
> >>>> interfaces.
> >>>>>>>>>>>>
> >>>>>>>>>>>> We can keep the design generic, so if we decide to make it
> >>> public,
> >>>>>> we
> >>>>>>>>>>>> don't need to re-invent it. This will also have the advantage,
> >>>> that
> >>>>>>>> we
> >>>>>>>>>>>> can add upgrade pattern for other stores later, too.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly,
> >>> but
> >>>>>> it
> >>>>>>>>>>>> was the only way I could find to design a generic upgrade
> >>>> interface.
> >>>>>>>> If
> >>>>>>>>>>>> we decide the hide all the upgrade stuff,
> `StoreUpgradeBuilder`
> >>>>>> would
> >>>>>>>>>>>> become an internal interface I guess (don't think we can
> remove
> >>>> it).
> >>>>>>>>>>>>
> >>>>>>>>>>>> I will wait for more feedback about this and if nobody wants
> to
> >>>> keep
> >>>>>>>> it
> >>>>>>>>>>>> as public API I will update the KIP accordingly. Will add some
> >>>> more
> >>>>>>>>>>>> clarifications for different upgrade patterns in the mean time
> >>> and
> >>>>>>>> fix
> >>>>>>>>>>>> the typos/minor issues.
> >>>>>>>>>>>>
> >>>>>>>>>>>> About adding a new state UPGRADING: maybe we could do that.
> >>>> However,
> >>>>>>>> I
> >>>>>>>>>>>> find it particularly difficult to make the estimation when we
> >>>> should
> >>>>>>>>>>>> switch to RUNNING, thus, I am a little hesitant. Using store
> >>>>>>>> callbacks
> >>>>>>>>>>>> or just logging the progress including some indication about
> the
> >>>>>>>> "lag"
> >>>>>>>>>>>> might actually be sufficient. Not sure what others think?
> >>>>>>>>>>>>
> >>>>>>>>>>>> About "value before timestamp": no real reason and I think it
> >>> does
> >>>>>>>> not
> >>>>>>>>>>>> make any difference. Do you want to change it?
> >>>>>>>>>>>>
> >>>>>>>>>>>> About upgrade robustness: yes, we cannot control if an
> instance
> >>>>>>>> fails.
> >>>>>>>>>>>> That is what I meant by "we need to write test". The upgrade
> >>>> should
> >>>>>>>> be
> >>>>>>>>>>>> able to continuous even is an instance goes down (and we must
> >>> make
> >>>>>>>> sure
> >>>>>>>>>>>> that we don't end up in an invalid state that forces us to
> wipe
> >>>> out
> >>>>>>>> the
> >>>>>>>>>>>> whole store). Thus, we need to write system tests that fail
> >>>>>> instances
> >>>>>>>>>>>> during upgrade.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For `in_place_offline` upgrade: I don't think we need this
> mode,
> >>>>>>>>> because
> >>>>>>>>>>>> people can do this via a single rolling bounce.
> >>>>>>>>>>>>
> >>>>>>>>>>>>  - prepare code and switch KV-Store to KVwithTs-Store
> >>>>>>>>>>>>  - do a single rolling bounce (don't set any upgrade config)
> >>>>>>>>>>>>
> >>>>>>>>>>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store`
> if
> >>>> we
> >>>>>>>>>>>> remove the `StoreUpgradeBuilder`) will detect that there is
> only
> >>>> an
> >>>>>>>> old
> >>>>>>>>>>>> local KV store w/o TS, will start to restore the new KVwithTs
> >>>> store,
> >>>>>>>>>>>> wipe out the old store and replace with the new store after
> >>>> restore
> >>>>>>>> is
> >>>>>>>>>>>> finished, and start processing only afterwards. (I guess we
> need
> >>>> to
> >>>>>>>>>>>> document this case -- will also add it to the KIP.)
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 8/9/18 1:10 PM, John Roesler wrote:
> >>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think this KIP is looking really good.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I have a few thoughts to add to the others:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. You mentioned at one point users needing to configure
> >>>>>>>>>>>>> `upgrade.mode="null"`. I think this was a typo and you meant
> to
> >>>> say
> >>>>>>>>>>> they
> >>>>>>>>>>>>> should remove the config. If they really have to set it to a
> >>>> string
> >>>>>>>>>>>> "null"
> >>>>>>>>>>>>> or even set it to a null value but not remove it, it would be
> >>>>>>>>>>>> unfortunate.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. In response to Bill's comment #1 , you said that "The idea
> >>> is
> >>>>>>>> that
> >>>>>>>>>>> the
> >>>>>>>>>>>>> upgrade should be robust and not fail. We need to write
> >>> according
> >>>>>>>>>>> tests".
> >>>>>>>>>>>>> I may have misunderstood the conversation, but I don't think
> >>> it's
> >>>>>>>>>>> within
> >>>>>>>>>>>>> our power to say that an instance won't fail. What if one of
> my
> >>>>>>>>>>> computers
> >>>>>>>>>>>>> catches on fire? What if I'm deployed in the cloud and one
> >>>> instance
> >>>>>>>>>>>>> disappears and is replaced by a new one? Or what if one
> >>> instance
> >>>>>>>> goes
> >>>>>>>>>>>> AWOL
> >>>>>>>>>>>>> for a long time and then suddenly returns? How will the
> upgrade
> >>>>>>>>> process
> >>>>>>>>>>>>> behave in light of such failures?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3. your thought about making in-place an offline mode is
> >>>>>>>> interesting,
> >>>>>>>>>>> but
> >>>>>>>>>>>>> it might be a bummer for on-prem users who wish to upgrade
> >>>> online,
> >>>>>>>> but
> >>>>>>>>>>>>> cannot just add new machines to the pool. It could be a new
> >>>> upgrade
> >>>>>>>>>>> mode
> >>>>>>>>>>>>> "offline-in-place", though...
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4. I was surprised to see that a user would need to modify
> the
> >>>>>>>>> topology
> >>>>>>>>>>>> to
> >>>>>>>>>>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of
> >>>> Guozhang's
> >>>>>>>>>>>>> suggestions would remove this necessity.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for taking on this very complex but necessary work.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -John
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <
> >>>> wangg...@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hello Matthias,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the updated KIP. Some more comments:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. The current set of proposed API is a bit too complicated,
> >>>> which
> >>>>>>>>>>> makes
> >>>>>>>>>>>>>> the upgrade flow from user's perspective also a bit complex.
> >>> I'd
> >>>>>>>> like
> >>>>>>>>>>> to
> >>>>>>>>>>>>>> check different APIs and discuss about their needs
> separately:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>     1.a. StoreProxy: needed for in-place upgrade only,
> between
> >>>> the
> >>>>>>>>>>> first
> >>>>>>>>>>>>>> and second rolling bounce, where the old-versioned stores
> can
> >>>>>>>> handle
> >>>>>>>>>>>>>> new-versioned store APIs. I think such upgrade paths (i.e.
> >>> from
> >>>>>> one
> >>>>>>>>>>>> store
> >>>>>>>>>>>>>> type to another) would not be very common: users may want to
> >>>>>>>> upgrade
> >>>>>>>>>>>> from a
> >>>>>>>>>>>>>> certain store engine to another, but the interface would
> >>> likely
> >>>> be
> >>>>>>>>>>>> staying
> >>>>>>>>>>>>>> the same. Hence personally I'd suggest we keep it internally
> >>> and
> >>>>>>>> only
> >>>>>>>>>>>>>> consider exposing it in the future if it does become a
> common
> >>>>>>>>> pattern.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>     1.b. ConverterStore / RecordConverter: needed for both
> >>>>>> in-place
> >>>>>>>>>>> and
> >>>>>>>>>>>>>> roll-over upgrade, between the first and second rolling
> >>> bounces,
> >>>>>>>> for
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> new versioned store to be able to read old-versioned
> changelog
> >>>>>>>>> topics.
> >>>>>>>>>>>>>> Firstly I think we should not expose key in the public APIs
> >>> but
> >>>>>>>> only
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> values, since allowing key format changes would break log
> >>>>>>>> compaction,
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>> hence would not be compatible anyways. As for value format
> >>>>>> changes,
> >>>>>>>>>>>>>> personally I think we can also keep its upgrade logic
> >>> internally
> >>>>>> as
> >>>>>>>>> it
> >>>>>>>>>>>> may
> >>>>>>>>>>>>>> not worth generalizing to user customizable logic.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>     1.c. If you agrees with 2.a/b above, then we can also
> >>>> remove "
> >>>>>>>>>>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the
> >>>>>> public
> >>>>>>>>>>>> APIs.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>     1.d. Personally I think
> >>> "ReadOnlyKeyValueWithTimestampStore"
> >>>>>> is
> >>>>>>>>>>> not
> >>>>>>>>>>>>>> needed either given that we are exposing "ValueAndTimestamp"
> >>>>>>>> anyways.
> >>>>>>>>>>>> I.e.
> >>>>>>>>>>>>>> it is just a syntax sugar and for IQ, users can always just
> >>> set
> >>>> a
> >>>>>> "
> >>>>>>>>>>>>>> QueryableStoreType<ReadOnlyKeyValue<K,
> ValueAndTimestamp<V>>>"
> >>>> as
> >>>>>>>> the
> >>>>>>>>>>>> new
> >>>>>>>>>>>>>> interface does not provide any additional functions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2. Could we further categorize the upgrade flow for
> different
> >>>> use
> >>>>>>>>>>> cases,
> >>>>>>>>>>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be
> >>> used
> >>>>>>>>>>>>>> automatically for non-windowed aggregate; 2) PAPI users who
> do
> >>>> not
> >>>>>>>>>>> need
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to
> >>>>>> switch
> >>>>>>>>> to
> >>>>>>>>>>>>>> KeyValueWithTimestampStore. Just to give my understanding
> for
> >>>> 3),
> >>>>>>>> the
> >>>>>>>>>>>>>> upgrade flow for users may be simplified as the following
> (for
> >>>>>> both
> >>>>>>>>>>>>>> in-place and roll-over):
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>     * Update the jar to new version, make code changes from
> >>>>>>>>>>>> KeyValueStore
> >>>>>>>>>>>>>> to KeyValueWithTimestampStore, set upgrade config.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>     * First rolling bounce, and library code can internally
> >>> use
> >>>>>>>> proxy
> >>>>>>>>>>> /
> >>>>>>>>>>>>>> converter based on the specified config to handle new APIs
> >>> with
> >>>>>> old
> >>>>>>>>>>>> stores,
> >>>>>>>>>>>>>> while let new stores read from old changelog data.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>     * Reset upgrade config.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>     * Second rolling bounce, and the library code
> >>> automatically
> >>>>>>>> turn
> >>>>>>>>>>> off
> >>>>>>>>>>>>>> logic for proxy / converter.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3. Some more detailed proposals are needed for when to
> >>> recommend
> >>>>>>>>> users
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> trigger the second rolling bounce. I have one idea to share
> >>>> here:
> >>>>>>>> we
> >>>>>>>>>>>> add a
> >>>>>>>>>>>>>> new state to KafkaStreams, say UPGRADING, which is set when
> 1)
> >>>>>>>>> upgrade
> >>>>>>>>>>>>>> config is set, and 2) the new stores are still ramping up
> (for
> >>>> the
> >>>>>>>>>>>> second
> >>>>>>>>>>>>>> part, we can start with some internal hard-coded heuristics
> to
> >>>>>>>> decide
> >>>>>>>>>>>> when
> >>>>>>>>>>>>>> it is close to be ramped up). If either one of it is not
> true
> >>>> any
> >>>>>>>>>>> more,
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>> should transit to RUNNING. Users can then watch on this
> state,
> >>>> and
> >>>>>>>>>>>> decide
> >>>>>>>>>>>>>> to only trigger the second rebalance when the state has
> >>>> transited
> >>>>>>>>> from
> >>>>>>>>>>>>>> UPGRADING. They can also choose to cut over while the
> instance
> >>>> is
> >>>>>>>>>>> still
> >>>>>>>>>>>>>> UPGRADING, the downside is that after that the application
> may
> >>>>>> have
> >>>>>>>>>>> long
> >>>>>>>>>>>>>> restoration phase which is, to user's pov, unavailability
> >>>> periods.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Below are just some minor things on the wiki:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 4. "proxy story" => "proxy store".
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 5. "use the a builder " => "use a builder"
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to
> >>> the
> >>>>>>>>>>> value":
> >>>>>>>>>>>>>> what's the rationale of putting the timestamp before the
> >>> value,
> >>>>>>>> than
> >>>>>>>>>>>> after
> >>>>>>>>>>>>>> the value?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax <
> >>>>>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the feedback Bill. I just update the KIP with
> some
> >>>> of
> >>>>>>>>> your
> >>>>>>>>>>>>>>> points.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing
> to
> >>>>>>>> watch
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a
> >>> type
> >>>> of
> >>>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new
> stores
> >>>> have
> >>>>>>>>>>>>>> reached
> >>>>>>>>>>>>>>>>> parity with the existing old stores and that could be the
> >>>>>> signal
> >>>>>>>>> to
> >>>>>>>>>>>>>>> start
> >>>>>>>>>>>>>>>>> second rolling rebalance?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I think we can reuse the existing listeners, thus, I did
> not
> >>>>>>>> include
> >>>>>>>>>>>>>>> anything in the KIP. About a signal to rebalance: this
> might
> >>> be
> >>>>>>>>>>> tricky.
> >>>>>>>>>>>>>>> If we prepare the store "online", the active task will
> update
> >>>> the
> >>>>>>>>>>> state
> >>>>>>>>>>>>>>> continuously, and thus, state prepare is never finished. It
> >>>> will
> >>>>>>>> be
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> users responsibility to do the second rebalance (note, that
> >>> the
> >>>>>>>>>>> second
> >>>>>>>>>>>>>>> rebalance will first finish the last delta of the upgrade
> to
> >>>>>>>> finish
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> upgrade before actual processing resumes). I clarified the
> >>> KIP
> >>>>>>>> with
> >>>>>>>>>>>> this
> >>>>>>>>>>>>>>> regard a little bit.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the
> >>> process,
> >>>>>>>> would
> >>>>>>>>>>> we
> >>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>> the other instances to complete or just fail the entire
> >>>>>> upgrade?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The idea is that the upgrade should be robust and not fail.
> >>> We
> >>>>>>>> need
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> write according tests.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could
> rename
> >>>> the
> >>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>> active directories vs. deleting them right away,  and
> when
> >>>> all
> >>>>>>>> the
> >>>>>>>>>>>>>>> prepare
> >>>>>>>>>>>>>>>>> task directories are successfully migrated then delete
> the
> >>>>>>>>> previous
> >>>>>>>>>>>>>>> active
> >>>>>>>>>>>>>>>>> ones.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Ack. Updated the KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing
> any
> >>>> new
> >>>>>>>>>>>>>> records
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all
> >>>> prepare
> >>>>>>>>>>> tasks
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling
> >>>> bounces
> >>>>>>>>> and
> >>>>>>>>>>>>>>> then as
> >>>>>>>>>>>>>>>>> each task successfully renames its prepare directories
> and
> >>>>>>>> deletes
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>> active task directories, normal processing of records
> >>>> resumes.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The basic idea is to do an online upgrade to avoid
> downtime.
> >>> We
> >>>>>>>> can
> >>>>>>>>>>>>>>> discuss to offer both options... For the offline upgrade
> >>>> option,
> >>>>>>>> we
> >>>>>>>>>>>>>>> could simplify user interaction and trigger the second
> >>>> rebalance
> >>>>>>>>>>>>>>> automatically with the requirement that a user needs to
> >>> update
> >>>>>> any
> >>>>>>>>>>>>>> config.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> If might actually be worth to include this option: we know
> >>> from
> >>>>>>>>>>>>>>> experience with state restore, that regular processing
> slows
> >>>> down
> >>>>>>>>> the
> >>>>>>>>>>>>>>> restore. For roll_over upgrade, it would be a different
> story
> >>>> and
> >>>>>>>>>>>>>>> upgrade should not be slowed down by regular processing.
> >>> Thus,
> >>>> we
> >>>>>>>>>>>> should
> >>>>>>>>>>>>>>> even make in_place an offline upgrade and force people to
> use
> >>>>>>>>>>> roll_over
> >>>>>>>>>>>>>>> if they need onlint upgrade. Might be a fair tradeoff that
> >>> may
> >>>>>>>>>>> simplify
> >>>>>>>>>>>>>>> the upgrade for the user and for the code complexity.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Let's see what other think.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote:
> >>>>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the update and the working prototype, it helps
> >>> with
> >>>>>>>>>>>>>>>> understanding the KIP.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I took an initial pass over this PR, and overall I find
> the
> >>>>>>>>>>> interfaces
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> approach to be reasonable.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing
> to
> >>>>>> watch
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a
> type
> >>>> of
> >>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new stores
> >>>> have
> >>>>>>>>>>>> reached
> >>>>>>>>>>>>>>>> parity with the existing old stores and that could be the
> >>>> signal
> >>>>>>>> to
> >>>>>>>>>>>>>> start
> >>>>>>>>>>>>>>>> second rolling rebalance?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Although you solicited feedback on the interfaces
> involved,
> >>> I
> >>>>>>>>> wanted
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> put
> >>>>>>>>>>>>>>>> down some thoughts that have come to mind reviewing this
> KIP
> >>>>>>>> again
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the
> process,
> >>>>>>>> would
> >>>>>>>>>>> we
> >>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>> the other instances to complete or just fail the entire
> >>>> upgrade?
> >>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename
> >>> the
> >>>>>>>>>>> current
> >>>>>>>>>>>>>>>> active directories vs. deleting them right away,  and when
> >>> all
> >>>>>>>> the
> >>>>>>>>>>>>>>> prepare
> >>>>>>>>>>>>>>>> task directories are successfully migrated then delete the
> >>>>>>>> previous
> >>>>>>>>>>>>>>> active
> >>>>>>>>>>>>>>>> ones.
> >>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing
> any
> >>>> new
> >>>>>>>>>>>> records
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all
> >>> prepare
> >>>>>>>>> tasks
> >>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling
> >>>> bounces
> >>>>>>>> and
> >>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>> each task successfully renames its prepare directories and
> >>>>>>>> deletes
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>> active task directories, normal processing of records
> >>> resumes.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax <
> >>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in
> >>> AK
> >>>>>> 2.0
> >>>>>>>>>>>>>>>>> release. Thus, I want to pick up this KIP again to get
> the
> >>>>>>>> RocksDB
> >>>>>>>>>>>>>>>>> upgrade done for 2.1.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I updated the KIP accordingly and also have a "prove of
> >>>>>> concept"
> >>>>>>>>> PR
> >>>>>>>>>>>>>>>>> ready (for "in place" upgrade only):
> >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5422/
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> There a still open questions, but I want to collect early
> >>>>>>>> feedback
> >>>>>>>>>>> on
> >>>>>>>>>>>>>>>>> the proposed interfaces we need for the store upgrade.
> Also
> >>>>>>>> note,
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> the KIP now also aim to define a generic upgrade path
> from
> >>>> any
> >>>>>>>>>>> store
> >>>>>>>>>>>>>>>>> format A to any other store format B. Adding timestamps
> is
> >>>> just
> >>>>>>>> a
> >>>>>>>>>>>>>>>>> special case.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I will continue to work on the PR and refine the KIP in
> the
> >>>>>>>>>>> meantime,
> >>>>>>>>>>>>>>> too.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote:
> >>>>>>>>>>>>>>>>>> After some more thoughts, I want to follow John's
> >>> suggestion
> >>>>>>>> and
> >>>>>>>>>>>>>> split
> >>>>>>>>>>>>>>>>>> upgrading the rebalance metadata from the store upgrade.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I extracted the metadata upgrade into it's own KIP:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%
> >>>>>>>>>>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I'll update this KIP accordingly shortly. I also want to
> >>>>>>>> consider
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> make the store format upgrade more flexible/generic.
> Atm,
> >>>> the
> >>>>>>>> KIP
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>> too
> >>>>>>>>>>>>>>>>>> much tailored to the DSL IMHO and does not encounter
> PAPI
> >>>>>> users
> >>>>>>>>>>> that
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> should not force to upgrade the stores. I need to figure
> >>> out
> >>>>>>>> the
> >>>>>>>>>>>>>>> details
> >>>>>>>>>>>>>>>>>> and follow up later.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Please give feedback for the new KIP-268 on the
> >>>> corresponding
> >>>>>>>>>>>>>>> discussion
> >>>>>>>>>>>>>>>>>> thread.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't
> >>>> figure
> >>>>>>>> out
> >>>>>>>>>>> a
> >>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>> for a single rolling bounce upgrade. But KIP-268
> proposes
> >>> a
> >>>>>> fix
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> future upgrades. Please share your thoughts.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks for all your feedback!
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
> >>>>>>>>>>>>>>>>>>> @John: yes, we would throw if configs are missing (it's
> >>> an
> >>>>>>>>>>>>>>>>>>> implementation details IMHO and thus I did not include
> it
> >>>> in
> >>>>>>>> the
> >>>>>>>>>>>>>> KIP)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> @Guozhang:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1) I understand know what you mean. We can certainly,
> >>> allow
> >>>>>>>> all
> >>>>>>>>>>>>>> values
> >>>>>>>>>>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for
> >>>>>>>>>>> `upgrade.from`
> >>>>>>>>>>>>>>>>>>> parameter. I had a similar though once but decided to
> >>>>>> collapse
> >>>>>>>>>>> them
> >>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>> one -- will update the KIP accordingly.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2) The idea to avoid any config would be, to always
> send
> >>>> both
> >>>>>>>>>>>>>> request.
> >>>>>>>>>>>>>>>>>>> If we add a config to eventually disable the old
> request,
> >>>> we
> >>>>>>>>>>> don't
> >>>>>>>>>>>>>>> gain
> >>>>>>>>>>>>>>>>>>> anything with this approach. The question is really, if
> >>> we
> >>>>>> are
> >>>>>>>>>>>>>> willing
> >>>>>>>>>>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be
> >>>> limited
> >>>>>>>> to
> >>>>>>>>>>> 2
> >>>>>>>>>>>>>>>>>>> versions and not grow further in future releases. More
> >>>>>> details
> >>>>>>>>> in
> >>>>>>>>>>>>>> (3)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases
> and
> >>>>>>>> allows
> >>>>>>>>>>> us
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> stay with 2 "assignment strategies" we need to
> register,
> >>> as
> >>>>>>>> the
> >>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>> assignment strategy will allow to "upgrade itself" via
> >>>>>>>> "version
> >>>>>>>>>>>>>>>>>>> probing". Thus, (2) would only be a workaround to
> avoid a
> >>>>>>>> config
> >>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>> people upgrade from pre-1.2 releases.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thus, I don't think we need to register new "assignment
> >>>>>>>>>>> strategies"
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> send empty subscriptions for older version.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 4) I agree that this is a tricky thing to get right
> with
> >>> a
> >>>>>>>>> single
> >>>>>>>>>>>>>>>>>>> rebalance. I share the concern that an application
> might
> >>>>>> never
> >>>>>>>>>>>> catch
> >>>>>>>>>>>>>>> up
> >>>>>>>>>>>>>>>>>>> and thus the hot standby will never be ready.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Maybe it's better to go with 2 rebalances for store
> >>>> upgrades.
> >>>>>>>> If
> >>>>>>>>>>> we
> >>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>>> this, we also don't need to go with (2) and can get (3)
> >>> in
> >>>>>>>> place
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> future upgrades. I also think that changes to the
> >>> metadata
> >>>>>> are
> >>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>> likely and thus allowing for single rolling bounce for
> >>> this
> >>>>>>>> case
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>> important anyway. If we assume that store upgrade a
> rare,
> >>>> it
> >>>>>>>>>>> might
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>> ok
> >>>>>>>>>>>>>>>>>>> to sacrifice two rolling bounced for this case. It was
> >>> just
> >>>>>> an
> >>>>>>>>>>> idea
> >>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>> wanted to share (even if I see the issues).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
> >>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for your replies.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 1) About the config names: actually I was trying to
> not
> >>>>>>>> expose
> >>>>>>>>>>>>>>>>>>>> implementation details :) My main concern was that in
> >>> your
> >>>>>>>>>>>> proposal
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> values need to cover the span of all the versions that
> >>> are
> >>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a
> >>>> user)
> >>>>>>>> am
> >>>>>>>>>>>>>>>>> upgrading
> >>>>>>>>>>>>>>>>>>>> from any versions within this range I need to remember
> >>> to
> >>>>>> use
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> value
> >>>>>>>>>>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version.
> In
> >>>> my
> >>>>>>>>>>>>>>> suggestion
> >>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>> was trying to argue the benefit of just letting users
> to
> >>>>>>>>> specify
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> actual
> >>>>>>>>>>>>>>>>>>>> Kafka version she's trying to upgrade from, than
> >>>> specifying
> >>>>>> a
> >>>>>>>>>>>> range
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc
> >>> as
> >>>>>> the
> >>>>>>>>>>>>>> values,
> >>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>> still using Kafka versions like broker's
> >>>> `internal.version`
> >>>>>>>>>>>> config.
> >>>>>>>>>>>>>>>>> But if
> >>>>>>>>>>>>>>>>>>>> you were suggesting the same thing, i.e. by
> >>>> "0.10.1.x-1.1.x"
> >>>>>>>>> you
> >>>>>>>>>>>>>>> meant
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or
> >>>> "0.11.0"
> >>>>>>>> or
> >>>>>>>>>>>>>> "1.1"
> >>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>> are all recognizable config values then I think we are
> >>>>>>>> actually
> >>>>>>>>>>> on
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>> page.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would
> >>>> increase
> >>>>>>>> the
> >>>>>>>>>>>>>>> network
> >>>>>>>>>>>>>>>>>>>> footprint, but not the message size, IF I'm not
> >>>>>>>>>>> mis-understanding
> >>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>> idea
> >>>>>>>>>>>>>>>>>>>> of registering multiple assignment. More details:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can
> >>>>>> encode
> >>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>> protocols each with their different metadata. The
> >>>>>> coordinator
> >>>>>>>>>>> will
> >>>>>>>>>>>>>>>>> pick the
> >>>>>>>>>>>>>>>>>>>> common one that everyone supports (if there are no
> >>> common
> >>>>>>>> one,
> >>>>>>>>>>> it
> >>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>>>>>> an error back; if there are multiple ones, it will
> pick
> >>>> the
> >>>>>>>> one
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>> most
> >>>>>>>>>>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded
> >>>> list).
> >>>>>>>>>>> Since
> >>>>>>>>>>>>>> our
> >>>>>>>>>>>>>>>>>>>> current Streams rebalance protocol is still based on
> the
> >>>>>>>>>>> consumer
> >>>>>>>>>>>>>>>>>>>> coordinator, it means our protocol_type would be
> >>>> "consumer",
> >>>>>>>>> but
> >>>>>>>>>>>>>>>>> instead
> >>>>>>>>>>>>>>>>>>>> the protocol type we can have multiple protocols like
> >>>>>>>>> "streams",
> >>>>>>>>>>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that
> we
> >>>> need
> >>>>>>>> to
> >>>>>>>>>>>>>>>>> implement a
> >>>>>>>>>>>>>>>>>>>> different assignor class for each version and register
> >>> all
> >>>>>> of
> >>>>>>>>>>> them
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In
> the
> >>>>>>>> future
> >>>>>>>>>>> if
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> re-factor our implementation to have our own client
> >>>>>>>> coordinator
> >>>>>>>>>>>>>> layer
> >>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>> Connect did, we can simplify this part of the
> >>>>>> implementation.
> >>>>>>>>>>> But
> >>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> now with the above approach this is still doable.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On the broker side, the group coordinator will only
> >>>> persist
> >>>>>> a
> >>>>>>>>>>>> group
> >>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> the selected protocol and its subscription metadata,
> >>> e.g.
> >>>> if
> >>>>>>>>>>>>>>>>> coordinator
> >>>>>>>>>>>>>>>>>>>> decides to pick "streams_v2" it will only sends that
> >>>>>>>> protocol's
> >>>>>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>>>>> from everyone to the leader to assign, AND when
> >>> completing
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> rebalance it
> >>>>>>>>>>>>>>>>>>>> will also only write the group metadata with that
> >>> protocol
> >>>>>>>> and
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> assignment only. In a word, although the network
> traffic
> >>>>>>>> maybe
> >>>>>>>>>>>>>>>>> increased a
> >>>>>>>>>>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One
> >>> corner
> >>>>>>>>>>>>>> situation
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> need to consider is how to stop registering very old
> >>>>>>>> assignors
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> avoid the
> >>>>>>>>>>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if
> >>> you
> >>>>>> are
> >>>>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register
> v1
> >>>>>>>>>>> assignor
> >>>>>>>>>>>>>>>>> anymore,
> >>>>>>>>>>>>>>>>>>>> but that would unfortunately still require some
> configs.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3) About the  "version probing" idea, I think that's a
> >>>>>>>>> promising
> >>>>>>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>>>>>> as well, but if we are going to do the
> multi-assignment
> >>>> its
> >>>>>>>>>>> value
> >>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on
> top
> >>> of
> >>>>>>>>>>>>>>>>> multi-assignment
> >>>>>>>>>>>>>>>>>>>> to save us from still requiring the config to avoid
> >>>>>>>> registering
> >>>>>>>>>>>> all
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> metadata for all version. More details:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, we still register all the
> >>>> assignor
> >>>>>>>> but
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> all old
> >>>>>>>>>>>>>>>>>>>> assignors we do not encode any metadata, i.e. the
> >>> encoded
> >>>>>>>> data
> >>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> be:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> "streams_vN" : "encoded metadata"
> >>>>>>>>>>>>>>>>>>>> "streams_vN-1":empty
> >>>>>>>>>>>>>>>>>>>> "streams_vN-2":empty
> >>>>>>>>>>>>>>>>>>>> ..
> >>>>>>>>>>>>>>>>>>>> "streams_0":empty
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> So the coordinator can still safely choose the latest
> >>>> common
> >>>>>>>>>>>>>> version;
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> then when leaders receive the subscription (note it
> >>> should
> >>>>>>>>>>> always
> >>>>>>>>>>>>>>>>> recognize
> >>>>>>>>>>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of
> >>> the
> >>>>>>>>>>>>>>>>> subscriptions
> >>>>>>>>>>>>>>>>>>>> are empty bytes, it will send the empty assignment
> with
> >>>> that
> >>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>> number
> >>>>>>>>>>>>>>>>>>>> encoded in the metadata. So in the second
> auto-triggered
> >>>> all
> >>>>>>>>>>>>>> members
> >>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>> send the metadata with that version:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> "streams_vN" : empty
> >>>>>>>>>>>>>>>>>>>> "streams_vN-1" : empty
> >>>>>>>>>>>>>>>>>>>> "streams_vN-2" : "encoded metadata"
> >>>>>>>>>>>>>>>>>>>> ..
> >>>>>>>>>>>>>>>>>>>> "streams_0":empty
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> By doing this we would not require any configs for
> >>> users.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not
> >>> clear
> >>>>>>>> about
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> details
> >>>>>>>>>>>>>>>>>>>> so probably we'd need to fill that out before making a
> >>>> call.
> >>>>>>>>> For
> >>>>>>>>>>>>>>>>> example,
> >>>>>>>>>>>>>>>>>>>> you mentioned "If we detect this situation, the
> Streams
> >>>>>>>>>>>> application
> >>>>>>>>>>>>>>>>> closes
> >>>>>>>>>>>>>>>>>>>> corresponding active tasks as well as "hot standby"
> >>> tasks,
> >>>>>>>> and
> >>>>>>>>>>>>>>>>> re-creates
> >>>>>>>>>>>>>>>>>>>> the new active tasks using the new store." How could
> we
> >>>>>>>>>>> guarantee
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> gap between these two stores will keep decreasing than
> >>>>>>>>>>> increasing
> >>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>> we'll
> >>>>>>>>>>>>>>>>>>>> eventually achieve the flip point? And also the longer
> >>> we
> >>>>>> are
> >>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> flip point, the larger we are doubling the storage
> >>> space,
> >>>>>>>> etc.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <
> >>>>>>>>>>>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> @John, Guozhang,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> thanks a lot for your comments. Very long reply...
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> About upgrading the rebalance metadata:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Another possibility to do this, would be to register
> >>>>>>>> multiple
> >>>>>>>>>>>>>>>>> assignment
> >>>>>>>>>>>>>>>>>>>>> strategies for the 1.2 applications. For this case,
> new
> >>>>>>>>>>> instances
> >>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>> be configured to support both and the broker would
> pick
> >>>> the
> >>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>> all instances understand. The disadvantage would be,
> >>> that
> >>>>>> we
> >>>>>>>>>>> send
> >>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance
> as
> >>>> long
> >>>>>>>> as
> >>>>>>>>>>> no
> >>>>>>>>>>>>>>>>> second
> >>>>>>>>>>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus,
> >>> using
> >>>>>>>> this
> >>>>>>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>>>>>>> would allow to avoid a second rebalance trading-off
> an
> >>>>>>>>>>> increased
> >>>>>>>>>>>>>>>>>>>>> rebalance network footprint (I also assume that this
> >>>> would
> >>>>>>>>>>>>>> increase
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> message size that is written into __consumer_offsets
> >>>>>>>> topic?).
> >>>>>>>>>>>>>>>>> Overall, I
> >>>>>>>>>>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it
> >>>> could
> >>>>>>>>>>> avoid
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> second rebalance (I have some more thoughts about
> >>> stores
> >>>>>>>> below
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>> relevant for single rebalance upgrade).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> For future upgrades we might be able to fix this
> >>> though.
> >>>> I
> >>>>>>>> was
> >>>>>>>>>>>>>>>>> thinking
> >>>>>>>>>>>>>>>>>>>>> about the following:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> In the current implementation, the leader fails if it
> >>>> gets
> >>>>>> a
> >>>>>>>>>>>>>>>>>>>>> subscription it does not understand (ie, newer
> >>> version).
> >>>> We
> >>>>>>>>>>> could
> >>>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>>> this behavior and let the leader send an empty
> >>> assignment
> >>>>>>>> plus
> >>>>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>>> code (including supported version) back to the
> instance
> >>>>>>>>> sending
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> "bad" subscription. This would allow the following
> >>> logic
> >>>>>> for
> >>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>> application instance:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>  - on startup, always send the latest subscription
> >>> format
> >>>>>>>>>>>>>>>>>>>>>  - if leader understands it, we get an assignment
> back
> >>> an
> >>>>>>>>> start
> >>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>>>>>  - if leader does not understand it, we get an empty
> >>>>>>>>> assignment
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>> supported version back
> >>>>>>>>>>>>>>>>>>>>>  - the application unsubscribe()/subscribe()/poll()
> >>> again
> >>>>>>>> and
> >>>>>>>>>>>>>>> sends a
> >>>>>>>>>>>>>>>>>>>>> subscription using the leader's supported version
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> This protocol would allow to do a single rolling
> >>> bounce,
> >>>>>> and
> >>>>>>>>>>>>>>>>> implements
> >>>>>>>>>>>>>>>>>>>>> a "version probing" step, that might result in two
> >>>> executed
> >>>>>>>>>>>>>>>>> rebalances.
> >>>>>>>>>>>>>>>>>>>>> The advantage would be, that the user does not need
> to
> >>>> set
> >>>>>>>> any
> >>>>>>>>>>>>>>> configs
> >>>>>>>>>>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care
> >>> of
> >>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>> automatically.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> One disadvantage would be, that two rebalances happen
> >>> and
> >>>>>>>> that
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>> error case during rebalance, we loose the information
> >>>> about
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> supported leader version and the "probing step" would
> >>>>>>>> happen a
> >>>>>>>>>>>>>>> second
> >>>>>>>>>>>>>>>>> time.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> If the leader is eventually updated, it will include
> >>> it's
> >>>>>>>> own
> >>>>>>>>>>>>>>>>> supported
> >>>>>>>>>>>>>>>>>>>>> version in all assignments, to allow a "down graded"
> >>>>>>>>>>> application
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> upgrade its version later. Also, if a application
> >>> fails,
> >>>>>> the
> >>>>>>>>>>>> first
> >>>>>>>>>>>>>>>>>>>>> probing would always be successful and only a single
> >>>>>>>> rebalance
> >>>>>>>>>>>>>>>>> happens.
> >>>>>>>>>>>>>>>>>>>>> If we use this protocol, I think we don't need any
> >>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>>>>> parameter for future upgrades.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version":
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Users would set "upgrade.from" to the release version
> >>> the
> >>>>>>>>>>>>>>> current/old
> >>>>>>>>>>>>>>>>>>>>> application is using. I think this is simpler, as
> users
> >>>>>> know
> >>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>> version. If we use "internal.protocol.version"
> instead,
> >>>> we
> >>>>>>>>>>> expose
> >>>>>>>>>>>>>>>>>>>>> implementation details and users need to know the
> >>>> protocol
> >>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>> (ie,
> >>>>>>>>>>>>>>>>>>>>> they need to map from the release version to the
> >>> protocol
> >>>>>>>>>>>> version;
> >>>>>>>>>>>>>>> ie,
> >>>>>>>>>>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol
> >>> version
> >>>>>>>> 2").
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Also the KIP states that for the second rolling
> bounce,
> >>>> the
> >>>>>>>>>>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` --
> >>> and
> >>>>>>>>> thus,
> >>>>>>>>>>>>>>>>>>>>> "upgrade.from" would not have any effect and is
> ignored
> >>>> (I
> >>>>>>>>> will
> >>>>>>>>>>>>>>> update
> >>>>>>>>>>>>>>>>>>>>> the KIP to point out this dependency).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> About your second point: I'll update the KIP
> >>> accordingly
> >>>> to
> >>>>>>>>>>>>>> describe
> >>>>>>>>>>>>>>>>>>>>> future updates as well. Both will be different.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> One more point about upgrading the store format. I
> was
> >>>>>>>>> thinking
> >>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>> avoiding the second rolling bounce all together in
> the
> >>>>>>>> future:
> >>>>>>>>>>>> (1)
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2)
> >>> this
> >>>>>>>>>>>> required
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the
> >>>>>> switch
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> delete
> >>>>>>>>>>>>>>>>>>>>> the old stores. (3) the current proposal does the
> >>> switch
> >>>>>>>>>>>>>> "globally"
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>> this is simpler and due to the required second
> >>> rebalance
> >>>> no
> >>>>>>>>>>>>>>>>> disadvantage.
> >>>>>>>>>>>>>>>>>>>>> However, a global consistent switch over might
> actually
> >>>> not
> >>>>>>>> be
> >>>>>>>>>>>>>>>>> required.
> >>>>>>>>>>>>>>>>>>>>> For "in_place" upgrade, following the protocol from
> >>>> above,
> >>>>>>>> we
> >>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>> decouple the store switch and each instance could
> >>> switch
> >>>>>> its
> >>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>> independently from all other instances. After the
> >>> rolling
> >>>>>>>>>>> bounce,
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>> seems to be ok to switch from the old store to the
> new
> >>>>>> store
> >>>>>>>>>>>>>> "under
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> hood" whenever the new store is ready (this could
> even
> >>> be
> >>>>>>>>> done,
> >>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>> we switch to the new metadata version). Each time we
> >>>> update
> >>>>>>>>> the
> >>>>>>>>>>>>>> "hot
> >>>>>>>>>>>>>>>>>>>>> standby" we check if it reached the "endOffset"  (or
> >>>> maybe
> >>>>>>>> X%
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>> either be hardcoded or configurable). If we detect
> this
> >>>>>>>>>>>> situation,
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> Streams application closes corresponding active tasks
> >>> as
> >>>>>>>> well
> >>>>>>>>>>> as
> >>>>>>>>>>>>>>> "hot
> >>>>>>>>>>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks
> >>> using
> >>>>>>>> the
> >>>>>>>>>>> new
> >>>>>>>>>>>>>>>>> store.
> >>>>>>>>>>>>>>>>>>>>> (I need to go through the details once again, but it
> >>>> seems
> >>>>>>>> to
> >>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> feasible.).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Combining this strategy with the "multiple
> assignment"
> >>>>>> idea,
> >>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from
> >>> 1.1
> >>>>>> ->
> >>>>>>>>>>> 1.2.
> >>>>>>>>>>>>>>>>>>>>> Applications would just use the old store, as long as
> >>> the
> >>>>>>>> new
> >>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> not ready, even if the new metadata version is used
> >>>>>> already.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> For future upgrades, a single rebalance would be
> >>>>>> sufficient,
> >>>>>>>>>>> too,
> >>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>> if the stores are upgraded. We would not need any
> >>> config
> >>>>>>>>>>>>>> parameters
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>> the "probe" step allows us to detect the supported
> >>>>>> rebalance
> >>>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>>>>>> version (and we would also not need multiple
> >>> "assigmnent
> >>>>>>>>>>>>>> strategies"
> >>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>> out own protocol encoded everything we need).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
> >>>>>>>>>>>>>>>>>>>>>> @John:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For the protocol version upgrade, it is only for the
> >>>>>>>> encoded
> >>>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>>>>>> bytes
> >>>>>>>>>>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from
> >>>>>> Consumer's
> >>>>>>>>>>> pov,
> >>>>>>>>>>>>>>> so I
> >>>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>> this change should be in the Streams layer as well.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> @Matthias:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> for 2), I agree that adding a "newest supported
> >>> version"
> >>>>>>>>>>> besides
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> "currently used version for encoding" is a good idea
> >>> to
> >>>>>>>> allow
> >>>>>>>>>>>>>>> either
> >>>>>>>>>>>>>>>>>>>>> case;
> >>>>>>>>>>>>>>>>>>>>>> the key is that in Streams we would likely end up
> >>> with a
> >>>>>>>>>>> mapping
> >>>>>>>>>>>>>>>>> from the
> >>>>>>>>>>>>>>>>>>>>>> protocol version to the other persistent data format
> >>>>>>>> versions
> >>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can
> actually
> >>>>>>>>> achieve
> >>>>>>>>>>>>>> both
> >>>>>>>>>>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the
> upgraded
> >>>>>>>>> protocol
> >>>>>>>>>>>>>>>>> version's
> >>>>>>>>>>>>>>>>>>>>>> corresponding data format does not change, e.g.
> 0.10.0
> >>>> ->
> >>>>>>>>>>> 0.10.1
> >>>>>>>>>>>>>>>>> leaders
> >>>>>>>>>>>>>>>>>>>>>> can choose to use the newer version in the first
> >>> rolling
> >>>>>>>>>>> bounce
> >>>>>>>>>>>>>>>>> directly
> >>>>>>>>>>>>>>>>>>>>>> and we can document to users that they would not
> need
> >>> to
> >>>>>>>> set
> >>>>>>>>>>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the
> >>>> upgraded
> >>>>>>>>>>>>>> protocol
> >>>>>>>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 ->
> >>> 1.2,
> >>>>>> and
> >>>>>>>>>>> then
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the
> >>>> first
> >>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>> bounce
> >>>>>>>>>>>>>>>>>>>>>> and reset in the second.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Besides that, some additional comments:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive
> for
> >>>>>> users
> >>>>>>>>> to
> >>>>>>>>>>>>>> set
> >>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>> "internal.protocol.version" where for the latter
> users
> >>>>>> only
> >>>>>>>>>>> need
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> set a
> >>>>>>>>>>>>>>>>>>>>>> single version, while the Streams will map that
> >>> version
> >>>> to
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>>>>>> assignor's behavior as well as the data format. But
> >>>> maybe
> >>>>>> I
> >>>>>>>>>>> did
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>>>>> your idea about how the  "upgrade.from" config will
> be
> >>>>>> set,
> >>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> your Compatibility section how the upgrade.from
> config
> >>>>>> will
> >>>>>>>>> be
> >>>>>>>>>>>>>> set
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> these two rolling bounces are not very clear: for
> >>>> example,
> >>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>>>>>> reset it to null in the second rolling bounce?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 2) In the upgrade path description, rather than
> >>> talking
> >>>>>>>> about
> >>>>>>>>>>>>>>>>> specific
> >>>>>>>>>>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just
> >>>>>>>> categorize
> >>>>>>>>>>> all
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> possible scenarios, even for future upgrade
> versions,
> >>>> what
> >>>>>>>>>>>> should
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> standard operations? The categorized we can
> summarize
> >>> to
> >>>>>>>>> would
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> (assuming
> >>>>>>>>>>>>>>>>>>>>>> user upgrade from version X to version Y, where X
> and
> >>> Y
> >>>>>> are
> >>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>> versions,
> >>>>>>>>>>>>>>>>>>>>>> with the corresponding supported protocol version x
> >>> and
> >>>>>> y):
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change,
> and
> >>>>>>>> hence
> >>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>>> persistent
> >>>>>>>>>>>>>>>>>>>>>> data formats have changed.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> b. x != y, but all persistent data format remains
> the
> >>>>>> same.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> b. x !=y, AND some persistene data format like
> RocksDB
> >>>>>>>>> format,
> >>>>>>>>>>>>>>>>> changelog
> >>>>>>>>>>>>>>>>>>>>>> format, has been changed.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> c. special case: we may need some special handling
> >>> logic
> >>>>>>>> when
> >>>>>>>>>>>>>>>>> "current
> >>>>>>>>>>>>>>>>>>>>>> version" or "newest supported version" are not
> >>> available
> >>>>>> in
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> protocol,
> >>>>>>>>>>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> under the above scenarios, how many rolling bounces
> >>>> users
> >>>>>>>>> need
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> execute?
> >>>>>>>>>>>>>>>>>>>>>> how they should set the configs in each rolling
> >>> bounce?
> >>>>>> and
> >>>>>>>>>>> how
> >>>>>>>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>>>>>>>> library will execute in these cases?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <
> >>>>>>>>>>>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Ted,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I still consider changing the KIP to include it
> right
> >>>>>> away
> >>>>>>>>> --
> >>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>> not,
> >>>>>>>>>>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in
> more
> >>>>>>>> detail
> >>>>>>>>>>>>>> first.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> (Same for other open questions like interface names
> >>> --
> >>>> I
> >>>>>>>>>>>> collect
> >>>>>>>>>>>>>>>>>>>>>>> feedback and update the KIP after we reach
> consensus
> >>>> :))
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the details, Matthias.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future
> >>>>>>>> release,
> >>>>>>>>>>>>>>> encoding
> >>>>>>>>>>>>>>>>>>>>> both
> >>>>>>>>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>>>>>>>> and supported version might be an advantage
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be
> >>>>>> implemented
> >>>>>>>>> in
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding
> >>>>>>>> proposal.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Cheers
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <
> >>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part
> of
> >>>>>> user
> >>>>>>>>>>>> code.
> >>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>>>>>>>> if we want to add something like this, I would
> >>> prefer
> >>>>>> to
> >>>>>>>>> do
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>> separate KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling
> >>>> bounce.
> >>>>>>>> But
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>>>>> understanding it would not be possible.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is
> indeed
> >>> to
> >>>>>>>>>>> switch
> >>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to
> >>>> switch
> >>>>>>>>>>> from
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>>>>>>>>>> store to the new store (this happens after the
> last
> >>>>>>>>>>> instance
> >>>>>>>>>>>>>>>>> bounces a
> >>>>>>>>>>>>>>>>>>>>>>>>> second time).
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is,
> >>>> that
> >>>>>>>>> it's
> >>>>>>>>>>>>>>>>> unclear
> >>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>>>> to which from version 2 to version 3. The
> >>>>>>>>>>>>>>>>> StreamsPartitionsAssignor is
> >>>>>>>>>>>>>>>>>>>>>>>>> stateless by design, and thus, the information
> >>> which
> >>>>>>>>>>> version
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>>>> use must be passed in from externally -- and we
> >>> want
> >>>> to
> >>>>>>>>> use
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> StreamsConfig to pass in this information.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> During upgrade, all new instanced have no
> >>> information
> >>>>>>>>> about
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> progress
> >>>>>>>>>>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got
> >>>>>>>> upgrades
> >>>>>>>>>>>>>>>>> already).
> >>>>>>>>>>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a
> >>> version 3
> >>>>>>>>>>>>>>>>> subscription.
> >>>>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>>>>> leader also has this limited view on the world
> and
> >>>> can
> >>>>>>>>> only
> >>>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>>>>>>> 2 assignments back.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can
> >>>>>> simplify
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> upgrade.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> We did consider to change the metadata to make
> >>> later
> >>>>>>>>>>> upgrades
> >>>>>>>>>>>>>>> (ie,
> >>>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we
> change
> >>>> the
> >>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>>>> storage format again -- as long as we don't
> change
> >>>> it,
> >>>>>> a
> >>>>>>>>>>>>>> single
> >>>>>>>>>>>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version"
> >>> and
> >>>>>>>>>>>>>> "supported
> >>>>>>>>>>>>>>>>>>>>>>>>> version". This would allow the leader to switch
> to
> >>>> the
> >>>>>>>> new
> >>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader
> >>> would
> >>>>>>>>>>> receive
> >>>>>>>>>>>>>>> "used
> >>>>>>>>>>>>>>>>>>>>>>>>> version == old" and "supported version = old/new"
> >>> --
> >>>> as
> >>>>>>>>>>> long
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>>>> least
> >>>>>>>>>>>>>>>>>>>>>>>>> one instance sends a "supported version = old"
> >>> leader
> >>>>>>>>> sends
> >>>>>>>>>>>>>> old
> >>>>>>>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>>>>>>> assignment back. However, encoding both version
> >>> would
> >>>>>>>>> allow
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> leader can send a new version assignment back,
> >>> right
> >>>>>>>> after
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> first
> >>>>>>>>>>>>>>>>>>>>>>>>> round or rebalance finished (all instances send
> >>>>>>>> "supported
> >>>>>>>>>>>>>>>>> version =
> >>>>>>>>>>>>>>>>>>>>>>>>> new"). However, there are still two issues with
> >>> this:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the
> >>>> last
> >>>>>>>>>>>>>> instance
> >>>>>>>>>>>>>>>>>>>>> bounced,
> >>>>>>>>>>>>>>>>>>>>>>>>> the new stores might not be ready to be used --
> >>> this
> >>>>>>>> could
> >>>>>>>>>>>>>> lead
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>> "downtime" as store must be restored before
> >>>> processing
> >>>>>>>> can
> >>>>>>>>>>>>>>> resume.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted
> again.
> >>>> At
> >>>>>>>>> this
> >>>>>>>>>>>>>>>>> point, the
> >>>>>>>>>>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled
> and
> >>>>>> thus
> >>>>>>>>>>>> sends
> >>>>>>>>>>>>>>>>> the old
> >>>>>>>>>>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to
> >>>> never
> >>>>>>>>> fall
> >>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> old protocol after the switch to the new
> protocol.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> The second issue is minor and I guess if users
> >>> set-up
> >>>>>>>> the
> >>>>>>>>>>>>>>> instance
> >>>>>>>>>>>>>>>>>>>>>>>>> properly it could be avoided. However, the first
> >>>> issue
> >>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> prevent
> >>>>>>>>>>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we
> >>>>>>>> consider
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>>>>>>>> change the metadata protocol only if a future
> >>>> release,
> >>>>>>>>>>>>>> encoding
> >>>>>>>>>>>>>>>>> both
> >>>>>>>>>>>>>>>>>>>>>>>>> used and supported version might be an advantage
> in
> >>>> the
> >>>>>>>>>>>> future
> >>>>>>>>>>>>>>>>> and we
> >>>>>>>>>>>>>>>>>>>>>>>>> could consider to add this information in 1.2
> >>> release
> >>>>>> to
> >>>>>>>>>>>>>> prepare
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to
> >>>> give
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> instances
> >>>>>>>>>>>>>>>>>>>>>>>>> enough time to prepare the stores in new format.
> If
> >>>> you
> >>>>>>>>>>> would
> >>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> second rolling bounce before this, it would still
> >>>> work
> >>>>>>>> --
> >>>>>>>>>>>>>>>>> however, you
> >>>>>>>>>>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be
> >>>> fully
> >>>>>>>>>>>>>> restored
> >>>>>>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>>>>>> processing can resume.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Does this make sense?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get
> >>> rid
> >>>>>> of
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> 2nd
> >>>>>>>>>>>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>>>>>>>> bounce?
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary
> >>>>>>>>> difference
> >>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is
> to
> >>>>>>>> decide
> >>>>>>>>>>>>>>>>> whether to
> >>>>>>>>>>>>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3.
> >>>>>>>>>>> (Actually,
> >>>>>>>>>>>>>>>>> there is
> >>>>>>>>>>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says
> >>>> that
> >>>>>>>> the
> >>>>>>>>>>>> 2nd
> >>>>>>>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>>>>>>>> bounce should happen after all new state stores
> are
> >>>>>>>>> created
> >>>>>>>>>>>> by
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> background thread. However, within the 2nd
> rolling
> >>>>>>>> bounce,
> >>>>>>>>>>> we
> >>>>>>>>>>>>>>> say
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>> there is still a background thread, so it seems
> >>> like
> >>>> is
> >>>>>>>> no
> >>>>>>>>>>>>>>> actual
> >>>>>>>>>>>>>>>>>>>>>>>>> requirement to wait for the new state stores to
> be
> >>>>>>>>>>> created.)
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal
> >>>> with
> >>>>>>>>>>>>>>> mixed-mode
> >>>>>>>>>>>>>>>>>>>>>>> (having
> >>>>>>>>>>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer
> >>>>>>>> group).
> >>>>>>>>>>> It
> >>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic
> >>>>>>>>>>>>>>>>> (somehow/somewhere)
> >>>>>>>>>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>>>>>>>>>> that:
> >>>>>>>>>>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until
> all
> >>>>>>>>>>> instances
> >>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>> running
> >>>>>>>>>>>>>>>>>>>>>>>>> the new code.
> >>>>>>>>>>>>>>>>>>>>>>>>>> * Once all the instances are running the new
> code,
> >>>>>> then
> >>>>>>>>>>> one
> >>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> time,
> >>>>>>>>>>>>>>>>>>>>>>>>> the instances start sending Subscription V3.
> Leader
> >>>>>>>> still
> >>>>>>>>>>>>>> hands
> >>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores
> >>> are
> >>>>>>>>> ready.
> >>>>>>>>>>>>>>>>>>>>>>>>>> * Once all instances report that new stores are
> >>>> ready,
> >>>>>>>>>>>> Leader
> >>>>>>>>>>>>>>>>> sends
> >>>>>>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 3.
> >>>>>>>>>>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment
> Version
> >>> 3,
> >>>>>> it
> >>>>>>>>>>> can
> >>>>>>>>>>>>>>>>> delete
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> old state store.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a
> lot
> >>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2
> >>> rolling
> >>>>>>>>>>>>>> restarts.
> >>>>>>>>>>>>>>> No
> >>>>>>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just
> >>> deploy
> >>>>>>>> it,
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> instances
> >>>>>>>>>>>>>>>>>>>>>>>>> update themselves.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> The thing that made me think of this is that the
> >>> "2
> >>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>> bounces"
> >>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes
> in
> >>>>>>>>>>>>>>>>>>>>>>>>> inter.broker.protocol.version and
> >>>>>>>>>>> log.message.format.version.
> >>>>>>>>>>>>>>> And
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> broker case, it seems like it would be possible
> >>> (with
> >>>>>>>> some
> >>>>>>>>>>>>>> work
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>> course)
> >>>>>>>>>>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar
> >>>>>> auto-detection
> >>>>>>>>> of
> >>>>>>>>>>>>>>> broker
> >>>>>>>>>>>>>>>>>>>>>>>>> capabilities and automatically do a switchover
> from
> >>>>>>>>> old/new
> >>>>>>>>>>>>>>>>> versions.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> -James
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <
> >>>>>>>>>>>> bbej...@gmail.com
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval
> >>>>>> methods
> >>>>>>>>> on
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>>>>> interfaces.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a
> >>>>>>>>> Predicate
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored
> >>> with
> >>>>>> the
> >>>>>>>>>>>>>> record?
> >>>>>>>>>>>>>>>>> Or
> >>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>> better left for users to implement themselves
> >>> once
> >>>>>> the
> >>>>>>>>>>> data
> >>>>>>>>>>>>>>> has
> >>>>>>>>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>>>>>>>>>>> retrieved?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <
> >>>>>>>>>>>> yuzhih...@gmail.com
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to
> >>>> which
> >>>>>>>>>>>>>>> separator
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>> chosen.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current
> >>> choice
> >>>>>> is
> >>>>>>>>>>>> good.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it
> >>> is
> >>>>>>>>> closer
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> English
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> grammar.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people
> >>>> think.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J.
> Sax
> >>> <
> >>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comments!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance
> >>>> metadata
> >>>>>>>>>>>> upgrade
> >>>>>>>>>>>>>>> fix
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (addressing the mentioned
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>> https://issues.apache.org/jira/browse/KAFKA-6054
> >>>> )
> >>>>>>>> It
> >>>>>>>>>>>>>> give a
> >>>>>>>>>>>>>>>>> first
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works
> >>>> including
> >>>>>>>> a
> >>>>>>>>>>>>>> system
> >>>>>>>>>>>>>>>>> test:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are
> >>> ready.
> >>>> I
> >>>>>>>>>>> agree
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to
> >>>> give
> >>>>>>>>>>> better
> >>>>>>>>>>>>>>>>>>>>> discussion
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> context.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Ted:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the
> >>>>>>>>>>>>>>>>>>>>> `processing.guarantee`
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and
> >>>>>>>>>>> `exactly_once`
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>> values.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs
> >>> dash
> >>>>>>>> but
> >>>>>>>>> I
> >>>>>>>>>>>>>>> prefer
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we
> >>> can
> >>>>>>>> also
> >>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>> it to
> >>>>>>>>>>>>>>>>>>>>>>>>> `-`.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either
> way
> >>>> -- I
> >>>>>>>>>>>>>> stripped
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> `With`
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be
> >>> good
> >>>> to
> >>>>>>>>> get
> >>>>>>>>>>>>>>>>> feedback
> >>>>>>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> others and pick the name the majority
> prefers.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> @John:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it
> >>> would
> >>>>>>>> not
> >>>>>>>>>>>>>> make a
> >>>>>>>>>>>>>>>>>>>>>>>>> difference.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the
> two
> >>>>>>>>> version
> >>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>>>>>>>> introduce
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I
> >>>> don't
> >>>>>>>> hit
> >>>>>>>>>>> an
> >>>>>>>>>>>>>>>>> issue
> >>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory
> >>> instead
> >>>> of
> >>>>>>>>>>>>>>>>> `rocksdb-v2`
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Matthias,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several
> >>>> questions
> >>>>>>>>>>> queued
> >>>>>>>>>>>>>>> up,
> >>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> were
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives"
> section...
> >>>> oh,
> >>>>>>>>>>> well.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state
> >>>>>>>>> directory
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
> >/rocksdb/storeName/"
> >>>> to
> >>>>>>>>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
> >>>>> /rocksdb-v2/storeName/":
> >>>>>>>> if
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>> put the
> >>>>>>>>>>>>>>>>>>>>>>>>> "v2"
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path
> >>> (i.e.,
> >>>>>>>>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
> >>>>>>> /rocksdb/storeName-v2/"),
> >>>>>>>>>>> then
> >>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level
> >>>> directory
> >>>>>>>>>>>>>>> structure.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine
> people
> >>>>>>>> running
> >>>>>>>>>>>>>>>>> scripts to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> monitor
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other
> >>> such
> >>>>>> use
> >>>>>>>>>>>>>> cases.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <
> >>>>>>>>>>>>>>> yuzhih...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ?
> >>>> Underscore
> >>>>>>>> may
> >>>>>>>>>>>>>>>>> sometimes
> >>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>> miss
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more
> >>>>>> friendly
> >>>>>>>>> to
> >>>>>>>>>>>>>>> user.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface
> >>>>>> ReadOnlyKeyValueTimestampStore<K,
> >>>>>>>>> V>
> >>>>>>>>>>>> {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp
> better
> >>>> name
> >>>>>>>>> for
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> class ?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang
> >>> Wang <
> >>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch
> section
> >>>> and
> >>>>>>>> it
> >>>>>>>>>>>>>> looks
> >>>>>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>> me,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you
> also
> >>>>>> share
> >>>>>>>>> it
> >>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> people
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can take a look?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs
> >>> like
> >>>>>>>> this
> >>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>> always
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it
> >>> is
> >>>>>>>>> better
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the
> >>>> design
> >>>>>>>>>>>>>>>>> discussion :)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias
> J.
> >>>> Sax
> >>>>>> <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams
> >>> API
> >>>>>> to
> >>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>> storing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is
> the
> >>>>>> basis
> >>>>>>>>> to
> >>>>>>>>>>>>>>>>> resolve
> >>>>>>>>>>>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about
> >>> this!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>

-- 
-- Guozhang

Reply via email to