Thank you Bill, I can get behind keeping the correct pattern and adding the Named object into the aggregate. And adding something like Cogrouped Object is a bit beyond the scope of this KIP.
I updated the KIP to include the changes to include the Named parameter and some minor text fixes. If there are no other comments I believe we can take this to a vote? Thanks, Walker On Mon, Oct 28, 2019 at 8:28 AM Bill Bejeck <bbej...@gmail.com> wrote: > Hi Walker, > > Thanks for taking on KIP-150, the co-group will be very useful. > > Regarding the naming, IMHO, we should stick to the current pattern, and > that is, we provide overloads with a "Named" operator for the > "aggregate" methods (I believe those are the only ones that create a > processor). > Currently, that's what we have with various operators performing > aggregation operations. > I understand your concern about adding methods, but IMHO it would be very > confusing to users why would break the current pattern we have at an > arbitrary point in time. > > As for Sophie's suggestion of adding a "CoGrouped" configuration object, I > can see the merits of that approach. But IMHO, instead of doing so for one > operation, maybe we should take a step back and consider refactoring to one > configuration object overall (I believe John has suggested something > similar in the past). That is well beyond the scope of this KIP, but I > think it would be better to stick with our current pattern and consider > changes we can apply to the entire API in a later KIP. > > Just my 2 cents. > > Thanks, > Bill > > > On Fri, Oct 25, 2019 at 4:34 PM Walker Carlson <wcarl...@confluent.io> > wrote: > > > Hi Guozhang, > > > > 1. I am familiar with the cogroup of spark, it is very similar to > > their join operator but instead it makes the values iterable. I think > that > > the use cases are different enough that it makes sense to specify the > > aggregator when we do. > > > > I like the idea of "absorb" and I think it could be useful. Although I do > > not think it is as intuitive. > > > > If we were to go that route we would either use more processors or do > > essentially the same thing but would have to store the information > > required to cogroup inside that KTable. I think this would violate some > > design principles. I would argue that we should consider adding absorb as > > well and auto re-write it to use cogroup. > > > > 2. We have not considered this thought that would be a convenient > > operation. > > > > 3. There is only one processor made. We are actually having the naming > > conversation right now in the above thread > > > > 4, 5. fair points > > > > Walker > > > > On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hi Walker, thanks for the KIP! I made a pass on the writeup and have > some > > > comments below: > > > > > > Meta: > > > > > > 1. Syntax-wise, I'm wondering if we have compared our current proposal > > with > > > Spark's co-group syntax (I know they are targeting for different use > > cases, > > > but wondering if their syntax is closer to the join operator), what are > > the > > > syntax / semantics trade-off here? > > > > > > Just playing a devil's advocate here, if the main motivation is to > > provide > > > a more convienent multi-way join syntax, and in order to only have one > > > materialized store we need to specify the final joined format at the > > > beginning, then what about the following alternative (with the given > > > example in your wiki page): > > > > > > > > > KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey(); > > > KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey(); > > > KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey(); > > > > > > KTable<K, CG> aggregated = grouped1.aggregate(initializer, > materialized, > > > aggregator1); > > > > > > aggregated.absorb(grouped2, aggregator2); // I'm just using a random > > name > > > on top of my head here > > > .absorb(grouped3, aggregator3); > > > > > > In this way, we just add a new API to the KTable to "absorb" new > streams > > as > > > aggregated results without needing to introduce new first citizen > > classes. > > > > > > 2. From the DSL optimization, have we considered if we can auto > re-write > > > the user written old fashioned multi-join into this new DSL operator? > > > > > > 3. Although it is not needed for the wiki page itself, for internal > > > implementation how many processor nodes would we create for the new > > > operator, and how we can allow users to name them? > > > > > > Minor: > > > > > > 4. In "Public Interfaces", better add the templated generics to > > > "KGroupedStream" as "KGroupedStream<K, V>". > > > > > > 5. Naming wise, I'd suggest we keep the "K" together with Stream/Table, > > > e.g. "TimeWindowed*CogroupedKStream*<K, V>". > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax < > matth...@confluent.io> > > > wrote: > > > > > > > Walker, > > > > > > > > I am not sure if I can follow your argument. What do you exactly mean > > by > > > > > > > > > I also > > > > >> think that in this case it would be better to separate the 2 > option > > > out > > > > >> into separate overloads. > > > > > > > > Maybe you can give an example what method signature you have in mind? > > > > > > > > >> We could take a named parameter from upstream or add an extra > naming > > > > option > > > > >> however I don't really see the advantage that would give. > > > > > > > > Are you familiar with KIP-307? Before KIP-307, KS generated all names > > > > for all Processors. This makes it hard to reason about a Topology if > > > > it's getting complex. Adding `Named` to the new co-group operator > would > > > > actually align with KIP-307. > > > > > > > > > It seems to go in > > > > >> the opposite direction from the cogroup configuration idea you > > > proposed. > > > > > > > > Can you elaborate? Not sure if I can follow. > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 10/24/19 10:20 AM, Walker Carlson wrote: > > > > > While I like the idea Sophie I don't think that it is necessary. I > > also > > > > > think that in this case it would be better to separate the 2 option > > out > > > > > into separate overloads. > > > > > We could take a named parameter from upstream or add an extra > naming > > > > option > > > > > however I don't really see the advantage that would give. It seems > to > > > go > > > > in > > > > > the opposite direction from the cogroup configuration idea you > > > proposed. > > > > > > > > > > John, I think it could be both. It depends on when you aggregate > and > > > what > > > > > kind of data you have. In the example it is aggregating before > > joining, > > > > > there are probably some cases where you could join before > > aggregating. > > > > IMHO > > > > > it would be easier to group all the streams together then perform > the > > > one > > > > > operation that results in a single KTable. > > > > > > > > > > > > > > > > > > > > On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman < > > > sop...@confluent.io > > > > > > > > > > wrote: > > > > > > > > > >>> I can personally not see any need to add other configuration > > > > >> Famous last words? > > > > >> > > > > >> Just kidding, 95% confidence is more than enough to me (and > better > > to > > > > >> optimize for current > > > > >> design than for hypothetical future changes). > > > > >> > > > > >> One last question I have then is about the > > operator/store/repartition > > > > >> naming -- seems like > > > > >> we can name the underlying store/changelog through the > Materialized > > > > >> parameter, but do we > > > > >> also want to include an overload taking a Named parameter for the > > > > operator > > > > >> name (as in the > > > > >> KTable#join variations)? > > > > >> > > > > >> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax < > > > matth...@confluent.io> > > > > >> wrote: > > > > >> > > > > >>> Interesting idea, Sophie. > > > > >>> > > > > >>> So far, we tried to reuse existing config objects and only add > new > > > ones > > > > >>> when needed to avoid creating "redundant" classes. This is of > > course > > > a > > > > >>> reactive approach (with the drawback to deprecate stuff if we > > change > > > > it, > > > > >>> as you described). > > > > >>> > > > > >>> I can personally not see any need to add other configuration > > > parameters > > > > >>> atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has > > > only > > > > a > > > > >>> single state store that we need to configure, and reusing > > > > `Materialized` > > > > >>> seems to be appropriate. > > > > >>> > > > > >>> Also note, that the `Initializer` is a mandatory parameter and > not > > a > > > > >>> configuration and should be passed directly, and not via a > > > > configuration > > > > >>> object. > > > > >>> > > > > >>> > > > > >>> -Matthias > > > > >>> > > > > >>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote: > > > > >>>> Thanks for the explanation, makes sense to me! As for the API, > one > > > > >> other > > > > >>>> thought I had is might we ever want or need to introduce any > other > > > > >>> configs > > > > >>>> or parameters in the future? Obviously that's difficult to say > now > > > (or > > > > >>>> maybe the > > > > >>>> answer seems obviously "no") but we seem to often end up needing > > to > > > > add > > > > >>> new > > > > >>>> overloads and/or deprecate old ones as new features or > > requirements > > > > >> come > > > > >>>> into > > > > >>>> play. > > > > >>>> > > > > >>>> What do you (and others?) think about wrapping the config > > parameters > > > > >> (ie > > > > >>>> everything > > > > >>>> except the actual grouped streams) in a new config object? For > > > > example, > > > > >>> the > > > > >>>> CogroupedStream#aggregate field could take a single Cogrouped > > > object, > > > > >>>> which itself would have an initializer and a materialized. If we > > > ever > > > > >>> need > > > > >>>> to add > > > > >>>> a new parameter, we can just add it to the Cogrouped class. > > > > >>>> > > > > >>>> Also, will the backing store be available for IQ if a > Materialized > > > is > > > > >>>> passed in? > > > > >>>> > > > > >>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson < > > > > wcarl...@confluent.io > > > > >>> > > > > >>>> wrote: > > > > >>>> > > > > >>>>> Hi Sophie, > > > > >>>>> > > > > >>>>> Thank you for your comments. As for the different methods > > > signatures > > > > I > > > > >>> have > > > > >>>>> not really considered any other options but while I do agree > it > > is > > > > >>>>> confusing, I don't see any obvious solutions. The problem is > that > > > the > > > > >>>>> cogroup essentially pairs a group stream with an aggregator and > > > when > > > > >> it > > > > >>> is > > > > >>>>> first made the method is called on a groupedStream already. > > However > > > > >> each > > > > >>>>> subsequent stream-aggregator pair is added on to a cogroup > stream > > > so > > > > >> it > > > > >>>>> needs both arguments. > > > > >>>>> > > > > >>>>> For the second question you should not need a joiner. The idea > is > > > > that > > > > >>> you > > > > >>>>> can collect many grouped streams with overlapping key spaces > and > > > any > > > > >>> kind > > > > >>>>> of value types. Once aggregated its value will be reduced into > > one > > > > >> type. > > > > >>>>> This is why you need only one initializer. Each aggregator will > > > need > > > > >> to > > > > >>>>> integrate the new value with the new object made in the > > > initializer. > > > > >>>>> Does that make sense? > > > > >>>>> > > > > >>>>> This is a good question and I will include this explanation in > > the > > > > kip > > > > >>> as > > > > >>>>> well. > > > > >>>>> > > > > >>>>> Thanks, > > > > >>>>> Walker > > > > >>>>> > > > > >>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman < > > > > >>> sop...@confluent.io> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>>> Hey Walker, > > > > >>>>>> > > > > >>>>>> Thanks for the KIP! I have just a couple of questions: > > > > >>>>>> > > > > >>>>>> 1) It seems a little awkward to me that with the current API, > we > > > > >> have a > > > > >>>>>> nearly identical > > > > >>>>>> "add stream to cogroup" method, except for the first which > has a > > > > >>>>> different > > > > >>>>>> signature > > > > >>>>>> (ie the first stream is joined as stream.cogroup(Aggregator) > > while > > > > >> the > > > > >>>>>> subsequent ones > > > > >>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not sure > what > > it > > > > >>> would > > > > >>>>>> look like exactly, > > > > >>>>>> but I was just wondering if you'd considered and/or rejected > any > > > > >>>>>> alternative APIs? > > > > >>>>>> > > > > >>>>>> 2) This might just be my lack of familiarity with "cogroup" > as a > > > > >>> concept, > > > > >>>>>> but with the > > > > >>>>>> current (non-optimal) API the user seems to have some control > > over > > > > >> how > > > > >>>>>> exactly > > > > >>>>>> the different streams are joined through the ValueJoiners. > Would > > > > this > > > > >>> new > > > > >>>>>> cogroup > > > > >>>>>> simply concatenate the values from the different cogroup > > streams, > > > or > > > > >>>>> could > > > > >>>>>> users > > > > >>>>>> potentially pass some kind of Joiner to the cogroup/aggregate > > > > >> methods? > > > > >>>>> Or, > > > > >>>>>> is the > > > > >>>>>> whole point of cogroups that you no longer ever need to > specify > > a > > > > >>> Joiner? > > > > >>>>>> If so, you > > > > >>>>>> should add a short line to the KIP explaining that for those > of > > us > > > > >> who > > > > >>>>>> aren't fluent > > > > >>>>>> in cogroup semantics :) > > > > >>>>>> > > > > >>>>>> Cheers, > > > > >>>>>> Sophie > > > > >>>>>> > > > > >>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson < > > > > >> wcarl...@confluent.io> > > > > >>>>>> wrote: > > > > >>>>>> > > > > >>>>>>> Good catch I updated that. > > > > >>>>>>> > > > > >>>>>>> I have made a PR for this KIP > > > > >>>>>>> > > > > >>>>>>> I then am splitting it into 3 parts, first cogroup for a > > > key-value > > > > >>>>> store > > > > >>>>>> ( > > > > >>>>>>> here <https://github.com/apache/kafka/pull/7538>), then for > a > > > > >>>>>>> timeWindowedStore, and then a sessionWindowedStore + ensuring > > > > >>>>>> partitioning. > > > > >>>>>>> > > > > >>>>>>> Walker > > > > >>>>>>> > > > > >>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax < > > > > >>>>> matth...@confluent.io> > > > > >>>>>>> wrote: > > > > >>>>>>> > > > > >>>>>>>> Walker, > > > > >>>>>>>> > > > > >>>>>>>> thanks for picking up the KIP and reworking it for the > changed > > > > API. > > > > >>>>>>>> > > > > >>>>>>>> Overall, the updated API suggestions make sense to me. The > > seem > > > to > > > > >>>>>> align > > > > >>>>>>>> quite nicely with our current API design. > > > > >>>>>>>> > > > > >>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the type > > parameter > > > > of > > > > >>>>>>>> `Materialized` should be `V`, not `VR`? > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> -Matthias > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote: > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup > > > > >>>>>>>>> here > > > > >>>>>>>>> is a link > > > > >>>>>>>>> > > > > >>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson < > > > > >>>>>> wcarl...@confluent.io> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>>> Hello all, > > > > >>>>>>>>>> > > > > >>>>>>>>>> I have picked up and updated KIP-150. Due to changes to > the > > > > >>>>> project > > > > >>>>>>>> since > > > > >>>>>>>>>> KIP #150 was written there are a few items that need to be > > > > >>>>> updated. > > > > >>>>>>>>>> > > > > >>>>>>>>>> First item that changed is the adoption of the > Materialized > > > > >>>>>> parameter. > > > > >>>>>>>>>> > > > > >>>>>>>>>> The second item is the WindowedBy. How the old KIP handles > > > > >>>>> windowing > > > > >>>>>>> is > > > > >>>>>>>>>> that it overloads the aggregate function to take in a > Window > > > > >>>>> object > > > > >>>>>> as > > > > >>>>>>>> well > > > > >>>>>>>>>> as the other parameters. The current practice to window > > > > >>>>>>> grouped-streams > > > > >>>>>>>> is > > > > >>>>>>>>>> to call windowedBy and receive a windowed stream object. > The > > > > >>>>>> existing > > > > >>>>>>>>>> interface for a windowed stream made from a grouped stream > > > will > > > > >>>>> not > > > > >>>>>>> work > > > > >>>>>>>>>> for cogrouped streams. Hence, we have to make new > interfaces > > > for > > > > >>>>>>>> cogrouped > > > > >>>>>>>>>> windowed streams. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Please take a look, I would like to hear your feedback, > > > > >>>>>>>>>> > > > > >>>>>>>>>> Walker > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > >>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >