All, Thanks for the comments, I'll respond to all of your comments in the recieved order.
Guozhang, > And if [join-name] not specified, stay the same, which is: > * [previous-processor-name]-repartition for both Stream-Stream (S-S) join and S-T join I believe the current approach is [appId]-[previous-processor-name]-repartition, but the main point is if no name is provided, the current naming scheme will remain in place > I think it is more natural to rename it to > * [app-id]-[join-name]-left-repartition > * [app-id]-[join-name]-right-repartition I agree and I'll update the KIP accordingly > 2 I'd suggest to use the name to also define the corresponding processor I'll address this in conjunction with comments from Matthias and your second round of comments. >3. Could you also specify how this would affect the optimization for merging multiple repartition topics? With an optimization where we reduce the number of repartition topics, if the user has not named the repartition topics, we'll generate a name for the optimized repartition topic. In the case where users have provided names for the repartition topics, we'll reuse the name from the first of the named repartition topics. However, optimizing a topology by reducing the number of repartition topics may require an application reset, but that is beyond the scope of this KIP. I'll update the KIP with this case I've also addressed 4.a and 4.b in the "Compatibility, Deprecation and Migration plan" section Matthias > 1) We should either use `[app-id]-this|other-[join-name]-repartition` or > `app-id]-[join-name]-left|right-repartition` but we should not change > the pattern depending if the user specifies a name of not. I've updated the KIP to say in the case of Joins and a name is provided we'll go with `app-id-name-left|right-repartition` in all cases if the name is not provided we will continue to use the current naming process. > 2) I didn't see why we would need to do this in this KIP. KIP-307 seems > to be orthogonal, and thus KIP-372 should not change any processor > names, but KIP-307 should define a holistic strategy for all processor. > Otherwise, we might up with different strategies or revert what we > decide in this KIP if it's not compatible with KIP-307. I agree and have updated the name of this KIP to be more precise. Eno > I know we don't normally have a "Related work" section in KIPs, but > sometimes I find it useful to see what others have done in similar cases. > Since this will be important for rolling re-deployments, I wonder what > other frameworks like Flink (or Samza) have done in these cases. Perhaps > they have done nothing, in which case it's fine to do this from first > principles, but IMO it would be good to know just to make sure we're > heading in the right direction. > Also I don't get a good feel for how much work this will be for an end user > who is doing the rolling deployment, perhaps an end-to-end example would > help. I can't speak for the other projects, so I'll have to take a look and see. As for getting a feel for how much work this will be for an end user, could you look at the updated KIP and see if it clears things up? Matthias > (1) For `Grouped` should we add `with(String name, Serde<K> key, > Serde<V> value)` to allow specifying all parameters at once? > Produced/Consumed/Serialized etc follow a similar pattern. There is one > static method for each config parameter, plus one static method that > accepts all parameters. Might be more consistent if we follow this pattern. > (2) It seems that `Serialized` is only used in `groupBy` and > `groupByKey` -- because both methods accepting `Serialized` parameter > are deprecated and replaced with methods accepting `Grouped`, it seems > that we also want to deprecate `Serialized`? > (3) About naming repartition topics: thinking about this once more, I > actually prefer to use `left|right` instead of `this|other` :) For 1, I agree and I'll update the KIP. For 2, Yes that seems to make sense and I'll update the KIP Part 3 addressed in earlier comments and the KIP is clarified. Guozhang > Just to clarify on 2): currently KIP-307 do not have proposed APIs for > `groupBy/groupByKey` naming schemes, and for joins its current proposal is > to extend ValueJoiner with Named and hence this part is what I meant to > have "overlaps". > Thinking about it a bit more, since Joined is only used for S-S and S-T > joins but not T-T joins, having the naming schemes on Joined would not be > sufficient, and extending ValueJoiner would indeed be a good choice. > As for groupBy, since it is using KeyValueMapper which is supposed to be > extended with Named in KIP-307, it does not require extending to processor > nodes as well. > Given this, I'm fine with limiting the scope to only repartition topics. I agree with limiting the scope of this KIP to only repartition topics and have updated the KIP John > I think it's slightly out of scope for this KIP, but I'm not sure it's > right to add a name to ValueJoiner or KeyValueMapper. > Both of those are "functional interfaces", that is, they are basically > named functions. > > It seems like we should preserve this property both to provide a clean > separation between the operation *logic* and the operator *config*. > > It's a good point that `Joined` doesn't appear in the KTable join. However, > KTable join does have the variant that accepts "Materialized". > This makes it similar to the grouping operators that currently accept > "Serialized", namely the operator is already configurable, but only in > a narrow way (we can only configure the materialization of the table or the > serialization of the grouping). > > Consider as an alternative the KStream.join operation that takes a config > object called "Joined". This implies no more than that we are > configuring the join operation. If we are deciding to allow adding a name > to the operation, it's natural to add it to the operation's config. > > Conversely, we also want to name the KTable.join operation, not the > materialization itself (which already has a name with separate semantics). > > To me, this suggests that, just like we propose to subsume the Serialized > config for grouping into a more general config called Grouped, we should > also want to do something similar and replace `KTable#join(KTable<>, > ValueJoiner<>, Materialized<>)` with one taking a more general > configuration, maybe: > `KTable#join(KTable<>, ValueJoiner<>, TableJoined<>)`? I'm not sure I completely follow what you are suggesting. But it seems like you are in favor of combining the naming of join and grouping operation and processors with this KIP. I'll say that I don't agree we should do the renaming in this KIP and restrict it to naming repartition topics 1. It gives us a good head start for users to be able to do rolling updates once repartition topics have been named 2. Undertaking any KIP changes almost always introduces some uncertainty with respect to what changes will introduce, so a smaller scope helps ensure the success of the KIP 3. We have an existing proposal KIP-307 for processor naming and I think it's best to do it all in one singular KIP so we don't have to undo any inconsistent previous work. So I'll have to agree with Matthias and Guozhang in limiting the scope of this KIP to naming repartition topics. To that end, I'll also say we defer adding another general configuration object `TableJoined` to the work of naming operations and processors in KIP-307 On Thu, Sep 13, 2018 at 6:49 PM John Roesler <j...@confluent.io> wrote: > Hey all, > > I think it's slightly out of scope for this KIP, but I'm not sure it's > right to add a name to ValueJoiner or KeyValueMapper. > Both of those are "functional interfaces", that is, they are basically > named functions. > > It seems like we should preserve this property both to provide a clean > separation between the operation *logic* and the operator *config*. > > It's a good point that `Joined` doesn't appear in the KTable join. However, > KTable join does have the variant that accepts "Materialized". > This makes it similar to the grouping operators that currently accept > "Serialized", namely the operator is already configurable, but only in > a narrow way (we can only configure the materialization of the table or the > serialization of the grouping). > > Consider as an alternative the KStream.join operation that takes a config > object called "Joined". This implies no more than that we are > configuring the join operation. If we are deciding to allow adding a name > to the operation, it's natural to add it to the operation's config. > > Conversely, we also want to name the KTable.join operation, not the > materialization itself (which already has a name with separate semantics). > > To me, this suggests that, just like we propose to subsume the Serialized > config for grouping into a more general config called Grouped, we should > also want to do something similar and replace `KTable#join(KTable<>, > ValueJoiner<>, Materialized<>)` with one taking a more general > configuration, maybe: > `KTable#join(KTable<>, ValueJoiner<>, TableJoined<>)`? > > Does this make sense? > > Thanks, > -John > > On Thu, Sep 13, 2018 at 3:45 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Just to clarify on 2): currently KIP-307 do not have proposed APIs for > > `groupBy/groupByKey` naming schemes, and for joins its current proposal > is > > to extend ValueJoiner with Named and hence this part is what I meant to > > have "overlaps". > > > > Thinking about it a bit more, since Joined is only used for S-S and S-T > > joins but not T-T joins, having the naming schemes on Joined would not be > > sufficient, and extending ValueJoiner would indeed be a good choice. > > > > As for groupBy, since it is using KeyValueMapper which is supposed to be > > extended with Named in KIP-307, it does not require extending to > processor > > nodes as well. > > > > > > Given this, I'm fine with limiting the scope to only repartition topics. > > > > > > Guozhang > > > > On Wed, Sep 12, 2018 at 10:22 PM, Matthias J. Sax <matth...@confluent.io > > > > wrote: > > > > > Follow up comments: > > > > > > 1) We should either use `[app-id]-this|other-[join-name]-repartition` > or > > > `app-id]-[join-name]-left|right-repartition` but we should not change > > > the pattern depending if the user specifies a name of not. I am fine > > > with both patterns---just want to make sure with stick with one. > > > > > > 2) I didn't see why we would need to do this in this KIP. KIP-307 seems > > > to be orthogonal, and thus KIP-372 should not change any processor > > > names, but KIP-307 should define a holistic strategy for all processor. > > > Otherwise, we might up with different strategies or revert what we > > > decide in this KIP if it's not compatible with KIP-307. > > > > > > > > > -Matthias > > > > > > > > > On 9/12/18 6:28 PM, Guozhang Wang wrote: > > > > Hello Bill, > > > > > > > > I made a pass over your proposal and here are some questions: > > > > > > > > 1. For Joined names, the current proposal is to define the > repartition > > > > topic names as > > > > > > > > * [app-id]-this-[join-name]-repartition > > > > > > > > * [app-id]-other-[join-name]-repartition > > > > > > > > > > > > And if [join-name] not specified, stay the same, which is: > > > > > > > > * [previous-processor-name]-repartition for both Stream-Stream (S-S) > > > join > > > > and S-T join > > > > > > > > I think it is more natural to rename it to > > > > > > > > * [app-id]-[join-name]-left-repartition > > > > > > > > * [app-id]-[join-name]-right-repartition > > > > > > > > > > > > 2. I'd suggest to use the name to also define the corresponding > > processor > > > > names accordingly, in addition to the repartition topic names. Note > > that > > > > for joins, this may be overlapping with KIP-307 > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL> > > > > as > > > > it also have proposals for defining processor names for join > operators > > as > > > > well. > > > > > > > > 3. Could you also specify how this would affect the optimization for > > > > merging multiple repartition topics? > > > > > > > > 4. In the "Compatibility, Deprecation, and Migration Plan" section, > > could > > > > you also mention the following scenarios, if any of the upgrade path > > > would > > > > be changed: > > > > > > > > a) changing user DSL code: under which scenarios users can now do a > > > > rolling bounce instead of resetting applications. > > > > > > > > b) upgrading from older version to new version, with all the names > > > > specified, and with optimization turned on. E.g. say we have the code > > > > written in 2.1 with all names specified, and now upgrading to 2.2 > with > > > new > > > > optimizations that may potentially change the repartition topics. Is > > that > > > > always safe to do? > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Sep 12, 2018 at 4:52 PM, Bill Bejeck <bbej...@gmail.com> > > wrote: > > > > > > > >> All I'd like to start a discussion on KIP-372 for the naming of > joins > > > and > > > >> grouping operations in Kafka Streams. > > > >> > > > >> The KIP page can be found here: > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > >> 372%3A+Naming+Joins+and+Grouping > > > >> > > > >> I look forward to feedback and comments. > > > >> > > > >> Thanks, > > > >> Bill > > > >> > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > >