I also want to point out, that Ryanne Dolan commented on the WIP PR
(https://github.com/apache/kafka/pull/6044) about the naming. I asked
him to reply to this thread, but this did no happen yet, thus I want to
point it out myself, because it seems to important.


WindowWithTimestampStore.java

> Hard to parse this class name -- sounds like "trait Window with 
> TimestampStore" instead of WindowStore<... ValueAndTimestamp...>
> 
> Maybe TimestampWindowStore or TimestampedWindowStore? shrug.
> 



Another comment, that I personally do not necessarily agree:


KeyValueWithTimestampStore.java

> possible alternative: implement KeyValueStore<K, V>, and then expose an 
> additional putWithTimestamp, getWithTimestamp etc for callers that want 
> ValueAndTimestamp<V> instead of V. This would probably require fewer code 
> changes elsewhere.



What do you think?


-Matthias



On 1/12/19 6:43 PM, Matthias J. Sax wrote:
> Bill,
> 
> I left the question about legacy column family out, because as a matter
> of fact, we use the default column family atm that cannot be deleted.
> Thus, this old column family will always be there.
> 
> Nevertheless, as an implementation detail, it might make sense to avoid
> accessing both column families forever (ie, each time a key is not
> found). Also, we might want/need a way, to "force" upgrading to the new
> column family, for the case that some records are not accessed for a
> long time. Again, this seems to be an implementation detail (and I am
> also not sure if we really need it). If you thing both are not
> implementation details, I can of course extend the KIP accordingly.
> 
> 
> -Matthias
> 
> On 1/11/19 1:27 PM, Bill Bejeck wrote:
>> Hi Matthias,
>>
>> Thanks for the KIP, it goes into good detail and is well done.
>>
>> Overall I'm a +1 on the KIP and have one minor question.
>>
>> Regarding the upgrade path, we'll use two column families to do a lazy
>> conversion which makes sense to me.  What is the plan to get
>> rid of the "legacy" column family (if ever)?  Would we drop the "legacy"
>> column family once it is empty? I'm not sure we'd ever need to as it would
>> just be a column family that doesn't get used.
>>
>> Maybe this is an implementation detail and doesn't need to be addressed
>> now, but it came to mind when I read the KIP.
>>
>> Thanks again,
>> Bill
>>
>> On Fri, Jan 11, 2019 at 1:19 PM John Roesler <j...@confluent.io> wrote:
>>
>>> Hi Matthias,
>>>
>>> Thanks for the updates to the KIP. I've just read it over, and am
>>> personally quite happy with it.
>>>
>>> Thanks for tackling this dicey issue and putting in a huge amount of design
>>> work to produce
>>> a smooth upgrade path for DSL users.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Mon, Dec 17, 2018 at 10:35 AM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Dear all,
>>>>
>>>> I finally managed to update the KIP.
>>>>
>>>> To address the concerns about the complex upgrade path, I simplified the
>>>> design. We don't need any configs and the upgrade can be done with the
>>>> simple single rolling bounce pattern.
>>>>
>>>> The suggestion is to exploit RocksDB column families to isolate old and
>>>> new on-disk format. Furthermore, the upgrade from old to new format
>>>> happens "on the side" after an instance was upgraded.
>>>>
>>>> I also pushed a WIP PR in case you want to look into some details
>>>> (potential reviewers, don't panic: I plan to break this down into
>>>> multiple PRs for actual review if the KIP is accepted).
>>>>
>>>> https://github.com/apache/kafka/pull/6044
>>>>
>>>> @Eno: I think I never answered your question about being future proof:
>>>>
>>>> The latest design is not generic, because it does not support changes
>>>> that need to be reflected in the changelog topic. I aimed for a
>>>> non-generic design for now to keep it as simple as possible. Thus, other
>>>> format changes might need a different design / upgrade path -- however,
>>>> because this KIP is quite encapsulated in the current design, I don't
>>>> see any issue to build this later and a generic upgrade path seems to be
>>>> an orthogonal concern atm.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 11/22/18 2:50 PM, Adam Bellemare wrote:
>>>>> Thanks for the information Matthias.
>>>>>
>>>>> I will await your completion of this ticket then since it underpins the
>>>>> essential parts of a RocksDB TTL aligned with the changelog topic. I am
>>>>> eager to work on that ticket myself, so if I can help on this one in
>>> any
>>>>> way please let me know.
>>>>>
>>>>> Thanks
>>>>> Adam
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Nov 20, 2018 at 5:26 PM Matthias J. Sax <matth...@confluent.io
>>>>
>>>>> wrote:
>>>>>
>>>>>> It's an interesting idea to use second store, to maintain the
>>>>>> timestamps. However, each RocksDB instance implies some overhead. In
>>>>>> fact, we are looking into ColumnFamilies atm to see if we can use
>>> those
>>>>>> and merge multiple RocksDBs into a single one to reduce this overhead.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 11/20/18 5:15 AM, Patrik Kleindl wrote:
>>>>>>> Hi Adam
>>>>>>>
>>>>>>> Sounds great, I was already planning to ask around if anyone had
>>>> tackled
>>>>>>> this.
>>>>>>> We have a use case very similar to what you described in KAFKA-4212,
>>>> only
>>>>>>> with Global State Stores.
>>>>>>> I have tried a few things with the normal DSL but was not really
>>>>>> successful.
>>>>>>> Schedule/Punctuate is not possible, supplying a windowed store is
>>> also
>>>>>> not
>>>>>>> allowed and the process method has no knowledge of the timestamp of
>>> the
>>>>>>> record.
>>>>>>> And anything loaded on startup is not filtered anyway.
>>>>>>>
>>>>>>> Regarding 4212, wouldn't it be easier (although a little less
>>>>>>> space-efficient) to track the Timestamps in a separate Store with <K,
>>>>>> Long>
>>>>>>> ?
>>>>>>> This would leave the original store intact and allow a migration of
>>> the
>>>>>>> timestamps without touching the other data.
>>>>>>>
>>>>>>> So I am very interested in your PR :-)
>>>>>>>
>>>>>>> best regards
>>>>>>>
>>>>>>> Patrik
>>>>>>>
>>>>>>> On Tue, 20 Nov 2018 at 04:46, Adam Bellemare <
>>> adam.bellem...@gmail.com
>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Matthias
>>>>>>>>
>>>>>>>> Thanks - I figured that it was probably a case of just too much to
>>> do
>>>>>> and
>>>>>>>> not enough time. I know how that can go. I am asking about this one
>>> in
>>>>>>>> relation to https://issues.apache.org/jira/browse/KAFKA-4212,
>>> adding
>>>> a
>>>>>> TTL
>>>>>>>> to RocksDB. I have outlined a bit about my use-case within 4212, but
>>>> for
>>>>>>>> brevity here it is:
>>>>>>>>
>>>>>>>> My case:
>>>>>>>> 1) I have a RocksDB with TTL implementation working where records
>>> are
>>>>>> aged
>>>>>>>> out using the TTL that comes with RocksDB (very simple).
>>>>>>>> 2) We prevent records from loading from the changelog if recordTime
>>> +
>>>>>> TTL <
>>>>>>>> referenceTimeStamp (default = System.currentTimeInMillis() ).
>>>>>>>>
>>>>>>>> This assumes that the records are stored with the same time
>>> reference
>>>>>> (say
>>>>>>>> UTC) as the consumer materializing the RocksDB store.
>>>>>>>>
>>>>>>>> My questions about KIP-258 are as follows:
>>>>>>>> 1) How does "we want to be able to store record timestamps in
>>> KTables"
>>>>>>>> differ from inserting records into RocksDB with TTL at consumption
>>>>>> time? I
>>>>>>>> understand that it could be a difference of some seconds, minutes,
>>>>>> hours,
>>>>>>>> days etc between when the record was published and now, but given
>>> the
>>>>>>>> nature of how RocksDB TTL works (eventual - based on compaction) I
>>>> don't
>>>>>>>> see how a precise TTL can be achieved, such as that which one can
>>> get
>>>>>> with
>>>>>>>> windowed stores.
>>>>>>>>
>>>>>>>> 2) Are you looking to change how records are inserted into a TTL
>>>>>> RocksDB,
>>>>>>>> such that the TTL would take effect from the record's published
>>> time?
>>>> If
>>>>>>>> not, what would be the ideal workflow here for a single record with
>>>> TTL
>>>>>>>> RocksDB?
>>>>>>>> ie: Record Timestamp: 100
>>>>>>>> TTL: 50
>>>>>>>> Record inserted into rocksDB: 110
>>>>>>>> Record to expire at 150?
>>>>>>>>
>>>>>>>> 3) I'm not sure I fully understand the importance of the upgrade
>>>> path. I
>>>>>>>> have read the link to (
>>>> https://issues.apache.org/jira/browse/KAFKA-3522
>>>>>> )
>>>>>>>> in
>>>>>>>> the KIP, and I can understand that a state-store on disk may not
>>>>>> represent
>>>>>>>> what the application is expecting. I don't think I have the full
>>>> picture
>>>>>>>> though, because that issue seems to be easy to fix with a simple
>>>>>> versioned
>>>>>>>> header or accompanying file, forcing the app to rebuild the state if
>>>> the
>>>>>>>> version is incompatible. Can you elaborate or add a scenario to the
>>>> KIP
>>>>>>>> that illustrates the need for the upgrade path?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Adam
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax <
>>>> matth...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Adam,
>>>>>>>>>
>>>>>>>>> I am still working on it. Was pulled into a lot of other tasks
>>> lately
>>>>>> so
>>>>>>>>> this was delayed. Also had some discussions about simplifying the
>>>>>>>>> upgrade path with some colleagues and I am prototyping this atm.
>>> Hope
>>>>>> to
>>>>>>>>> update the KIP accordingly soon.
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 11/10/18 7:41 AM, Adam Bellemare wrote:
>>>>>>>>>> Hello Matthias
>>>>>>>>>>
>>>>>>>>>> I am curious as to the status of this KIP. TTL and expiry of
>>> records
>>>>>>>> will
>>>>>>>>>> be extremely useful for several of our business use-cases, as well
>>>> as
>>>>>>>>>> another KIP I had been working on.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <
>>>> eno.there...@gmail.com
>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>
>>>>>>>>>>> Good stuff. Could you comment a bit on how future-proof is this
>>>>>>>> change?
>>>>>>>>> For
>>>>>>>>>>> example, if we want to store both event timestamp "and"
>>> processing
>>>>>>>> time
>>>>>>>>> in
>>>>>>>>>>> RocksDB will we then need another interface (e.g. called
>>>>>>>>>>> KeyValueWithTwoTimestampsStore)?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Eno
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <
>>>>>>>> matth...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your input Guozhang and John.
>>>>>>>>>>>>
>>>>>>>>>>>> I see your point, that the upgrade API is not simple. If you
>>> don't
>>>>>>>>>>>> thinks it's valuable to make generic store upgrades possible
>>>> (atm),
>>>>>>>> we
>>>>>>>>>>>> can make the API internal, too. The impact is, that we only
>>>> support
>>>>>> a
>>>>>>>>>>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to
>>>>>>>>>>>> WindowedWithTS etc) for which we implement the internal
>>>> interfaces.
>>>>>>>>>>>>
>>>>>>>>>>>> We can keep the design generic, so if we decide to make it
>>> public,
>>>>>> we
>>>>>>>>>>>> don't need to re-invent it. This will also have the advantage,
>>>> that
>>>>>>>> we
>>>>>>>>>>>> can add upgrade pattern for other stores later, too.
>>>>>>>>>>>>
>>>>>>>>>>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly,
>>> but
>>>>>> it
>>>>>>>>>>>> was the only way I could find to design a generic upgrade
>>>> interface.
>>>>>>>> If
>>>>>>>>>>>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder`
>>>>>> would
>>>>>>>>>>>> become an internal interface I guess (don't think we can remove
>>>> it).
>>>>>>>>>>>>
>>>>>>>>>>>> I will wait for more feedback about this and if nobody wants to
>>>> keep
>>>>>>>> it
>>>>>>>>>>>> as public API I will update the KIP accordingly. Will add some
>>>> more
>>>>>>>>>>>> clarifications for different upgrade patterns in the mean time
>>> and
>>>>>>>> fix
>>>>>>>>>>>> the typos/minor issues.
>>>>>>>>>>>>
>>>>>>>>>>>> About adding a new state UPGRADING: maybe we could do that.
>>>> However,
>>>>>>>> I
>>>>>>>>>>>> find it particularly difficult to make the estimation when we
>>>> should
>>>>>>>>>>>> switch to RUNNING, thus, I am a little hesitant. Using store
>>>>>>>> callbacks
>>>>>>>>>>>> or just logging the progress including some indication about the
>>>>>>>> "lag"
>>>>>>>>>>>> might actually be sufficient. Not sure what others think?
>>>>>>>>>>>>
>>>>>>>>>>>> About "value before timestamp": no real reason and I think it
>>> does
>>>>>>>> not
>>>>>>>>>>>> make any difference. Do you want to change it?
>>>>>>>>>>>>
>>>>>>>>>>>> About upgrade robustness: yes, we cannot control if an instance
>>>>>>>> fails.
>>>>>>>>>>>> That is what I meant by "we need to write test". The upgrade
>>>> should
>>>>>>>> be
>>>>>>>>>>>> able to continuous even is an instance goes down (and we must
>>> make
>>>>>>>> sure
>>>>>>>>>>>> that we don't end up in an invalid state that forces us to wipe
>>>> out
>>>>>>>> the
>>>>>>>>>>>> whole store). Thus, we need to write system tests that fail
>>>>>> instances
>>>>>>>>>>>> during upgrade.
>>>>>>>>>>>>
>>>>>>>>>>>> For `in_place_offline` upgrade: I don't think we need this mode,
>>>>>>>>> because
>>>>>>>>>>>> people can do this via a single rolling bounce.
>>>>>>>>>>>>
>>>>>>>>>>>>  - prepare code and switch KV-Store to KVwithTs-Store
>>>>>>>>>>>>  - do a single rolling bounce (don't set any upgrade config)
>>>>>>>>>>>>
>>>>>>>>>>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if
>>>> we
>>>>>>>>>>>> remove the `StoreUpgradeBuilder`) will detect that there is only
>>>> an
>>>>>>>> old
>>>>>>>>>>>> local KV store w/o TS, will start to restore the new KVwithTs
>>>> store,
>>>>>>>>>>>> wipe out the old store and replace with the new store after
>>>> restore
>>>>>>>> is
>>>>>>>>>>>> finished, and start processing only afterwards. (I guess we need
>>>> to
>>>>>>>>>>>> document this case -- will also add it to the KIP.)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 8/9/18 1:10 PM, John Roesler wrote:
>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think this KIP is looking really good.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a few thoughts to add to the others:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. You mentioned at one point users needing to configure
>>>>>>>>>>>>> `upgrade.mode="null"`. I think this was a typo and you meant to
>>>> say
>>>>>>>>>>> they
>>>>>>>>>>>>> should remove the config. If they really have to set it to a
>>>> string
>>>>>>>>>>>> "null"
>>>>>>>>>>>>> or even set it to a null value but not remove it, it would be
>>>>>>>>>>>> unfortunate.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. In response to Bill's comment #1 , you said that "The idea
>>> is
>>>>>>>> that
>>>>>>>>>>> the
>>>>>>>>>>>>> upgrade should be robust and not fail. We need to write
>>> according
>>>>>>>>>>> tests".
>>>>>>>>>>>>> I may have misunderstood the conversation, but I don't think
>>> it's
>>>>>>>>>>> within
>>>>>>>>>>>>> our power to say that an instance won't fail. What if one of my
>>>>>>>>>>> computers
>>>>>>>>>>>>> catches on fire? What if I'm deployed in the cloud and one
>>>> instance
>>>>>>>>>>>>> disappears and is replaced by a new one? Or what if one
>>> instance
>>>>>>>> goes
>>>>>>>>>>>> AWOL
>>>>>>>>>>>>> for a long time and then suddenly returns? How will the upgrade
>>>>>>>>> process
>>>>>>>>>>>>> behave in light of such failures?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3. your thought about making in-place an offline mode is
>>>>>>>> interesting,
>>>>>>>>>>> but
>>>>>>>>>>>>> it might be a bummer for on-prem users who wish to upgrade
>>>> online,
>>>>>>>> but
>>>>>>>>>>>>> cannot just add new machines to the pool. It could be a new
>>>> upgrade
>>>>>>>>>>> mode
>>>>>>>>>>>>> "offline-in-place", though...
>>>>>>>>>>>>>
>>>>>>>>>>>>> 4. I was surprised to see that a user would need to modify the
>>>>>>>>> topology
>>>>>>>>>>>> to
>>>>>>>>>>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of
>>>> Guozhang's
>>>>>>>>>>>>> suggestions would remove this necessity.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for taking on this very complex but necessary work.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <
>>>> wangg...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Matthias,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the updated KIP. Some more comments:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. The current set of proposed API is a bit too complicated,
>>>> which
>>>>>>>>>>> makes
>>>>>>>>>>>>>> the upgrade flow from user's perspective also a bit complex.
>>> I'd
>>>>>>>> like
>>>>>>>>>>> to
>>>>>>>>>>>>>> check different APIs and discuss about their needs separately:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     1.a. StoreProxy: needed for in-place upgrade only, between
>>>> the
>>>>>>>>>>> first
>>>>>>>>>>>>>> and second rolling bounce, where the old-versioned stores can
>>>>>>>> handle
>>>>>>>>>>>>>> new-versioned store APIs. I think such upgrade paths (i.e.
>>> from
>>>>>> one
>>>>>>>>>>>> store
>>>>>>>>>>>>>> type to another) would not be very common: users may want to
>>>>>>>> upgrade
>>>>>>>>>>>> from a
>>>>>>>>>>>>>> certain store engine to another, but the interface would
>>> likely
>>>> be
>>>>>>>>>>>> staying
>>>>>>>>>>>>>> the same. Hence personally I'd suggest we keep it internally
>>> and
>>>>>>>> only
>>>>>>>>>>>>>> consider exposing it in the future if it does become a common
>>>>>>>>> pattern.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     1.b. ConverterStore / RecordConverter: needed for both
>>>>>> in-place
>>>>>>>>>>> and
>>>>>>>>>>>>>> roll-over upgrade, between the first and second rolling
>>> bounces,
>>>>>>>> for
>>>>>>>>>>> the
>>>>>>>>>>>>>> new versioned store to be able to read old-versioned changelog
>>>>>>>>> topics.
>>>>>>>>>>>>>> Firstly I think we should not expose key in the public APIs
>>> but
>>>>>>>> only
>>>>>>>>>>> the
>>>>>>>>>>>>>> values, since allowing key format changes would break log
>>>>>>>> compaction,
>>>>>>>>>>>> and
>>>>>>>>>>>>>> hence would not be compatible anyways. As for value format
>>>>>> changes,
>>>>>>>>>>>>>> personally I think we can also keep its upgrade logic
>>> internally
>>>>>> as
>>>>>>>>> it
>>>>>>>>>>>> may
>>>>>>>>>>>>>> not worth generalizing to user customizable logic.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     1.c. If you agrees with 2.a/b above, then we can also
>>>> remove "
>>>>>>>>>>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the
>>>>>> public
>>>>>>>>>>>> APIs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     1.d. Personally I think
>>> "ReadOnlyKeyValueWithTimestampStore"
>>>>>> is
>>>>>>>>>>> not
>>>>>>>>>>>>>> needed either given that we are exposing "ValueAndTimestamp"
>>>>>>>> anyways.
>>>>>>>>>>>> I.e.
>>>>>>>>>>>>>> it is just a syntax sugar and for IQ, users can always just
>>> set
>>>> a
>>>>>> "
>>>>>>>>>>>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>"
>>>> as
>>>>>>>> the
>>>>>>>>>>>> new
>>>>>>>>>>>>>> interface does not provide any additional functions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2. Could we further categorize the upgrade flow for different
>>>> use
>>>>>>>>>>> cases,
>>>>>>>>>>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be
>>> used
>>>>>>>>>>>>>> automatically for non-windowed aggregate; 2) PAPI users who do
>>>> not
>>>>>>>>>>> need
>>>>>>>>>>>> to
>>>>>>>>>>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to
>>>>>> switch
>>>>>>>>> to
>>>>>>>>>>>>>> KeyValueWithTimestampStore. Just to give my understanding for
>>>> 3),
>>>>>>>> the
>>>>>>>>>>>>>> upgrade flow for users may be simplified as the following (for
>>>>>> both
>>>>>>>>>>>>>> in-place and roll-over):
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     * Update the jar to new version, make code changes from
>>>>>>>>>>>> KeyValueStore
>>>>>>>>>>>>>> to KeyValueWithTimestampStore, set upgrade config.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     * First rolling bounce, and library code can internally
>>> use
>>>>>>>> proxy
>>>>>>>>>>> /
>>>>>>>>>>>>>> converter based on the specified config to handle new APIs
>>> with
>>>>>> old
>>>>>>>>>>>> stores,
>>>>>>>>>>>>>> while let new stores read from old changelog data.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     * Reset upgrade config.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     * Second rolling bounce, and the library code
>>> automatically
>>>>>>>> turn
>>>>>>>>>>> off
>>>>>>>>>>>>>> logic for proxy / converter.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3. Some more detailed proposals are needed for when to
>>> recommend
>>>>>>>>> users
>>>>>>>>>>>> to
>>>>>>>>>>>>>> trigger the second rolling bounce. I have one idea to share
>>>> here:
>>>>>>>> we
>>>>>>>>>>>> add a
>>>>>>>>>>>>>> new state to KafkaStreams, say UPGRADING, which is set when 1)
>>>>>>>>> upgrade
>>>>>>>>>>>>>> config is set, and 2) the new stores are still ramping up (for
>>>> the
>>>>>>>>>>>> second
>>>>>>>>>>>>>> part, we can start with some internal hard-coded heuristics to
>>>>>>>> decide
>>>>>>>>>>>> when
>>>>>>>>>>>>>> it is close to be ramped up). If either one of it is not true
>>>> any
>>>>>>>>>>> more,
>>>>>>>>>>>> it
>>>>>>>>>>>>>> should transit to RUNNING. Users can then watch on this state,
>>>> and
>>>>>>>>>>>> decide
>>>>>>>>>>>>>> to only trigger the second rebalance when the state has
>>>> transited
>>>>>>>>> from
>>>>>>>>>>>>>> UPGRADING. They can also choose to cut over while the instance
>>>> is
>>>>>>>>>>> still
>>>>>>>>>>>>>> UPGRADING, the downside is that after that the application may
>>>>>> have
>>>>>>>>>>> long
>>>>>>>>>>>>>> restoration phase which is, to user's pov, unavailability
>>>> periods.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Below are just some minor things on the wiki:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 4. "proxy story" => "proxy store".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 5. "use the a builder " => "use a builder"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to
>>> the
>>>>>>>>>>> value":
>>>>>>>>>>>>>> what's the rationale of putting the timestamp before the
>>> value,
>>>>>>>> than
>>>>>>>>>>>> after
>>>>>>>>>>>>>> the value?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax <
>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the feedback Bill. I just update the KIP with some
>>>> of
>>>>>>>>> your
>>>>>>>>>>>>>>> points.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to
>>>>>>>> watch
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a
>>> type
>>>> of
>>>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new stores
>>>> have
>>>>>>>>>>>>>> reached
>>>>>>>>>>>>>>>>> parity with the existing old stores and that could be the
>>>>>> signal
>>>>>>>>> to
>>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>>> second rolling rebalance?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think we can reuse the existing listeners, thus, I did not
>>>>>>>> include
>>>>>>>>>>>>>>> anything in the KIP. About a signal to rebalance: this might
>>> be
>>>>>>>>>>> tricky.
>>>>>>>>>>>>>>> If we prepare the store "online", the active task will update
>>>> the
>>>>>>>>>>> state
>>>>>>>>>>>>>>> continuously, and thus, state prepare is never finished. It
>>>> will
>>>>>>>> be
>>>>>>>>>>> the
>>>>>>>>>>>>>>> users responsibility to do the second rebalance (note, that
>>> the
>>>>>>>>>>> second
>>>>>>>>>>>>>>> rebalance will first finish the last delta of the upgrade to
>>>>>>>> finish
>>>>>>>>>>> the
>>>>>>>>>>>>>>> upgrade before actual processing resumes). I clarified the
>>> KIP
>>>>>>>> with
>>>>>>>>>>>> this
>>>>>>>>>>>>>>> regard a little bit.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the
>>> process,
>>>>>>>> would
>>>>>>>>>>> we
>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>> the other instances to complete or just fail the entire
>>>>>> upgrade?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The idea is that the upgrade should be robust and not fail.
>>> We
>>>>>>>> need
>>>>>>>>>>> to
>>>>>>>>>>>>>>> write according tests.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename
>>>> the
>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>> active directories vs. deleting them right away,  and when
>>>> all
>>>>>>>> the
>>>>>>>>>>>>>>> prepare
>>>>>>>>>>>>>>>>> task directories are successfully migrated then delete the
>>>>>>>>> previous
>>>>>>>>>>>>>>> active
>>>>>>>>>>>>>>>>> ones.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ack. Updated the KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing any
>>>> new
>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all
>>>> prepare
>>>>>>>>>>> tasks
>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling
>>>> bounces
>>>>>>>>> and
>>>>>>>>>>>>>>> then as
>>>>>>>>>>>>>>>>> each task successfully renames its prepare directories and
>>>>>>>> deletes
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>> active task directories, normal processing of records
>>>> resumes.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The basic idea is to do an online upgrade to avoid downtime.
>>> We
>>>>>>>> can
>>>>>>>>>>>>>>> discuss to offer both options... For the offline upgrade
>>>> option,
>>>>>>>> we
>>>>>>>>>>>>>>> could simplify user interaction and trigger the second
>>>> rebalance
>>>>>>>>>>>>>>> automatically with the requirement that a user needs to
>>> update
>>>>>> any
>>>>>>>>>>>>>> config.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If might actually be worth to include this option: we know
>>> from
>>>>>>>>>>>>>>> experience with state restore, that regular processing slows
>>>> down
>>>>>>>>> the
>>>>>>>>>>>>>>> restore. For roll_over upgrade, it would be a different story
>>>> and
>>>>>>>>>>>>>>> upgrade should not be slowed down by regular processing.
>>> Thus,
>>>> we
>>>>>>>>>>>> should
>>>>>>>>>>>>>>> even make in_place an offline upgrade and force people to use
>>>>>>>>>>> roll_over
>>>>>>>>>>>>>>> if they need onlint upgrade. Might be a fair tradeoff that
>>> may
>>>>>>>>>>> simplify
>>>>>>>>>>>>>>> the upgrade for the user and for the code complexity.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Let's see what other think.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote:
>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the update and the working prototype, it helps
>>> with
>>>>>>>>>>>>>>>> understanding the KIP.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I took an initial pass over this PR, and overall I find the
>>>>>>>>>>> interfaces
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> approach to be reasonable.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to
>>>>>> watch
>>>>>>>>>>> the
>>>>>>>>>>>>>>>> restore process), I'm wondering if we want to provide a type
>>>> of
>>>>>>>>>>>>>>>> StateRestoreListener that could signal when the new stores
>>>> have
>>>>>>>>>>>> reached
>>>>>>>>>>>>>>>> parity with the existing old stores and that could be the
>>>> signal
>>>>>>>> to
>>>>>>>>>>>>>> start
>>>>>>>>>>>>>>>> second rolling rebalance?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Although you solicited feedback on the interfaces involved,
>>> I
>>>>>>>>> wanted
>>>>>>>>>>>> to
>>>>>>>>>>>>>>> put
>>>>>>>>>>>>>>>> down some thoughts that have come to mind reviewing this KIP
>>>>>>>> again
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. Out of N instances, one fails midway through the process,
>>>>>>>> would
>>>>>>>>>>> we
>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>> the other instances to complete or just fail the entire
>>>> upgrade?
>>>>>>>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename
>>> the
>>>>>>>>>>> current
>>>>>>>>>>>>>>>> active directories vs. deleting them right away,  and when
>>> all
>>>>>>>> the
>>>>>>>>>>>>>>> prepare
>>>>>>>>>>>>>>>> task directories are successfully migrated then delete the
>>>>>>>> previous
>>>>>>>>>>>>>>> active
>>>>>>>>>>>>>>>> ones.
>>>>>>>>>>>>>>>> 3. For the first rolling bounce we pause any processing any
>>>> new
>>>>>>>>>>>> records
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> just allow the prepare tasks to restore, then once all
>>> prepare
>>>>>>>>> tasks
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>> restored, it's a signal for the second round of rolling
>>>> bounces
>>>>>>>> and
>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>> each task successfully renames its prepare directories and
>>>>>>>> deletes
>>>>>>>>>>> the
>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>> active task directories, normal processing of records
>>> resumes.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax <
>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in
>>> AK
>>>>>> 2.0
>>>>>>>>>>>>>>>>> release. Thus, I want to pick up this KIP again to get the
>>>>>>>> RocksDB
>>>>>>>>>>>>>>>>> upgrade done for 2.1.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I updated the KIP accordingly and also have a "prove of
>>>>>> concept"
>>>>>>>>> PR
>>>>>>>>>>>>>>>>> ready (for "in place" upgrade only):
>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5422/
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> There a still open questions, but I want to collect early
>>>>>>>> feedback
>>>>>>>>>>> on
>>>>>>>>>>>>>>>>> the proposed interfaces we need for the store upgrade. Also
>>>>>>>> note,
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> the KIP now also aim to define a generic upgrade path from
>>>> any
>>>>>>>>>>> store
>>>>>>>>>>>>>>>>> format A to any other store format B. Adding timestamps is
>>>> just
>>>>>>>> a
>>>>>>>>>>>>>>>>> special case.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I will continue to work on the PR and refine the KIP in the
>>>>>>>>>>> meantime,
>>>>>>>>>>>>>>> too.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote:
>>>>>>>>>>>>>>>>>> After some more thoughts, I want to follow John's
>>> suggestion
>>>>>>>> and
>>>>>>>>>>>>>> split
>>>>>>>>>>>>>>>>>> upgrading the rebalance metadata from the store upgrade.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I extracted the metadata upgrade into it's own KIP:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%
>>>>>>>>>>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I'll update this KIP accordingly shortly. I also want to
>>>>>>>> consider
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> make the store format upgrade more flexible/generic. Atm,
>>>> the
>>>>>>>> KIP
>>>>>>>>>>> is
>>>>>>>>>>>>>>> too
>>>>>>>>>>>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI
>>>>>> users
>>>>>>>>>>> that
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> should not force to upgrade the stores. I need to figure
>>> out
>>>>>>>> the
>>>>>>>>>>>>>>> details
>>>>>>>>>>>>>>>>>> and follow up later.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Please give feedback for the new KIP-268 on the
>>>> corresponding
>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>> thread.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't
>>>> figure
>>>>>>>> out
>>>>>>>>>>> a
>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes
>>> a
>>>>>> fix
>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>> future upgrades. Please share your thoughts.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for all your feedback!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
>>>>>>>>>>>>>>>>>>> @John: yes, we would throw if configs are missing (it's
>>> an
>>>>>>>>>>>>>>>>>>> implementation details IMHO and thus I did not include it
>>>> in
>>>>>>>> the
>>>>>>>>>>>>>> KIP)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> @Guozhang:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1) I understand know what you mean. We can certainly,
>>> allow
>>>>>>>> all
>>>>>>>>>>>>>> values
>>>>>>>>>>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for
>>>>>>>>>>> `upgrade.from`
>>>>>>>>>>>>>>>>>>> parameter. I had a similar though once but decided to
>>>>>> collapse
>>>>>>>>>>> them
>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>> one -- will update the KIP accordingly.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2) The idea to avoid any config would be, to always send
>>>> both
>>>>>>>>>>>>>> request.
>>>>>>>>>>>>>>>>>>> If we add a config to eventually disable the old request,
>>>> we
>>>>>>>>>>> don't
>>>>>>>>>>>>>>> gain
>>>>>>>>>>>>>>>>>>> anything with this approach. The question is really, if
>>> we
>>>>>> are
>>>>>>>>>>>>>> willing
>>>>>>>>>>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be
>>>> limited
>>>>>>>> to
>>>>>>>>>>> 2
>>>>>>>>>>>>>>>>>>> versions and not grow further in future releases. More
>>>>>> details
>>>>>>>>> in
>>>>>>>>>>>>>> (3)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and
>>>>>>>> allows
>>>>>>>>>>> us
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> stay with 2 "assignment strategies" we need to register,
>>> as
>>>>>>>> the
>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>> assignment strategy will allow to "upgrade itself" via
>>>>>>>> "version
>>>>>>>>>>>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a
>>>>>>>> config
>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> people upgrade from pre-1.2 releases.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thus, I don't think we need to register new "assignment
>>>>>>>>>>> strategies"
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> send empty subscriptions for older version.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 4) I agree that this is a tricky thing to get right with
>>> a
>>>>>>>>> single
>>>>>>>>>>>>>>>>>>> rebalance. I share the concern that an application might
>>>>>> never
>>>>>>>>>>>> catch
>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>> and thus the hot standby will never be ready.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Maybe it's better to go with 2 rebalances for store
>>>> upgrades.
>>>>>>>> If
>>>>>>>>>>> we
>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>> this, we also don't need to go with (2) and can get (3)
>>> in
>>>>>>>> place
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> future upgrades. I also think that changes to the
>>> metadata
>>>>>> are
>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>> likely and thus allowing for single rolling bounce for
>>> this
>>>>>>>> case
>>>>>>>>>>> is
>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>> important anyway. If we assume that store upgrade a rare,
>>>> it
>>>>>>>>>>> might
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> ok
>>>>>>>>>>>>>>>>>>> to sacrifice two rolling bounced for this case. It was
>>> just
>>>>>> an
>>>>>>>>>>> idea
>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>> wanted to share (even if I see the issues).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for your replies.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 1) About the config names: actually I was trying to not
>>>>>>>> expose
>>>>>>>>>>>>>>>>>>>> implementation details :) My main concern was that in
>>> your
>>>>>>>>>>>> proposal
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> values need to cover the span of all the versions that
>>> are
>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a
>>>> user)
>>>>>>>> am
>>>>>>>>>>>>>>>>> upgrading
>>>>>>>>>>>>>>>>>>>> from any versions within this range I need to remember
>>> to
>>>>>> use
>>>>>>>>>>> the
>>>>>>>>>>>>>>> value
>>>>>>>>>>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In
>>>> my
>>>>>>>>>>>>>>> suggestion
>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>> was trying to argue the benefit of just letting users to
>>>>>>>>> specify
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>>>>>>> Kafka version she's trying to upgrade from, than
>>>> specifying
>>>>>> a
>>>>>>>>>>>> range
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc
>>> as
>>>>>> the
>>>>>>>>>>>>>> values,
>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>> still using Kafka versions like broker's
>>>> `internal.version`
>>>>>>>>>>>> config.
>>>>>>>>>>>>>>>>> But if
>>>>>>>>>>>>>>>>>>>> you were suggesting the same thing, i.e. by
>>>> "0.10.1.x-1.1.x"
>>>>>>>>> you
>>>>>>>>>>>>>>> meant
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or
>>>> "0.11.0"
>>>>>>>> or
>>>>>>>>>>>>>> "1.1"
>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>> are all recognizable config values then I think we are
>>>>>>>> actually
>>>>>>>>>>> on
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>> page.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would
>>>> increase
>>>>>>>> the
>>>>>>>>>>>>>>> network
>>>>>>>>>>>>>>>>>>>> footprint, but not the message size, IF I'm not
>>>>>>>>>>> mis-understanding
>>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>>> idea
>>>>>>>>>>>>>>>>>>>> of registering multiple assignment. More details:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can
>>>>>> encode
>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>>>> protocols each with their different metadata. The
>>>>>> coordinator
>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> pick the
>>>>>>>>>>>>>>>>>>>> common one that everyone supports (if there are no
>>> common
>>>>>>>> one,
>>>>>>>>>>> it
>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>>>>>> an error back; if there are multiple ones, it will pick
>>>> the
>>>>>>>> one
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded
>>>> list).
>>>>>>>>>>> Since
>>>>>>>>>>>>>> our
>>>>>>>>>>>>>>>>>>>> current Streams rebalance protocol is still based on the
>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>>>> coordinator, it means our protocol_type would be
>>>> "consumer",
>>>>>>>>> but
>>>>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>>>>>>> the protocol type we can have multiple protocols like
>>>>>>>>> "streams",
>>>>>>>>>>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we
>>>> need
>>>>>>>> to
>>>>>>>>>>>>>>>>> implement a
>>>>>>>>>>>>>>>>>>>> different assignor class for each version and register
>>> all
>>>>>> of
>>>>>>>>>>> them
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the
>>>>>>>> future
>>>>>>>>>>> if
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> re-factor our implementation to have our own client
>>>>>>>> coordinator
>>>>>>>>>>>>>> layer
>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>> Connect did, we can simplify this part of the
>>>>>> implementation.
>>>>>>>>>>> But
>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> now with the above approach this is still doable.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On the broker side, the group coordinator will only
>>>> persist
>>>>>> a
>>>>>>>>>>>> group
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>> the selected protocol and its subscription metadata,
>>> e.g.
>>>> if
>>>>>>>>>>>>>>>>> coordinator
>>>>>>>>>>>>>>>>>>>> decides to pick "streams_v2" it will only sends that
>>>>>>>> protocol's
>>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>> from everyone to the leader to assign, AND when
>>> completing
>>>>>>>> the
>>>>>>>>>>>>>>>>> rebalance it
>>>>>>>>>>>>>>>>>>>> will also only write the group metadata with that
>>> protocol
>>>>>>>> and
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> assignment only. In a word, although the network traffic
>>>>>>>> maybe
>>>>>>>>>>>>>>>>> increased a
>>>>>>>>>>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One
>>> corner
>>>>>>>>>>>>>> situation
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> need to consider is how to stop registering very old
>>>>>>>> assignors
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> avoid the
>>>>>>>>>>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if
>>> you
>>>>>> are
>>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1
>>>>>>>>>>> assignor
>>>>>>>>>>>>>>>>> anymore,
>>>>>>>>>>>>>>>>>>>> but that would unfortunately still require some configs.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 3) About the  "version probing" idea, I think that's a
>>>>>>>>> promising
>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>>>> as well, but if we are going to do the multi-assignment
>>>> its
>>>>>>>>>>> value
>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top
>>> of
>>>>>>>>>>>>>>>>> multi-assignment
>>>>>>>>>>>>>>>>>>>> to save us from still requiring the config to avoid
>>>>>>>> registering
>>>>>>>>>>>> all
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> metadata for all version. More details:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In the JoinGroupRequest, we still register all the
>>>> assignor
>>>>>>>> but
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> all old
>>>>>>>>>>>>>>>>>>>> assignors we do not encode any metadata, i.e. the
>>> encoded
>>>>>>>> data
>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>> be:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> "streams_vN" : "encoded metadata"
>>>>>>>>>>>>>>>>>>>> "streams_vN-1":empty
>>>>>>>>>>>>>>>>>>>> "streams_vN-2":empty
>>>>>>>>>>>>>>>>>>>> ..
>>>>>>>>>>>>>>>>>>>> "streams_0":empty
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So the coordinator can still safely choose the latest
>>>> common
>>>>>>>>>>>>>> version;
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> then when leaders receive the subscription (note it
>>> should
>>>>>>>>>>> always
>>>>>>>>>>>>>>>>> recognize
>>>>>>>>>>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of
>>> the
>>>>>>>>>>>>>>>>> subscriptions
>>>>>>>>>>>>>>>>>>>> are empty bytes, it will send the empty assignment with
>>>> that
>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>> number
>>>>>>>>>>>>>>>>>>>> encoded in the metadata. So in the second auto-triggered
>>>> all
>>>>>>>>>>>>>> members
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>> send the metadata with that version:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> "streams_vN" : empty
>>>>>>>>>>>>>>>>>>>> "streams_vN-1" : empty
>>>>>>>>>>>>>>>>>>>> "streams_vN-2" : "encoded metadata"
>>>>>>>>>>>>>>>>>>>> ..
>>>>>>>>>>>>>>>>>>>> "streams_0":empty
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> By doing this we would not require any configs for
>>> users.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not
>>> clear
>>>>>>>> about
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> details
>>>>>>>>>>>>>>>>>>>> so probably we'd need to fill that out before making a
>>>> call.
>>>>>>>>> For
>>>>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>>>>>>>> you mentioned "If we detect this situation, the Streams
>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>> closes
>>>>>>>>>>>>>>>>>>>> corresponding active tasks as well as "hot standby"
>>> tasks,
>>>>>>>> and
>>>>>>>>>>>>>>>>> re-creates
>>>>>>>>>>>>>>>>>>>> the new active tasks using the new store." How could we
>>>>>>>>>>> guarantee
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> gap between these two stores will keep decreasing than
>>>>>>>>>>> increasing
>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>> we'll
>>>>>>>>>>>>>>>>>>>> eventually achieve the flip point? And also the longer
>>> we
>>>>>> are
>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> flip point, the larger we are doubling the storage
>>> space,
>>>>>>>> etc.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> @John, Guozhang,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> thanks a lot for your comments. Very long reply...
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> About upgrading the rebalance metadata:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Another possibility to do this, would be to register
>>>>>>>> multiple
>>>>>>>>>>>>>>>>> assignment
>>>>>>>>>>>>>>>>>>>>> strategies for the 1.2 applications. For this case, new
>>>>>>>>>>> instances
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>> be configured to support both and the broker would pick
>>>> the
>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>> all instances understand. The disadvantage would be,
>>> that
>>>>>> we
>>>>>>>>>>> send
>>>>>>>>>>>>>>> much
>>>>>>>>>>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as
>>>> long
>>>>>>>> as
>>>>>>>>>>> no
>>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus,
>>> using
>>>>>>>> this
>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>>>>> would allow to avoid a second rebalance trading-off an
>>>>>>>>>>> increased
>>>>>>>>>>>>>>>>>>>>> rebalance network footprint (I also assume that this
>>>> would
>>>>>>>>>>>>>> increase
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> message size that is written into __consumer_offsets
>>>>>>>> topic?).
>>>>>>>>>>>>>>>>> Overall, I
>>>>>>>>>>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it
>>>> could
>>>>>>>>>>> avoid
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> second rebalance (I have some more thoughts about
>>> stores
>>>>>>>> below
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>> relevant for single rebalance upgrade).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> For future upgrades we might be able to fix this
>>> though.
>>>> I
>>>>>>>> was
>>>>>>>>>>>>>>>>> thinking
>>>>>>>>>>>>>>>>>>>>> about the following:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In the current implementation, the leader fails if it
>>>> gets
>>>>>> a
>>>>>>>>>>>>>>>>>>>>> subscription it does not understand (ie, newer
>>> version).
>>>> We
>>>>>>>>>>> could
>>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>>>> this behavior and let the leader send an empty
>>> assignment
>>>>>>>> plus
>>>>>>>>>>>>>> error
>>>>>>>>>>>>>>>>>>>>> code (including supported version) back to the instance
>>>>>>>>> sending
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> "bad" subscription. This would allow the following
>>> logic
>>>>>> for
>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>> application instance:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>  - on startup, always send the latest subscription
>>> format
>>>>>>>>>>>>>>>>>>>>>  - if leader understands it, we get an assignment back
>>> an
>>>>>>>>> start
>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>  - if leader does not understand it, we get an empty
>>>>>>>>> assignment
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> supported version back
>>>>>>>>>>>>>>>>>>>>>  - the application unsubscribe()/subscribe()/poll()
>>> again
>>>>>>>> and
>>>>>>>>>>>>>>> sends a
>>>>>>>>>>>>>>>>>>>>> subscription using the leader's supported version
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This protocol would allow to do a single rolling
>>> bounce,
>>>>>> and
>>>>>>>>>>>>>>>>> implements
>>>>>>>>>>>>>>>>>>>>> a "version probing" step, that might result in two
>>>> executed
>>>>>>>>>>>>>>>>> rebalances.
>>>>>>>>>>>>>>>>>>>>> The advantage would be, that the user does not need to
>>>> set
>>>>>>>> any
>>>>>>>>>>>>>>> configs
>>>>>>>>>>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care
>>> of
>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> automatically.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> One disadvantage would be, that two rebalances happen
>>> and
>>>>>>>> that
>>>>>>>>>>>> for
>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>> error case during rebalance, we loose the information
>>>> about
>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> supported leader version and the "probing step" would
>>>>>>>> happen a
>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>> time.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> If the leader is eventually updated, it will include
>>> it's
>>>>>>>> own
>>>>>>>>>>>>>>>>> supported
>>>>>>>>>>>>>>>>>>>>> version in all assignments, to allow a "down graded"
>>>>>>>>>>> application
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> upgrade its version later. Also, if a application
>>> fails,
>>>>>> the
>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>>>> probing would always be successful and only a single
>>>>>>>> rebalance
>>>>>>>>>>>>>>>>> happens.
>>>>>>>>>>>>>>>>>>>>> If we use this protocol, I think we don't need any
>>>>>>>>>>> configuration
>>>>>>>>>>>>>>>>>>>>> parameter for future upgrades.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version":
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Users would set "upgrade.from" to the release version
>>> the
>>>>>>>>>>>>>>> current/old
>>>>>>>>>>>>>>>>>>>>> application is using. I think this is simpler, as users
>>>>>> know
>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> version. If we use "internal.protocol.version" instead,
>>>> we
>>>>>>>>>>> expose
>>>>>>>>>>>>>>>>>>>>> implementation details and users need to know the
>>>> protocol
>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>> (ie,
>>>>>>>>>>>>>>>>>>>>> they need to map from the release version to the
>>> protocol
>>>>>>>>>>>> version;
>>>>>>>>>>>>>>> ie,
>>>>>>>>>>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol
>>> version
>>>>>>>> 2").
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Also the KIP states that for the second rolling bounce,
>>>> the
>>>>>>>>>>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` --
>>> and
>>>>>>>>> thus,
>>>>>>>>>>>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored
>>>> (I
>>>>>>>>> will
>>>>>>>>>>>>>>> update
>>>>>>>>>>>>>>>>>>>>> the KIP to point out this dependency).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> About your second point: I'll update the KIP
>>> accordingly
>>>> to
>>>>>>>>>>>>>> describe
>>>>>>>>>>>>>>>>>>>>> future updates as well. Both will be different.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> One more point about upgrading the store format. I was
>>>>>>>>> thinking
>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>> avoiding the second rolling bounce all together in the
>>>>>>>> future:
>>>>>>>>>>>> (1)
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2)
>>> this
>>>>>>>>>>>> required
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the
>>>>>> switch
>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> delete
>>>>>>>>>>>>>>>>>>>>> the old stores. (3) the current proposal does the
>>> switch
>>>>>>>>>>>>>> "globally"
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> this is simpler and due to the required second
>>> rebalance
>>>> no
>>>>>>>>>>>>>>>>> disadvantage.
>>>>>>>>>>>>>>>>>>>>> However, a global consistent switch over might actually
>>>> not
>>>>>>>> be
>>>>>>>>>>>>>>>>> required.
>>>>>>>>>>>>>>>>>>>>> For "in_place" upgrade, following the protocol from
>>>> above,
>>>>>>>> we
>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>> decouple the store switch and each instance could
>>> switch
>>>>>> its
>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>>>> independently from all other instances. After the
>>> rolling
>>>>>>>>>>> bounce,
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> seems to be ok to switch from the old store to the new
>>>>>> store
>>>>>>>>>>>>>> "under
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> hood" whenever the new store is ready (this could even
>>> be
>>>>>>>>> done,
>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>> we switch to the new metadata version). Each time we
>>>> update
>>>>>>>>> the
>>>>>>>>>>>>>> "hot
>>>>>>>>>>>>>>>>>>>>> standby" we check if it reached the "endOffset"  (or
>>>> maybe
>>>>>>>> X%
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>> either be hardcoded or configurable). If we detect this
>>>>>>>>>>>> situation,
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> Streams application closes corresponding active tasks
>>> as
>>>>>>>> well
>>>>>>>>>>> as
>>>>>>>>>>>>>>> "hot
>>>>>>>>>>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks
>>> using
>>>>>>>> the
>>>>>>>>>>> new
>>>>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>>>>>>>> (I need to go through the details once again, but it
>>>> seems
>>>>>>>> to
>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> feasible.).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Combining this strategy with the "multiple assignment"
>>>>>> idea,
>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from
>>> 1.1
>>>>>> ->
>>>>>>>>>>> 1.2.
>>>>>>>>>>>>>>>>>>>>> Applications would just use the old store, as long as
>>> the
>>>>>>>> new
>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> not ready, even if the new metadata version is used
>>>>>> already.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> For future upgrades, a single rebalance would be
>>>>>> sufficient,
>>>>>>>>>>> too,
>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>> if the stores are upgraded. We would not need any
>>> config
>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>> the "probe" step allows us to detect the supported
>>>>>> rebalance
>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>> version (and we would also not need multiple
>>> "assigmnent
>>>>>>>>>>>>>> strategies"
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>> out own protocol encoded everything we need).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Let me know what you think.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>>>>>>> @John:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> For the protocol version upgrade, it is only for the
>>>>>>>> encoded
>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>> bytes
>>>>>>>>>>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from
>>>>>> Consumer's
>>>>>>>>>>> pov,
>>>>>>>>>>>>>>> so I
>>>>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>>>>> this change should be in the Streams layer as well.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> @Matthias:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> for 2), I agree that adding a "newest supported
>>> version"
>>>>>>>>>>> besides
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> "currently used version for encoding" is a good idea
>>> to
>>>>>>>> allow
>>>>>>>>>>>>>>> either
>>>>>>>>>>>>>>>>>>>>> case;
>>>>>>>>>>>>>>>>>>>>>> the key is that in Streams we would likely end up
>>> with a
>>>>>>>>>>> mapping
>>>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>>>>>>>>> protocol version to the other persistent data format
>>>>>>>> versions
>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually
>>>>>>>>> achieve
>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded
>>>>>>>>> protocol
>>>>>>>>>>>>>>>>> version's
>>>>>>>>>>>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0
>>>> ->
>>>>>>>>>>> 0.10.1
>>>>>>>>>>>>>>>>> leaders
>>>>>>>>>>>>>>>>>>>>>> can choose to use the newer version in the first
>>> rolling
>>>>>>>>>>> bounce
>>>>>>>>>>>>>>>>> directly
>>>>>>>>>>>>>>>>>>>>>> and we can document to users that they would not need
>>> to
>>>>>>>> set
>>>>>>>>>>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the
>>>> upgraded
>>>>>>>>>>>>>> protocol
>>>>>>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 ->
>>> 1.2,
>>>>>> and
>>>>>>>>>>> then
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the
>>>> first
>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>> bounce
>>>>>>>>>>>>>>>>>>>>>> and reset in the second.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Besides that, some additional comments:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for
>>>>>> users
>>>>>>>>> to
>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>> "internal.protocol.version" where for the latter users
>>>>>> only
>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> set a
>>>>>>>>>>>>>>>>>>>>>> single version, while the Streams will map that
>>> version
>>>> to
>>>>>>>>> the
>>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>>>>>> assignor's behavior as well as the data format. But
>>>> maybe
>>>>>> I
>>>>>>>>>>> did
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>> your idea about how the  "upgrade.from" config will be
>>>>>> set,
>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> your Compatibility section how the upgrade.from config
>>>>>> will
>>>>>>>>> be
>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> these two rolling bounces are not very clear: for
>>>> example,
>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>> reset it to null in the second rolling bounce?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 2) In the upgrade path description, rather than
>>> talking
>>>>>>>> about
>>>>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just
>>>>>>>> categorize
>>>>>>>>>>> all
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> possible scenarios, even for future upgrade versions,
>>>> what
>>>>>>>>>>>> should
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> standard operations? The categorized we can summarize
>>> to
>>>>>>>>> would
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>> (assuming
>>>>>>>>>>>>>>>>>>>>>> user upgrade from version X to version Y, where X and
>>> Y
>>>>>> are
>>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>>>>>>>> versions,
>>>>>>>>>>>>>>>>>>>>>> with the corresponding supported protocol version x
>>> and
>>>>>> y):
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and
>>>>>>>> hence
>>>>>>>>>>> no
>>>>>>>>>>>>>>>>>>>>> persistent
>>>>>>>>>>>>>>>>>>>>>> data formats have changed.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> b. x != y, but all persistent data format remains the
>>>>>> same.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB
>>>>>>>>> format,
>>>>>>>>>>>>>>>>> changelog
>>>>>>>>>>>>>>>>>>>>>> format, has been changed.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> c. special case: we may need some special handling
>>> logic
>>>>>>>> when
>>>>>>>>>>>>>>>>> "current
>>>>>>>>>>>>>>>>>>>>>> version" or "newest supported version" are not
>>> available
>>>>>> in
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> protocol,
>>>>>>>>>>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> under the above scenarios, how many rolling bounces
>>>> users
>>>>>>>>> need
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> execute?
>>>>>>>>>>>>>>>>>>>>>> how they should set the configs in each rolling
>>> bounce?
>>>>>> and
>>>>>>>>>>> how
>>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>>>>>> library will execute in these cases?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Ted,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I still consider changing the KIP to include it right
>>>>>> away
>>>>>>>>> --
>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> not,
>>>>>>>>>>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more
>>>>>>>> detail
>>>>>>>>>>>>>> first.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> (Same for other open questions like interface names
>>> --
>>>> I
>>>>>>>>>>>> collect
>>>>>>>>>>>>>>>>>>>>>>> feedback and update the KIP after we reach consensus
>>>> :))
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the details, Matthias.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future
>>>>>>>> release,
>>>>>>>>>>>>>>> encoding
>>>>>>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>>>>>>>> and supported version might be an advantage
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be
>>>>>> implemented
>>>>>>>>> in
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding
>>>>>>>> proposal.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of
>>>>>> user
>>>>>>>>>>>> code.
>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>>>>>> if we want to add something like this, I would
>>> prefer
>>>>>> to
>>>>>>>>> do
>>>>>>>>>>>> it
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>> separate KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling
>>>> bounce.
>>>>>>>> But
>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>>>>> understanding it would not be possible.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed
>>> to
>>>>>>>>>>> switch
>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to
>>>> switch
>>>>>>>>>>> from
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>>>>>>>>>> store to the new store (this happens after the last
>>>>>>>>>>> instance
>>>>>>>>>>>>>>>>> bounces a
>>>>>>>>>>>>>>>>>>>>>>>>> second time).
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is,
>>>> that
>>>>>>>>> it's
>>>>>>>>>>>>>>>>> unclear
>>>>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>>>>>>> to which from version 2 to version 3. The
>>>>>>>>>>>>>>>>> StreamsPartitionsAssignor is
>>>>>>>>>>>>>>>>>>>>>>>>> stateless by design, and thus, the information
>>> which
>>>>>>>>>>> version
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>> use must be passed in from externally -- and we
>>> want
>>>> to
>>>>>>>>> use
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> StreamsConfig to pass in this information.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> During upgrade, all new instanced have no
>>> information
>>>>>>>>> about
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> progress
>>>>>>>>>>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got
>>>>>>>> upgrades
>>>>>>>>>>>>>>>>> already).
>>>>>>>>>>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a
>>> version 3
>>>>>>>>>>>>>>>>> subscription.
>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>> leader also has this limited view on the world and
>>>> can
>>>>>>>>> only
>>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>>>>>>> 2 assignments back.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can
>>>>>> simplify
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> upgrade.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> We did consider to change the metadata to make
>>> later
>>>>>>>>>>> upgrades
>>>>>>>>>>>>>>> (ie,
>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change
>>>> the
>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>> storage format again -- as long as we don't change
>>>> it,
>>>>>> a
>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version"
>>> and
>>>>>>>>>>>>>> "supported
>>>>>>>>>>>>>>>>>>>>>>>>> version". This would allow the leader to switch to
>>>> the
>>>>>>>> new
>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader
>>> would
>>>>>>>>>>> receive
>>>>>>>>>>>>>>> "used
>>>>>>>>>>>>>>>>>>>>>>>>> version == old" and "supported version = old/new"
>>> --
>>>> as
>>>>>>>>>>> long
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>> least
>>>>>>>>>>>>>>>>>>>>>>>>> one instance sends a "supported version = old"
>>> leader
>>>>>>>>> sends
>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>>>>>>> assignment back. However, encoding both version
>>> would
>>>>>>>>> allow
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> leader can send a new version assignment back,
>>> right
>>>>>>>> after
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>>>>>>>> round or rebalance finished (all instances send
>>>>>>>> "supported
>>>>>>>>>>>>>>>>> version =
>>>>>>>>>>>>>>>>>>>>>>>>> new"). However, there are still two issues with
>>> this:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the
>>>> last
>>>>>>>>>>>>>> instance
>>>>>>>>>>>>>>>>>>>>> bounced,
>>>>>>>>>>>>>>>>>>>>>>>>> the new stores might not be ready to be used --
>>> this
>>>>>>>> could
>>>>>>>>>>>>>> lead
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> "downtime" as store must be restored before
>>>> processing
>>>>>>>> can
>>>>>>>>>>>>>>> resume.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again.
>>>> At
>>>>>>>>> this
>>>>>>>>>>>>>>>>> point, the
>>>>>>>>>>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and
>>>>>> thus
>>>>>>>>>>>> sends
>>>>>>>>>>>>>>>>> the old
>>>>>>>>>>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to
>>>> never
>>>>>>>>> fall
>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> old protocol after the switch to the new protocol.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> The second issue is minor and I guess if users
>>> set-up
>>>>>>>> the
>>>>>>>>>>>>>>> instance
>>>>>>>>>>>>>>>>>>>>>>>>> properly it could be avoided. However, the first
>>>> issue
>>>>>>>>>>> would
>>>>>>>>>>>>>>>>> prevent
>>>>>>>>>>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we
>>>>>>>> consider
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>>>>> change the metadata protocol only if a future
>>>> release,
>>>>>>>>>>>>>> encoding
>>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>>>>> used and supported version might be an advantage in
>>>> the
>>>>>>>>>>>> future
>>>>>>>>>>>>>>>>> and we
>>>>>>>>>>>>>>>>>>>>>>>>> could consider to add this information in 1.2
>>> release
>>>>>> to
>>>>>>>>>>>>>> prepare
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to
>>>> give
>>>>>>>> the
>>>>>>>>>>>>>>>>> instances
>>>>>>>>>>>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If
>>>> you
>>>>>>>>>>> would
>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> second rolling bounce before this, it would still
>>>> work
>>>>>>>> --
>>>>>>>>>>>>>>>>> however, you
>>>>>>>>>>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be
>>>> fully
>>>>>>>>>>>>>> restored
>>>>>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>>>>>> processing can resume.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Does this make sense?
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get
>>> rid
>>>>>> of
>>>>>>>>>>> the
>>>>>>>>>>>>>> 2nd
>>>>>>>>>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>>>>>>>> bounce?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary
>>>>>>>>> difference
>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to
>>>>>>>> decide
>>>>>>>>>>>>>>>>> whether to
>>>>>>>>>>>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3.
>>>>>>>>>>> (Actually,
>>>>>>>>>>>>>>>>> there is
>>>>>>>>>>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says
>>>> that
>>>>>>>> the
>>>>>>>>>>>> 2nd
>>>>>>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>>>>>>>>>> bounce should happen after all new state stores are
>>>>>>>>> created
>>>>>>>>>>>> by
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling
>>>>>>>> bounce,
>>>>>>>>>>> we
>>>>>>>>>>>>>>> say
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>> there is still a background thread, so it seems
>>> like
>>>> is
>>>>>>>> no
>>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>>>>>>>>>>>> requirement to wait for the new state stores to be
>>>>>>>>>>> created.)
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal
>>>> with
>>>>>>>>>>>>>>> mixed-mode
>>>>>>>>>>>>>>>>>>>>>>> (having
>>>>>>>>>>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer
>>>>>>>> group).
>>>>>>>>>>> It
>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic
>>>>>>>>>>>>>>>>> (somehow/somewhere)
>>>>>>>>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>>>>>>>>>> that:
>>>>>>>>>>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all
>>>>>>>>>>> instances
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>> running
>>>>>>>>>>>>>>>>>>>>>>>>> the new code.
>>>>>>>>>>>>>>>>>>>>>>>>>> * Once all the instances are running the new code,
>>>>>> then
>>>>>>>>>>> one
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> time,
>>>>>>>>>>>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader
>>>>>>>> still
>>>>>>>>>>>>>> hands
>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores
>>> are
>>>>>>>>> ready.
>>>>>>>>>>>>>>>>>>>>>>>>>> * Once all instances report that new stores are
>>>> ready,
>>>>>>>>>>>> Leader
>>>>>>>>>>>>>>>>> sends
>>>>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>>>>>>>> Assignment Version 3.
>>>>>>>>>>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version
>>> 3,
>>>>>> it
>>>>>>>>>>> can
>>>>>>>>>>>>>>>>> delete
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> old state store.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot
>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2
>>> rolling
>>>>>>>>>>>>>> restarts.
>>>>>>>>>>>>>>> No
>>>>>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just
>>> deploy
>>>>>>>> it,
>>>>>>>>>>> and
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> instances
>>>>>>>>>>>>>>>>>>>>>>>>> update themselves.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> The thing that made me think of this is that the
>>> "2
>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>>> bounces"
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in
>>>>>>>>>>>>>>>>>>>>>>>>> inter.broker.protocol.version and
>>>>>>>>>>> log.message.format.version.
>>>>>>>>>>>>>>> And
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> broker case, it seems like it would be possible
>>> (with
>>>>>>>> some
>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> course)
>>>>>>>>>>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar
>>>>>> auto-detection
>>>>>>>>> of
>>>>>>>>>>>>>>> broker
>>>>>>>>>>>>>>>>>>>>>>>>> capabilities and automatically do a switchover from
>>>>>>>>> old/new
>>>>>>>>>>>>>>>>> versions.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> -James
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <
>>>>>>>>>>>> bbej...@gmail.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval
>>>>>> methods
>>>>>>>>> on
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>>>>> interfaces.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a
>>>>>>>>> Predicate
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored
>>> with
>>>>>> the
>>>>>>>>>>>>>> record?
>>>>>>>>>>>>>>>>> Or
>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>> better left for users to implement themselves
>>> once
>>>>>> the
>>>>>>>>>>> data
>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>> been
>>>>>>>>>>>>>>>>>>>>>>>>>>> retrieved?
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <
>>>>>>>>>>>> yuzhih...@gmail.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to
>>>> which
>>>>>>>>>>>>>>> separator
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>> chosen.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current
>>> choice
>>>>>> is
>>>>>>>>>>>> good.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it
>>> is
>>>>>>>>> closer
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> English
>>>>>>>>>>>>>>>>>>>>>>>>>>>> grammar.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people
>>>> think.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax
>>> <
>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance
>>>> metadata
>>>>>>>>>>>> upgrade
>>>>>>>>>>>>>>> fix
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (addressing the mentioned
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>> https://issues.apache.org/jira/browse/KAFKA-6054
>>>> )
>>>>>>>> It
>>>>>>>>>>>>>> give a
>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works
>>>> including
>>>>>>>> a
>>>>>>>>>>>>>> system
>>>>>>>>>>>>>>>>> test:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are
>>> ready.
>>>> I
>>>>>>>>>>> agree
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to
>>>> give
>>>>>>>>>>> better
>>>>>>>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> context.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Ted:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the
>>>>>>>>>>>>>>>>>>>>> `processing.guarantee`
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and
>>>>>>>>>>> `exactly_once`
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>> values.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs
>>> dash
>>>>>>>> but
>>>>>>>>> I
>>>>>>>>>>>>>>> prefer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we
>>> can
>>>>>>>> also
>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>> it to
>>>>>>>>>>>>>>>>>>>>>>>>> `-`.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way
>>>> -- I
>>>>>>>>>>>>>> stripped
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> `With`
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be
>>> good
>>>> to
>>>>>>>>> get
>>>>>>>>>>>>>>>>> feedback
>>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @John:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it
>>> would
>>>>>>>> not
>>>>>>>>>>>>>> make a
>>>>>>>>>>>>>>>>>>>>>>>>> difference.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two
>>>>>>>>> version
>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>>>>> introduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I
>>>> don't
>>>>>>>> hit
>>>>>>>>>>> an
>>>>>>>>>>>>>>>>> issue
>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory
>>> instead
>>>> of
>>>>>>>>>>>>>>>>> `rocksdb-v2`
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several
>>>> questions
>>>>>>>>>>> queued
>>>>>>>>>>>>>>> up,
>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> were
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section...
>>>> oh,
>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state
>>>>>>>>> directory
>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/"
>>>> to
>>>>>>>>>>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
>>>>> /rocksdb-v2/storeName/":
>>>>>>>> if
>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>> put the
>>>>>>>>>>>>>>>>>>>>>>>>> "v2"
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path
>>> (i.e.,
>>>>>>>>>>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
>>>>>>> /rocksdb/storeName-v2/"),
>>>>>>>>>>> then
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level
>>>> directory
>>>>>>>>>>>>>>> structure.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people
>>>>>>>> running
>>>>>>>>>>>>>>>>> scripts to
>>>>>>>>>>>>>>>>>>>>>>>>>>>> monitor
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other
>>> such
>>>>>> use
>>>>>>>>>>>>>> cases.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <
>>>>>>>>>>>>>>> yuzhih...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Matthias:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ?
>>>> Underscore
>>>>>>>> may
>>>>>>>>>>>>>>>>> sometimes
>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>> miss
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more
>>>>>> friendly
>>>>>>>>> to
>>>>>>>>>>>>>>> user.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface
>>>>>> ReadOnlyKeyValueTimestampStore<K,
>>>>>>>>> V>
>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better
>>>> name
>>>>>>>>> for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> class ?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang
>>> Wang <
>>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section
>>>> and
>>>>>>>> it
>>>>>>>>>>>>>> looks
>>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> me,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also
>>>>>> share
>>>>>>>>> it
>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> people
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can take a look?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs
>>> like
>>>>>>>> this
>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it
>>> is
>>>>>>>>> better
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the
>>>> design
>>>>>>>>>>>>>>>>> discussion :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J.
>>>> Sax
>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams
>>> API
>>>>>> to
>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>> storing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the
>>>>>> basis
>>>>>>>>> to
>>>>>>>>>>>>>>>>> resolve
>>>>>>>>>>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about
>>> this!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to