Adam,

I am still working on it. Was pulled into a lot of other tasks lately so
this was delayed. Also had some discussions about simplifying the
upgrade path with some colleagues and I am prototyping this atm. Hope to
update the KIP accordingly soon.

-Matthias

On 11/10/18 7:41 AM, Adam Bellemare wrote:
> Hello Matthias
> 
> I am curious as to the status of this KIP. TTL and expiry of records will
> be extremely useful for several of our business use-cases, as well as
> another KIP I had been working on.
> 
> Thanks
> 
> 
> 
> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <eno.there...@gmail.com>
> wrote:
> 
>> Hi Matthias,
>>
>> Good stuff. Could you comment a bit on how future-proof is this change? For
>> example, if we want to store both event timestamp "and" processing time in
>> RocksDB will we then need another interface (e.g. called
>> KeyValueWithTwoTimestampsStore)?
>>
>> Thanks
>> Eno
>>
>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Thanks for your input Guozhang and John.
>>>
>>> I see your point, that the upgrade API is not simple. If you don't
>>> thinks it's valuable to make generic store upgrades possible (atm), we
>>> can make the API internal, too. The impact is, that we only support a
>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to
>>> WindowedWithTS etc) for which we implement the internal interfaces.
>>>
>>> We can keep the design generic, so if we decide to make it public, we
>>> don't need to re-invent it. This will also have the advantage, that we
>>> can add upgrade pattern for other stores later, too.
>>>
>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, but it
>>> was the only way I could find to design a generic upgrade interface. If
>>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` would
>>> become an internal interface I guess (don't think we can remove it).
>>>
>>> I will wait for more feedback about this and if nobody wants to keep it
>>> as public API I will update the KIP accordingly. Will add some more
>>> clarifications for different upgrade patterns in the mean time and fix
>>> the typos/minor issues.
>>>
>>> About adding a new state UPGRADING: maybe we could do that. However, I
>>> find it particularly difficult to make the estimation when we should
>>> switch to RUNNING, thus, I am a little hesitant. Using store callbacks
>>> or just logging the progress including some indication about the "lag"
>>> might actually be sufficient. Not sure what others think?
>>>
>>> About "value before timestamp": no real reason and I think it does not
>>> make any difference. Do you want to change it?
>>>
>>> About upgrade robustness: yes, we cannot control if an instance fails.
>>> That is what I meant by "we need to write test". The upgrade should be
>>> able to continuous even is an instance goes down (and we must make sure
>>> that we don't end up in an invalid state that forces us to wipe out the
>>> whole store). Thus, we need to write system tests that fail instances
>>> during upgrade.
>>>
>>> For `in_place_offline` upgrade: I don't think we need this mode, because
>>> people can do this via a single rolling bounce.
>>>
>>>  - prepare code and switch KV-Store to KVwithTs-Store
>>>  - do a single rolling bounce (don't set any upgrade config)
>>>
>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we
>>> remove the `StoreUpgradeBuilder`) will detect that there is only an old
>>> local KV store w/o TS, will start to restore the new KVwithTs store,
>>> wipe out the old store and replace with the new store after restore is
>>> finished, and start processing only afterwards. (I guess we need to
>>> document this case -- will also add it to the KIP.)
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 8/9/18 1:10 PM, John Roesler wrote:
>>>> Hi Matthias,
>>>>
>>>> I think this KIP is looking really good.
>>>>
>>>> I have a few thoughts to add to the others:
>>>>
>>>> 1. You mentioned at one point users needing to configure
>>>> `upgrade.mode="null"`. I think this was a typo and you meant to say
>> they
>>>> should remove the config. If they really have to set it to a string
>>> "null"
>>>> or even set it to a null value but not remove it, it would be
>>> unfortunate.
>>>>
>>>> 2. In response to Bill's comment #1 , you said that "The idea is that
>> the
>>>> upgrade should be robust and not fail. We need to write according
>> tests".
>>>> I may have misunderstood the conversation, but I don't think it's
>> within
>>>> our power to say that an instance won't fail. What if one of my
>> computers
>>>> catches on fire? What if I'm deployed in the cloud and one instance
>>>> disappears and is replaced by a new one? Or what if one instance goes
>>> AWOL
>>>> for a long time and then suddenly returns? How will the upgrade process
>>>> behave in light of such failures?
>>>>
>>>> 3. your thought about making in-place an offline mode is interesting,
>> but
>>>> it might be a bummer for on-prem users who wish to upgrade online, but
>>>> cannot just add new machines to the pool. It could be a new upgrade
>> mode
>>>> "offline-in-place", though...
>>>>
>>>> 4. I was surprised to see that a user would need to modify the topology
>>> to
>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's
>>>> suggestions would remove this necessity.
>>>>
>>>> Thanks for taking on this very complex but necessary work.
>>>>
>>>> -John
>>>>
>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>>
>>>>> Hello Matthias,
>>>>>
>>>>> Thanks for the updated KIP. Some more comments:
>>>>>
>>>>> 1. The current set of proposed API is a bit too complicated, which
>> makes
>>>>> the upgrade flow from user's perspective also a bit complex. I'd like
>> to
>>>>> check different APIs and discuss about their needs separately:
>>>>>
>>>>>     1.a. StoreProxy: needed for in-place upgrade only, between the
>> first
>>>>> and second rolling bounce, where the old-versioned stores can handle
>>>>> new-versioned store APIs. I think such upgrade paths (i.e. from one
>>> store
>>>>> type to another) would not be very common: users may want to upgrade
>>> from a
>>>>> certain store engine to another, but the interface would likely be
>>> staying
>>>>> the same. Hence personally I'd suggest we keep it internally and only
>>>>> consider exposing it in the future if it does become a common pattern.
>>>>>
>>>>>     1.b. ConverterStore / RecordConverter: needed for both in-place
>> and
>>>>> roll-over upgrade, between the first and second rolling bounces, for
>> the
>>>>> new versioned store to be able to read old-versioned changelog topics.
>>>>> Firstly I think we should not expose key in the public APIs but only
>> the
>>>>> values, since allowing key format changes would break log compaction,
>>> and
>>>>> hence would not be compatible anyways. As for value format changes,
>>>>> personally I think we can also keep its upgrade logic internally as it
>>> may
>>>>> not worth generalizing to user customizable logic.
>>>>>
>>>>>     1.c. If you agrees with 2.a/b above, then we can also remove "
>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the public
>>> APIs.
>>>>>
>>>>>     1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" is
>> not
>>>>> needed either given that we are exposing "ValueAndTimestamp" anyways.
>>> I.e.
>>>>> it is just a syntax sugar and for IQ, users can always just set a "
>>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" as the
>>> new
>>>>> interface does not provide any additional functions.
>>>>>
>>>>>
>>>>> 2. Could we further categorize the upgrade flow for different use
>> cases,
>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be used
>>>>> automatically for non-windowed aggregate; 2) PAPI users who do not
>> need
>>> to
>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to switch to
>>>>> KeyValueWithTimestampStore. Just to give my understanding for 3), the
>>>>> upgrade flow for users may be simplified as the following (for both
>>>>> in-place and roll-over):
>>>>>
>>>>>     * Update the jar to new version, make code changes from
>>> KeyValueStore
>>>>> to KeyValueWithTimestampStore, set upgrade config.
>>>>>
>>>>>     * First rolling bounce, and library code can internally use proxy
>> /
>>>>> converter based on the specified config to handle new APIs with old
>>> stores,
>>>>> while let new stores read from old changelog data.
>>>>>
>>>>>     * Reset upgrade config.
>>>>>
>>>>>     * Second rolling bounce, and the library code automatically turn
>> off
>>>>> logic for proxy / converter.
>>>>>
>>>>>
>>>>> 3. Some more detailed proposals are needed for when to recommend users
>>> to
>>>>> trigger the second rolling bounce. I have one idea to share here: we
>>> add a
>>>>> new state to KafkaStreams, say UPGRADING, which is set when 1) upgrade
>>>>> config is set, and 2) the new stores are still ramping up (for the
>>> second
>>>>> part, we can start with some internal hard-coded heuristics to decide
>>> when
>>>>> it is close to be ramped up). If either one of it is not true any
>> more,
>>> it
>>>>> should transit to RUNNING. Users can then watch on this state, and
>>> decide
>>>>> to only trigger the second rebalance when the state has transited from
>>>>> UPGRADING. They can also choose to cut over while the instance is
>> still
>>>>> UPGRADING, the downside is that after that the application may have
>> long
>>>>> restoration phase which is, to user's pov, unavailability periods.
>>>>>
>>>>>
>>>>> Below are just some minor things on the wiki:
>>>>>
>>>>> 4. "proxy story" => "proxy store".
>>>>>
>>>>> 5. "use the a builder " => "use a builder"
>>>>>
>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to the
>> value":
>>>>> what's the rationale of putting the timestamp before the value, than
>>> after
>>>>> the value?
>>>>>
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax <
>> matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the feedback Bill. I just update the KIP with some of your
>>>>>> points.
>>>>>>
>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to watch
>> the
>>>>>>>> restore process), I'm wondering if we want to provide a type of
>>>>>>>> StateRestoreListener that could signal when the new stores have
>>>>> reached
>>>>>>>> parity with the existing old stores and that could be the signal to
>>>>>> start
>>>>>>>> second rolling rebalance?
>>>>>>
>>>>>> I think we can reuse the existing listeners, thus, I did not include
>>>>>> anything in the KIP. About a signal to rebalance: this might be
>> tricky.
>>>>>> If we prepare the store "online", the active task will update the
>> state
>>>>>> continuously, and thus, state prepare is never finished. It will be
>> the
>>>>>> users responsibility to do the second rebalance (note, that the
>> second
>>>>>> rebalance will first finish the last delta of the upgrade to finish
>> the
>>>>>> upgrade before actual processing resumes). I clarified the KIP with
>>> this
>>>>>> regard a little bit.
>>>>>>
>>>>>>>> 1. Out of N instances, one fails midway through the process, would
>> we
>>>>>> allow
>>>>>>>> the other instances to complete or just fail the entire upgrade?
>>>>>>
>>>>>> The idea is that the upgrade should be robust and not fail. We need
>> to
>>>>>> write according tests.
>>>>>>
>>>>>>>> 2. During the second rolling bounce, maybe we could rename the
>>> current
>>>>>>>> active directories vs. deleting them right away,  and when all the
>>>>>> prepare
>>>>>>>> task directories are successfully migrated then delete the previous
>>>>>> active
>>>>>>>> ones.
>>>>>>
>>>>>> Ack. Updated the KIP.
>>>>>>
>>>>>>>> 3. For the first rolling bounce we pause any processing any new
>>>>> records
>>>>>> and
>>>>>>>> just allow the prepare tasks to restore, then once all prepare
>> tasks
>>>>>> have
>>>>>>>> restored, it's a signal for the second round of rolling bounces and
>>>>>> then as
>>>>>>>> each task successfully renames its prepare directories and deletes
>>> the
>>>>>> old
>>>>>>>> active task directories, normal processing of records resumes.
>>>>>>
>>>>>> The basic idea is to do an online upgrade to avoid downtime. We can
>>>>>> discuss to offer both options... For the offline upgrade option, we
>>>>>> could simplify user interaction and trigger the second rebalance
>>>>>> automatically with the requirement that a user needs to update any
>>>>> config.
>>>>>>
>>>>>> If might actually be worth to include this option: we know from
>>>>>> experience with state restore, that regular processing slows down the
>>>>>> restore. For roll_over upgrade, it would be a different story and
>>>>>> upgrade should not be slowed down by regular processing. Thus, we
>>> should
>>>>>> even make in_place an offline upgrade and force people to use
>> roll_over
>>>>>> if they need onlint upgrade. Might be a fair tradeoff that may
>> simplify
>>>>>> the upgrade for the user and for the code complexity.
>>>>>>
>>>>>> Let's see what other think.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote:
>>>>>>> Hi Matthias,
>>>>>>>
>>>>>>> Thanks for the update and the working prototype, it helps with
>>>>>>> understanding the KIP.
>>>>>>>
>>>>>>> I took an initial pass over this PR, and overall I find the
>> interfaces
>>>>>> and
>>>>>>> approach to be reasonable.
>>>>>>>
>>>>>>> Regarding step 3C of the in-place upgrade (users needing to watch
>> the
>>>>>>> restore process), I'm wondering if we want to provide a type of
>>>>>>> StateRestoreListener that could signal when the new stores have
>>> reached
>>>>>>> parity with the existing old stores and that could be the signal to
>>>>> start
>>>>>>> second rolling rebalance?
>>>>>>>
>>>>>>> Although you solicited feedback on the interfaces involved, I wanted
>>> to
>>>>>> put
>>>>>>> down some thoughts that have come to mind reviewing this KIP again
>>>>>>>
>>>>>>> 1. Out of N instances, one fails midway through the process, would
>> we
>>>>>> allow
>>>>>>> the other instances to complete or just fail the entire upgrade?
>>>>>>> 2. During the second rolling bounce, maybe we could rename the
>> current
>>>>>>> active directories vs. deleting them right away,  and when all the
>>>>>> prepare
>>>>>>> task directories are successfully migrated then delete the previous
>>>>>> active
>>>>>>> ones.
>>>>>>> 3. For the first rolling bounce we pause any processing any new
>>> records
>>>>>> and
>>>>>>> just allow the prepare tasks to restore, then once all prepare tasks
>>>>> have
>>>>>>> restored, it's a signal for the second round of rolling bounces and
>>>>> then
>>>>>> as
>>>>>>> each task successfully renames its prepare directories and deletes
>> the
>>>>>> old
>>>>>>> active task directories, normal processing of records resumes.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Bill
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax <
>>> matth...@confluent.io
>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in AK 2.0
>>>>>>>> release. Thus, I want to pick up this KIP again to get the RocksDB
>>>>>>>> upgrade done for 2.1.
>>>>>>>>
>>>>>>>> I updated the KIP accordingly and also have a "prove of concept" PR
>>>>>>>> ready (for "in place" upgrade only):
>>>>>>>> https://github.com/apache/kafka/pull/5422/
>>>>>>>>
>>>>>>>> There a still open questions, but I want to collect early feedback
>> on
>>>>>>>> the proposed interfaces we need for the store upgrade. Also note,
>>> that
>>>>>>>> the KIP now also aim to define a generic upgrade path from any
>> store
>>>>>>>> format A to any other store format B. Adding timestamps is just a
>>>>>>>> special case.
>>>>>>>>
>>>>>>>> I will continue to work on the PR and refine the KIP in the
>> meantime,
>>>>>> too.
>>>>>>>>
>>>>>>>> Looking forward to your feedback.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote:
>>>>>>>>> After some more thoughts, I want to follow John's suggestion and
>>>>> split
>>>>>>>>> upgrading the rebalance metadata from the store upgrade.
>>>>>>>>>
>>>>>>>>> I extracted the metadata upgrade into it's own KIP:
>>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%
>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
>>>>>>>>>
>>>>>>>>> I'll update this KIP accordingly shortly. I also want to consider
>> to
>>>>>>>>> make the store format upgrade more flexible/generic. Atm, the KIP
>> is
>>>>>> too
>>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI users
>> that
>>>>> we
>>>>>>>>> should not force to upgrade the stores. I need to figure out the
>>>>>> details
>>>>>>>>> and follow up later.
>>>>>>>>>
>>>>>>>>> Please give feedback for the new KIP-268 on the corresponding
>>>>>> discussion
>>>>>>>>> thread.
>>>>>>>>>
>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't figure out
>> a
>>>>> way
>>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a fix
>> for
>>>>>>>>> future upgrades. Please share your thoughts.
>>>>>>>>>
>>>>>>>>> Thanks for all your feedback!
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
>>>>>>>>>> @John: yes, we would throw if configs are missing (it's an
>>>>>>>>>> implementation details IMHO and thus I did not include it in the
>>>>> KIP)
>>>>>>>>>>
>>>>>>>>>> @Guozhang:
>>>>>>>>>>
>>>>>>>>>> 1) I understand know what you mean. We can certainly, allow all
>>>>> values
>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for
>> `upgrade.from`
>>>>>>>>>> parameter. I had a similar though once but decided to collapse
>> them
>>>>>> into
>>>>>>>>>> one -- will update the KIP accordingly.
>>>>>>>>>>
>>>>>>>>>> 2) The idea to avoid any config would be, to always send both
>>>>> request.
>>>>>>>>>> If we add a config to eventually disable the old request, we
>> don't
>>>>>> gain
>>>>>>>>>> anything with this approach. The question is really, if we are
>>>>> willing
>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be limited to
>> 2
>>>>>>>>>> versions and not grow further in future releases. More details in
>>>>> (3)
>>>>>>>>>>
>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and allows
>> us
>>>>> to
>>>>>>>>>> stay with 2 "assignment strategies" we need to register, as the
>> new
>>>>>>>>>> assignment strategy will allow to "upgrade itself" via "version
>>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a config
>> if
>>>>>>>>>> people upgrade from pre-1.2 releases.
>>>>>>>>>>
>>>>>>>>>> Thus, I don't think we need to register new "assignment
>> strategies"
>>>>>> and
>>>>>>>>>> send empty subscriptions for older version.
>>>>>>>>>>
>>>>>>>>>> 4) I agree that this is a tricky thing to get right with a single
>>>>>>>>>> rebalance. I share the concern that an application might never
>>> catch
>>>>>> up
>>>>>>>>>> and thus the hot standby will never be ready.
>>>>>>>>>>
>>>>>>>>>> Maybe it's better to go with 2 rebalances for store upgrades. If
>> we
>>>>> do
>>>>>>>>>> this, we also don't need to go with (2) and can get (3) in place
>>> for
>>>>>>>>>> future upgrades. I also think that changes to the metadata are
>> more
>>>>>>>>>> likely and thus allowing for single rolling bounce for this case
>> is
>>>>>> more
>>>>>>>>>> important anyway. If we assume that store upgrade a rare, it
>> might
>>>>> be
>>>>>> ok
>>>>>>>>>> to sacrifice two rolling bounced for this case. It was just an
>> idea
>>>>> I
>>>>>>>>>> wanted to share (even if I see the issues).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
>>>>>>>>>>> Hello Matthias, thanks for your replies.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 1) About the config names: actually I was trying to not expose
>>>>>>>>>>> implementation details :) My main concern was that in your
>>> proposal
>>>>>> the
>>>>>>>>>>> values need to cover the span of all the versions that are
>>> actually
>>>>>>>> using
>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am
>>>>>>>> upgrading
>>>>>>>>>>> from any versions within this range I need to remember to use
>> the
>>>>>> value
>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In my
>>>>>> suggestion
>>>>>>>> I
>>>>>>>>>>> was trying to argue the benefit of just letting users to specify
>>>>> the
>>>>>>>> actual
>>>>>>>>>>> Kafka version she's trying to upgrade from, than specifying a
>>> range
>>>>>> of
>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as the
>>>>> values,
>>>>>>>> but
>>>>>>>>>>> still using Kafka versions like broker's `internal.version`
>>> config.
>>>>>>>> But if
>>>>>>>>>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you
>>>>>> meant
>>>>>>>> to
>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or
>>>>> "1.1"
>>>>>>>> which
>>>>>>>>>>> are all recognizable config values then I think we are actually
>> on
>>>>>> the
>>>>>>>> same
>>>>>>>>>>> page.
>>>>>>>>>>>
>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would increase the
>>>>>> network
>>>>>>>>>>> footprint, but not the message size, IF I'm not
>> mis-understanding
>>>>>> your
>>>>>>>> idea
>>>>>>>>>>> of registering multiple assignment. More details:
>>>>>>>>>>>
>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can encode
>>>>>> multiple
>>>>>>>>>>> protocols each with their different metadata. The coordinator
>> will
>>>>>>>> pick the
>>>>>>>>>>> common one that everyone supports (if there are no common one,
>> it
>>>>>> will
>>>>>>>> send
>>>>>>>>>>> an error back; if there are multiple ones, it will pick the one
>>>>> with
>>>>>>>> most
>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded list).
>> Since
>>>>> our
>>>>>>>>>>> current Streams rebalance protocol is still based on the
>> consumer
>>>>>>>>>>> coordinator, it means our protocol_type would be "consumer", but
>>>>>>>> instead
>>>>>>>>>>> the protocol type we can have multiple protocols like "streams",
>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we need to
>>>>>>>> implement a
>>>>>>>>>>> different assignor class for each version and register all of
>> them
>>>>> in
>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future
>> if
>>>>> we
>>>>>>>>>>> re-factor our implementation to have our own client coordinator
>>>>> layer
>>>>>>>> like
>>>>>>>>>>> Connect did, we can simplify this part of the implementation.
>> But
>>>>>> even
>>>>>>>> for
>>>>>>>>>>> now with the above approach this is still doable.
>>>>>>>>>>>
>>>>>>>>>>> On the broker side, the group coordinator will only persist a
>>> group
>>>>>>>> with
>>>>>>>>>>> the selected protocol and its subscription metadata, e.g. if
>>>>>>>> coordinator
>>>>>>>>>>> decides to pick "streams_v2" it will only sends that protocol's
>>>>>>>> metadata
>>>>>>>>>>> from everyone to the leader to assign, AND when completing the
>>>>>>>> rebalance it
>>>>>>>>>>> will also only write the group metadata with that protocol and
>> the
>>>>>>>>>>> assignment only. In a word, although the network traffic maybe
>>>>>>>> increased a
>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One corner
>>>>> situation
>>>>>> we
>>>>>>>>>>> need to consider is how to stop registering very old assignors
>> to
>>>>>>>> avoid the
>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if you are
>>>>> rolling
>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1
>> assignor
>>>>>>>> anymore,
>>>>>>>>>>> but that would unfortunately still require some configs.
>>>>>>>>>>>
>>>>>>>>>>> 3) About the  "version probing" idea, I think that's a promising
>>>>>>>> approach
>>>>>>>>>>> as well, but if we are going to do the multi-assignment its
>> value
>>>>>> seems
>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top of
>>>>>>>> multi-assignment
>>>>>>>>>>> to save us from still requiring the config to avoid registering
>>> all
>>>>>> the
>>>>>>>>>>> metadata for all version. More details:
>>>>>>>>>>>
>>>>>>>>>>> In the JoinGroupRequest, we still register all the assignor but
>>> for
>>>>>>>> all old
>>>>>>>>>>> assignors we do not encode any metadata, i.e. the encoded data
>>>>> would
>>>>>>>> be:
>>>>>>>>>>>
>>>>>>>>>>> "streams_vN" : "encoded metadata"
>>>>>>>>>>> "streams_vN-1":empty
>>>>>>>>>>> "streams_vN-2":empty
>>>>>>>>>>> ..
>>>>>>>>>>> "streams_0":empty
>>>>>>>>>>>
>>>>>>>>>>> So the coordinator can still safely choose the latest common
>>>>> version;
>>>>>>>> and
>>>>>>>>>>> then when leaders receive the subscription (note it should
>> always
>>>>>>>> recognize
>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of the
>>>>>>>> subscriptions
>>>>>>>>>>> are empty bytes, it will send the empty assignment with that
>>>>> version
>>>>>>>> number
>>>>>>>>>>> encoded in the metadata. So in the second auto-triggered all
>>>>> members
>>>>>>>> would
>>>>>>>>>>> send the metadata with that version:
>>>>>>>>>>>
>>>>>>>>>>> "streams_vN" : empty
>>>>>>>>>>> "streams_vN-1" : empty
>>>>>>>>>>> "streams_vN-2" : "encoded metadata"
>>>>>>>>>>> ..
>>>>>>>>>>> "streams_0":empty
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> By doing this we would not require any configs for users.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about
>>> the
>>>>>>>> details
>>>>>>>>>>> so probably we'd need to fill that out before making a call. For
>>>>>>>> example,
>>>>>>>>>>> you mentioned "If we detect this situation, the Streams
>>> application
>>>>>>>> closes
>>>>>>>>>>> corresponding active tasks as well as "hot standby" tasks, and
>>>>>>>> re-creates
>>>>>>>>>>> the new active tasks using the new store." How could we
>> guarantee
>>>>>> that
>>>>>>>> the
>>>>>>>>>>> gap between these two stores will keep decreasing than
>> increasing
>>>>> so
>>>>>>>> we'll
>>>>>>>>>>> eventually achieve the flip point? And also the longer we are
>>>>> before
>>>>>>>> the
>>>>>>>>>>> flip point, the larger we are doubling the storage space, etc.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <
>>>>>>>> matth...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> @John, Guozhang,
>>>>>>>>>>>>
>>>>>>>>>>>> thanks a lot for your comments. Very long reply...
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> About upgrading the rebalance metadata:
>>>>>>>>>>>>
>>>>>>>>>>>> Another possibility to do this, would be to register multiple
>>>>>>>> assignment
>>>>>>>>>>>> strategies for the 1.2 applications. For this case, new
>> instances
>>>>>>>> would
>>>>>>>>>>>> be configured to support both and the broker would pick the
>>>>> version
>>>>>>>> that
>>>>>>>>>>>> all instances understand. The disadvantage would be, that we
>> send
>>>>>> much
>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as long as
>> no
>>>>>>>> second
>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, using this
>>>>>>>> approach
>>>>>>>>>>>> would allow to avoid a second rebalance trading-off an
>> increased
>>>>>>>>>>>> rebalance network footprint (I also assume that this would
>>>>> increase
>>>>>>>> the
>>>>>>>>>>>> message size that is written into __consumer_offsets topic?).
>>>>>>>> Overall, I
>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it could
>> avoid
>>> a
>>>>>>>>>>>> second rebalance (I have some more thoughts about stores below
>>>>> that
>>>>>>>> are
>>>>>>>>>>>> relevant for single rebalance upgrade).
>>>>>>>>>>>>
>>>>>>>>>>>> For future upgrades we might be able to fix this though. I was
>>>>>>>> thinking
>>>>>>>>>>>> about the following:
>>>>>>>>>>>>
>>>>>>>>>>>> In the current implementation, the leader fails if it gets a
>>>>>>>>>>>> subscription it does not understand (ie, newer version). We
>> could
>>>>>>>> change
>>>>>>>>>>>> this behavior and let the leader send an empty assignment plus
>>>>> error
>>>>>>>>>>>> code (including supported version) back to the instance sending
>>>>> the
>>>>>>>>>>>> "bad" subscription. This would allow the following logic for an
>>>>>>>>>>>> application instance:
>>>>>>>>>>>>
>>>>>>>>>>>>  - on startup, always send the latest subscription format
>>>>>>>>>>>>  - if leader understands it, we get an assignment back an start
>>>>>>>> processing
>>>>>>>>>>>>  - if leader does not understand it, we get an empty assignment
>>>>> and
>>>>>>>>>>>> supported version back
>>>>>>>>>>>>  - the application unsubscribe()/subscribe()/poll() again and
>>>>>> sends a
>>>>>>>>>>>> subscription using the leader's supported version
>>>>>>>>>>>>
>>>>>>>>>>>> This protocol would allow to do a single rolling bounce, and
>>>>>>>> implements
>>>>>>>>>>>> a "version probing" step, that might result in two executed
>>>>>>>> rebalances.
>>>>>>>>>>>> The advantage would be, that the user does not need to set any
>>>>>> configs
>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care of this
>>>>>>>>>>>> automatically.
>>>>>>>>>>>>
>>>>>>>>>>>> One disadvantage would be, that two rebalances happen and that
>>> for
>>>>>> an
>>>>>>>>>>>> error case during rebalance, we loose the information about the
>>>>>>>>>>>> supported leader version and the "probing step" would happen a
>>>>>> second
>>>>>>>> time.
>>>>>>>>>>>>
>>>>>>>>>>>> If the leader is eventually updated, it will include it's own
>>>>>>>> supported
>>>>>>>>>>>> version in all assignments, to allow a "down graded"
>> application
>>>>> to
>>>>>>>>>>>> upgrade its version later. Also, if a application fails, the
>>> first
>>>>>>>>>>>> probing would always be successful and only a single rebalance
>>>>>>>> happens.
>>>>>>>>>>>> If we use this protocol, I think we don't need any
>> configuration
>>>>>>>>>>>> parameter for future upgrades.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version":
>>>>>>>>>>>>
>>>>>>>>>>>> Users would set "upgrade.from" to the release version the
>>>>>> current/old
>>>>>>>>>>>> application is using. I think this is simpler, as users know
>> this
>>>>>>>>>>>> version. If we use "internal.protocol.version" instead, we
>> expose
>>>>>>>>>>>> implementation details and users need to know the protocol
>>> version
>>>>>>>> (ie,
>>>>>>>>>>>> they need to map from the release version to the protocol
>>> version;
>>>>>> ie,
>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version 2").
>>>>>>>>>>>>
>>>>>>>>>>>> Also the KIP states that for the second rolling bounce, the
>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- and thus,
>>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored (I will
>>>>>> update
>>>>>>>>>>>> the KIP to point out this dependency).
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> About your second point: I'll update the KIP accordingly to
>>>>> describe
>>>>>>>>>>>> future updates as well. Both will be different.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> One more point about upgrading the store format. I was thinking
>>>>>> about
>>>>>>>>>>>> avoiding the second rolling bounce all together in the future:
>>> (1)
>>>>>> the
>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this
>>> required
>>>>>> to
>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the switch
>> and
>>>>>>>> delete
>>>>>>>>>>>> the old stores. (3) the current proposal does the switch
>>>>> "globally"
>>>>>> --
>>>>>>>>>>>> this is simpler and due to the required second rebalance no
>>>>>>>> disadvantage.
>>>>>>>>>>>> However, a global consistent switch over might actually not be
>>>>>>>> required.
>>>>>>>>>>>> For "in_place" upgrade, following the protocol from above, we
>>>>> could
>>>>>>>>>>>> decouple the store switch and each instance could switch its
>>> store
>>>>>>>>>>>> independently from all other instances. After the rolling
>> bounce,
>>>>> it
>>>>>>>>>>>> seems to be ok to switch from the old store to the new store
>>>>> "under
>>>>>>>> the
>>>>>>>>>>>> hood" whenever the new store is ready (this could even be done,
>>>>>> before
>>>>>>>>>>>> we switch to the new metadata version). Each time we update the
>>>>> "hot
>>>>>>>>>>>> standby" we check if it reached the "endOffset"  (or maybe X%
>>> that
>>>>>>>> could
>>>>>>>>>>>> either be hardcoded or configurable). If we detect this
>>> situation,
>>>>>> the
>>>>>>>>>>>> Streams application closes corresponding active tasks as well
>> as
>>>>>> "hot
>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks using the
>> new
>>>>>>>> store.
>>>>>>>>>>>> (I need to go through the details once again, but it seems to
>> be
>>>>>>>>>>>> feasible.).
>>>>>>>>>>>>
>>>>>>>>>>>> Combining this strategy with the "multiple assignment" idea,
>>> might
>>>>>>>> even
>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1 ->
>> 1.2.
>>>>>>>>>>>> Applications would just use the old store, as long as the new
>>>>> store
>>>>>> is
>>>>>>>>>>>> not ready, even if the new metadata version is used already.
>>>>>>>>>>>>
>>>>>>>>>>>> For future upgrades, a single rebalance would be sufficient,
>> too,
>>>>>> even
>>>>>>>>>>>> if the stores are upgraded. We would not need any config
>>>>> parameters
>>>>>> as
>>>>>>>>>>>> the "probe" step allows us to detect the supported rebalance
>>>>>> metadata
>>>>>>>>>>>> version (and we would also not need multiple "assigmnent
>>>>> strategies"
>>>>>>>> as
>>>>>>>>>>>> out own protocol encoded everything we need).
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Let me know what you think.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
>>>>>>>>>>>>> @John:
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the protocol version upgrade, it is only for the encoded
>>>>>> metadata
>>>>>>>>>>>> bytes
>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from Consumer's
>> pov,
>>>>>> so I
>>>>>>>>>>>> think
>>>>>>>>>>>>> this change should be in the Streams layer as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Matthias:
>>>>>>>>>>>>>
>>>>>>>>>>>>> for 2), I agree that adding a "newest supported version"
>> besides
>>>>>> the
>>>>>>>>>>>>> "currently used version for encoding" is a good idea to allow
>>>>>> either
>>>>>>>>>>>> case;
>>>>>>>>>>>>> the key is that in Streams we would likely end up with a
>> mapping
>>>>>>>> from the
>>>>>>>>>>>>> protocol version to the other persistent data format versions
>>>>> such
>>>>>> as
>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually achieve
>>>>> both
>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded protocol
>>>>>>>> version's
>>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 ->
>> 0.10.1
>>>>>>>> leaders
>>>>>>>>>>>>> can choose to use the newer version in the first rolling
>> bounce
>>>>>>>> directly
>>>>>>>>>>>>> and we can document to users that they would not need to set
>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded
>>>>> protocol
>>>>>>>>>>>> version
>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and
>> then
>>>>> we
>>>>>>>> can
>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the first
>>> rolling
>>>>>>>> bounce
>>>>>>>>>>>>> and reset in the second.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Besides that, some additional comments:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for users to
>>>>> set
>>>>>>>> than
>>>>>>>>>>>>> "internal.protocol.version" where for the latter users only
>> need
>>>>> to
>>>>>>>> set a
>>>>>>>>>>>>> single version, while the Streams will map that version to the
>>>>>>>> Streams
>>>>>>>>>>>>> assignor's behavior as well as the data format. But maybe I
>> did
>>>>> not
>>>>>>>> get
>>>>>>>>>>>>> your idea about how the  "upgrade.from" config will be set,
>>>>> because
>>>>>>>> in
>>>>>>>>>>>>> your Compatibility section how the upgrade.from config will be
>>>>> set
>>>>>>>> for
>>>>>>>>>>>>> these two rolling bounces are not very clear: for example,
>>> should
>>>>>>>> user
>>>>>>>>>>>>> reset it to null in the second rolling bounce?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2) In the upgrade path description, rather than talking about
>>>>>>>> specific
>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize
>> all
>>>>>> the
>>>>>>>>>>>>> possible scenarios, even for future upgrade versions, what
>>> should
>>>>>> be
>>>>>>>> the
>>>>>>>>>>>>> standard operations? The categorized we can summarize to would
>>> be
>>>>>>>>>>>> (assuming
>>>>>>>>>>>>> user upgrade from version X to version Y, where X and Y are
>>> Kafka
>>>>>>>>>>>> versions,
>>>>>>>>>>>>> with the corresponding supported protocol version x and y):
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and hence
>> no
>>>>>>>>>>>> persistent
>>>>>>>>>>>>> data formats have changed.
>>>>>>>>>>>>>
>>>>>>>>>>>>> b. x != y, but all persistent data format remains the same.
>>>>>>>>>>>>>
>>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB format,
>>>>>>>> changelog
>>>>>>>>>>>>> format, has been changed.
>>>>>>>>>>>>>
>>>>>>>>>>>>> c. special case: we may need some special handling logic when
>>>>>>>> "current
>>>>>>>>>>>>> version" or "newest supported version" are not available in
>> the
>>>>>>>> protocol,
>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> under the above scenarios, how many rolling bounces users need
>>> to
>>>>>>>>>>>> execute?
>>>>>>>>>>>>> how they should set the configs in each rolling bounce? and
>> how
>>>>>>>> Streams
>>>>>>>>>>>>> library will execute in these cases?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <
>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ted,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I still consider changing the KIP to include it right away --
>>> if
>>>>>>>> not,
>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more detail
>>>>> first.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> (Same for other open questions like interface names -- I
>>> collect
>>>>>>>>>>>>>> feedback and update the KIP after we reach consensus :))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
>>>>>>>>>>>>>>> Thanks for the details, Matthias.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future release,
>>>>>> encoding
>>>>>>>>>>>> both
>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>> and supported version might be an advantage
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be implemented in
>>>>> this
>>>>>>>> KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding proposal.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <
>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of user
>>> code.
>>>>>> And
>>>>>>>>>>>> even
>>>>>>>>>>>>>>>> if we want to add something like this, I would prefer to do
>>> it
>>>>>> in
>>>>>>>> a
>>>>>>>>>>>>>>>> separate KIP.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling bounce. But
>>>>> from
>>>>>> my
>>>>>>>>>>>>>>>> understanding it would not be possible.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to
>> switch
>>>>>> from
>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to switch
>> from
>>>>> the
>>>>>>>> old
>>>>>>>>>>>>>>>> store to the new store (this happens after the last
>> instance
>>>>>>>> bounces a
>>>>>>>>>>>>>>>> second time).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is, that it's
>>>>>>>> unclear
>>>>>>>>>>>> when
>>>>>>>>>>>>>>>> to which from version 2 to version 3. The
>>>>>>>> StreamsPartitionsAssignor is
>>>>>>>>>>>>>>>> stateless by design, and thus, the information which
>> version
>>>>> it
>>>>>>>> should
>>>>>>>>>>>>>>>> use must be passed in from externally -- and we want to use
>>>>> the
>>>>>>>>>>>>>>>> StreamsConfig to pass in this information.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> During upgrade, all new instanced have no information about
>>>>> the
>>>>>>>>>>>> progress
>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got upgrades
>>>>>>>> already).
>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3
>>>>>>>> subscription.
>>>>>>>>>>>> The
>>>>>>>>>>>>>>>> leader also has this limited view on the world and can only
>>>>> send
>>>>>>>>>>>> version
>>>>>>>>>>>>>>>> 2 assignments back.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify
>> the
>>>>>>>> upgrade.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We did consider to change the metadata to make later
>> upgrades
>>>>>> (ie,
>>>>>>>>>>>> from
>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the
>>>>> metadata
>>>>>> or
>>>>>>>>>>>>>>>> storage format again -- as long as we don't change it, a
>>>>> single
>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and
>>>>> "supported
>>>>>>>>>>>>>>>> version". This would allow the leader to switch to the new
>>>>>> version
>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader would
>> receive
>>>>>> "used
>>>>>>>>>>>>>>>> version == old" and "supported version = old/new" -- as
>> long
>>>>> as
>>>>>> at
>>>>>>>>>>>> least
>>>>>>>>>>>>>>>> one instance sends a "supported version = old" leader sends
>>>>> old
>>>>>>>>>>>> version
>>>>>>>>>>>>>>>> assignment back. However, encoding both version would allow
>>>>> that
>>>>>>>> the
>>>>>>>>>>>>>>>> leader can send a new version assignment back, right after
>>> the
>>>>>>>> first
>>>>>>>>>>>>>>>> round or rebalance finished (all instances send "supported
>>>>>>>> version =
>>>>>>>>>>>>>>>> new"). However, there are still two issues with this:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the last
>>>>> instance
>>>>>>>>>>>> bounced,
>>>>>>>>>>>>>>>> the new stores might not be ready to be used -- this could
>>>>> lead
>>>>>> to
>>>>>>>>>>>>>>>> "downtime" as store must be restored before processing can
>>>>>> resume.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. At this
>>>>>>>> point, the
>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and thus
>>> sends
>>>>>>>> the old
>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to never fall
>>>>> back
>>>>>>>> to
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> old protocol after the switch to the new protocol.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The second issue is minor and I guess if users set-up the
>>>>>> instance
>>>>>>>>>>>>>>>> properly it could be avoided. However, the first issue
>> would
>>>>>>>> prevent
>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we consider
>>>>> that
>>>>>> we
>>>>>>>>>>>> might
>>>>>>>>>>>>>>>> change the metadata protocol only if a future release,
>>>>> encoding
>>>>>>>> both
>>>>>>>>>>>>>>>> used and supported version might be an advantage in the
>>> future
>>>>>>>> and we
>>>>>>>>>>>>>>>> could consider to add this information in 1.2 release to
>>>>> prepare
>>>>>>>> for
>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to give the
>>>>>>>> instances
>>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If you
>> would
>>>>> do
>>>>>>>> the
>>>>>>>>>>>>>>>> second rolling bounce before this, it would still work --
>>>>>>>> however, you
>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be fully
>>>>> restored
>>>>>>>>>>>> before
>>>>>>>>>>>>>>>> processing can resume.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Does this make sense?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
>>>>>>>>>>>>>>>>> Matthias,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid of
>> the
>>>>> 2nd
>>>>>>>>>>>> rolling
>>>>>>>>>>>>>>>> bounce?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary difference
>>>>>>>> between
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide
>>>>>>>> whether to
>>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3.
>> (Actually,
>>>>>>>> there is
>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says that the
>>> 2nd
>>>>>>>> rolling
>>>>>>>>>>>>>>>> bounce should happen after all new state stores are created
>>> by
>>>>>> the
>>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling bounce,
>> we
>>>>>> say
>>>>>>>> that
>>>>>>>>>>>>>>>> there is still a background thread, so it seems like is no
>>>>>> actual
>>>>>>>>>>>>>>>> requirement to wait for the new state stores to be
>> created.)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal with
>>>>>> mixed-mode
>>>>>>>>>>>>>> (having
>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer group).
>> It
>>>>>> seems
>>>>>>>>>>>> like
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic
>>>>>>>> (somehow/somewhere)
>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>> that:
>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all
>> instances
>>>>> are
>>>>>>>>>>>> running
>>>>>>>>>>>>>>>> the new code.
>>>>>>>>>>>>>>>>> * Once all the instances are running the new code, then
>> one
>>>>> at
>>>>>> a
>>>>>>>>>>>> time,
>>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader still
>>>>> hands
>>>>>>>> out
>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores are ready.
>>>>>>>>>>>>>>>>> * Once all instances report that new stores are ready,
>>> Leader
>>>>>>>> sends
>>>>>>>>>>>> out
>>>>>>>>>>>>>>>> Assignment Version 3.
>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3, it
>> can
>>>>>>>> delete
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> old state store.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot of
>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling
>>>>> restarts.
>>>>>> No
>>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy it,
>> and
>>>>>> the
>>>>>>>>>>>>>> instances
>>>>>>>>>>>>>>>> update themselves.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The thing that made me think of this is that the "2
>> rolling
>>>>>>>> bounces"
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in
>>>>>>>>>>>>>>>> inter.broker.protocol.version and
>> log.message.format.version.
>>>>>> And
>>>>>>>> in
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> broker case, it seems like it would be possible (with some
>>>>> work
>>>>>> of
>>>>>>>>>>>>>> course)
>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar auto-detection of
>>>>>> broker
>>>>>>>>>>>>>>>> capabilities and automatically do a switchover from old/new
>>>>>>>> versions.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -James
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <
>>> bbej...@gmail.com
>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Matthias,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval methods on
>>>>> the
>>>>>>>> new
>>>>>>>>>>>>>>>>>> interfaces.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a Predicate
>>>>> that
>>>>>>>> would
>>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored with the
>>>>> record?
>>>>>>>> Or
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> better left for users to implement themselves once the
>> data
>>>>>> has
>>>>>>>> been
>>>>>>>>>>>>>>>>>> retrieved?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <
>>> yuzhih...@gmail.com
>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Matthias:
>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to which
>>>>>> separator
>>>>>>>> is
>>>>>>>>>>>>>>>> chosen.
>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current choice is
>>> good.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is closer
>>> to
>>>>>>>> English
>>>>>>>>>>>>>>>>>>> grammar.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people think.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <
>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> @Guozhang:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance metadata
>>> upgrade
>>>>>> fix
>>>>>>>>>>>>>>>>>>>> (addressing the mentioned
>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It
>>>>> give a
>>>>>>>> first
>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works including a
>>>>> system
>>>>>>>> test:
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. I
>> agree
>>>>>> that
>>>>>>>> the
>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to give
>> better
>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>> context.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> @Ted:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the
>>>>>>>>>>>> `processing.guarantee`
>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and
>> `exactly_once`
>>>>> as
>>>>>>>>>>>> values.
>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash but I
>>>>>> prefer
>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can also
>>>>> change
>>>>>>>> it to
>>>>>>>>>>>>>>>> `-`.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way -- I
>>>>> stripped
>>>>>>>> the
>>>>>>>>>>>>>>>> `With`
>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good to get
>>>>>>>> feedback
>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> @John:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would not
>>>>> make a
>>>>>>>>>>>>>>>> difference.
>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two version
>>>>>> might
>>>>>>>>>>>>>>>> introduce
>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I don't hit
>> an
>>>>>>>> issue
>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead of
>>>>>>>> `rocksdb-v2`
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
>>>>>>>>>>>>>>>>>>>>> Hey Matthias,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several questions
>> queued
>>>>>> up,
>>>>>>>> but
>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>> were
>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... oh,
>> well.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state directory
>>>>> from
>>>>>>>>>>>>>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to
>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if
>>> you
>>>>>>>> put the
>>>>>>>>>>>>>>>> "v2"
>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e.,
>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"),
>> then
>>>>>> you
>>>>>>>> get
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level directory
>>>>>> structure.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people running
>>>>>>>> scripts to
>>>>>>>>>>>>>>>>>>> monitor
>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such use
>>>>> cases.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <
>>>>>> yuzhih...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Matthias:
>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may
>>>>>>>> sometimes
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> miss
>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly to
>>>>>> user.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V>
>>> {
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for
>>>>> the
>>>>>>>>>>>> class ?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <
>>>>>>>>>>>> wangg...@gmail.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section and it
>>>>> looks
>>>>>>>> good
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> me,
>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also share it
>>>>> here
>>>>>>>> so
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> people
>>>>>>>>>>>>>>>>>>>>>>> can take a look?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this
>>>>> there
>>>>>>>> are
>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is better
>>> to
>>>>>>>> have
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the design
>>>>>>>> discussion :)
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to
>>> allow
>>>>>>>> storing
>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to
>>>>>>>> resolve
>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this!
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to