Thanks for the feedback.

I updated the interface names to `TimestampedXxxStore`.

About `RecordConverter` I would like to keep the name, because it's a
generic concept and we will have a special implementation that does the
timestamp conversion -- however, this should be reflected in the
(internal) class name, not the interface name.

I also added more details about `RecordConverter` interface and impact
on custom store implementations.

About Bill's question. Happy to address this in the PR. But it seems to
be an implementation details and thus, I did not add anything to the KIP.


-Matthias


On 1/16/19 10:00 AM, Guozhang Wang wrote:
> Matthias,
> 
> Thanks for the updated wiki page. I made another pass and here are some
> minor comments I have (mostly about API names and writing itself):
> 
> 1. The scope of the RecordConverter may not be very clear from the wiki
> page reads. Here's my understanding:
> 
> 1.a) For build-in implementations (rocksdb, memory) of the defined types of
> StateStore (k-v, window, session), Streams will implement the
> RecordConverter itself *so there's nothing that users need to do*;
> 1.b) For customized implementations of the defined types of StateStore,
> Streams will use a proxy store internally to always "down-convert" the new
> APIs to the old formats, *so there' nothing that users need to do still*;
> but users can opt-in to let their custom impl to also extend RecordConver.
> 1.c) For user-defined StateStore (e.g. Alice has her own interface named
> Database extending StateStore with her own impl). *This is the only place
> users are required to do extra work by implementing the RecordConverter
> interface*.
> 
> 2. Naming.
> 
> 2.a) I'd also prefer TimestampedXXXStore over XXXWithTimestampStore.
> 2.b) RecordConverter: I felt having a more specific name is better if we
> believe future format changes cannot reuse it anyways. Maybe
> "TimestampedRecordConverter"?
> 
> 3. Regarding Bill's question above. One idea is that we can add an extra
> check logic during the starting up phase, to check if the old CF is empty
> or not already, and if yes set a flag so that we can skip that CF for state
> store access. The rationale is that Streams apps are expected to be bounced
> / re-launched frequently so just having this check logic upon starting up
> should be a good trade-off between complexity and efficiency.
> 
> 
> Guozhang
> 
> 
> 
> 
> Guozhang
> 
> 
> On Sat, Jan 12, 2019 at 6:46 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> I also want to point out, that Ryanne Dolan commented on the WIP PR
>> (https://github.com/apache/kafka/pull/6044) about the naming. I asked
>> him to reply to this thread, but this did no happen yet, thus I want to
>> point it out myself, because it seems to important.
>>
>>
>> WindowWithTimestampStore.java
>>
>>> Hard to parse this class name -- sounds like "trait Window with
>> TimestampStore" instead of WindowStore<... ValueAndTimestamp...>
>>>
>>> Maybe TimestampWindowStore or TimestampedWindowStore? shrug.
>>>
>>
>>
>>
>> Another comment, that I personally do not necessarily agree:
>>
>>
>> KeyValueWithTimestampStore.java
>>
>>> possible alternative: implement KeyValueStore<K, V>, and then expose an
>> additional putWithTimestamp, getWithTimestamp etc for callers that want
>> ValueAndTimestamp<V> instead of V. This would probably require fewer code
>> changes elsewhere.
>>
>>
>>
>> What do you think?
>>
>>
>> -Matthias
>>
>>
>>
>> On 1/12/19 6:43 PM, Matthias J. Sax wrote:
>>> Bill,
>>>
>>> I left the question about legacy column family out, because as a matter
>>> of fact, we use the default column family atm that cannot be deleted.
>>> Thus, this old column family will always be there.
>>>
>>> Nevertheless, as an implementation detail, it might make sense to avoid
>>> accessing both column families forever (ie, each time a key is not
>>> found). Also, we might want/need a way, to "force" upgrading to the new
>>> column family, for the case that some records are not accessed for a
>>> long time. Again, this seems to be an implementation detail (and I am
>>> also not sure if we really need it). If you thing both are not
>>> implementation details, I can of course extend the KIP accordingly.
>>>
>>>
>>> -Matthias
>>>
>>> On 1/11/19 1:27 PM, Bill Bejeck wrote:
>>>> Hi Matthias,
>>>>
>>>> Thanks for the KIP, it goes into good detail and is well done.
>>>>
>>>> Overall I'm a +1 on the KIP and have one minor question.
>>>>
>>>> Regarding the upgrade path, we'll use two column families to do a lazy
>>>> conversion which makes sense to me.  What is the plan to get
>>>> rid of the "legacy" column family (if ever)?  Would we drop the "legacy"
>>>> column family once it is empty? I'm not sure we'd ever need to as it
>> would
>>>> just be a column family that doesn't get used.
>>>>
>>>> Maybe this is an implementation detail and doesn't need to be addressed
>>>> now, but it came to mind when I read the KIP.
>>>>
>>>> Thanks again,
>>>> Bill
>>>>
>>>> On Fri, Jan 11, 2019 at 1:19 PM John Roesler <j...@confluent.io> wrote:
>>>>
>>>>> Hi Matthias,
>>>>>
>>>>> Thanks for the updates to the KIP. I've just read it over, and am
>>>>> personally quite happy with it.
>>>>>
>>>>> Thanks for tackling this dicey issue and putting in a huge amount of
>> design
>>>>> work to produce
>>>>> a smooth upgrade path for DSL users.
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>> On Mon, Dec 17, 2018 at 10:35 AM Matthias J. Sax <
>> matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Dear all,
>>>>>>
>>>>>> I finally managed to update the KIP.
>>>>>>
>>>>>> To address the concerns about the complex upgrade path, I simplified
>> the
>>>>>> design. We don't need any configs and the upgrade can be done with the
>>>>>> simple single rolling bounce pattern.
>>>>>>
>>>>>> The suggestion is to exploit RocksDB column families to isolate old
>> and
>>>>>> new on-disk format. Furthermore, the upgrade from old to new format
>>>>>> happens "on the side" after an instance was upgraded.
>>>>>>
>>>>>> I also pushed a WIP PR in case you want to look into some details
>>>>>> (potential reviewers, don't panic: I plan to break this down into
>>>>>> multiple PRs for actual review if the KIP is accepted).
>>>>>>
>>>>>> https://github.com/apache/kafka/pull/6044
>>>>>>
>>>>>> @Eno: I think I never answered your question about being future proof:
>>>>>>
>>>>>> The latest design is not generic, because it does not support changes
>>>>>> that need to be reflected in the changelog topic. I aimed for a
>>>>>> non-generic design for now to keep it as simple as possible. Thus,
>> other
>>>>>> format changes might need a different design / upgrade path --
>> however,
>>>>>> because this KIP is quite encapsulated in the current design, I don't
>>>>>> see any issue to build this later and a generic upgrade path seems to
>> be
>>>>>> an orthogonal concern atm.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 11/22/18 2:50 PM, Adam Bellemare wrote:
>>>>>>> Thanks for the information Matthias.
>>>>>>>
>>>>>>> I will await your completion of this ticket then since it underpins
>> the
>>>>>>> essential parts of a RocksDB TTL aligned with the changelog topic. I
>> am
>>>>>>> eager to work on that ticket myself, so if I can help on this one in
>>>>> any
>>>>>>> way please let me know.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Adam
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Nov 20, 2018 at 5:26 PM Matthias J. Sax <
>> matth...@confluent.io
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> It's an interesting idea to use second store, to maintain the
>>>>>>>> timestamps. However, each RocksDB instance implies some overhead. In
>>>>>>>> fact, we are looking into ColumnFamilies atm to see if we can use
>>>>> those
>>>>>>>> and merge multiple RocksDBs into a single one to reduce this
>> overhead.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 11/20/18 5:15 AM, Patrik Kleindl wrote:
>>>>>>>>> Hi Adam
>>>>>>>>>
>>>>>>>>> Sounds great, I was already planning to ask around if anyone had
>>>>>> tackled
>>>>>>>>> this.
>>>>>>>>> We have a use case very similar to what you described in
>> KAFKA-4212,
>>>>>> only
>>>>>>>>> with Global State Stores.
>>>>>>>>> I have tried a few things with the normal DSL but was not really
>>>>>>>> successful.
>>>>>>>>> Schedule/Punctuate is not possible, supplying a windowed store is
>>>>> also
>>>>>>>> not
>>>>>>>>> allowed and the process method has no knowledge of the timestamp of
>>>>> the
>>>>>>>>> record.
>>>>>>>>> And anything loaded on startup is not filtered anyway.
>>>>>>>>>
>>>>>>>>> Regarding 4212, wouldn't it be easier (although a little less
>>>>>>>>> space-efficient) to track the Timestamps in a separate Store with
>> <K,
>>>>>>>> Long>
>>>>>>>>> ?
>>>>>>>>> This would leave the original store intact and allow a migration of
>>>>> the
>>>>>>>>> timestamps without touching the other data.
>>>>>>>>>
>>>>>>>>> So I am very interested in your PR :-)
>>>>>>>>>
>>>>>>>>> best regards
>>>>>>>>>
>>>>>>>>> Patrik
>>>>>>>>>
>>>>>>>>> On Tue, 20 Nov 2018 at 04:46, Adam Bellemare <
>>>>> adam.bellem...@gmail.com
>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Matthias
>>>>>>>>>>
>>>>>>>>>> Thanks - I figured that it was probably a case of just too much to
>>>>> do
>>>>>>>> and
>>>>>>>>>> not enough time. I know how that can go. I am asking about this
>> one
>>>>> in
>>>>>>>>>> relation to https://issues.apache.org/jira/browse/KAFKA-4212,
>>>>> adding
>>>>>> a
>>>>>>>> TTL
>>>>>>>>>> to RocksDB. I have outlined a bit about my use-case within 4212,
>> but
>>>>>> for
>>>>>>>>>> brevity here it is:
>>>>>>>>>>
>>>>>>>>>> My case:
>>>>>>>>>> 1) I have a RocksDB with TTL implementation working where records
>>>>> are
>>>>>>>> aged
>>>>>>>>>> out using the TTL that comes with RocksDB (very simple).
>>>>>>>>>> 2) We prevent records from loading from the changelog if
>> recordTime
>>>>> +
>>>>>>>> TTL <
>>>>>>>>>> referenceTimeStamp (default = System.currentTimeInMillis() ).
>>>>>>>>>>
>>>>>>>>>> This assumes that the records are stored with the same time
>>>>> reference
>>>>>>>> (say
>>>>>>>>>> UTC) as the consumer materializing the RocksDB store.
>>>>>>>>>>
>>>>>>>>>> My questions about KIP-258 are as follows:
>>>>>>>>>> 1) How does "we want to be able to store record timestamps in
>>>>> KTables"
>>>>>>>>>> differ from inserting records into RocksDB with TTL at consumption
>>>>>>>> time? I
>>>>>>>>>> understand that it could be a difference of some seconds, minutes,
>>>>>>>> hours,
>>>>>>>>>> days etc between when the record was published and now, but given
>>>>> the
>>>>>>>>>> nature of how RocksDB TTL works (eventual - based on compaction) I
>>>>>> don't
>>>>>>>>>> see how a precise TTL can be achieved, such as that which one can
>>>>> get
>>>>>>>> with
>>>>>>>>>> windowed stores.
>>>>>>>>>>
>>>>>>>>>> 2) Are you looking to change how records are inserted into a TTL
>>>>>>>> RocksDB,
>>>>>>>>>> such that the TTL would take effect from the record's published
>>>>> time?
>>>>>> If
>>>>>>>>>> not, what would be the ideal workflow here for a single record
>> with
>>>>>> TTL
>>>>>>>>>> RocksDB?
>>>>>>>>>> ie: Record Timestamp: 100
>>>>>>>>>> TTL: 50
>>>>>>>>>> Record inserted into rocksDB: 110
>>>>>>>>>> Record to expire at 150?
>>>>>>>>>>
>>>>>>>>>> 3) I'm not sure I fully understand the importance of the upgrade
>>>>>> path. I
>>>>>>>>>> have read the link to (
>>>>>> https://issues.apache.org/jira/browse/KAFKA-3522
>>>>>>>> )
>>>>>>>>>> in
>>>>>>>>>> the KIP, and I can understand that a state-store on disk may not
>>>>>>>> represent
>>>>>>>>>> what the application is expecting. I don't think I have the full
>>>>>> picture
>>>>>>>>>> though, because that issue seems to be easy to fix with a simple
>>>>>>>> versioned
>>>>>>>>>> header or accompanying file, forcing the app to rebuild the state
>> if
>>>>>> the
>>>>>>>>>> version is incompatible. Can you elaborate or add a scenario to
>> the
>>>>>> KIP
>>>>>>>>>> that illustrates the need for the upgrade path?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Adam
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax <
>>>>>> matth...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Adam,
>>>>>>>>>>>
>>>>>>>>>>> I am still working on it. Was pulled into a lot of other tasks
>>>>> lately
>>>>>>>> so
>>>>>>>>>>> this was delayed. Also had some discussions about simplifying the
>>>>>>>>>>> upgrade path with some colleagues and I am prototyping this atm.
>>>>> Hope
>>>>>>>> to
>>>>>>>>>>> update the KIP accordingly soon.
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 11/10/18 7:41 AM, Adam Bellemare wrote:
>>>>>>>>>>>> Hello Matthias
>>>>>>>>>>>>
>>>>>>>>>>>> I am curious as to the status of this KIP. TTL and expiry of
>>>>> records
>>>>>>>>>> will
>>>>>>>>>>>> be extremely useful for several of our business use-cases, as
>> well
>>>>>> as
>>>>>>>>>>>> another KIP I had been working on.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <
>>>>>> eno.there...@gmail.com
>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Good stuff. Could you comment a bit on how future-proof is this
>>>>>>>>>> change?
>>>>>>>>>>> For
>>>>>>>>>>>>> example, if we want to store both event timestamp "and"
>>>>> processing
>>>>>>>>>> time
>>>>>>>>>>> in
>>>>>>>>>>>>> RocksDB will we then need another interface (e.g. called
>>>>>>>>>>>>> KeyValueWithTwoTimestampsStore)?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <
>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your input Guozhang and John.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I see your point, that the upgrade API is not simple. If you
>>>>> don't
>>>>>>>>>>>>>> thinks it's valuable to make generic store upgrades possible
>>>>>> (atm),
>>>>>>>>>> we
>>>>>>>>>>>>>> can make the API internal, too. The impact is, that we only
>>>>>> support
>>>>>>>> a
>>>>>>>>>>>>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to
>>>>>>>>>>>>>> WindowedWithTS etc) for which we implement the internal
>>>>>> interfaces.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We can keep the design generic, so if we decide to make it
>>>>> public,
>>>>>>>> we
>>>>>>>>>>>>>> don't need to re-invent it. This will also have the advantage,
>>>>>> that
>>>>>>>>>> we
>>>>>>>>>>>>>> can add upgrade pattern for other stores later, too.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly,
>>>>> but
>>>>>>>> it
>>>>>>>>>>>>>> was the only way I could find to design a generic upgrade
>>>>>> interface.
>>>>>>>>>> If
>>>>>>>>>>>>>> we decide the hide all the upgrade stuff,
>> `StoreUpgradeBuilder`
>>>>>>>> would
>>>>>>>>>>>>>> become an internal interface I guess (don't think we can
>> remove
>>>>>> it).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I will wait for more feedback about this and if nobody wants
>> to
>>>>>> keep
>>>>>>>>>> it
>>>>>>>>>>>>>> as public API I will update the KIP accordingly. Will add some
>>>>>> more
>>>>>>>>>>>>>> clarifications for different upgrade patterns in the mean time
>>>>> and
>>>>>>>>>> fix
>>>>>>>>>>>>>> the typos/minor issues.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> About adding a new state UPGRADING: maybe we could do that.
>>>>>> However,
>>>>>>>>>> I
>>>>>>>>>>>>>> find it particularly difficult to make the estimation when we
>>>>>> should
>>>>>>>>>>>>>> switch to RUNNING, thus, I am a little hesitant. Using store
>>>>>>>>>> callbacks
>>>>>>>>>>>>>> or just logging the progress including some indication about
>> the
>>>>>>>>>> "lag"
>>>>>>>>>>>>>> might actually be sufficient. Not sure what others think?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> About "value before timestamp": no real reason and I think it
>>>>> does
>>>>>>>>>> not
>>>>>>>>>>>>>> make any difference. Do you want to change it?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> About upgrade robustness: yes, we cannot control if an
>> instance
>>>>>>>>>> fails.
>>>>>>>>>>>>>> That is what I meant by "we need to write test". The upgrade
>>>>>> should
>>>>>>>>>> be
>>>>>>>>>>>>>> able to continuous even is an instance goes down (and we must
>>>>> make
>>>>>>>>>> sure
>>>>>>>>>>>>>> that we don't end up in an invalid state that forces us to
>> wipe
>>>>>> out
>>>>>>>>>> the
>>>>>>>>>>>>>> whole store). Thus, we need to write system tests that fail
>>>>>>>> instances
>>>>>>>>>>>>>> during upgrade.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For `in_place_offline` upgrade: I don't think we need this
>> mode,
>>>>>>>>>>> because
>>>>>>>>>>>>>> people can do this via a single rolling bounce.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  - prepare code and switch KV-Store to KVwithTs-Store
>>>>>>>>>>>>>>  - do a single rolling bounce (don't set any upgrade config)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store`
>> if
>>>>>> we
>>>>>>>>>>>>>> remove the `StoreUpgradeBuilder`) will detect that there is
>> only
>>>>>> an
>>>>>>>>>> old
>>>>>>>>>>>>>> local KV store w/o TS, will start to restore the new KVwithTs
>>>>>> store,
>>>>>>>>>>>>>> wipe out the old store and replace with the new store after
>>>>>> restore
>>>>>>>>>> is
>>>>>>>>>>>>>> finished, and start processing only afterwards. (I guess we
>> need
>>>>>> to
>>>>>>>>>>>>>> document this case -- will also add it to the KIP.)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 8/9/18 1:10 PM, John Roesler wrote:
>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think this KIP is looking really good.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a few thoughts to add to the others:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. You mentioned at one point users needing to configure
>>>>>>>>>>>>>>> `upgrade.mode="null"`. I think this was a typo and you meant
>> to
>>>>>> say
>>>>>>>>>>>>> they
>>>>>>>>>>>>>>> should remove the config. If they really have to set it to a
>>>>>> string
>>>>>>>>>>>>>> "null"
>>>>>>>>>>>>>>> or even set it to a null value but not remove it, it would be
>>>>>>>>>>>>>> unfortunate.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2. In response to Bill's comment #1 , you said that "The idea
>>>>> is
>>>>>>>>>> that
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> upgrade should be robust and not fail. We need to write
>>>>> according
>>>>>>>>>>>>> tests".
>>>>>>>>>>>>>>> I may have misunderstood the conversation, but I don't think
>>>>> it's
>>>>>>>>>>>>> within
>>>>>>>>>>>>>>> our power to say that an instance won't fail. What if one of
>> my
>>>>>>>>>>>>> computers
>>>>>>>>>>>>>>> catches on fire? What if I'm deployed in the cloud and one
>>>>>> instance
>>>>>>>>>>>>>>> disappears and is replaced by a new one? Or what if one
>>>>> instance
>>>>>>>>>> goes
>>>>>>>>>>>>>> AWOL
>>>>>>>>>>>>>>> for a long time and then suddenly returns? How will the
>> upgrade
>>>>>>>>>>> process
>>>>>>>>>>>>>>> behave in light of such failures?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3. your thought about making in-place an offline mode is
>>>>>>>>>> interesting,
>>>>>>>>>>>>> but
>>>>>>>>>>>>>>> it might be a bummer for on-prem users who wish to upgrade
>>>>>> online,
>>>>>>>>>> but
>>>>>>>>>>>>>>> cannot just add new machines to the pool. It could be a new
>>>>>> upgrade
>>>>>>>>>>>>> mode
>>>>>>>>>>>>>>> "offline-in-place", though...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 4. I was surprised to see that a user would need to modify
>> the
>>>>>>>>>>> topology
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of
>>>>>> Guozhang's
>>>>>>>>>>>>>>> suggestions would remove this necessity.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for taking on this very complex but necessary work.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <
>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello Matthias,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the updated KIP. Some more comments:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. The current set of proposed API is a bit too complicated,
>>>>>> which
>>>>>>>>>>>>> makes
>>>>>>>>>>>>>>>> the upgrade flow from user's perspective also a bit complex.
>>>>> I'd
>>>>>>>>>> like
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> check different APIs and discuss about their needs
>> separately:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     1.a. StoreProxy: needed for in-place upgrade only,
>> between
>>>>>> the
>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>> and second rolling bounce, where the old-versioned stores
>> can
>>>>>>>>>> handle
>>>>>>>>>>>>>>>> new-versioned store APIs. I think such upgrade paths (i.e.
>>>>> from
>>>>>>>> one
>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>> type to another) would not be very common: users may want to
>>>>>>>>>> upgrade
>>>>>>>>>>>>>> from a
>>>>>>>>>>>>>>>> certain store engine to another, but the interface would
>>>>> likely
>>>>>> be
>>>>>>>>>>>>>> staying
>>>>>>>>>>>>>>>> the same. Hence personally I'd suggest we keep it internally
>>>>> and
>>>>>>>>>> only
>>>>>>>>>>>>>>>> consider exposing it in the future if it does become a
>> common
>>>>>>>>>>> pattern.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     1.b. ConverterStore / RecordConverter: needed for both
>>>>>>>> in-place
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> roll-over upgrade, between the first and second rolling
>>>>> bounces,
>>>>>>>>>> for
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> new versioned store to be able to read old-versioned
>> changelog
>>>>>>>>>>> topics.
>>>>>>>>>>>>>>>> Firstly I think we should not expose key in the public APIs
>>>>> but
>>>>>>>>>> only
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> values, since allowing key format changes would break log
>>>>>>>>>> compaction,
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> hence would not be compatible anyways. As for value format
>>>>>>>> changes,
>>>>>>>>>>>>>>>> personally I think we can also keep its upgrade logic
>>>>> internally
>>>>>>>> as
>>>>>>>>>>> it
>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>> not worth generalizing to user customizable logic.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     1.c. If you agrees with 2.a/b above, then we can also
>>>>>> remove "
>>>>>>>>>>>>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the
>>>>>>>> public
>>>>>>>>>>>>>> APIs.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     1.d. Personally I think
>>>>> "ReadOnlyKeyValueWithTimestampStore"
>>>>>>>> is
>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> needed either given that we are exposing "ValueAndTimestamp"
>>>>>>>>>> anyways.
>>>>>>>>>>>>>> I.e.
>>>>>>>>>>>>>>>> it is just a syntax sugar and for IQ, users can always just
>>>>> set
>>>>>> a
>>>>>>>> "
>>>>>>>>>>>>>>>> QueryableStoreType<ReadOnlyKeyValue<K,
>> ValueAndTimestamp<V>>>"
>>>>>> as
>>>>>>>>>> the
>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>> interface does not provide any additional functions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2. Could we further categorize the upgrade flow for
>> different
>>>>>> use
>>>>>>>>>>>>> cases,
>>>>>>>>>>>>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be
>>>>> used
>>>>>>>>>>>>>>>> automatically for non-windowed aggregate; 2) PAPI users who
>> do
>>>>>> not
>>>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to
>>>>>>>> switch
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> KeyValueWithTimestampStore. Just to give my understanding
>> for
>>>>>> 3),
>>>>>>>>>> the
>>>>>>>>>>>>>>>> upgrade flow for users may be simplified as the following
>> (for
>>>>>>>> both
>>>>>>>>>>>>>>>> in-place and roll-over):
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     * Update the jar to new version, make code changes from
>>>>>>>>>>>>>> KeyValueStore
>>>>>>>>>>>>>>>> to KeyValueWithTimestampStore, set upgrade config.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     * First rolling bounce, and library code can internally
>>>>> use
>>>>>>>>>> proxy
>>>>>>>>>>>>> /
>>>>>>>>>>>>>>>> converter based on the specified config to handle new APIs
>>>>> with
>>>>>>>> old
>>>>>>>>>>>>>> stores,
>>>>>>>>>>>>>>>> while let new stores read from old changelog data.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     * Reset upgrade config.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     * Second rolling bounce, and the library code
>>>>> automatically
>>>>>>>>>> turn
>>>>>>>>>>>>> off
>>>>>>>>>>>>>>>> logic for proxy / converter.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 3. Some more detailed proposals are needed for when to
>>>>> recommend
>>>>>>>>>>> users
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> trigger the second rolling bounce. I have one idea to share
>>>>>> here:
>>>>>>>>>> we
>>>>>>>>>>>>>> add a
>>>>>>>>>>>>>>>> new state to KafkaStreams, say UPGRADING, which is set when
>> 1)
>>>>>>>>>>> upgrade
>>>>>>>>>>>>>>>> config is set, and 2) the new stores are still ramping up
>> (for
>>>>>> the
>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>> part, we can start with some internal hard-coded heuristics
>> to
>>>>>>>>>> decide
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>> it is close to be ramped up). If either one of it is not
>> true
>>>>>> any
>>>>>>>>>>>>> more,
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> should transit to RUNNING. Users can then watch on this
>> state,
>>>>>> and
>>>>>>>>>>>>>> decide
>>>>>>>>>>>>>>>> to only trigger the second rebalance when the state has
>>>>>> transited
>>>>>>>>>>> from
>>>>>>>>>>>>>>>> UPGRADING. They can also choose to cut over while the
>> instance
>>>>>> is
>>>>>>>>>>>>> still
>>>>>>>>>>>>>>>> UPGRADING, the downside is that after that the application
>> may
>>>>>>>> have
>>>>>>>>>>>>> long
>>>>>>>>>>>>>>>> restoration phase which is, to user's pov, unavailability
>>>>>> periods.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Below are just some minor things on the wiki:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 4. "proxy story" => "proxy store".
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 5. "use the a builder " => "use a builder"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to
>>>>> the
>>>>>>>>>>>>> value":
>>>>>>>>>>>>>>>> what's the rationale of putting the timestamp before the
>>>>> value,
>>>>>>>>>> than
>>>>>>>>>>>>>> after
>>>>>>>>>>>>>>>> the value?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax <
>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the feedback Bill. I just update the KIP with
>> some
>>>>>> of
>>>>>>>>>>> your
>>>>>>>>>>>>>>>>> points.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing
>> to
>>>>>>>>>> watch
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a
>>>>> type
>>>>>> of
>>>>>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new
>> stores
>>>>>> have
>>>>>>>>>>>>>>>> reached
>>>>>>>>>>>>>>>>>>> parity with the existing old stores and that could be the
>>>>>>>> signal
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>>>> second rolling rebalance?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think we can reuse the existing listeners, thus, I did
>> not
>>>>>>>>>> include
>>>>>>>>>>>>>>>>> anything in the KIP. About a signal to rebalance: this
>> might
>>>>> be
>>>>>>>>>>>>> tricky.
>>>>>>>>>>>>>>>>> If we prepare the store "online", the active task will
>> update
>>>>>> the
>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>> continuously, and thus, state prepare is never finished. It
>>>>>> will
>>>>>>>>>> be
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> users responsibility to do the second rebalance (note, that
>>>>> the
>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>> rebalance will first finish the last delta of the upgrade
>> to
>>>>>>>>>> finish
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> upgrade before actual processing resumes). I clarified the
>>>>> KIP
>>>>>>>>>> with
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> regard a little bit.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the
>>>>> process,
>>>>>>>>>> would
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>>> the other instances to complete or just fail the entire
>>>>>>>> upgrade?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The idea is that the upgrade should be robust and not fail.
>>>>> We
>>>>>>>>>> need
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> write according tests.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could
>> rename
>>>>>> the
>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>> active directories vs. deleting them right away,  and
>> when
>>>>>> all
>>>>>>>>>> the
>>>>>>>>>>>>>>>>> prepare
>>>>>>>>>>>>>>>>>>> task directories are successfully migrated then delete
>> the
>>>>>>>>>>> previous
>>>>>>>>>>>>>>>>> active
>>>>>>>>>>>>>>>>>>> ones.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ack. Updated the KIP.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing
>> any
>>>>>> new
>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all
>>>>>> prepare
>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling
>>>>>> bounces
>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> then as
>>>>>>>>>>>>>>>>>>> each task successfully renames its prepare directories
>> and
>>>>>>>>>> deletes
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>>>> active task directories, normal processing of records
>>>>>> resumes.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The basic idea is to do an online upgrade to avoid
>> downtime.
>>>>> We
>>>>>>>>>> can
>>>>>>>>>>>>>>>>> discuss to offer both options... For the offline upgrade
>>>>>> option,
>>>>>>>>>> we
>>>>>>>>>>>>>>>>> could simplify user interaction and trigger the second
>>>>>> rebalance
>>>>>>>>>>>>>>>>> automatically with the requirement that a user needs to
>>>>> update
>>>>>>>> any
>>>>>>>>>>>>>>>> config.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If might actually be worth to include this option: we know
>>>>> from
>>>>>>>>>>>>>>>>> experience with state restore, that regular processing
>> slows
>>>>>> down
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> restore. For roll_over upgrade, it would be a different
>> story
>>>>>> and
>>>>>>>>>>>>>>>>> upgrade should not be slowed down by regular processing.
>>>>> Thus,
>>>>>> we
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>> even make in_place an offline upgrade and force people to
>> use
>>>>>>>>>>>>> roll_over
>>>>>>>>>>>>>>>>> if they need onlint upgrade. Might be a fair tradeoff that
>>>>> may
>>>>>>>>>>>>> simplify
>>>>>>>>>>>>>>>>> the upgrade for the user and for the code complexity.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Let's see what other think.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote:
>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the update and the working prototype, it helps
>>>>> with
>>>>>>>>>>>>>>>>>> understanding the KIP.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I took an initial pass over this PR, and overall I find
>> the
>>>>>>>>>>>>> interfaces
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> approach to be reasonable.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing
>> to
>>>>>>>> watch
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a
>> type
>>>>>> of
>>>>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new stores
>>>>>> have
>>>>>>>>>>>>>> reached
>>>>>>>>>>>>>>>>>> parity with the existing old stores and that could be the
>>>>>> signal
>>>>>>>>>> to
>>>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>>> second rolling rebalance?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Although you solicited feedback on the interfaces
>> involved,
>>>>> I
>>>>>>>>>>> wanted
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> put
>>>>>>>>>>>>>>>>>> down some thoughts that have come to mind reviewing this
>> KIP
>>>>>>>>>> again
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the
>> process,
>>>>>>>>>> would
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>> the other instances to complete or just fail the entire
>>>>>> upgrade?
>>>>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename
>>>>> the
>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>> active directories vs. deleting them right away,  and when
>>>>> all
>>>>>>>>>> the
>>>>>>>>>>>>>>>>> prepare
>>>>>>>>>>>>>>>>>> task directories are successfully migrated then delete the
>>>>>>>>>> previous
>>>>>>>>>>>>>>>>> active
>>>>>>>>>>>>>>>>>> ones.
>>>>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing
>> any
>>>>>> new
>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all
>>>>> prepare
>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling
>>>>>> bounces
>>>>>>>>>> and
>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>> each task successfully renames its prepare directories and
>>>>>>>>>> deletes
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>>> active task directories, normal processing of records
>>>>> resumes.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax <
>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in
>>>>> AK
>>>>>>>> 2.0
>>>>>>>>>>>>>>>>>>> release. Thus, I want to pick up this KIP again to get
>> the
>>>>>>>>>> RocksDB
>>>>>>>>>>>>>>>>>>> upgrade done for 2.1.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I updated the KIP accordingly and also have a "prove of
>>>>>>>> concept"
>>>>>>>>>>> PR
>>>>>>>>>>>>>>>>>>> ready (for "in place" upgrade only):
>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5422/
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> There a still open questions, but I want to collect early
>>>>>>>>>> feedback
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> the proposed interfaces we need for the store upgrade.
>> Also
>>>>>>>>>> note,
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> the KIP now also aim to define a generic upgrade path
>> from
>>>>>> any
>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>> format A to any other store format B. Adding timestamps
>> is
>>>>>> just
>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> special case.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I will continue to work on the PR and refine the KIP in
>> the
>>>>>>>>>>>>> meantime,
>>>>>>>>>>>>>>>>> too.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote:
>>>>>>>>>>>>>>>>>>>> After some more thoughts, I want to follow John's
>>>>> suggestion
>>>>>>>>>> and
>>>>>>>>>>>>>>>> split
>>>>>>>>>>>>>>>>>>>> upgrading the rebalance metadata from the store upgrade.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I extracted the metadata upgrade into it's own KIP:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%
>>>>>>>>>>>>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I'll update this KIP accordingly shortly. I also want to
>>>>>>>>>> consider
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> make the store format upgrade more flexible/generic.
>> Atm,
>>>>>> the
>>>>>>>>>> KIP
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> too
>>>>>>>>>>>>>>>>>>>> much tailored to the DSL IMHO and does not encounter
>> PAPI
>>>>>>>> users
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> should not force to upgrade the stores. I need to figure
>>>>> out
>>>>>>>>>> the
>>>>>>>>>>>>>>>>> details
>>>>>>>>>>>>>>>>>>>> and follow up later.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Please give feedback for the new KIP-268 on the
>>>>>> corresponding
>>>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>> thread.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't
>>>>>> figure
>>>>>>>>>> out
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>>>>> for a single rolling bounce upgrade. But KIP-268
>> proposes
>>>>> a
>>>>>>>> fix
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> future upgrades. Please share your thoughts.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for all your feedback!
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
>>>>>>>>>>>>>>>>>>>>> @John: yes, we would throw if configs are missing (it's
>>>>> an
>>>>>>>>>>>>>>>>>>>>> implementation details IMHO and thus I did not include
>> it
>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>>>>>>> KIP)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> @Guozhang:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 1) I understand know what you mean. We can certainly,
>>>>> allow
>>>>>>>>>> all
>>>>>>>>>>>>>>>> values
>>>>>>>>>>>>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for
>>>>>>>>>>>>> `upgrade.from`
>>>>>>>>>>>>>>>>>>>>> parameter. I had a similar though once but decided to
>>>>>>>> collapse
>>>>>>>>>>>>> them
>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>>> one -- will update the KIP accordingly.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 2) The idea to avoid any config would be, to always
>> send
>>>>>> both
>>>>>>>>>>>>>>>> request.
>>>>>>>>>>>>>>>>>>>>> If we add a config to eventually disable the old
>> request,
>>>>>> we
>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>> gain
>>>>>>>>>>>>>>>>>>>>> anything with this approach. The question is really, if
>>>>> we
>>>>>>>> are
>>>>>>>>>>>>>>>> willing
>>>>>>>>>>>>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be
>>>>>> limited
>>>>>>>>>> to
>>>>>>>>>>>>> 2
>>>>>>>>>>>>>>>>>>>>> versions and not grow further in future releases. More
>>>>>>>> details
>>>>>>>>>>> in
>>>>>>>>>>>>>>>> (3)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases
>> and
>>>>>>>>>> allows
>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> stay with 2 "assignment strategies" we need to
>> register,
>>>>> as
>>>>>>>>>> the
>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>> assignment strategy will allow to "upgrade itself" via
>>>>>>>>>> "version
>>>>>>>>>>>>>>>>>>>>> probing". Thus, (2) would only be a workaround to
>> avoid a
>>>>>>>>>> config
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>> people upgrade from pre-1.2 releases.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thus, I don't think we need to register new "assignment
>>>>>>>>>>>>> strategies"
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> send empty subscriptions for older version.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 4) I agree that this is a tricky thing to get right
>> with
>>>>> a
>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>> rebalance. I share the concern that an application
>> might
>>>>>>>> never
>>>>>>>>>>>>>> catch
>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>>> and thus the hot standby will never be ready.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Maybe it's better to go with 2 rebalances for store
>>>>>> upgrades.
>>>>>>>>>> If
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>>> this, we also don't need to go with (2) and can get (3)
>>>>> in
>>>>>>>>>> place
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>> future upgrades. I also think that changes to the
>>>>> metadata
>>>>>>>> are
>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>> likely and thus allowing for single rolling bounce for
>>>>> this
>>>>>>>>>> case
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>> important anyway. If we assume that store upgrade a
>> rare,
>>>>>> it
>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> ok
>>>>>>>>>>>>>>>>>>>>> to sacrifice two rolling bounced for this case. It was
>>>>> just
>>>>>>>> an
>>>>>>>>>>>>> idea
>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>> wanted to share (even if I see the issues).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for your replies.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 1) About the config names: actually I was trying to
>> not
>>>>>>>>>> expose
>>>>>>>>>>>>>>>>>>>>>> implementation details :) My main concern was that in
>>>>> your
>>>>>>>>>>>>>> proposal
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> values need to cover the span of all the versions that
>>>>> are
>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a
>>>>>> user)
>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>> upgrading
>>>>>>>>>>>>>>>>>>>>>> from any versions within this range I need to remember
>>>>> to
>>>>>>>> use
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> value
>>>>>>>>>>>>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version.
>> In
>>>>>> my
>>>>>>>>>>>>>>>>> suggestion
>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>> was trying to argue the benefit of just letting users
>> to
>>>>>>>>>>> specify
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>>>>>>>>> Kafka version she's trying to upgrade from, than
>>>>>> specifying
>>>>>>>> a
>>>>>>>>>>>>>> range
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc
>>>>> as
>>>>>>>> the
>>>>>>>>>>>>>>>> values,
>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>> still using Kafka versions like broker's
>>>>>> `internal.version`
>>>>>>>>>>>>>> config.
>>>>>>>>>>>>>>>>>>> But if
>>>>>>>>>>>>>>>>>>>>>> you were suggesting the same thing, i.e. by
>>>>>> "0.10.1.x-1.1.x"
>>>>>>>>>>> you
>>>>>>>>>>>>>>>>> meant
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or
>>>>>> "0.11.0"
>>>>>>>>>> or
>>>>>>>>>>>>>>>> "1.1"
>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>> are all recognizable config values then I think we are
>>>>>>>>>> actually
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>> page.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would
>>>>>> increase
>>>>>>>>>> the
>>>>>>>>>>>>>>>>> network
>>>>>>>>>>>>>>>>>>>>>> footprint, but not the message size, IF I'm not
>>>>>>>>>>>>> mis-understanding
>>>>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>>>>> idea
>>>>>>>>>>>>>>>>>>>>>> of registering multiple assignment. More details:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can
>>>>>>>> encode
>>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>>>>>> protocols each with their different metadata. The
>>>>>>>> coordinator
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> pick the
>>>>>>>>>>>>>>>>>>>>>> common one that everyone supports (if there are no
>>>>> common
>>>>>>>>>> one,
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>>>>>>>> an error back; if there are multiple ones, it will
>> pick
>>>>>> the
>>>>>>>>>> one
>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded
>>>>>> list).
>>>>>>>>>>>>> Since
>>>>>>>>>>>>>>>> our
>>>>>>>>>>>>>>>>>>>>>> current Streams rebalance protocol is still based on
>> the
>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>>>>>> coordinator, it means our protocol_type would be
>>>>>> "consumer",
>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>>>>>>>>> the protocol type we can have multiple protocols like
>>>>>>>>>>> "streams",
>>>>>>>>>>>>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that
>> we
>>>>>> need
>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> implement a
>>>>>>>>>>>>>>>>>>>>>> different assignor class for each version and register
>>>>> all
>>>>>>>> of
>>>>>>>>>>>>> them
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In
>> the
>>>>>>>>>> future
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> re-factor our implementation to have our own client
>>>>>>>>>> coordinator
>>>>>>>>>>>>>>>> layer
>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>> Connect did, we can simplify this part of the
>>>>>>>> implementation.
>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> now with the above approach this is still doable.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On the broker side, the group coordinator will only
>>>>>> persist
>>>>>>>> a
>>>>>>>>>>>>>> group
>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>> the selected protocol and its subscription metadata,
>>>>> e.g.
>>>>>> if
>>>>>>>>>>>>>>>>>>> coordinator
>>>>>>>>>>>>>>>>>>>>>> decides to pick "streams_v2" it will only sends that
>>>>>>>>>> protocol's
>>>>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>>> from everyone to the leader to assign, AND when
>>>>> completing
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> rebalance it
>>>>>>>>>>>>>>>>>>>>>> will also only write the group metadata with that
>>>>> protocol
>>>>>>>>>> and
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> assignment only. In a word, although the network
>> traffic
>>>>>>>>>> maybe
>>>>>>>>>>>>>>>>>>> increased a
>>>>>>>>>>>>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One
>>>>> corner
>>>>>>>>>>>>>>>> situation
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>> need to consider is how to stop registering very old
>>>>>>>>>> assignors
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> avoid the
>>>>>>>>>>>>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if
>>>>> you
>>>>>>>> are
>>>>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register
>> v1
>>>>>>>>>>>>> assignor
>>>>>>>>>>>>>>>>>>> anymore,
>>>>>>>>>>>>>>>>>>>>>> but that would unfortunately still require some
>> configs.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 3) About the  "version probing" idea, I think that's a
>>>>>>>>>>> promising
>>>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>>>>>> as well, but if we are going to do the
>> multi-assignment
>>>>>> its
>>>>>>>>>>>>> value
>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on
>> top
>>>>> of
>>>>>>>>>>>>>>>>>>> multi-assignment
>>>>>>>>>>>>>>>>>>>>>> to save us from still requiring the config to avoid
>>>>>>>>>> registering
>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> metadata for all version. More details:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, we still register all the
>>>>>> assignor
>>>>>>>>>> but
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> all old
>>>>>>>>>>>>>>>>>>>>>> assignors we do not encode any metadata, i.e. the
>>>>> encoded
>>>>>>>>>> data
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> be:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> "streams_vN" : "encoded metadata"
>>>>>>>>>>>>>>>>>>>>>> "streams_vN-1":empty
>>>>>>>>>>>>>>>>>>>>>> "streams_vN-2":empty
>>>>>>>>>>>>>>>>>>>>>> ..
>>>>>>>>>>>>>>>>>>>>>> "streams_0":empty
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> So the coordinator can still safely choose the latest
>>>>>> common
>>>>>>>>>>>>>>>> version;
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> then when leaders receive the subscription (note it
>>>>> should
>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>> recognize
>>>>>>>>>>>>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of
>>>>> the
>>>>>>>>>>>>>>>>>>> subscriptions
>>>>>>>>>>>>>>>>>>>>>> are empty bytes, it will send the empty assignment
>> with
>>>>>> that
>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>> number
>>>>>>>>>>>>>>>>>>>>>> encoded in the metadata. So in the second
>> auto-triggered
>>>>>> all
>>>>>>>>>>>>>>>> members
>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>> send the metadata with that version:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> "streams_vN" : empty
>>>>>>>>>>>>>>>>>>>>>> "streams_vN-1" : empty
>>>>>>>>>>>>>>>>>>>>>> "streams_vN-2" : "encoded metadata"
>>>>>>>>>>>>>>>>>>>>>> ..
>>>>>>>>>>>>>>>>>>>>>> "streams_0":empty
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> By doing this we would not require any configs for
>>>>> users.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not
>>>>> clear
>>>>>>>>>> about
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> details
>>>>>>>>>>>>>>>>>>>>>> so probably we'd need to fill that out before making a
>>>>>> call.
>>>>>>>>>>> For
>>>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>>>>>>>>>> you mentioned "If we detect this situation, the
>> Streams
>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>>> closes
>>>>>>>>>>>>>>>>>>>>>> corresponding active tasks as well as "hot standby"
>>>>> tasks,
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> re-creates
>>>>>>>>>>>>>>>>>>>>>> the new active tasks using the new store." How could
>> we
>>>>>>>>>>>>> guarantee
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> gap between these two stores will keep decreasing than
>>>>>>>>>>>>> increasing
>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>> we'll
>>>>>>>>>>>>>>>>>>>>>> eventually achieve the flip point? And also the longer
>>>>> we
>>>>>>>> are
>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> flip point, the larger we are doubling the storage
>>>>> space,
>>>>>>>>>> etc.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> @John, Guozhang,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> thanks a lot for your comments. Very long reply...
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> About upgrading the rebalance metadata:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Another possibility to do this, would be to register
>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>>> assignment
>>>>>>>>>>>>>>>>>>>>>>> strategies for the 1.2 applications. For this case,
>> new
>>>>>>>>>>>>> instances
>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>> be configured to support both and the broker would
>> pick
>>>>>> the
>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>> all instances understand. The disadvantage would be,
>>>>> that
>>>>>>>> we
>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>>> much
>>>>>>>>>>>>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance
>> as
>>>>>> long
>>>>>>>>>> as
>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus,
>>>>> using
>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>>>>>>> would allow to avoid a second rebalance trading-off
>> an
>>>>>>>>>>>>> increased
>>>>>>>>>>>>>>>>>>>>>>> rebalance network footprint (I also assume that this
>>>>>> would
>>>>>>>>>>>>>>>> increase
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> message size that is written into __consumer_offsets
>>>>>>>>>> topic?).
>>>>>>>>>>>>>>>>>>> Overall, I
>>>>>>>>>>>>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it
>>>>>> could
>>>>>>>>>>>>> avoid
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>> second rebalance (I have some more thoughts about
>>>>> stores
>>>>>>>>>> below
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>> relevant for single rebalance upgrade).
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> For future upgrades we might be able to fix this
>>>>> though.
>>>>>> I
>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>> thinking
>>>>>>>>>>>>>>>>>>>>>>> about the following:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> In the current implementation, the leader fails if it
>>>>>> gets
>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>> subscription it does not understand (ie, newer
>>>>> version).
>>>>>> We
>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>>>>>> this behavior and let the leader send an empty
>>>>> assignment
>>>>>>>>>> plus
>>>>>>>>>>>>>>>> error
>>>>>>>>>>>>>>>>>>>>>>> code (including supported version) back to the
>> instance
>>>>>>>>>>> sending
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> "bad" subscription. This would allow the following
>>>>> logic
>>>>>>>> for
>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>> application instance:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>  - on startup, always send the latest subscription
>>>>> format
>>>>>>>>>>>>>>>>>>>>>>>  - if leader understands it, we get an assignment
>> back
>>>>> an
>>>>>>>>>>> start
>>>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>>>  - if leader does not understand it, we get an empty
>>>>>>>>>>> assignment
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> supported version back
>>>>>>>>>>>>>>>>>>>>>>>  - the application unsubscribe()/subscribe()/poll()
>>>>> again
>>>>>>>>>> and
>>>>>>>>>>>>>>>>> sends a
>>>>>>>>>>>>>>>>>>>>>>> subscription using the leader's supported version
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> This protocol would allow to do a single rolling
>>>>> bounce,
>>>>>>>> and
>>>>>>>>>>>>>>>>>>> implements
>>>>>>>>>>>>>>>>>>>>>>> a "version probing" step, that might result in two
>>>>>> executed
>>>>>>>>>>>>>>>>>>> rebalances.
>>>>>>>>>>>>>>>>>>>>>>> The advantage would be, that the user does not need
>> to
>>>>>> set
>>>>>>>>>> any
>>>>>>>>>>>>>>>>> configs
>>>>>>>>>>>>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care
>>>>> of
>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> automatically.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> One disadvantage would be, that two rebalances happen
>>>>> and
>>>>>>>>>> that
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>> error case during rebalance, we loose the information
>>>>>> about
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> supported leader version and the "probing step" would
>>>>>>>>>> happen a
>>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>>>> time.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> If the leader is eventually updated, it will include
>>>>> it's
>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>>>>> version in all assignments, to allow a "down graded"
>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> upgrade its version later. Also, if a application
>>>>> fails,
>>>>>>>> the
>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>>>>>> probing would always be successful and only a single
>>>>>>>>>> rebalance
>>>>>>>>>>>>>>>>>>> happens.
>>>>>>>>>>>>>>>>>>>>>>> If we use this protocol, I think we don't need any
>>>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>>>>>>>> parameter for future upgrades.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version":
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Users would set "upgrade.from" to the release version
>>>>> the
>>>>>>>>>>>>>>>>> current/old
>>>>>>>>>>>>>>>>>>>>>>> application is using. I think this is simpler, as
>> users
>>>>>>>> know
>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> version. If we use "internal.protocol.version"
>> instead,
>>>>>> we
>>>>>>>>>>>>> expose
>>>>>>>>>>>>>>>>>>>>>>> implementation details and users need to know the
>>>>>> protocol
>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>> (ie,
>>>>>>>>>>>>>>>>>>>>>>> they need to map from the release version to the
>>>>> protocol
>>>>>>>>>>>>>> version;
>>>>>>>>>>>>>>>>> ie,
>>>>>>>>>>>>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol
>>>>> version
>>>>>>>>>> 2").
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Also the KIP states that for the second rolling
>> bounce,
>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` --
>>>>> and
>>>>>>>>>>> thus,
>>>>>>>>>>>>>>>>>>>>>>> "upgrade.from" would not have any effect and is
>> ignored
>>>>>> (I
>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> update
>>>>>>>>>>>>>>>>>>>>>>> the KIP to point out this dependency).
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> About your second point: I'll update the KIP
>>>>> accordingly
>>>>>> to
>>>>>>>>>>>>>>>> describe
>>>>>>>>>>>>>>>>>>>>>>> future updates as well. Both will be different.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> One more point about upgrading the store format. I
>> was
>>>>>>>>>>> thinking
>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>> avoiding the second rolling bounce all together in
>> the
>>>>>>>>>> future:
>>>>>>>>>>>>>> (1)
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2)
>>>>> this
>>>>>>>>>>>>>> required
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the
>>>>>>>> switch
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> delete
>>>>>>>>>>>>>>>>>>>>>>> the old stores. (3) the current proposal does the
>>>>> switch
>>>>>>>>>>>>>>>> "globally"
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>> this is simpler and due to the required second
>>>>> rebalance
>>>>>> no
>>>>>>>>>>>>>>>>>>> disadvantage.
>>>>>>>>>>>>>>>>>>>>>>> However, a global consistent switch over might
>> actually
>>>>>> not
>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> required.
>>>>>>>>>>>>>>>>>>>>>>> For "in_place" upgrade, following the protocol from
>>>>>> above,
>>>>>>>>>> we
>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>> decouple the store switch and each instance could
>>>>> switch
>>>>>>>> its
>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>>>>>> independently from all other instances. After the
>>>>> rolling
>>>>>>>>>>>>> bounce,
>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>> seems to be ok to switch from the old store to the
>> new
>>>>>>>> store
>>>>>>>>>>>>>>>> "under
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> hood" whenever the new store is ready (this could
>> even
>>>>> be
>>>>>>>>>>> done,
>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>>>> we switch to the new metadata version). Each time we
>>>>>> update
>>>>>>>>>>> the
>>>>>>>>>>>>>>>> "hot
>>>>>>>>>>>>>>>>>>>>>>> standby" we check if it reached the "endOffset"  (or
>>>>>> maybe
>>>>>>>>>> X%
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>> either be hardcoded or configurable). If we detect
>> this
>>>>>>>>>>>>>> situation,
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> Streams application closes corresponding active tasks
>>>>> as
>>>>>>>>>> well
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> "hot
>>>>>>>>>>>>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks
>>>>> using
>>>>>>>>>> the
>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>>>>>>>>>> (I need to go through the details once again, but it
>>>>>> seems
>>>>>>>>>> to
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> feasible.).
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Combining this strategy with the "multiple
>> assignment"
>>>>>>>> idea,
>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from
>>>>> 1.1
>>>>>>>> ->
>>>>>>>>>>>>> 1.2.
>>>>>>>>>>>>>>>>>>>>>>> Applications would just use the old store, as long as
>>>>> the
>>>>>>>>>> new
>>>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> not ready, even if the new metadata version is used
>>>>>>>> already.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> For future upgrades, a single rebalance would be
>>>>>>>> sufficient,
>>>>>>>>>>>>> too,
>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>>>> if the stores are upgraded. We would not need any
>>>>> config
>>>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>> the "probe" step allows us to detect the supported
>>>>>>>> rebalance
>>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>>>> version (and we would also not need multiple
>>>>> "assigmnent
>>>>>>>>>>>>>>>> strategies"
>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>> out own protocol encoded everything we need).
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Let me know what you think.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>>>>>>>>> @John:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> For the protocol version upgrade, it is only for the
>>>>>>>>>> encoded
>>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>>>> bytes
>>>>>>>>>>>>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from
>>>>>>>> Consumer's
>>>>>>>>>>>>> pov,
>>>>>>>>>>>>>>>>> so I
>>>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>>>> this change should be in the Streams layer as well.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> @Matthias:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> for 2), I agree that adding a "newest supported
>>>>> version"
>>>>>>>>>>>>> besides
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> "currently used version for encoding" is a good idea
>>>>> to
>>>>>>>>>> allow
>>>>>>>>>>>>>>>>> either
>>>>>>>>>>>>>>>>>>>>>>> case;
>>>>>>>>>>>>>>>>>>>>>>>> the key is that in Streams we would likely end up
>>>>> with a
>>>>>>>>>>>>> mapping
>>>>>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>>>>>>>>>>> protocol version to the other persistent data format
>>>>>>>>>> versions
>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can
>> actually
>>>>>>>>>>> achieve
>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the
>> upgraded
>>>>>>>>>>> protocol
>>>>>>>>>>>>>>>>>>> version's
>>>>>>>>>>>>>>>>>>>>>>>> corresponding data format does not change, e.g.
>> 0.10.0
>>>>>> ->
>>>>>>>>>>>>> 0.10.1
>>>>>>>>>>>>>>>>>>> leaders
>>>>>>>>>>>>>>>>>>>>>>>> can choose to use the newer version in the first
>>>>> rolling
>>>>>>>>>>>>> bounce
>>>>>>>>>>>>>>>>>>> directly
>>>>>>>>>>>>>>>>>>>>>>>> and we can document to users that they would not
>> need
>>>>> to
>>>>>>>>>> set
>>>>>>>>>>>>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the
>>>>>> upgraded
>>>>>>>>>>>>>>>> protocol
>>>>>>>>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 ->
>>>>> 1.2,
>>>>>>>> and
>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the
>>>>>> first
>>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>> bounce
>>>>>>>>>>>>>>>>>>>>>>>> and reset in the second.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Besides that, some additional comments:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive
>> for
>>>>>>>> users
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>>>> "internal.protocol.version" where for the latter
>> users
>>>>>>>> only
>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> set a
>>>>>>>>>>>>>>>>>>>>>>>> single version, while the Streams will map that
>>>>> version
>>>>>> to
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>>>>>>>> assignor's behavior as well as the data format. But
>>>>>> maybe
>>>>>>>> I
>>>>>>>>>>>>> did
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>>>> your idea about how the  "upgrade.from" config will
>> be
>>>>>>>> set,
>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>> your Compatibility section how the upgrade.from
>> config
>>>>>>>> will
>>>>>>>>>>> be
>>>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> these two rolling bounces are not very clear: for
>>>>>> example,
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>>>> reset it to null in the second rolling bounce?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> 2) In the upgrade path description, rather than
>>>>> talking
>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just
>>>>>>>>>> categorize
>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> possible scenarios, even for future upgrade
>> versions,
>>>>>> what
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> standard operations? The categorized we can
>> summarize
>>>>> to
>>>>>>>>>>> would
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> (assuming
>>>>>>>>>>>>>>>>>>>>>>>> user upgrade from version X to version Y, where X
>> and
>>>>> Y
>>>>>>>> are
>>>>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>>>>>>>>>> versions,
>>>>>>>>>>>>>>>>>>>>>>>> with the corresponding supported protocol version x
>>>>> and
>>>>>>>> y):
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change,
>> and
>>>>>>>>>> hence
>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>>>>>>>>> persistent
>>>>>>>>>>>>>>>>>>>>>>>> data formats have changed.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> b. x != y, but all persistent data format remains
>> the
>>>>>>>> same.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> b. x !=y, AND some persistene data format like
>> RocksDB
>>>>>>>>>>> format,
>>>>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>>>>>> format, has been changed.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> c. special case: we may need some special handling
>>>>> logic
>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>> "current
>>>>>>>>>>>>>>>>>>>>>>>> version" or "newest supported version" are not
>>>>> available
>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> protocol,
>>>>>>>>>>>>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> under the above scenarios, how many rolling bounces
>>>>>> users
>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> execute?
>>>>>>>>>>>>>>>>>>>>>>>> how they should set the configs in each rolling
>>>>> bounce?
>>>>>>>> and
>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>>>>>>>> library will execute in these cases?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Ted,
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I still consider changing the KIP to include it
>> right
>>>>>>>> away
>>>>>>>>>>> --
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> not,
>>>>>>>>>>>>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in
>> more
>>>>>>>>>> detail
>>>>>>>>>>>>>>>> first.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> (Same for other open questions like interface names
>>>>> --
>>>>>> I
>>>>>>>>>>>>>> collect
>>>>>>>>>>>>>>>>>>>>>>>>> feedback and update the KIP after we reach
>> consensus
>>>>>> :))
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the details, Matthias.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future
>>>>>>>>>> release,
>>>>>>>>>>>>>>>>> encoding
>>>>>>>>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>>>>>>>>>> and supported version might be an advantage
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be
>>>>>>>> implemented
>>>>>>>>>>> in
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding
>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part
>> of
>>>>>>>> user
>>>>>>>>>>>>>> code.
>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>>>>>>>> if we want to add something like this, I would
>>>>> prefer
>>>>>>>> to
>>>>>>>>>>> do
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>> separate KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling
>>>>>> bounce.
>>>>>>>>>> But
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>>>> understanding it would not be possible.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is
>> indeed
>>>>> to
>>>>>>>>>>>>> switch
>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to
>>>>>> switch
>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>>>>>>>>>>>> store to the new store (this happens after the
>> last
>>>>>>>>>>>>> instance
>>>>>>>>>>>>>>>>>>> bounces a
>>>>>>>>>>>>>>>>>>>>>>>>>>> second time).
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is,
>>>>>> that
>>>>>>>>>>> it's
>>>>>>>>>>>>>>>>>>> unclear
>>>>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>>>> to which from version 2 to version 3. The
>>>>>>>>>>>>>>>>>>> StreamsPartitionsAssignor is
>>>>>>>>>>>>>>>>>>>>>>>>>>> stateless by design, and thus, the information
>>>>> which
>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>> use must be passed in from externally -- and we
>>>>> want
>>>>>> to
>>>>>>>>>>> use
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsConfig to pass in this information.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> During upgrade, all new instanced have no
>>>>> information
>>>>>>>>>>> about
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> progress
>>>>>>>>>>>>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got
>>>>>>>>>> upgrades
>>>>>>>>>>>>>>>>>>> already).
>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a
>>>>> version 3
>>>>>>>>>>>>>>>>>>> subscription.
>>>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>>>> leader also has this limited view on the world
>> and
>>>>>> can
>>>>>>>>>>> only
>>>>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>>>>>>>>> 2 assignments back.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can
>>>>>>>> simplify
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> upgrade.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> We did consider to change the metadata to make
>>>>> later
>>>>>>>>>>>>> upgrades
>>>>>>>>>>>>>>>>> (ie,
>>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we
>> change
>>>>>> the
>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>>> storage format again -- as long as we don't
>> change
>>>>>> it,
>>>>>>>> a
>>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version"
>>>>> and
>>>>>>>>>>>>>>>> "supported
>>>>>>>>>>>>>>>>>>>>>>>>>>> version". This would allow the leader to switch
>> to
>>>>>> the
>>>>>>>>>> new
>>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader
>>>>> would
>>>>>>>>>>>>> receive
>>>>>>>>>>>>>>>>> "used
>>>>>>>>>>>>>>>>>>>>>>>>>>> version == old" and "supported version = old/new"
>>>>> --
>>>>>> as
>>>>>>>>>>>>> long
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>>>> least
>>>>>>>>>>>>>>>>>>>>>>>>>>> one instance sends a "supported version = old"
>>>>> leader
>>>>>>>>>>> sends
>>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>>>>>>>>> assignment back. However, encoding both version
>>>>> would
>>>>>>>>>>> allow
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> leader can send a new version assignment back,
>>>>> right
>>>>>>>>>> after
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>>>>>>>>>> round or rebalance finished (all instances send
>>>>>>>>>> "supported
>>>>>>>>>>>>>>>>>>> version =
>>>>>>>>>>>>>>>>>>>>>>>>>>> new"). However, there are still two issues with
>>>>> this:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the
>>>>>> last
>>>>>>>>>>>>>>>> instance
>>>>>>>>>>>>>>>>>>>>>>> bounced,
>>>>>>>>>>>>>>>>>>>>>>>>>>> the new stores might not be ready to be used --
>>>>> this
>>>>>>>>>> could
>>>>>>>>>>>>>>>> lead
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> "downtime" as store must be restored before
>>>>>> processing
>>>>>>>>>> can
>>>>>>>>>>>>>>>>> resume.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted
>> again.
>>>>>> At
>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> point, the
>>>>>>>>>>>>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled
>> and
>>>>>>>> thus
>>>>>>>>>>>>>> sends
>>>>>>>>>>>>>>>>>>> the old
>>>>>>>>>>>>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to
>>>>>> never
>>>>>>>>>>> fall
>>>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> old protocol after the switch to the new
>> protocol.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> The second issue is minor and I guess if users
>>>>> set-up
>>>>>>>>>> the
>>>>>>>>>>>>>>>>> instance
>>>>>>>>>>>>>>>>>>>>>>>>>>> properly it could be avoided. However, the first
>>>>>> issue
>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> prevent
>>>>>>>>>>>>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we
>>>>>>>>>> consider
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>>>>>>> change the metadata protocol only if a future
>>>>>> release,
>>>>>>>>>>>>>>>> encoding
>>>>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>>>>>>> used and supported version might be an advantage
>> in
>>>>>> the
>>>>>>>>>>>>>> future
>>>>>>>>>>>>>>>>>>> and we
>>>>>>>>>>>>>>>>>>>>>>>>>>> could consider to add this information in 1.2
>>>>> release
>>>>>>>> to
>>>>>>>>>>>>>>>> prepare
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to
>>>>>> give
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> instances
>>>>>>>>>>>>>>>>>>>>>>>>>>> enough time to prepare the stores in new format.
>> If
>>>>>> you
>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> second rolling bounce before this, it would still
>>>>>> work
>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> however, you
>>>>>>>>>>>>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be
>>>>>> fully
>>>>>>>>>>>>>>>> restored
>>>>>>>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>>>>>>>> processing can resume.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Does this make sense?
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get
>>>>> rid
>>>>>>>> of
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> 2nd
>>>>>>>>>>>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>>>>>>>>>> bounce?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary
>>>>>>>>>>> difference
>>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is
>> to
>>>>>>>>>> decide
>>>>>>>>>>>>>>>>>>> whether to
>>>>>>>>>>>>>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3.
>>>>>>>>>>>>> (Actually,
>>>>>>>>>>>>>>>>>>> there is
>>>>>>>>>>>>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says
>>>>>> that
>>>>>>>>>> the
>>>>>>>>>>>>>> 2nd
>>>>>>>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>>>>>>>>>> bounce should happen after all new state stores
>> are
>>>>>>>>>>> created
>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> background thread. However, within the 2nd
>> rolling
>>>>>>>>>> bounce,
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> say
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>> there is still a background thread, so it seems
>>>>> like
>>>>>> is
>>>>>>>>>> no
>>>>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>>>>>>>>>>>>>> requirement to wait for the new state stores to
>> be
>>>>>>>>>>>>> created.)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal
>>>>>> with
>>>>>>>>>>>>>>>>> mixed-mode
>>>>>>>>>>>>>>>>>>>>>>>>> (having
>>>>>>>>>>>>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer
>>>>>>>>>> group).
>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic
>>>>>>>>>>>>>>>>>>> (somehow/somewhere)
>>>>>>>>>>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>>>>>>>>>>>> that:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until
>> all
>>>>>>>>>>>>> instances
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>> running
>>>>>>>>>>>>>>>>>>>>>>>>>>> the new code.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Once all the instances are running the new
>> code,
>>>>>>>> then
>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>> time,
>>>>>>>>>>>>>>>>>>>>>>>>>>> the instances start sending Subscription V3.
>> Leader
>>>>>>>>>> still
>>>>>>>>>>>>>>>> hands
>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores
>>>>> are
>>>>>>>>>>> ready.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Once all instances report that new stores are
>>>>>> ready,
>>>>>>>>>>>>>> Leader
>>>>>>>>>>>>>>>>>>> sends
>>>>>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 3.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment
>> Version
>>>>> 3,
>>>>>>>> it
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>> delete
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> old state store.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a
>> lot
>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2
>>>>> rolling
>>>>>>>>>>>>>>>> restarts.
>>>>>>>>>>>>>>>>> No
>>>>>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just
>>>>> deploy
>>>>>>>>>> it,
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> instances
>>>>>>>>>>>>>>>>>>>>>>>>>>> update themselves.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The thing that made me think of this is that the
>>>>> "2
>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>> bounces"
>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes
>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>> inter.broker.protocol.version and
>>>>>>>>>>>>> log.message.format.version.
>>>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> broker case, it seems like it would be possible
>>>>> (with
>>>>>>>>>> some
>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> course)
>>>>>>>>>>>>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar
>>>>>>>> auto-detection
>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> broker
>>>>>>>>>>>>>>>>>>>>>>>>>>> capabilities and automatically do a switchover
>> from
>>>>>>>>>>> old/new
>>>>>>>>>>>>>>>>>>> versions.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> -James
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <
>>>>>>>>>>>>>> bbej...@gmail.com
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval
>>>>>>>> methods
>>>>>>>>>>> on
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interfaces.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a
>>>>>>>>>>> Predicate
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored
>>>>> with
>>>>>>>> the
>>>>>>>>>>>>>>>> record?
>>>>>>>>>>>>>>>>>>> Or
>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> better left for users to implement themselves
>>>>> once
>>>>>>>> the
>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>> been
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> retrieved?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <
>>>>>>>>>>>>>> yuzhih...@gmail.com
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to
>>>>>> which
>>>>>>>>>>>>>>>>> separator
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>> chosen.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current
>>>>> choice
>>>>>>>> is
>>>>>>>>>>>>>> good.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it
>>>>> is
>>>>>>>>>>> closer
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> English
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> grammar.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people
>>>>>> think.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J.
>> Sax
>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance
>>>>>> metadata
>>>>>>>>>>>>>> upgrade
>>>>>>>>>>>>>>>>> fix
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (addressing the mentioned
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> https://issues.apache.org/jira/browse/KAFKA-6054
>>>>>> )
>>>>>>>>>> It
>>>>>>>>>>>>>>>> give a
>>>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works
>>>>>> including
>>>>>>>>>> a
>>>>>>>>>>>>>>>> system
>>>>>>>>>>>>>>>>>>> test:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are
>>>>> ready.
>>>>>> I
>>>>>>>>>>>>> agree
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to
>>>>>> give
>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> context.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Ted:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the
>>>>>>>>>>>>>>>>>>>>>>> `processing.guarantee`
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and
>>>>>>>>>>>>> `exactly_once`
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>> values.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs
>>>>> dash
>>>>>>>>>> but
>>>>>>>>>>> I
>>>>>>>>>>>>>>>>> prefer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we
>>>>> can
>>>>>>>>>> also
>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>> it to
>>>>>>>>>>>>>>>>>>>>>>>>>>> `-`.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either
>> way
>>>>>> -- I
>>>>>>>>>>>>>>>> stripped
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> `With`
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be
>>>>> good
>>>>>> to
>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>> feedback
>>>>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> others and pick the name the majority
>> prefers.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @John:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it
>>>>> would
>>>>>>>>>> not
>>>>>>>>>>>>>>>> make a
>>>>>>>>>>>>>>>>>>>>>>>>>>> difference.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the
>> two
>>>>>>>>>>> version
>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I
>>>>>> don't
>>>>>>>>>> hit
>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>> issue
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory
>>>>> instead
>>>>>> of
>>>>>>>>>>>>>>>>>>> `rocksdb-v2`
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several
>>>>>> questions
>>>>>>>>>>>>> queued
>>>>>>>>>>>>>>>>> up,
>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> were
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives"
>> section...
>>>>>> oh,
>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state
>>>>>>>>>>> directory
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
>>> /rocksdb/storeName/"
>>>>>> to
>>>>>>>>>>>>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
>>>>>>> /rocksdb-v2/storeName/":
>>>>>>>>>> if
>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>> put the
>>>>>>>>>>>>>>>>>>>>>>>>>>> "v2"
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path
>>>>> (i.e.,
>>>>>>>>>>>>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
>>>>>>>>> /rocksdb/storeName-v2/"),
>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level
>>>>>> directory
>>>>>>>>>>>>>>>>> structure.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine
>> people
>>>>>>>>>> running
>>>>>>>>>>>>>>>>>>> scripts to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> monitor
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other
>>>>> such
>>>>>>>> use
>>>>>>>>>>>>>>>> cases.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <
>>>>>>>>>>>>>>>>> yuzhih...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ?
>>>>>> Underscore
>>>>>>>>>> may
>>>>>>>>>>>>>>>>>>> sometimes
>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>> miss
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more
>>>>>>>> friendly
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> user.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface
>>>>>>>> ReadOnlyKeyValueTimestampStore<K,
>>>>>>>>>>> V>
>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp
>> better
>>>>>> name
>>>>>>>>>>> for
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> class ?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang
>>>>> Wang <
>>>>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch
>> section
>>>>>> and
>>>>>>>>>> it
>>>>>>>>>>>>>>>> looks
>>>>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> me,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you
>> also
>>>>>>>> share
>>>>>>>>>>> it
>>>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> people
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can take a look?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs
>>>>> like
>>>>>>>>>> this
>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it
>>>>> is
>>>>>>>>>>> better
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the
>>>>>> design
>>>>>>>>>>>>>>>>>>> discussion :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias
>> J.
>>>>>> Sax
>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams
>>>>> API
>>>>>>>> to
>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>>> storing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is
>> the
>>>>>>>> basis
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> resolve
>>>>>>>>>>>>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about
>>>>> this!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to