Hi Walker,

Thanks for taking on KIP-150, the co-group will be very useful.

Regarding the naming, IMHO, we should stick to the current pattern, and
that is, we provide overloads with a "Named" operator for the
"aggregate" methods (I believe those are the only ones that create a
processor).
Currently, that's what we have with various operators performing
aggregation operations.
I understand your concern about adding methods, but IMHO it would be very
confusing to users why would break the current pattern we have at an
arbitrary point in time.

As for Sophie's suggestion of adding a "CoGrouped" configuration object, I
can see the merits of that approach.  But IMHO, instead of doing so for one
operation, maybe we should take a step back and consider refactoring to one
configuration object overall (I believe John has suggested something
similar in the past).  That is well beyond the scope of this KIP, but I
think it would be better to stick with our current pattern and consider
changes we can apply to the entire API in a later KIP.

Just my 2 cents.

Thanks,
Bill


On Fri, Oct 25, 2019 at 4:34 PM Walker Carlson <wcarl...@confluent.io>
wrote:

> Hi Guozhang,
>
> 1. I am familiar with the cogroup of spark, it is very similar to
> their join operator but instead it makes the values iterable. I think that
> the use cases are different enough that it makes sense to specify the
> aggregator when we do.
>
> I like the idea of "absorb" and I think it could be useful. Although I do
> not think it is as intuitive.
>
> If we were to go that route we would either use more processors or do
> essentially the same thing but would have to store the information
> required to cogroup inside that KTable. I think this would violate some
> design principles. I would argue that we should consider adding absorb as
> well and auto re-write it to use cogroup.
>
> 2. We have not considered this thought that would be a convenient
> operation.
>
> 3. There is only one processor made. We are actually having the naming
> conversation right now in the above thread
>
> 4, 5. fair points
>
> Walker
>
> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Walker, thanks for the KIP! I made a pass on the writeup and have some
> > comments below:
> >
> > Meta:
> >
> > 1. Syntax-wise, I'm wondering if we have compared our current proposal
> with
> > Spark's co-group syntax (I know they are targeting for different use
> cases,
> > but wondering if their syntax is closer to the join operator), what are
> the
> > syntax / semantics trade-off here?
> >
> > Just playing a devil's advocate here, if the main motivation is to
> provide
> > a more convienent multi-way join syntax, and in order to only have one
> > materialized store we need to specify the final joined format at the
> > beginning, then what about the following alternative (with the given
> > example in your wiki page):
> >
> >
> > KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
> > KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
> > KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();
> >
> > KTable<K, CG> aggregated = grouped1.aggregate(initializer, materialized,
> > aggregator1);
> >
> > aggregated.absorb(grouped2, aggregator2);  // I'm just using a random
> name
> > on top of my head here
> >                   .absorb(grouped3, aggregator3);
> >
> > In this way, we just add a new API to the KTable to "absorb" new streams
> as
> > aggregated results without needing to introduce new first citizen
> classes.
> >
> > 2. From the DSL optimization, have we considered if we can auto re-write
> > the user written old fashioned multi-join into this new DSL operator?
> >
> > 3. Although it is not needed for the wiki page itself, for internal
> > implementation how many processor nodes would we create for the new
> > operator, and how we can allow users to name them?
> >
> > Minor:
> >
> > 4. In "Public Interfaces", better add the templated generics to
> > "KGroupedStream" as "KGroupedStream<K, V>".
> >
> > 5. Naming wise, I'd suggest we keep the "K" together with Stream/Table,
> > e.g. "TimeWindowed*CogroupedKStream*<K, V>".
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Walker,
> > >
> > > I am not sure if I can follow your argument. What do you exactly mean
> by
> > >
> > > > I also
> > > >> think that in this case it would be better to separate the 2 option
> > out
> > > >> into separate overloads.
> > >
> > > Maybe you can give an example what method signature you have in mind?
> > >
> > > >> We could take a named parameter from upstream or add an extra naming
> > > option
> > > >> however I don't really see the advantage that would give.
> > >
> > > Are you familiar with KIP-307? Before KIP-307, KS generated all names
> > > for all Processors. This makes it hard to reason about a Topology if
> > > it's getting complex. Adding `Named` to the new co-group operator would
> > > actually align with KIP-307.
> > >
> > > > It seems to go in
> > > >> the opposite direction from the cogroup configuration idea you
> > proposed.
> > >
> > > Can you elaborate? Not sure if I can follow.
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 10/24/19 10:20 AM, Walker Carlson wrote:
> > > > While I like the idea Sophie I don't think that it is necessary. I
> also
> > > > think that in this case it would be better to separate the 2 option
> out
> > > > into separate overloads.
> > > > We could take a named parameter from upstream or add an extra naming
> > > option
> > > > however I don't really see the advantage that would give. It seems to
> > go
> > > in
> > > > the opposite direction from the cogroup configuration idea you
> > proposed.
> > > >
> > > > John, I think it could be both. It depends on when you aggregate and
> > what
> > > > kind of data you have. In the example it is aggregating before
> joining,
> > > > there are probably some cases where you could join before
> aggregating.
> > > IMHO
> > > > it would be easier to group all the streams together then perform the
> > one
> > > > operation that results in a single KTable.
> > > >
> > > >
> > > >
> > > > On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <
> > sop...@confluent.io
> > > >
> > > > wrote:
> > > >
> > > >>> I can personally not see any need to add other configuration
> > > >> Famous last words?
> > > >>
> > > >> Just kidding, 95% confidence is more than enough to  me (and better
> to
> > > >> optimize for current
> > > >> design than for hypothetical future changes).
> > > >>
> > > >> One last question I have then is about the
> operator/store/repartition
> > > >> naming -- seems like
> > > >> we can name the underlying store/changelog through the Materialized
> > > >> parameter, but do we
> > > >> also want to include an overload taking a Named parameter for the
> > > operator
> > > >> name (as in the
> > > >> KTable#join variations)?
> > > >>
> > > >> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <
> > matth...@confluent.io>
> > > >> wrote:
> > > >>
> > > >>> Interesting idea, Sophie.
> > > >>>
> > > >>> So far, we tried to reuse existing config objects and only add new
> > ones
> > > >>> when needed to avoid creating "redundant" classes. This is of
> course
> > a
> > > >>> reactive approach (with the drawback to deprecate stuff if we
> change
> > > it,
> > > >>> as you described).
> > > >>>
> > > >>> I can personally not see any need to add other configuration
> > parameters
> > > >>> atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has
> > only
> > > a
> > > >>> single state store that we need to configure, and reusing
> > > `Materialized`
> > > >>> seems to be appropriate.
> > > >>>
> > > >>> Also note, that the `Initializer` is a mandatory parameter and not
> a
> > > >>> configuration and should be passed directly, and not via a
> > > configuration
> > > >>> object.
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
> > > >>>> Thanks for the explanation, makes sense to me! As for the API, one
> > > >> other
> > > >>>> thought I had is might we ever want or need to introduce any other
> > > >>> configs
> > > >>>> or parameters in the future? Obviously that's difficult to say now
> > (or
> > > >>>> maybe the
> > > >>>> answer seems obviously "no") but we seem to often end up needing
> to
> > > add
> > > >>> new
> > > >>>> overloads and/or deprecate old ones as new features or
> requirements
> > > >> come
> > > >>>> into
> > > >>>> play.
> > > >>>>
> > > >>>> What do you (and others?) think about wrapping the config
> parameters
> > > >> (ie
> > > >>>> everything
> > > >>>> except the actual grouped streams) in a new config object? For
> > > example,
> > > >>> the
> > > >>>> CogroupedStream#aggregate field could take a single Cogrouped
> > object,
> > > >>>> which itself would have an initializer and a materialized. If we
> > ever
> > > >>> need
> > > >>>> to add
> > > >>>> a new parameter, we can just add it to the Cogrouped class.
> > > >>>>
> > > >>>> Also, will the backing store be available for IQ if a Materialized
> > is
> > > >>>> passed in?
> > > >>>>
> > > >>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <
> > > wcarl...@confluent.io
> > > >>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi Sophie,
> > > >>>>>
> > > >>>>> Thank you for your comments. As for the different methods
> > signatures
> > > I
> > > >>> have
> > > >>>>> not really considered any other options but  while I do agree it
> is
> > > >>>>> confusing, I don't see any obvious solutions. The problem is that
> > the
> > > >>>>> cogroup essentially pairs a group stream with an aggregator and
> > when
> > > >> it
> > > >>> is
> > > >>>>> first made the method is called on a groupedStream already.
> However
> > > >> each
> > > >>>>> subsequent stream-aggregator pair is added on to a cogroup stream
> > so
> > > >> it
> > > >>>>> needs both arguments.
> > > >>>>>
> > > >>>>> For the second question you should not need a joiner. The idea is
> > > that
> > > >>> you
> > > >>>>> can collect many grouped streams with overlapping key spaces and
> > any
> > > >>> kind
> > > >>>>> of value types. Once aggregated its value will be reduced into
> one
> > > >> type.
> > > >>>>> This is why you need only one initializer. Each aggregator will
> > need
> > > >> to
> > > >>>>> integrate the new value with the new object made in the
> > initializer.
> > > >>>>> Does that make sense?
> > > >>>>>
> > > >>>>> This is a good question and I will include this explanation in
> the
> > > kip
> > > >>> as
> > > >>>>> well.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Walker
> > > >>>>>
> > > >>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman <
> > > >>> sop...@confluent.io>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hey Walker,
> > > >>>>>>
> > > >>>>>> Thanks for the KIP! I have just a couple of questions:
> > > >>>>>>
> > > >>>>>> 1) It seems a little awkward to me that with the current API, we
> > > >> have a
> > > >>>>>> nearly identical
> > > >>>>>> "add stream to cogroup" method, except for the first which has a
> > > >>>>> different
> > > >>>>>> signature
> > > >>>>>> (ie the first stream is joined as stream.cogroup(Aggregator)
> while
> > > >> the
> > > >>>>>> subsequent ones
> > > >>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not sure what
> it
> > > >>> would
> > > >>>>>> look like exactly,
> > > >>>>>> but I was just wondering if you'd considered and/or rejected any
> > > >>>>>> alternative APIs?
> > > >>>>>>
> > > >>>>>> 2) This might just be my lack of familiarity with "cogroup" as a
> > > >>> concept,
> > > >>>>>> but with the
> > > >>>>>> current (non-optimal) API the user seems to have some control
> over
> > > >> how
> > > >>>>>> exactly
> > > >>>>>> the different streams are joined through the ValueJoiners. Would
> > > this
> > > >>> new
> > > >>>>>> cogroup
> > > >>>>>> simply concatenate the values from the different cogroup
> streams,
> > or
> > > >>>>> could
> > > >>>>>> users
> > > >>>>>> potentially pass some kind of Joiner to the cogroup/aggregate
> > > >> methods?
> > > >>>>> Or,
> > > >>>>>> is the
> > > >>>>>> whole point of cogroups that you no longer ever need to specify
> a
> > > >>> Joiner?
> > > >>>>>> If so, you
> > > >>>>>> should add a short line to the KIP explaining that for those of
> us
> > > >> who
> > > >>>>>> aren't fluent
> > > >>>>>> in cogroup semantics :)
> > > >>>>>>
> > > >>>>>> Cheers,
> > > >>>>>> Sophie
> > > >>>>>>
> > > >>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson <
> > > >> wcarl...@confluent.io>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Good catch I updated that.
> > > >>>>>>>
> > > >>>>>>> I have made a PR for this KIP
> > > >>>>>>>
> > > >>>>>>> I then am splitting it into 3 parts, first cogroup for a
> > key-value
> > > >>>>> store
> > > >>>>>> (
> > > >>>>>>> here <https://github.com/apache/kafka/pull/7538>), then for a
> > > >>>>>>> timeWindowedStore, and then a sessionWindowedStore + ensuring
> > > >>>>>> partitioning.
> > > >>>>>>>
> > > >>>>>>> Walker
> > > >>>>>>>
> > > >>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax <
> > > >>>>> matth...@confluent.io>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Walker,
> > > >>>>>>>>
> > > >>>>>>>> thanks for picking up the KIP and reworking it for the changed
> > > API.
> > > >>>>>>>>
> > > >>>>>>>> Overall, the updated API suggestions make sense to me. The
> seem
> > to
> > > >>>>>> align
> > > >>>>>>>> quite nicely with our current API design.
> > > >>>>>>>>
> > > >>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the type
> parameter
> > > of
> > > >>>>>>>> `Materialized` should be `V`, not `VR`?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> -Matthias
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote:
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > > >>>>>>>>> here
> > > >>>>>>>>> is a link
> > > >>>>>>>>>
> > > >>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson <
> > > >>>>>> wcarl...@confluent.io>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hello all,
> > > >>>>>>>>>>
> > > >>>>>>>>>> I have picked up and updated KIP-150. Due to changes to the
> > > >>>>> project
> > > >>>>>>>> since
> > > >>>>>>>>>> KIP #150 was written there are a few items that need to be
> > > >>>>> updated.
> > > >>>>>>>>>>
> > > >>>>>>>>>> First item that changed is the adoption of the Materialized
> > > >>>>>> parameter.
> > > >>>>>>>>>>
> > > >>>>>>>>>> The second item is the WindowedBy. How the old KIP handles
> > > >>>>> windowing
> > > >>>>>>> is
> > > >>>>>>>>>> that it overloads the aggregate function to take in a Window
> > > >>>>> object
> > > >>>>>> as
> > > >>>>>>>> well
> > > >>>>>>>>>> as the other parameters. The current practice to window
> > > >>>>>>> grouped-streams
> > > >>>>>>>> is
> > > >>>>>>>>>> to call windowedBy and receive a windowed stream object. The
> > > >>>>>> existing
> > > >>>>>>>>>> interface for a windowed stream made from a grouped stream
> > will
> > > >>>>> not
> > > >>>>>>> work
> > > >>>>>>>>>> for cogrouped streams. Hence, we have to make new interfaces
> > for
> > > >>>>>>>> cogrouped
> > > >>>>>>>>>> windowed streams.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Please take a look, I would like to hear your feedback,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Walker
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to