Hello Matthias

I am curious as to the status of this KIP. TTL and expiry of records will
be extremely useful for several of our business use-cases, as well as
another KIP I had been working on.

Thanks



On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Matthias,
>
> Good stuff. Could you comment a bit on how future-proof is this change? For
> example, if we want to store both event timestamp "and" processing time in
> RocksDB will we then need another interface (e.g. called
> KeyValueWithTwoTimestampsStore)?
>
> Thanks
> Eno
>
> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Thanks for your input Guozhang and John.
> >
> > I see your point, that the upgrade API is not simple. If you don't
> > thinks it's valuable to make generic store upgrades possible (atm), we
> > can make the API internal, too. The impact is, that we only support a
> > predefined set up upgrades (ie, KV to KVwithTs, Windowed to
> > WindowedWithTS etc) for which we implement the internal interfaces.
> >
> > We can keep the design generic, so if we decide to make it public, we
> > don't need to re-invent it. This will also have the advantage, that we
> > can add upgrade pattern for other stores later, too.
> >
> > I also agree, that the `StoreUpgradeBuilder` is a little ugly, but it
> > was the only way I could find to design a generic upgrade interface. If
> > we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` would
> > become an internal interface I guess (don't think we can remove it).
> >
> > I will wait for more feedback about this and if nobody wants to keep it
> > as public API I will update the KIP accordingly. Will add some more
> > clarifications for different upgrade patterns in the mean time and fix
> > the typos/minor issues.
> >
> > About adding a new state UPGRADING: maybe we could do that. However, I
> > find it particularly difficult to make the estimation when we should
> > switch to RUNNING, thus, I am a little hesitant. Using store callbacks
> > or just logging the progress including some indication about the "lag"
> > might actually be sufficient. Not sure what others think?
> >
> > About "value before timestamp": no real reason and I think it does not
> > make any difference. Do you want to change it?
> >
> > About upgrade robustness: yes, we cannot control if an instance fails.
> > That is what I meant by "we need to write test". The upgrade should be
> > able to continuous even is an instance goes down (and we must make sure
> > that we don't end up in an invalid state that forces us to wipe out the
> > whole store). Thus, we need to write system tests that fail instances
> > during upgrade.
> >
> > For `in_place_offline` upgrade: I don't think we need this mode, because
> > people can do this via a single rolling bounce.
> >
> >  - prepare code and switch KV-Store to KVwithTs-Store
> >  - do a single rolling bounce (don't set any upgrade config)
> >
> > For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we
> > remove the `StoreUpgradeBuilder`) will detect that there is only an old
> > local KV store w/o TS, will start to restore the new KVwithTs store,
> > wipe out the old store and replace with the new store after restore is
> > finished, and start processing only afterwards. (I guess we need to
> > document this case -- will also add it to the KIP.)
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 8/9/18 1:10 PM, John Roesler wrote:
> > > Hi Matthias,
> > >
> > > I think this KIP is looking really good.
> > >
> > > I have a few thoughts to add to the others:
> > >
> > > 1. You mentioned at one point users needing to configure
> > > `upgrade.mode="null"`. I think this was a typo and you meant to say
> they
> > > should remove the config. If they really have to set it to a string
> > "null"
> > > or even set it to a null value but not remove it, it would be
> > unfortunate.
> > >
> > > 2. In response to Bill's comment #1 , you said that "The idea is that
> the
> > > upgrade should be robust and not fail. We need to write according
> tests".
> > > I may have misunderstood the conversation, but I don't think it's
> within
> > > our power to say that an instance won't fail. What if one of my
> computers
> > > catches on fire? What if I'm deployed in the cloud and one instance
> > > disappears and is replaced by a new one? Or what if one instance goes
> > AWOL
> > > for a long time and then suddenly returns? How will the upgrade process
> > > behave in light of such failures?
> > >
> > > 3. your thought about making in-place an offline mode is interesting,
> but
> > > it might be a bummer for on-prem users who wish to upgrade online, but
> > > cannot just add new machines to the pool. It could be a new upgrade
> mode
> > > "offline-in-place", though...
> > >
> > > 4. I was surprised to see that a user would need to modify the topology
> > to
> > > do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's
> > > suggestions would remove this necessity.
> > >
> > > Thanks for taking on this very complex but necessary work.
> > >
> > > -John
> > >
> > > On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Hello Matthias,
> > >>
> > >> Thanks for the updated KIP. Some more comments:
> > >>
> > >> 1. The current set of proposed API is a bit too complicated, which
> makes
> > >> the upgrade flow from user's perspective also a bit complex. I'd like
> to
> > >> check different APIs and discuss about their needs separately:
> > >>
> > >>     1.a. StoreProxy: needed for in-place upgrade only, between the
> first
> > >> and second rolling bounce, where the old-versioned stores can handle
> > >> new-versioned store APIs. I think such upgrade paths (i.e. from one
> > store
> > >> type to another) would not be very common: users may want to upgrade
> > from a
> > >> certain store engine to another, but the interface would likely be
> > staying
> > >> the same. Hence personally I'd suggest we keep it internally and only
> > >> consider exposing it in the future if it does become a common pattern.
> > >>
> > >>     1.b. ConverterStore / RecordConverter: needed for both in-place
> and
> > >> roll-over upgrade, between the first and second rolling bounces, for
> the
> > >> new versioned store to be able to read old-versioned changelog topics.
> > >> Firstly I think we should not expose key in the public APIs but only
> the
> > >> values, since allowing key format changes would break log compaction,
> > and
> > >> hence would not be compatible anyways. As for value format changes,
> > >> personally I think we can also keep its upgrade logic internally as it
> > may
> > >> not worth generalizing to user customizable logic.
> > >>
> > >>     1.c. If you agrees with 2.a/b above, then we can also remove "
> > >> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the public
> > APIs.
> > >>
> > >>     1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" is
> not
> > >> needed either given that we are exposing "ValueAndTimestamp" anyways.
> > I.e.
> > >> it is just a syntax sugar and for IQ, users can always just set a "
> > >> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" as the
> > new
> > >> interface does not provide any additional functions.
> > >>
> > >>
> > >> 2. Could we further categorize the upgrade flow for different use
> cases,
> > >> e.g. 1) DSL users where KeyValueWithTimestampStore will be used
> > >> automatically for non-windowed aggregate; 2) PAPI users who do not
> need
> > to
> > >> use KeyValueWithTimestampStore; 3) PAPI users who do want to switch to
> > >> KeyValueWithTimestampStore. Just to give my understanding for 3), the
> > >> upgrade flow for users may be simplified as the following (for both
> > >> in-place and roll-over):
> > >>
> > >>     * Update the jar to new version, make code changes from
> > KeyValueStore
> > >> to KeyValueWithTimestampStore, set upgrade config.
> > >>
> > >>     * First rolling bounce, and library code can internally use proxy
> /
> > >> converter based on the specified config to handle new APIs with old
> > stores,
> > >> while let new stores read from old changelog data.
> > >>
> > >>     * Reset upgrade config.
> > >>
> > >>     * Second rolling bounce, and the library code automatically turn
> off
> > >> logic for proxy / converter.
> > >>
> > >>
> > >> 3. Some more detailed proposals are needed for when to recommend users
> > to
> > >> trigger the second rolling bounce. I have one idea to share here: we
> > add a
> > >> new state to KafkaStreams, say UPGRADING, which is set when 1) upgrade
> > >> config is set, and 2) the new stores are still ramping up (for the
> > second
> > >> part, we can start with some internal hard-coded heuristics to decide
> > when
> > >> it is close to be ramped up). If either one of it is not true any
> more,
> > it
> > >> should transit to RUNNING. Users can then watch on this state, and
> > decide
> > >> to only trigger the second rebalance when the state has transited from
> > >> UPGRADING. They can also choose to cut over while the instance is
> still
> > >> UPGRADING, the downside is that after that the application may have
> long
> > >> restoration phase which is, to user's pov, unavailability periods.
> > >>
> > >>
> > >> Below are just some minor things on the wiki:
> > >>
> > >> 4. "proxy story" => "proxy store".
> > >>
> > >> 5. "use the a builder " => "use a builder"
> > >>
> > >> 6: "we add the record timestamp as a 8-byte (long) prefix to the
> value":
> > >> what's the rationale of putting the timestamp before the value, than
> > after
> > >> the value?
> > >>
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax <
> matth...@confluent.io>
> > >> wrote:
> > >>
> > >>> Thanks for the feedback Bill. I just update the KIP with some of your
> > >>> points.
> > >>>
> > >>>>> Regarding step 3C of the in-place upgrade (users needing to watch
> the
> > >>>>> restore process), I'm wondering if we want to provide a type of
> > >>>>> StateRestoreListener that could signal when the new stores have
> > >> reached
> > >>>>> parity with the existing old stores and that could be the signal to
> > >>> start
> > >>>>> second rolling rebalance?
> > >>>
> > >>> I think we can reuse the existing listeners, thus, I did not include
> > >>> anything in the KIP. About a signal to rebalance: this might be
> tricky.
> > >>> If we prepare the store "online", the active task will update the
> state
> > >>> continuously, and thus, state prepare is never finished. It will be
> the
> > >>> users responsibility to do the second rebalance (note, that the
> second
> > >>> rebalance will first finish the last delta of the upgrade to finish
> the
> > >>> upgrade before actual processing resumes). I clarified the KIP with
> > this
> > >>> regard a little bit.
> > >>>
> > >>>>> 1. Out of N instances, one fails midway through the process, would
> we
> > >>> allow
> > >>>>> the other instances to complete or just fail the entire upgrade?
> > >>>
> > >>> The idea is that the upgrade should be robust and not fail. We need
> to
> > >>> write according tests.
> > >>>
> > >>>>> 2. During the second rolling bounce, maybe we could rename the
> > current
> > >>>>> active directories vs. deleting them right away,  and when all the
> > >>> prepare
> > >>>>> task directories are successfully migrated then delete the previous
> > >>> active
> > >>>>> ones.
> > >>>
> > >>> Ack. Updated the KIP.
> > >>>
> > >>>>> 3. For the first rolling bounce we pause any processing any new
> > >> records
> > >>> and
> > >>>>> just allow the prepare tasks to restore, then once all prepare
> tasks
> > >>> have
> > >>>>> restored, it's a signal for the second round of rolling bounces and
> > >>> then as
> > >>>>> each task successfully renames its prepare directories and deletes
> > the
> > >>> old
> > >>>>> active task directories, normal processing of records resumes.
> > >>>
> > >>> The basic idea is to do an online upgrade to avoid downtime. We can
> > >>> discuss to offer both options... For the offline upgrade option, we
> > >>> could simplify user interaction and trigger the second rebalance
> > >>> automatically with the requirement that a user needs to update any
> > >> config.
> > >>>
> > >>> If might actually be worth to include this option: we know from
> > >>> experience with state restore, that regular processing slows down the
> > >>> restore. For roll_over upgrade, it would be a different story and
> > >>> upgrade should not be slowed down by regular processing. Thus, we
> > should
> > >>> even make in_place an offline upgrade and force people to use
> roll_over
> > >>> if they need onlint upgrade. Might be a fair tradeoff that may
> simplify
> > >>> the upgrade for the user and for the code complexity.
> > >>>
> > >>> Let's see what other think.
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 7/27/18 12:53 PM, Bill Bejeck wrote:
> > >>>> Hi Matthias,
> > >>>>
> > >>>> Thanks for the update and the working prototype, it helps with
> > >>>> understanding the KIP.
> > >>>>
> > >>>> I took an initial pass over this PR, and overall I find the
> interfaces
> > >>> and
> > >>>> approach to be reasonable.
> > >>>>
> > >>>> Regarding step 3C of the in-place upgrade (users needing to watch
> the
> > >>>> restore process), I'm wondering if we want to provide a type of
> > >>>> StateRestoreListener that could signal when the new stores have
> > reached
> > >>>> parity with the existing old stores and that could be the signal to
> > >> start
> > >>>> second rolling rebalance?
> > >>>>
> > >>>> Although you solicited feedback on the interfaces involved, I wanted
> > to
> > >>> put
> > >>>> down some thoughts that have come to mind reviewing this KIP again
> > >>>>
> > >>>> 1. Out of N instances, one fails midway through the process, would
> we
> > >>> allow
> > >>>> the other instances to complete or just fail the entire upgrade?
> > >>>> 2. During the second rolling bounce, maybe we could rename the
> current
> > >>>> active directories vs. deleting them right away,  and when all the
> > >>> prepare
> > >>>> task directories are successfully migrated then delete the previous
> > >>> active
> > >>>> ones.
> > >>>> 3. For the first rolling bounce we pause any processing any new
> > records
> > >>> and
> > >>>> just allow the prepare tasks to restore, then once all prepare tasks
> > >> have
> > >>>> restored, it's a signal for the second round of rolling bounces and
> > >> then
> > >>> as
> > >>>> each task successfully renames its prepare directories and deletes
> the
> > >>> old
> > >>>> active task directories, normal processing of records resumes.
> > >>>>
> > >>>> Thanks,
> > >>>> Bill
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax <
> > matth...@confluent.io
> > >>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>>
> > >>>>> KIP-268 (rebalance meatadata) is finished and included in AK 2.0
> > >>>>> release. Thus, I want to pick up this KIP again to get the RocksDB
> > >>>>> upgrade done for 2.1.
> > >>>>>
> > >>>>> I updated the KIP accordingly and also have a "prove of concept" PR
> > >>>>> ready (for "in place" upgrade only):
> > >>>>> https://github.com/apache/kafka/pull/5422/
> > >>>>>
> > >>>>> There a still open questions, but I want to collect early feedback
> on
> > >>>>> the proposed interfaces we need for the store upgrade. Also note,
> > that
> > >>>>> the KIP now also aim to define a generic upgrade path from any
> store
> > >>>>> format A to any other store format B. Adding timestamps is just a
> > >>>>> special case.
> > >>>>>
> > >>>>> I will continue to work on the PR and refine the KIP in the
> meantime,
> > >>> too.
> > >>>>>
> > >>>>> Looking forward to your feedback.
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>>
> > >>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote:
> > >>>>>> After some more thoughts, I want to follow John's suggestion and
> > >> split
> > >>>>>> upgrading the rebalance metadata from the store upgrade.
> > >>>>>>
> > >>>>>> I extracted the metadata upgrade into it's own KIP:
> > >>>>>>
> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%
> > >>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
> > >>>>>>
> > >>>>>> I'll update this KIP accordingly shortly. I also want to consider
> to
> > >>>>>> make the store format upgrade more flexible/generic. Atm, the KIP
> is
> > >>> too
> > >>>>>> much tailored to the DSL IMHO and does not encounter PAPI users
> that
> > >> we
> > >>>>>> should not force to upgrade the stores. I need to figure out the
> > >>> details
> > >>>>>> and follow up later.
> > >>>>>>
> > >>>>>> Please give feedback for the new KIP-268 on the corresponding
> > >>> discussion
> > >>>>>> thread.
> > >>>>>>
> > >>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't figure out
> a
> > >> way
> > >>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a fix
> for
> > >>>>>> future upgrades. Please share your thoughts.
> > >>>>>>
> > >>>>>> Thanks for all your feedback!
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
> > >>>>>>> @John: yes, we would throw if configs are missing (it's an
> > >>>>>>> implementation details IMHO and thus I did not include it in the
> > >> KIP)
> > >>>>>>>
> > >>>>>>> @Guozhang:
> > >>>>>>>
> > >>>>>>> 1) I understand know what you mean. We can certainly, allow all
> > >> values
> > >>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for
> `upgrade.from`
> > >>>>>>> parameter. I had a similar though once but decided to collapse
> them
> > >>> into
> > >>>>>>> one -- will update the KIP accordingly.
> > >>>>>>>
> > >>>>>>> 2) The idea to avoid any config would be, to always send both
> > >> request.
> > >>>>>>> If we add a config to eventually disable the old request, we
> don't
> > >>> gain
> > >>>>>>> anything with this approach. The question is really, if we are
> > >> willing
> > >>>>>>> to pay this overhead from 1.2 on -- note, it would be limited to
> 2
> > >>>>>>> versions and not grow further in future releases. More details in
> > >> (3)
> > >>>>>>>
> > >>>>>>> 3) Yes, this approach subsumes (2) for later releases and allows
> us
> > >> to
> > >>>>>>> stay with 2 "assignment strategies" we need to register, as the
> new
> > >>>>>>> assignment strategy will allow to "upgrade itself" via "version
> > >>>>>>> probing". Thus, (2) would only be a workaround to avoid a config
> if
> > >>>>>>> people upgrade from pre-1.2 releases.
> > >>>>>>>
> > >>>>>>> Thus, I don't think we need to register new "assignment
> strategies"
> > >>> and
> > >>>>>>> send empty subscriptions for older version.
> > >>>>>>>
> > >>>>>>> 4) I agree that this is a tricky thing to get right with a single
> > >>>>>>> rebalance. I share the concern that an application might never
> > catch
> > >>> up
> > >>>>>>> and thus the hot standby will never be ready.
> > >>>>>>>
> > >>>>>>> Maybe it's better to go with 2 rebalances for store upgrades. If
> we
> > >> do
> > >>>>>>> this, we also don't need to go with (2) and can get (3) in place
> > for
> > >>>>>>> future upgrades. I also think that changes to the metadata are
> more
> > >>>>>>> likely and thus allowing for single rolling bounce for this case
> is
> > >>> more
> > >>>>>>> important anyway. If we assume that store upgrade a rare, it
> might
> > >> be
> > >>> ok
> > >>>>>>> to sacrifice two rolling bounced for this case. It was just an
> idea
> > >> I
> > >>>>>>> wanted to share (even if I see the issues).
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> -Matthias
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
> > >>>>>>>> Hello Matthias, thanks for your replies.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 1) About the config names: actually I was trying to not expose
> > >>>>>>>> implementation details :) My main concern was that in your
> > proposal
> > >>> the
> > >>>>>>>> values need to cover the span of all the versions that are
> > actually
> > >>>>> using
> > >>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am
> > >>>>> upgrading
> > >>>>>>>> from any versions within this range I need to remember to use
> the
> > >>> value
> > >>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In my
> > >>> suggestion
> > >>>>> I
> > >>>>>>>> was trying to argue the benefit of just letting users to specify
> > >> the
> > >>>>> actual
> > >>>>>>>> Kafka version she's trying to upgrade from, than specifying a
> > range
> > >>> of
> > >>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as the
> > >> values,
> > >>>>> but
> > >>>>>>>> still using Kafka versions like broker's `internal.version`
> > config.
> > >>>>> But if
> > >>>>>>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you
> > >>> meant
> > >>>>> to
> > >>>>>>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or
> > >> "1.1"
> > >>>>> which
> > >>>>>>>> are all recognizable config values then I think we are actually
> on
> > >>> the
> > >>>>> same
> > >>>>>>>> page.
> > >>>>>>>>
> > >>>>>>>> 2) About the "multi-assignment" idea: yes it would increase the
> > >>> network
> > >>>>>>>> footprint, but not the message size, IF I'm not
> mis-understanding
> > >>> your
> > >>>>> idea
> > >>>>>>>> of registering multiple assignment. More details:
> > >>>>>>>>
> > >>>>>>>> In the JoinGroupRequest, in the protocols field we can encode
> > >>> multiple
> > >>>>>>>> protocols each with their different metadata. The coordinator
> will
> > >>>>> pick the
> > >>>>>>>> common one that everyone supports (if there are no common one,
> it
> > >>> will
> > >>>>> send
> > >>>>>>>> an error back; if there are multiple ones, it will pick the one
> > >> with
> > >>>>> most
> > >>>>>>>> votes, i.e. the one which was earlier in the encoded list).
> Since
> > >> our
> > >>>>>>>> current Streams rebalance protocol is still based on the
> consumer
> > >>>>>>>> coordinator, it means our protocol_type would be "consumer", but
> > >>>>> instead
> > >>>>>>>> the protocol type we can have multiple protocols like "streams",
> > >>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we need to
> > >>>>> implement a
> > >>>>>>>> different assignor class for each version and register all of
> them
> > >> in
> > >>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future
> if
> > >> we
> > >>>>>>>> re-factor our implementation to have our own client coordinator
> > >> layer
> > >>>>> like
> > >>>>>>>> Connect did, we can simplify this part of the implementation.
> But
> > >>> even
> > >>>>> for
> > >>>>>>>> now with the above approach this is still doable.
> > >>>>>>>>
> > >>>>>>>> On the broker side, the group coordinator will only persist a
> > group
> > >>>>> with
> > >>>>>>>> the selected protocol and its subscription metadata, e.g. if
> > >>>>> coordinator
> > >>>>>>>> decides to pick "streams_v2" it will only sends that protocol's
> > >>>>> metadata
> > >>>>>>>> from everyone to the leader to assign, AND when completing the
> > >>>>> rebalance it
> > >>>>>>>> will also only write the group metadata with that protocol and
> the
> > >>>>>>>> assignment only. In a word, although the network traffic maybe
> > >>>>> increased a
> > >>>>>>>> bit, it would not be a bummer in our trade-off. One corner
> > >> situation
> > >>> we
> > >>>>>>>> need to consider is how to stop registering very old assignors
> to
> > >>>>> avoid the
> > >>>>>>>> network traffic from increasing indefinitely, e.g. if you are
> > >> rolling
> > >>>>>>>> bounce from v2 to v3, then you'd not need to register v1
> assignor
> > >>>>> anymore,
> > >>>>>>>> but that would unfortunately still require some configs.
> > >>>>>>>>
> > >>>>>>>> 3) About the  "version probing" idea, I think that's a promising
> > >>>>> approach
> > >>>>>>>> as well, but if we are going to do the multi-assignment its
> value
> > >>> seems
> > >>>>>>>> subsumed? But I'm thinking maybe it can be added on top of
> > >>>>> multi-assignment
> > >>>>>>>> to save us from still requiring the config to avoid registering
> > all
> > >>> the
> > >>>>>>>> metadata for all version. More details:
> > >>>>>>>>
> > >>>>>>>> In the JoinGroupRequest, we still register all the assignor but
> > for
> > >>>>> all old
> > >>>>>>>> assignors we do not encode any metadata, i.e. the encoded data
> > >> would
> > >>>>> be:
> > >>>>>>>>
> > >>>>>>>> "streams_vN" : "encoded metadata"
> > >>>>>>>> "streams_vN-1":empty
> > >>>>>>>> "streams_vN-2":empty
> > >>>>>>>> ..
> > >>>>>>>> "streams_0":empty
> > >>>>>>>>
> > >>>>>>>> So the coordinator can still safely choose the latest common
> > >> version;
> > >>>>> and
> > >>>>>>>> then when leaders receive the subscription (note it should
> always
> > >>>>> recognize
> > >>>>>>>> that version), let's say it is streams_vN-2, if one of the
> > >>>>> subscriptions
> > >>>>>>>> are empty bytes, it will send the empty assignment with that
> > >> version
> > >>>>> number
> > >>>>>>>> encoded in the metadata. So in the second auto-triggered all
> > >> members
> > >>>>> would
> > >>>>>>>> send the metadata with that version:
> > >>>>>>>>
> > >>>>>>>> "streams_vN" : empty
> > >>>>>>>> "streams_vN-1" : empty
> > >>>>>>>> "streams_vN-2" : "encoded metadata"
> > >>>>>>>> ..
> > >>>>>>>> "streams_0":empty
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> By doing this we would not require any configs for users.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about
> > the
> > >>>>> details
> > >>>>>>>> so probably we'd need to fill that out before making a call. For
> > >>>>> example,
> > >>>>>>>> you mentioned "If we detect this situation, the Streams
> > application
> > >>>>> closes
> > >>>>>>>> corresponding active tasks as well as "hot standby" tasks, and
> > >>>>> re-creates
> > >>>>>>>> the new active tasks using the new store." How could we
> guarantee
> > >>> that
> > >>>>> the
> > >>>>>>>> gap between these two stores will keep decreasing than
> increasing
> > >> so
> > >>>>> we'll
> > >>>>>>>> eventually achieve the flip point? And also the longer we are
> > >> before
> > >>>>> the
> > >>>>>>>> flip point, the larger we are doubling the storage space, etc.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <
> > >>>>> matth...@confluent.io>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> @John, Guozhang,
> > >>>>>>>>>
> > >>>>>>>>> thanks a lot for your comments. Very long reply...
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> About upgrading the rebalance metadata:
> > >>>>>>>>>
> > >>>>>>>>> Another possibility to do this, would be to register multiple
> > >>>>> assignment
> > >>>>>>>>> strategies for the 1.2 applications. For this case, new
> instances
> > >>>>> would
> > >>>>>>>>> be configured to support both and the broker would pick the
> > >> version
> > >>>>> that
> > >>>>>>>>> all instances understand. The disadvantage would be, that we
> send
> > >>> much
> > >>>>>>>>> more data (ie, two subscriptions) in each rebalance as long as
> no
> > >>>>> second
> > >>>>>>>>> rebalance is done disabling the old protocol. Thus, using this
> > >>>>> approach
> > >>>>>>>>> would allow to avoid a second rebalance trading-off an
> increased
> > >>>>>>>>> rebalance network footprint (I also assume that this would
> > >> increase
> > >>>>> the
> > >>>>>>>>> message size that is written into __consumer_offsets topic?).
> > >>>>> Overall, I
> > >>>>>>>>> am not sure if this would be a good tradeoff, but it could
> avoid
> > a
> > >>>>>>>>> second rebalance (I have some more thoughts about stores below
> > >> that
> > >>>>> are
> > >>>>>>>>> relevant for single rebalance upgrade).
> > >>>>>>>>>
> > >>>>>>>>> For future upgrades we might be able to fix this though. I was
> > >>>>> thinking
> > >>>>>>>>> about the following:
> > >>>>>>>>>
> > >>>>>>>>> In the current implementation, the leader fails if it gets a
> > >>>>>>>>> subscription it does not understand (ie, newer version). We
> could
> > >>>>> change
> > >>>>>>>>> this behavior and let the leader send an empty assignment plus
> > >> error
> > >>>>>>>>> code (including supported version) back to the instance sending
> > >> the
> > >>>>>>>>> "bad" subscription. This would allow the following logic for an
> > >>>>>>>>> application instance:
> > >>>>>>>>>
> > >>>>>>>>>  - on startup, always send the latest subscription format
> > >>>>>>>>>  - if leader understands it, we get an assignment back an start
> > >>>>> processing
> > >>>>>>>>>  - if leader does not understand it, we get an empty assignment
> > >> and
> > >>>>>>>>> supported version back
> > >>>>>>>>>  - the application unsubscribe()/subscribe()/poll() again and
> > >>> sends a
> > >>>>>>>>> subscription using the leader's supported version
> > >>>>>>>>>
> > >>>>>>>>> This protocol would allow to do a single rolling bounce, and
> > >>>>> implements
> > >>>>>>>>> a "version probing" step, that might result in two executed
> > >>>>> rebalances.
> > >>>>>>>>> The advantage would be, that the user does not need to set any
> > >>> configs
> > >>>>>>>>> or do multiple rolling bounces, as Streams takes care of this
> > >>>>>>>>> automatically.
> > >>>>>>>>>
> > >>>>>>>>> One disadvantage would be, that two rebalances happen and that
> > for
> > >>> an
> > >>>>>>>>> error case during rebalance, we loose the information about the
> > >>>>>>>>> supported leader version and the "probing step" would happen a
> > >>> second
> > >>>>> time.
> > >>>>>>>>>
> > >>>>>>>>> If the leader is eventually updated, it will include it's own
> > >>>>> supported
> > >>>>>>>>> version in all assignments, to allow a "down graded"
> application
> > >> to
> > >>>>>>>>> upgrade its version later. Also, if a application fails, the
> > first
> > >>>>>>>>> probing would always be successful and only a single rebalance
> > >>>>> happens.
> > >>>>>>>>> If we use this protocol, I think we don't need any
> configuration
> > >>>>>>>>> parameter for future upgrades.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> About "upgrade.from" vs "internal.protocol.version":
> > >>>>>>>>>
> > >>>>>>>>> Users would set "upgrade.from" to the release version the
> > >>> current/old
> > >>>>>>>>> application is using. I think this is simpler, as users know
> this
> > >>>>>>>>> version. If we use "internal.protocol.version" instead, we
> expose
> > >>>>>>>>> implementation details and users need to know the protocol
> > version
> > >>>>> (ie,
> > >>>>>>>>> they need to map from the release version to the protocol
> > version;
> > >>> ie,
> > >>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version 2").
> > >>>>>>>>>
> > >>>>>>>>> Also the KIP states that for the second rolling bounce, the
> > >>>>>>>>> "upgrade.mode" config should be set back to `null` -- and thus,
> > >>>>>>>>> "upgrade.from" would not have any effect and is ignored (I will
> > >>> update
> > >>>>>>>>> the KIP to point out this dependency).
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> About your second point: I'll update the KIP accordingly to
> > >> describe
> > >>>>>>>>> future updates as well. Both will be different.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> One more point about upgrading the store format. I was thinking
> > >>> about
> > >>>>>>>>> avoiding the second rolling bounce all together in the future:
> > (1)
> > >>> the
> > >>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this
> > required
> > >>> to
> > >>>>>>>>> prepare the stores as "hot standbys" before we do the switch
> and
> > >>>>> delete
> > >>>>>>>>> the old stores. (3) the current proposal does the switch
> > >> "globally"
> > >>> --
> > >>>>>>>>> this is simpler and due to the required second rebalance no
> > >>>>> disadvantage.
> > >>>>>>>>> However, a global consistent switch over might actually not be
> > >>>>> required.
> > >>>>>>>>> For "in_place" upgrade, following the protocol from above, we
> > >> could
> > >>>>>>>>> decouple the store switch and each instance could switch its
> > store
> > >>>>>>>>> independently from all other instances. After the rolling
> bounce,
> > >> it
> > >>>>>>>>> seems to be ok to switch from the old store to the new store
> > >> "under
> > >>>>> the
> > >>>>>>>>> hood" whenever the new store is ready (this could even be done,
> > >>> before
> > >>>>>>>>> we switch to the new metadata version). Each time we update the
> > >> "hot
> > >>>>>>>>> standby" we check if it reached the "endOffset"  (or maybe X%
> > that
> > >>>>> could
> > >>>>>>>>> either be hardcoded or configurable). If we detect this
> > situation,
> > >>> the
> > >>>>>>>>> Streams application closes corresponding active tasks as well
> as
> > >>> "hot
> > >>>>>>>>> standby" tasks, and re-creates the new active tasks using the
> new
> > >>>>> store.
> > >>>>>>>>> (I need to go through the details once again, but it seems to
> be
> > >>>>>>>>> feasible.).
> > >>>>>>>>>
> > >>>>>>>>> Combining this strategy with the "multiple assignment" idea,
> > might
> > >>>>> even
> > >>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1 ->
> 1.2.
> > >>>>>>>>> Applications would just use the old store, as long as the new
> > >> store
> > >>> is
> > >>>>>>>>> not ready, even if the new metadata version is used already.
> > >>>>>>>>>
> > >>>>>>>>> For future upgrades, a single rebalance would be sufficient,
> too,
> > >>> even
> > >>>>>>>>> if the stores are upgraded. We would not need any config
> > >> parameters
> > >>> as
> > >>>>>>>>> the "probe" step allows us to detect the supported rebalance
> > >>> metadata
> > >>>>>>>>> version (and we would also not need multiple "assigmnent
> > >> strategies"
> > >>>>> as
> > >>>>>>>>> out own protocol encoded everything we need).
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Let me know what you think.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> -Matthias
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
> > >>>>>>>>>> @John:
> > >>>>>>>>>>
> > >>>>>>>>>> For the protocol version upgrade, it is only for the encoded
> > >>> metadata
> > >>>>>>>>> bytes
> > >>>>>>>>>> protocol, which are just bytes-in bytes-out from Consumer's
> pov,
> > >>> so I
> > >>>>>>>>> think
> > >>>>>>>>>> this change should be in the Streams layer as well.
> > >>>>>>>>>>
> > >>>>>>>>>> @Matthias:
> > >>>>>>>>>>
> > >>>>>>>>>> for 2), I agree that adding a "newest supported version"
> besides
> > >>> the
> > >>>>>>>>>> "currently used version for encoding" is a good idea to allow
> > >>> either
> > >>>>>>>>> case;
> > >>>>>>>>>> the key is that in Streams we would likely end up with a
> mapping
> > >>>>> from the
> > >>>>>>>>>> protocol version to the other persistent data format versions
> > >> such
> > >>> as
> > >>>>>>>>>> rocksDB, changelog. So with such a map we can actually achieve
> > >> both
> > >>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded protocol
> > >>>>> version's
> > >>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 ->
> 0.10.1
> > >>>>> leaders
> > >>>>>>>>>> can choose to use the newer version in the first rolling
> bounce
> > >>>>> directly
> > >>>>>>>>>> and we can document to users that they would not need to set
> > >>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded
> > >> protocol
> > >>>>>>>>> version
> > >>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and
> then
> > >> we
> > >>>>> can
> > >>>>>>>>>> document that "upgrade.mode" needs to be set in the first
> > rolling
> > >>>>> bounce
> > >>>>>>>>>> and reset in the second.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Besides that, some additional comments:
> > >>>>>>>>>>
> > >>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for users to
> > >> set
> > >>>>> than
> > >>>>>>>>>> "internal.protocol.version" where for the latter users only
> need
> > >> to
> > >>>>> set a
> > >>>>>>>>>> single version, while the Streams will map that version to the
> > >>>>> Streams
> > >>>>>>>>>> assignor's behavior as well as the data format. But maybe I
> did
> > >> not
> > >>>>> get
> > >>>>>>>>>> your idea about how the  "upgrade.from" config will be set,
> > >> because
> > >>>>> in
> > >>>>>>>>>> your Compatibility section how the upgrade.from config will be
> > >> set
> > >>>>> for
> > >>>>>>>>>> these two rolling bounces are not very clear: for example,
> > should
> > >>>>> user
> > >>>>>>>>>> reset it to null in the second rolling bounce?
> > >>>>>>>>>>
> > >>>>>>>>>> 2) In the upgrade path description, rather than talking about
> > >>>>> specific
> > >>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize
> all
> > >>> the
> > >>>>>>>>>> possible scenarios, even for future upgrade versions, what
> > should
> > >>> be
> > >>>>> the
> > >>>>>>>>>> standard operations? The categorized we can summarize to would
> > be
> > >>>>>>>>> (assuming
> > >>>>>>>>>> user upgrade from version X to version Y, where X and Y are
> > Kafka
> > >>>>>>>>> versions,
> > >>>>>>>>>> with the corresponding supported protocol version x and y):
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and hence
> no
> > >>>>>>>>> persistent
> > >>>>>>>>>> data formats have changed.
> > >>>>>>>>>>
> > >>>>>>>>>> b. x != y, but all persistent data format remains the same.
> > >>>>>>>>>>
> > >>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB format,
> > >>>>> changelog
> > >>>>>>>>>> format, has been changed.
> > >>>>>>>>>>
> > >>>>>>>>>> c. special case: we may need some special handling logic when
> > >>>>> "current
> > >>>>>>>>>> version" or "newest supported version" are not available in
> the
> > >>>>> protocol,
> > >>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> under the above scenarios, how many rolling bounces users need
> > to
> > >>>>>>>>> execute?
> > >>>>>>>>>> how they should set the configs in each rolling bounce? and
> how
> > >>>>> Streams
> > >>>>>>>>>> library will execute in these cases?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Guozhang
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <
> > >>>>> matth...@confluent.io>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Ted,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I still consider changing the KIP to include it right away --
> > if
> > >>>>> not,
> > >>>>>>>>>>> I'll create a JIRA. Need to think it through in more detail
> > >> first.
> > >>>>>>>>>>>
> > >>>>>>>>>>> (Same for other open questions like interface names -- I
> > collect
> > >>>>>>>>>>> feedback and update the KIP after we reach consensus :))
> > >>>>>>>>>>>
> > >>>>>>>>>>> -Matthias
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
> > >>>>>>>>>>>> Thanks for the details, Matthias.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> bq. change the metadata protocol only if a future release,
> > >>> encoding
> > >>>>>>>>> both
> > >>>>>>>>>>> used
> > >>>>>>>>>>>> and supported version might be an advantage
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Looks like encoding both versions wouldn't be implemented in
> > >> this
> > >>>>> KIP.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Please consider logging a JIRA with the encoding proposal.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Cheers
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <
> > >>>>> matth...@confluent.io
> > >>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> @Bill: I think a filter predicate should be part of user
> > code.
> > >>> And
> > >>>>>>>>> even
> > >>>>>>>>>>>>> if we want to add something like this, I would prefer to do
> > it
> > >>> in
> > >>>>> a
> > >>>>>>>>>>>>> separate KIP.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> @James: I would love to avoid a second rolling bounce. But
> > >> from
> > >>> my
> > >>>>>>>>>>>>> understanding it would not be possible.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to
> switch
> > >>> from
> > >>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to switch
> from
> > >> the
> > >>>>> old
> > >>>>>>>>>>>>> store to the new store (this happens after the last
> instance
> > >>>>> bounces a
> > >>>>>>>>>>>>> second time).
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The problem with one round of rolling bounces is, that it's
> > >>>>> unclear
> > >>>>>>>>> when
> > >>>>>>>>>>>>> to which from version 2 to version 3. The
> > >>>>> StreamsPartitionsAssignor is
> > >>>>>>>>>>>>> stateless by design, and thus, the information which
> version
> > >> it
> > >>>>> should
> > >>>>>>>>>>>>> use must be passed in from externally -- and we want to use
> > >> the
> > >>>>>>>>>>>>> StreamsConfig to pass in this information.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> During upgrade, all new instanced have no information about
> > >> the
> > >>>>>>>>> progress
> > >>>>>>>>>>>>> of the upgrade (ie, how many other instanced got upgrades
> > >>>>> already).
> > >>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3
> > >>>>> subscription.
> > >>>>>>>>> The
> > >>>>>>>>>>>>> leader also has this limited view on the world and can only
> > >> send
> > >>>>>>>>> version
> > >>>>>>>>>>>>> 2 assignments back.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify
> the
> > >>>>> upgrade.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> We did consider to change the metadata to make later
> upgrades
> > >>> (ie,
> > >>>>>>>>> from
> > >>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the
> > >> metadata
> > >>> or
> > >>>>>>>>>>>>> storage format again -- as long as we don't change it, a
> > >> single
> > >>>>>>>>> rolling
> > >>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and
> > >> "supported
> > >>>>>>>>>>>>> version". This would allow the leader to switch to the new
> > >>> version
> > >>>>>>>>>>>>> earlier and without a second rebalance: leader would
> receive
> > >>> "used
> > >>>>>>>>>>>>> version == old" and "supported version = old/new" -- as
> long
> > >> as
> > >>> at
> > >>>>>>>>> least
> > >>>>>>>>>>>>> one instance sends a "supported version = old" leader sends
> > >> old
> > >>>>>>>>> version
> > >>>>>>>>>>>>> assignment back. However, encoding both version would allow
> > >> that
> > >>>>> the
> > >>>>>>>>>>>>> leader can send a new version assignment back, right after
> > the
> > >>>>> first
> > >>>>>>>>>>>>> round or rebalance finished (all instances send "supported
> > >>>>> version =
> > >>>>>>>>>>>>> new"). However, there are still two issues with this:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 1) if we switch to the new format right after the last
> > >> instance
> > >>>>>>>>> bounced,
> > >>>>>>>>>>>>> the new stores might not be ready to be used -- this could
> > >> lead
> > >>> to
> > >>>>>>>>>>>>> "downtime" as store must be restored before processing can
> > >>> resume.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. At this
> > >>>>> point, the
> > >>>>>>>>>>>>> instance will still have "upgrade mode" enabled and thus
> > sends
> > >>>>> the old
> > >>>>>>>>>>>>> protocol data. However, it would be desirable to never fall
> > >> back
> > >>>>> to
> > >>>>>>>>> the
> > >>>>>>>>>>>>> old protocol after the switch to the new protocol.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The second issue is minor and I guess if users set-up the
> > >>> instance
> > >>>>>>>>>>>>> properly it could be avoided. However, the first issue
> would
> > >>>>> prevent
> > >>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we consider
> > >> that
> > >>> we
> > >>>>>>>>> might
> > >>>>>>>>>>>>> change the metadata protocol only if a future release,
> > >> encoding
> > >>>>> both
> > >>>>>>>>>>>>> used and supported version might be an advantage in the
> > future
> > >>>>> and we
> > >>>>>>>>>>>>> could consider to add this information in 1.2 release to
> > >> prepare
> > >>>>> for
> > >>>>>>>>>>> this.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Btw: monitoring the log, is also only required to give the
> > >>>>> instances
> > >>>>>>>>>>>>> enough time to prepare the stores in new format. If you
> would
> > >> do
> > >>>>> the
> > >>>>>>>>>>>>> second rolling bounce before this, it would still work --
> > >>>>> however, you
> > >>>>>>>>>>>>> might see app "downtime" as the new store must be fully
> > >> restored
> > >>>>>>>>> before
> > >>>>>>>>>>>>> processing can resume.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Does this make sense?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
> > >>>>>>>>>>>>>> Matthias,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid of
> the
> > >> 2nd
> > >>>>>>>>> rolling
> > >>>>>>>>>>>>> bounce?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> For the in-place upgrade, it seems like primary difference
> > >>>>> between
> > >>>>>>>>> the
> > >>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide
> > >>>>> whether to
> > >>>>>>>>>>> send
> > >>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3.
> (Actually,
> > >>>>> there is
> > >>>>>>>>>>>>> another difference mentioned in that the KIP says that the
> > 2nd
> > >>>>> rolling
> > >>>>>>>>>>>>> bounce should happen after all new state stores are created
> > by
> > >>> the
> > >>>>>>>>>>>>> background thread. However, within the 2nd rolling bounce,
> we
> > >>> say
> > >>>>> that
> > >>>>>>>>>>>>> there is still a background thread, so it seems like is no
> > >>> actual
> > >>>>>>>>>>>>> requirement to wait for the new state stores to be
> created.)
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal with
> > >>> mixed-mode
> > >>>>>>>>>>> (having
> > >>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer group).
> It
> > >>> seems
> > >>>>>>>>> like
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic
> > >>>>> (somehow/somewhere)
> > >>>>>>>>>>> such
> > >>>>>>>>>>>>> that:
> > >>>>>>>>>>>>>> * Instances send Subscription Version 2 until all
> instances
> > >> are
> > >>>>>>>>> running
> > >>>>>>>>>>>>> the new code.
> > >>>>>>>>>>>>>> * Once all the instances are running the new code, then
> one
> > >> at
> > >>> a
> > >>>>>>>>> time,
> > >>>>>>>>>>>>> the instances start sending Subscription V3. Leader still
> > >> hands
> > >>>>> out
> > >>>>>>>>>>>>> Assignment Version 2, until all new state stores are ready.
> > >>>>>>>>>>>>>> * Once all instances report that new stores are ready,
> > Leader
> > >>>>> sends
> > >>>>>>>>> out
> > >>>>>>>>>>>>> Assignment Version 3.
> > >>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3, it
> can
> > >>>>> delete
> > >>>>>>>>> the
> > >>>>>>>>>>>>> old state store.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot of
> > >>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling
> > >> restarts.
> > >>> No
> > >>>>>>>>> need
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy it,
> and
> > >>> the
> > >>>>>>>>>>> instances
> > >>>>>>>>>>>>> update themselves.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> What do you think?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> The thing that made me think of this is that the "2
> rolling
> > >>>>> bounces"
> > >>>>>>>>> is
> > >>>>>>>>>>>>> similar to what Kafka brokers have to do changes in
> > >>>>>>>>>>>>> inter.broker.protocol.version and
> log.message.format.version.
> > >>> And
> > >>>>> in
> > >>>>>>>>> the
> > >>>>>>>>>>>>> broker case, it seems like it would be possible (with some
> > >> work
> > >>> of
> > >>>>>>>>>>> course)
> > >>>>>>>>>>>>> to modify kafka to allow us to do similar auto-detection of
> > >>> broker
> > >>>>>>>>>>>>> capabilities and automatically do a switchover from old/new
> > >>>>> versions.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> -James
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <
> > bbej...@gmail.com
> > >>>
> > >>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Matthias,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I do have one question regarding the retrieval methods on
> > >> the
> > >>>>> new
> > >>>>>>>>>>>>>>> interfaces.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Would want to consider adding one method with a Predicate
> > >> that
> > >>>>> would
> > >>>>>>>>>>>>> allow
> > >>>>>>>>>>>>>>> for filtering records by the timestamp stored with the
> > >> record?
> > >>>>> Or
> > >>>>>>>>> is
> > >>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>> better left for users to implement themselves once the
> data
> > >>> has
> > >>>>> been
> > >>>>>>>>>>>>>>> retrieved?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>> Bill
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <
> > yuzhih...@gmail.com
> > >>>
> > >>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Matthias:
> > >>>>>>>>>>>>>>>> For my point #1, I don't have preference as to which
> > >>> separator
> > >>>>> is
> > >>>>>>>>>>>>> chosen.
> > >>>>>>>>>>>>>>>> Given the background you mentioned, current choice is
> > good.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is closer
> > to
> > >>>>> English
> > >>>>>>>>>>>>>>>> grammar.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Would be good to listen to what other people think.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <
> > >>>>>>>>>>> matth...@confluent.io
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks for the comments!
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> @Guozhang:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance metadata
> > upgrade
> > >>> fix
> > >>>>>>>>>>>>>>>>> (addressing the mentioned
> > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It
> > >> give a
> > >>>>> first
> > >>>>>>>>>>>>>>>>> impression how the metadata upgrade works including a
> > >> system
> > >>>>> test:
> > >>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. I
> agree
> > >>> that
> > >>>>> the
> > >>>>>>>>>>> KIP
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> complex am I ok with putting out more code to give
> better
> > >>>>>>>>> discussion
> > >>>>>>>>>>>>>>>>> context.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> @Ted:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the
> > >>>>>>>>> `processing.guarantee`
> > >>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and
> `exactly_once`
> > >> as
> > >>>>>>>>> values.
> > >>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash but I
> > >>> prefer
> > >>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can also
> > >> change
> > >>>>> it to
> > >>>>>>>>>>>>> `-`.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> About the interface name: I am fine either way -- I
> > >> stripped
> > >>>>> the
> > >>>>>>>>>>>>> `With`
> > >>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good to get
> > >>>>> feedback
> > >>>>>>>>>>> from
> > >>>>>>>>>>>>>>>>> others and pick the name the majority prefers.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> @John:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would not
> > >> make a
> > >>>>>>>>>>>>> difference.
> > >>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two version
> > >>> might
> > >>>>>>>>>>>>> introduce
> > >>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I don't hit
> an
> > >>>>> issue
> > >>>>>>>>> with
> > >>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead of
> > >>>>> `rocksdb-v2`
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
> > >>>>>>>>>>>>>>>>>> Hey Matthias,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several questions
> queued
> > >>> up,
> > >>>>> but
> > >>>>>>>>>>> they
> > >>>>>>>>>>>>>>>>> were
> > >>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... oh,
> well.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> One very minor thought re changing the state directory
> > >> from
> > >>>>>>>>>>>>>>>>> "/<state.dir>/<
> > >>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to
> > >>>>> "/<state.dir>/<
> > >>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if
> > you
> > >>>>> put the
> > >>>>>>>>>>>>> "v2"
> > >>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e.,
> > >>>>> "/<state.dir>/<
> > >>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"),
> then
> > >>> you
> > >>>>> get
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>>>> benefits without altering the high-level directory
> > >>> structure.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people running
> > >>>>> scripts to
> > >>>>>>>>>>>>>>>> monitor
> > >>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such use
> > >> cases.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>> -John
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <
> > >>> yuzhih...@gmail.com>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Matthias:
> > >>>>>>>>>>>>>>>>>>> Nicely written KIP.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may
> > >>>>> sometimes
> > >>>>>>>>> be
> > >>>>>>>>>>>>> miss
> > >>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly to
> > >>> user.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V>
> > {
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for
> > >> the
> > >>>>>>>>> class ?
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <
> > >>>>>>>>> wangg...@gmail.com
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section and it
> > >> looks
> > >>>>> good
> > >>>>>>>>> to
> > >>>>>>>>>>>>> me,
> > >>>>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also share it
> > >> here
> > >>>>> so
> > >>>>>>>>> that
> > >>>>>>>>>>>>>>>>> people
> > >>>>>>>>>>>>>>>>>>>> can take a look?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this
> > >> there
> > >>>>> are
> > >>>>>>>>>>> always
> > >>>>>>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is better
> > to
> > >>>>> have
> > >>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> implementation in parallel along with the design
> > >>>>> discussion :)
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax <
> > >>>>>>>>>>>>>>>> matth...@confluent.io
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to
> > allow
> > >>>>> storing
> > >>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to
> > >>>>> resolve
> > >>>>>>>>>>>>> multiple
> > >>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests).
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this!
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> >
> >
>

Reply via email to