After some more thoughts, I want to follow John's suggestion and split
upgrading the rebalance metadata from the store upgrade.

I extracted the metadata upgrade into it's own KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade

I'll update this KIP accordingly shortly. I also want to consider to
make the store format upgrade more flexible/generic. Atm, the KIP is too
much tailored to the DSL IMHO and does not encounter PAPI users that we
should not force to upgrade the stores. I need to figure out the details
and follow up later.

Please give feedback for the new KIP-268 on the corresponding discussion
thread.

@James: unfortunately, for upgrading to 1.2 I couldn't figure out a way
for a single rolling bounce upgrade. But KIP-268 proposes a fix for
future upgrades. Please share your thoughts.

Thanks for all your feedback!

-Matthias

On 3/12/18 11:56 PM, Matthias J. Sax wrote:
> @John: yes, we would throw if configs are missing (it's an
> implementation details IMHO and thus I did not include it in the KIP)
> 
> @Guozhang:
> 
> 1) I understand know what you mean. We can certainly, allow all values
> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for `upgrade.from`
> parameter. I had a similar though once but decided to collapse them into
> one -- will update the KIP accordingly.
> 
> 2) The idea to avoid any config would be, to always send both request.
> If we add a config to eventually disable the old request, we don't gain
> anything with this approach. The question is really, if we are willing
> to pay this overhead from 1.2 on -- note, it would be limited to 2
> versions and not grow further in future releases. More details in (3)
> 
> 3) Yes, this approach subsumes (2) for later releases and allows us to
> stay with 2 "assignment strategies" we need to register, as the new
> assignment strategy will allow to "upgrade itself" via "version
> probing". Thus, (2) would only be a workaround to avoid a config if
> people upgrade from pre-1.2 releases.
> 
> Thus, I don't think we need to register new "assignment strategies" and
> send empty subscriptions for older version.
> 
> 4) I agree that this is a tricky thing to get right with a single
> rebalance. I share the concern that an application might never catch up
> and thus the hot standby will never be ready.
> 
> Maybe it's better to go with 2 rebalances for store upgrades. If we do
> this, we also don't need to go with (2) and can get (3) in place for
> future upgrades. I also think that changes to the metadata are more
> likely and thus allowing for single rolling bounce for this case is more
> important anyway. If we assume that store upgrade a rare, it might be ok
> to sacrifice two rolling bounced for this case. It was just an idea I
> wanted to share (even if I see the issues).
> 
> 
> -Matthias
> 
> 
> 
> On 3/12/18 11:45 AM, Guozhang Wang wrote:
>> Hello Matthias, thanks for your replies.
>>
>>
>> 1) About the config names: actually I was trying to not expose
>> implementation details :) My main concern was that in your proposal the
>> values need to cover the span of all the versions that are actually using
>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am upgrading
>> from any versions within this range I need to remember to use the value
>> "0.10.1.x-1.1.x" than just specifying my old version. In my suggestion I
>> was trying to argue the benefit of just letting users to specify the actual
>> Kafka version she's trying to upgrade from, than specifying a range of
>> versions. I was not suggesting to use "v1, v2, v3" etc as the values, but
>> still using Kafka versions like broker's `internal.version` config. But if
>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you meant to
>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or "1.1" which
>> are all recognizable config values then I think we are actually on the same
>> page.
>>
>> 2) About the "multi-assignment" idea: yes it would increase the network
>> footprint, but not the message size, IF I'm not mis-understanding your idea
>> of registering multiple assignment. More details:
>>
>> In the JoinGroupRequest, in the protocols field we can encode multiple
>> protocols each with their different metadata. The coordinator will pick the
>> common one that everyone supports (if there are no common one, it will send
>> an error back; if there are multiple ones, it will pick the one with most
>> votes, i.e. the one which was earlier in the encoded list). Since our
>> current Streams rebalance protocol is still based on the consumer
>> coordinator, it means our protocol_type would be "consumer", but instead
>> the protocol type we can have multiple protocols like "streams",
>> "streams_v2", "streams_v3" etc. The downside is that we need to implement a
>> different assignor class for each version and register all of them in
>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future if we
>> re-factor our implementation to have our own client coordinator layer like
>> Connect did, we can simplify this part of the implementation. But even for
>> now with the above approach this is still doable.
>>
>> On the broker side, the group coordinator will only persist a group with
>> the selected protocol and its subscription metadata, e.g. if coordinator
>> decides to pick "streams_v2" it will only sends that protocol's metadata
>> from everyone to the leader to assign, AND when completing the rebalance it
>> will also only write the group metadata with that protocol and the
>> assignment only. In a word, although the network traffic maybe increased a
>> bit, it would not be a bummer in our trade-off. One corner situation we
>> need to consider is how to stop registering very old assignors to avoid the
>> network traffic from increasing indefinitely, e.g. if you are rolling
>> bounce from v2 to v3, then you'd not need to register v1 assignor anymore,
>> but that would unfortunately still require some configs.
>>
>> 3) About the  "version probing" idea, I think that's a promising approach
>> as well, but if we are going to do the multi-assignment its value seems
>> subsumed? But I'm thinking maybe it can be added on top of multi-assignment
>> to save us from still requiring the config to avoid registering all the
>> metadata for all version. More details:
>>
>> In the JoinGroupRequest, we still register all the assignor but for all old
>> assignors we do not encode any metadata, i.e. the encoded data would be:
>>
>> "streams_vN" : "encoded metadata"
>> "streams_vN-1":empty
>> "streams_vN-2":empty
>> ..
>> "streams_0":empty
>>
>> So the coordinator can still safely choose the latest common version; and
>> then when leaders receive the subscription (note it should always recognize
>> that version), let's say it is streams_vN-2, if one of the subscriptions
>> are empty bytes, it will send the empty assignment with that version number
>> encoded in the metadata. So in the second auto-triggered all members would
>> send the metadata with that version:
>>
>> "streams_vN" : empty
>> "streams_vN-1" : empty
>> "streams_vN-2" : "encoded metadata"
>> ..
>> "streams_0":empty
>>
>>
>> By doing this we would not require any configs for users.
>>
>>
>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about the details
>> so probably we'd need to fill that out before making a call. For example,
>> you mentioned "If we detect this situation, the Streams application closes
>> corresponding active tasks as well as "hot standby" tasks, and re-creates
>> the new active tasks using the new store." How could we guarantee that the
>> gap between these two stores will keep decreasing than increasing so we'll
>> eventually achieve the flip point? And also the longer we are before the
>> flip point, the larger we are doubling the storage space, etc.
>>
>>
>>
>> Guozhang
>>
>>
>>
>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> @John, Guozhang,
>>>
>>> thanks a lot for your comments. Very long reply...
>>>
>>>
>>> About upgrading the rebalance metadata:
>>>
>>> Another possibility to do this, would be to register multiple assignment
>>> strategies for the 1.2 applications. For this case, new instances would
>>> be configured to support both and the broker would pick the version that
>>> all instances understand. The disadvantage would be, that we send much
>>> more data (ie, two subscriptions) in each rebalance as long as no second
>>> rebalance is done disabling the old protocol. Thus, using this approach
>>> would allow to avoid a second rebalance trading-off an increased
>>> rebalance network footprint (I also assume that this would increase the
>>> message size that is written into __consumer_offsets topic?). Overall, I
>>> am not sure if this would be a good tradeoff, but it could avoid a
>>> second rebalance (I have some more thoughts about stores below that are
>>> relevant for single rebalance upgrade).
>>>
>>> For future upgrades we might be able to fix this though. I was thinking
>>> about the following:
>>>
>>> In the current implementation, the leader fails if it gets a
>>> subscription it does not understand (ie, newer version). We could change
>>> this behavior and let the leader send an empty assignment plus error
>>> code (including supported version) back to the instance sending the
>>> "bad" subscription. This would allow the following logic for an
>>> application instance:
>>>
>>>  - on startup, always send the latest subscription format
>>>  - if leader understands it, we get an assignment back an start processing
>>>  - if leader does not understand it, we get an empty assignment and
>>> supported version back
>>>  - the application unsubscribe()/subscribe()/poll() again and sends a
>>> subscription using the leader's supported version
>>>
>>> This protocol would allow to do a single rolling bounce, and implements
>>> a "version probing" step, that might result in two executed rebalances.
>>> The advantage would be, that the user does not need to set any configs
>>> or do multiple rolling bounces, as Streams takes care of this
>>> automatically.
>>>
>>> One disadvantage would be, that two rebalances happen and that for an
>>> error case during rebalance, we loose the information about the
>>> supported leader version and the "probing step" would happen a second time.
>>>
>>> If the leader is eventually updated, it will include it's own supported
>>> version in all assignments, to allow a "down graded" application to
>>> upgrade its version later. Also, if a application fails, the first
>>> probing would always be successful and only a single rebalance happens.
>>> If we use this protocol, I think we don't need any configuration
>>> parameter for future upgrades.
>>>
>>>
>>> About "upgrade.from" vs "internal.protocol.version":
>>>
>>> Users would set "upgrade.from" to the release version the current/old
>>> application is using. I think this is simpler, as users know this
>>> version. If we use "internal.protocol.version" instead, we expose
>>> implementation details and users need to know the protocol version (ie,
>>> they need to map from the release version to the protocol version; ie,
>>> "I am run 0.11.0 that runs with metadata protocol version 2").
>>>
>>> Also the KIP states that for the second rolling bounce, the
>>> "upgrade.mode" config should be set back to `null` -- and thus,
>>> "upgrade.from" would not have any effect and is ignored (I will update
>>> the KIP to point out this dependency).
>>>
>>>
>>>
>>> About your second point: I'll update the KIP accordingly to describe
>>> future updates as well. Both will be different.
>>>
>>>
>>>
>>> One more point about upgrading the store format. I was thinking about
>>> avoiding the second rolling bounce all together in the future: (1) the
>>> goal is to achieve an upgrade with zero downtime (2) this required to
>>> prepare the stores as "hot standbys" before we do the switch and delete
>>> the old stores. (3) the current proposal does the switch "globally" --
>>> this is simpler and due to the required second rebalance no disadvantage.
>>> However, a global consistent switch over might actually not be required.
>>> For "in_place" upgrade, following the protocol from above, we could
>>> decouple the store switch and each instance could switch its store
>>> independently from all other instances. After the rolling bounce, it
>>> seems to be ok to switch from the old store to the new store "under the
>>> hood" whenever the new store is ready (this could even be done, before
>>> we switch to the new metadata version). Each time we update the "hot
>>> standby" we check if it reached the "endOffset"  (or maybe X% that could
>>> either be hardcoded or configurable). If we detect this situation, the
>>> Streams application closes corresponding active tasks as well as "hot
>>> standby" tasks, and re-creates the new active tasks using the new store.
>>> (I need to go through the details once again, but it seems to be
>>> feasible.).
>>>
>>> Combining this strategy with the "multiple assignment" idea, might even
>>> enable us to do an single rolling bounce upgrade from 1.1 -> 1.2.
>>> Applications would just use the old store, as long as the new store is
>>> not ready, even if the new metadata version is used already.
>>>
>>> For future upgrades, a single rebalance would be sufficient, too, even
>>> if the stores are upgraded. We would not need any config parameters as
>>> the "probe" step allows us to detect the supported rebalance metadata
>>> version (and we would also not need multiple "assigmnent strategies" as
>>> out own protocol encoded everything we need).
>>>
>>>
>>> Let me know what you think.
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
>>>> @John:
>>>>
>>>> For the protocol version upgrade, it is only for the encoded metadata
>>> bytes
>>>> protocol, which are just bytes-in bytes-out from Consumer's pov, so I
>>> think
>>>> this change should be in the Streams layer as well.
>>>>
>>>> @Matthias:
>>>>
>>>> for 2), I agree that adding a "newest supported version" besides the
>>>> "currently used version for encoding" is a good idea to allow either
>>> case;
>>>> the key is that in Streams we would likely end up with a mapping from the
>>>> protocol version to the other persistent data format versions such as
>>>> rocksDB, changelog. So with such a map we can actually achieve both
>>>> scenarios, i.e. 1) one rolling bounce if the upgraded protocol version's
>>>> corresponding data format does not change, e.g. 0.10.0 -> 0.10.1 leaders
>>>> can choose to use the newer version in the first rolling bounce directly
>>>> and we can document to users that they would not need to set
>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded protocol
>>> version
>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and then we can
>>>> document that "upgrade.mode" needs to be set in the first rolling bounce
>>>> and reset in the second.
>>>>
>>>>
>>>> Besides that, some additional comments:
>>>>
>>>> 1) I still think "upgrade.from" is less intuitive for users to set than
>>>> "internal.protocol.version" where for the latter users only need to set a
>>>> single version, while the Streams will map that version to the Streams
>>>> assignor's behavior as well as the data format. But maybe I did not get
>>>> your idea about how the  "upgrade.from" config will be set, because in
>>>> your Compatibility section how the upgrade.from config will be set for
>>>> these two rolling bounces are not very clear: for example, should user
>>>> reset it to null in the second rolling bounce?
>>>>
>>>> 2) In the upgrade path description, rather than talking about specific
>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize all the
>>>> possible scenarios, even for future upgrade versions, what should be the
>>>> standard operations? The categorized we can summarize to would be
>>> (assuming
>>>> user upgrade from version X to version Y, where X and Y are Kafka
>>> versions,
>>>> with the corresponding supported protocol version x and y):
>>>>
>>>>
>>>> a. x == y, i.e. metadata protocol does not change, and hence no
>>> persistent
>>>> data formats have changed.
>>>>
>>>> b. x != y, but all persistent data format remains the same.
>>>>
>>>> b. x !=y, AND some persistene data format like RocksDB format, changelog
>>>> format, has been changed.
>>>>
>>>> c. special case: we may need some special handling logic when "current
>>>> version" or "newest supported version" are not available in the protocol,
>>>> i.e. for X as old as 0.10.0 and before 1.2.
>>>>
>>>>
>>>> under the above scenarios, how many rolling bounces users need to
>>> execute?
>>>> how they should set the configs in each rolling bounce? and how Streams
>>>> library will execute in these cases?
>>>>
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> Ted,
>>>>>
>>>>> I still consider changing the KIP to include it right away -- if not,
>>>>> I'll create a JIRA. Need to think it through in more detail first.
>>>>>
>>>>> (Same for other open questions like interface names -- I collect
>>>>> feedback and update the KIP after we reach consensus :))
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
>>>>>> Thanks for the details, Matthias.
>>>>>>
>>>>>> bq. change the metadata protocol only if a future release, encoding
>>> both
>>>>> used
>>>>>> and supported version might be an advantage
>>>>>>
>>>>>> Looks like encoding both versions wouldn't be implemented in this KIP.
>>>>>>
>>>>>> Please consider logging a JIRA with the encoding proposal.
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <matth...@confluent.io
>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> @Bill: I think a filter predicate should be part of user code. And
>>> even
>>>>>>> if we want to add something like this, I would prefer to do it in a
>>>>>>> separate KIP.
>>>>>>>
>>>>>>>
>>>>>>> @James: I would love to avoid a second rolling bounce. But from my
>>>>>>> understanding it would not be possible.
>>>>>>>
>>>>>>> The purpose of the second rolling bounce is indeed to switch from
>>>>>>> version 2 to 3. It also has a second purpose, to switch from the old
>>>>>>> store to the new store (this happens after the last instance bounces a
>>>>>>> second time).
>>>>>>>
>>>>>>> The problem with one round of rolling bounces is, that it's unclear
>>> when
>>>>>>> to which from version 2 to version 3. The StreamsPartitionsAssignor is
>>>>>>> stateless by design, and thus, the information which version it should
>>>>>>> use must be passed in from externally -- and we want to use the
>>>>>>> StreamsConfig to pass in this information.
>>>>>>>
>>>>>>> During upgrade, all new instanced have no information about the
>>> progress
>>>>>>> of the upgrade (ie, how many other instanced got upgrades already).
>>>>>>> Therefore, it's not safe for them to send a version 3 subscription.
>>> The
>>>>>>> leader also has this limited view on the world and can only send
>>> version
>>>>>>> 2 assignments back.
>>>>>>>
>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify the upgrade.
>>>>>>>
>>>>>>> We did consider to change the metadata to make later upgrades (ie,
>>> from
>>>>>>> 1.2 to 1.x) simpler though (for the case we change the metadata or
>>>>>>> storage format again -- as long as we don't change it, a single
>>> rolling
>>>>>>> bounce is sufficient), by encoding "used version" and "supported
>>>>>>> version". This would allow the leader to switch to the new version
>>>>>>> earlier and without a second rebalance: leader would receive "used
>>>>>>> version == old" and "supported version = old/new" -- as long as at
>>> least
>>>>>>> one instance sends a "supported version = old" leader sends old
>>> version
>>>>>>> assignment back. However, encoding both version would allow that the
>>>>>>> leader can send a new version assignment back, right after the first
>>>>>>> round or rebalance finished (all instances send "supported version =
>>>>>>> new"). However, there are still two issues with this:
>>>>>>>
>>>>>>> 1) if we switch to the new format right after the last instance
>>> bounced,
>>>>>>> the new stores might not be ready to be used -- this could lead to
>>>>>>> "downtime" as store must be restored before processing can resume.
>>>>>>>
>>>>>>> 2) Assume an instance fails and is restarted again. At this point, the
>>>>>>> instance will still have "upgrade mode" enabled and thus sends the old
>>>>>>> protocol data. However, it would be desirable to never fall back to
>>> the
>>>>>>> old protocol after the switch to the new protocol.
>>>>>>>
>>>>>>> The second issue is minor and I guess if users set-up the instance
>>>>>>> properly it could be avoided. However, the first issue would prevent
>>>>>>> "zero downtime" upgrades. Having said this, if we consider that we
>>> might
>>>>>>> change the metadata protocol only if a future release, encoding both
>>>>>>> used and supported version might be an advantage in the future and we
>>>>>>> could consider to add this information in 1.2 release to prepare for
>>>>> this.
>>>>>>>
>>>>>>> Btw: monitoring the log, is also only required to give the instances
>>>>>>> enough time to prepare the stores in new format. If you would do the
>>>>>>> second rolling bounce before this, it would still work -- however, you
>>>>>>> might see app "downtime" as the new store must be fully restored
>>> before
>>>>>>> processing can resume.
>>>>>>>
>>>>>>>
>>>>>>> Does this make sense?
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
>>>>>>>> Matthias,
>>>>>>>>
>>>>>>>> For all the upgrade paths, is it possible to get rid of the 2nd
>>> rolling
>>>>>>> bounce?
>>>>>>>>
>>>>>>>> For the in-place upgrade, it seems like primary difference between
>>> the
>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide whether to
>>>>> send
>>>>>>> Subscription Version 2 or Subscription Version 3.  (Actually, there is
>>>>>>> another difference mentioned in that the KIP says that the 2nd rolling
>>>>>>> bounce should happen after all new state stores are created by the
>>>>>>> background thread. However, within the 2nd rolling bounce, we say that
>>>>>>> there is still a background thread, so it seems like is no actual
>>>>>>> requirement to wait for the new state stores to be created.)
>>>>>>>>
>>>>>>>> The 2nd rolling bounce already knows how to deal with mixed-mode
>>>>> (having
>>>>>>> both Version 2 and Version 3 in the same consumer group). It seems
>>> like
>>>>> we
>>>>>>> could get rid of the 2nd bounce if we added logic (somehow/somewhere)
>>>>> such
>>>>>>> that:
>>>>>>>> * Instances send Subscription Version 2 until all instances are
>>> running
>>>>>>> the new code.
>>>>>>>> * Once all the instances are running the new code, then one at a
>>> time,
>>>>>>> the instances start sending Subscription V3. Leader still hands out
>>>>>>> Assignment Version 2, until all new state stores are ready.
>>>>>>>> * Once all instances report that new stores are ready, Leader sends
>>> out
>>>>>>> Assignment Version 3.
>>>>>>>> * Once an instance receives an Assignment Version 3, it can delete
>>> the
>>>>>>> old state store.
>>>>>>>>
>>>>>>>> Doing it that way seems like it would reduce a lot of
>>>>>>> operator/deployment overhead. No need to do 2 rolling restarts. No
>>> need
>>>>> to
>>>>>>> monitor logs for state store rebuild. You just deploy it, and the
>>>>> instances
>>>>>>> update themselves.
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> The thing that made me think of this is that the "2 rolling bounces"
>>> is
>>>>>>> similar to what Kafka brokers have to do changes in
>>>>>>> inter.broker.protocol.version and log.message.format.version. And in
>>> the
>>>>>>> broker case, it seems like it would be possible (with some work of
>>>>> course)
>>>>>>> to modify kafka to allow us to do similar auto-detection of broker
>>>>>>> capabilities and automatically do a switchover from old/new versions.
>>>>>>>>
>>>>>>>> -James
>>>>>>>>
>>>>>>>>
>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <bbej...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Matthias,
>>>>>>>>>
>>>>>>>>> Thanks for the KIP, it's a +1 from me.
>>>>>>>>>
>>>>>>>>> I do have one question regarding the retrieval methods on the new
>>>>>>>>> interfaces.
>>>>>>>>>
>>>>>>>>> Would want to consider adding one method with a Predicate that would
>>>>>>> allow
>>>>>>>>> for filtering records by the timestamp stored with the record?  Or
>>> is
>>>>>>> this
>>>>>>>>> better left for users to implement themselves once the data has been
>>>>>>>>> retrieved?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Bill
>>>>>>>>>
>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Matthias:
>>>>>>>>>> For my point #1, I don't have preference as to which separator is
>>>>>>> chosen.
>>>>>>>>>> Given the background you mentioned, current choice is good.
>>>>>>>>>>
>>>>>>>>>> For #2, I think my proposal is better since it is closer to English
>>>>>>>>>> grammar.
>>>>>>>>>>
>>>>>>>>>> Would be good to listen to what other people think.
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <
>>>>> matth...@confluent.io
>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>>
>>>>>>>>>>> @Guozhang:
>>>>>>>>>>>
>>>>>>>>>>> So far, there is one PR for the rebalance metadata upgrade fix
>>>>>>>>>>> (addressing the mentioned
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It give a first
>>>>>>>>>>> impression how the metadata upgrade works including a system test:
>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
>>>>>>>>>>>
>>>>>>>>>>> I can share other PRs as soon as they are ready. I agree that the
>>>>> KIP
>>>>>>> is
>>>>>>>>>>> complex am I ok with putting out more code to give better
>>> discussion
>>>>>>>>>>> context.
>>>>>>>>>>>
>>>>>>>>>>> @Ted:
>>>>>>>>>>>
>>>>>>>>>>> I picked `_` instead of `-` to align with the
>>> `processing.guarantee`
>>>>>>>>>>> parameter that accepts `at_least_one` and `exactly_once` as
>>> values.
>>>>>>>>>>> Personally, I don't care about underscore vs dash but I prefer
>>>>>>>>>>> consistency. If you feel strong about it, we can also change it to
>>>>>>> `-`.
>>>>>>>>>>>
>>>>>>>>>>> About the interface name: I am fine either way -- I stripped the
>>>>>>> `With`
>>>>>>>>>>> to keep the name a little shorter. Would be good to get feedback
>>>>> from
>>>>>>>>>>> others and pick the name the majority prefers.
>>>>>>>>>>>
>>>>>>>>>>> @John:
>>>>>>>>>>>
>>>>>>>>>>> We can certainly change it. I agree that it would not make a
>>>>>>> difference.
>>>>>>>>>>> I'll dig into the code to see if any of the two version might
>>>>>>> introduce
>>>>>>>>>>> undesired complexity and update the KIP if I don't hit an issue
>>> with
>>>>>>>>>>> putting the `-v2` to the store directory instead of `rocksdb-v2`
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
>>>>>>>>>>>> Hey Matthias,
>>>>>>>>>>>>
>>>>>>>>>>>> The KIP looks good to me. I had several questions queued up, but
>>>>> they
>>>>>>>>>>> were
>>>>>>>>>>>> all in the "rejected alternatives" section... oh, well.
>>>>>>>>>>>>
>>>>>>>>>>>> One very minor thought re changing the state directory from
>>>>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to "/<state.dir>/<
>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if you put the
>>>>>>> "v2"
>>>>>>>>>>>> marker on the storeName part of the path (i.e., "/<state.dir>/<
>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), then you get
>>>>> the
>>>>>>>>>> same
>>>>>>>>>>>> benefits without altering the high-level directory structure.
>>>>>>>>>>>>
>>>>>>>>>>>> It may not matter, but I could imagine people running scripts to
>>>>>>>>>> monitor
>>>>>>>>>>>> rocksdb disk usage for each task, or other such use cases.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> -John
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <yuzhih...@gmail.com>
>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Matthias:
>>>>>>>>>>>>> Nicely written KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may sometimes
>>> be
>>>>>>> miss
>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly to user.
>>>>>>>>>>>>>
>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> {
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for the
>>> class ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <
>>> wangg...@gmail.com
>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I've read through the upgrade patch section and it looks good
>>> to
>>>>>>> me,
>>>>>>>>>> if
>>>>>>>>>>>>> you
>>>>>>>>>>>>>> already have a WIP PR for it could you also share it here so
>>> that
>>>>>>>>>>> people
>>>>>>>>>>>>>> can take a look?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this there are
>>>>> always
>>>>>>>>>>> some
>>>>>>>>>>>>>> devil hidden in the details, so I think it is better to have
>>> the
>>>>>>>>>>>>>> implementation in parallel along with the design discussion :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax <
>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to allow storing
>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to resolve
>>>>>>> multiple
>>>>>>>>>>>>>>> tickets (issues and feature requests).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Looking forward to your comments about this!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to