Hi Walker,

On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson <wcarl...@confluent.io>
wrote:

> Hi Guozhang,
>
> 1. I am familiar with the cogroup of spark, it is very similar to
> their join operator but instead it makes the values iterable. I think that
> the use cases are different enough that it makes sense to specify the
> aggregator when we do.
>
> I like the idea of "absorb" and I think it could be useful. Although I do
> not think it is as intuitive.
>
> If we were to go that route we would either use more processors or do
> essentially the same thing but would have to store the information
> required to cogroup inside that KTable. I think this would violate some
> design principles. I would argue that we should consider adding absorb as
> well and auto re-write it to use cogroup.
>

Yeah I think I agree with you about the internal design complexity with
"absorb"; I was primarily thinking if we can save ourselves from
introducing 3 more public classes with co-group. But it seems that without
introducing new classes there's no easy way for us to bound the scope of
co-grouping (like how many streams will be co-grouped together).

LMK if you have some better ideas: generally speaking the less new public
interfaces we are introducing to fulfill a new feature the better, so I'd
push us to think twice and carefully before we go down the route.


>
> 2. We have not considered this thought that would be a convenient
> operation.
>
> 3. There is only one processor made. We are actually having the naming
> conversation right now in the above thread
>
> 4, 5. fair points
>
> Walker
>
> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Walker, thanks for the KIP! I made a pass on the writeup and have some
> > comments below:
> >
> > Meta:
> >
> > 1. Syntax-wise, I'm wondering if we have compared our current proposal
> with
> > Spark's co-group syntax (I know they are targeting for different use
> cases,
> > but wondering if their syntax is closer to the join operator), what are
> the
> > syntax / semantics trade-off here?
> >
> > Just playing a devil's advocate here, if the main motivation is to
> provide
> > a more convienent multi-way join syntax, and in order to only have one
> > materialized store we need to specify the final joined format at the
> > beginning, then what about the following alternative (with the given
> > example in your wiki page):
> >
> >
> > KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
> > KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
> > KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();
> >
> > KTable<K, CG> aggregated = grouped1.aggregate(initializer, materialized,
> > aggregator1);
> >
> > aggregated.absorb(grouped2, aggregator2);  // I'm just using a random
> name
> > on top of my head here
> >                   .absorb(grouped3, aggregator3);
> >
> > In this way, we just add a new API to the KTable to "absorb" new streams
> as
> > aggregated results without needing to introduce new first citizen
> classes.
> >
> > 2. From the DSL optimization, have we considered if we can auto re-write
> > the user written old fashioned multi-join into this new DSL operator?
> >
> > 3. Although it is not needed for the wiki page itself, for internal
> > implementation how many processor nodes would we create for the new
> > operator, and how we can allow users to name them?
> >
> > Minor:
> >
> > 4. In "Public Interfaces", better add the templated generics to
> > "KGroupedStream" as "KGroupedStream<K, V>".
> >
> > 5. Naming wise, I'd suggest we keep the "K" together with Stream/Table,
> > e.g. "TimeWindowed*CogroupedKStream*<K, V>".
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Walker,
> > >
> > > I am not sure if I can follow your argument. What do you exactly mean
> by
> > >
> > > > I also
> > > >> think that in this case it would be better to separate the 2 option
> > out
> > > >> into separate overloads.
> > >
> > > Maybe you can give an example what method signature you have in mind?
> > >
> > > >> We could take a named parameter from upstream or add an extra naming
> > > option
> > > >> however I don't really see the advantage that would give.
> > >
> > > Are you familiar with KIP-307? Before KIP-307, KS generated all names
> > > for all Processors. This makes it hard to reason about a Topology if
> > > it's getting complex. Adding `Named` to the new co-group operator would
> > > actually align with KIP-307.
> > >
> > > > It seems to go in
> > > >> the opposite direction from the cogroup configuration idea you
> > proposed.
> > >
> > > Can you elaborate? Not sure if I can follow.
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 10/24/19 10:20 AM, Walker Carlson wrote:
> > > > While I like the idea Sophie I don't think that it is necessary. I
> also
> > > > think that in this case it would be better to separate the 2 option
> out
> > > > into separate overloads.
> > > > We could take a named parameter from upstream or add an extra naming
> > > option
> > > > however I don't really see the advantage that would give. It seems to
> > go
> > > in
> > > > the opposite direction from the cogroup configuration idea you
> > proposed.
> > > >
> > > > John, I think it could be both. It depends on when you aggregate and
> > what
> > > > kind of data you have. In the example it is aggregating before
> joining,
> > > > there are probably some cases where you could join before
> aggregating.
> > > IMHO
> > > > it would be easier to group all the streams together then perform the
> > one
> > > > operation that results in a single KTable.
> > > >
> > > >
> > > >
> > > > On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <
> > sop...@confluent.io
> > > >
> > > > wrote:
> > > >
> > > >>> I can personally not see any need to add other configuration
> > > >> Famous last words?
> > > >>
> > > >> Just kidding, 95% confidence is more than enough to  me (and better
> to
> > > >> optimize for current
> > > >> design than for hypothetical future changes).
> > > >>
> > > >> One last question I have then is about the
> operator/store/repartition
> > > >> naming -- seems like
> > > >> we can name the underlying store/changelog through the Materialized
> > > >> parameter, but do we
> > > >> also want to include an overload taking a Named parameter for the
> > > operator
> > > >> name (as in the
> > > >> KTable#join variations)?
> > > >>
> > > >> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <
> > matth...@confluent.io>
> > > >> wrote:
> > > >>
> > > >>> Interesting idea, Sophie.
> > > >>>
> > > >>> So far, we tried to reuse existing config objects and only add new
> > ones
> > > >>> when needed to avoid creating "redundant" classes. This is of
> course
> > a
> > > >>> reactive approach (with the drawback to deprecate stuff if we
> change
> > > it,
> > > >>> as you described).
> > > >>>
> > > >>> I can personally not see any need to add other configuration
> > parameters
> > > >>> atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has
> > only
> > > a
> > > >>> single state store that we need to configure, and reusing
> > > `Materialized`
> > > >>> seems to be appropriate.
> > > >>>
> > > >>> Also note, that the `Initializer` is a mandatory parameter and not
> a
> > > >>> configuration and should be passed directly, and not via a
> > > configuration
> > > >>> object.
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
> > > >>>> Thanks for the explanation, makes sense to me! As for the API, one
> > > >> other
> > > >>>> thought I had is might we ever want or need to introduce any other
> > > >>> configs
> > > >>>> or parameters in the future? Obviously that's difficult to say now
> > (or
> > > >>>> maybe the
> > > >>>> answer seems obviously "no") but we seem to often end up needing
> to
> > > add
> > > >>> new
> > > >>>> overloads and/or deprecate old ones as new features or
> requirements
> > > >> come
> > > >>>> into
> > > >>>> play.
> > > >>>>
> > > >>>> What do you (and others?) think about wrapping the config
> parameters
> > > >> (ie
> > > >>>> everything
> > > >>>> except the actual grouped streams) in a new config object? For
> > > example,
> > > >>> the
> > > >>>> CogroupedStream#aggregate field could take a single Cogrouped
> > object,
> > > >>>> which itself would have an initializer and a materialized. If we
> > ever
> > > >>> need
> > > >>>> to add
> > > >>>> a new parameter, we can just add it to the Cogrouped class.
> > > >>>>
> > > >>>> Also, will the backing store be available for IQ if a Materialized
> > is
> > > >>>> passed in?
> > > >>>>
> > > >>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <
> > > wcarl...@confluent.io
> > > >>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi Sophie,
> > > >>>>>
> > > >>>>> Thank you for your comments. As for the different methods
> > signatures
> > > I
> > > >>> have
> > > >>>>> not really considered any other options but  while I do agree it
> is
> > > >>>>> confusing, I don't see any obvious solutions. The problem is that
> > the
> > > >>>>> cogroup essentially pairs a group stream with an aggregator and
> > when
> > > >> it
> > > >>> is
> > > >>>>> first made the method is called on a groupedStream already.
> However
> > > >> each
> > > >>>>> subsequent stream-aggregator pair is added on to a cogroup stream
> > so
> > > >> it
> > > >>>>> needs both arguments.
> > > >>>>>
> > > >>>>> For the second question you should not need a joiner. The idea is
> > > that
> > > >>> you
> > > >>>>> can collect many grouped streams with overlapping key spaces and
> > any
> > > >>> kind
> > > >>>>> of value types. Once aggregated its value will be reduced into
> one
> > > >> type.
> > > >>>>> This is why you need only one initializer. Each aggregator will
> > need
> > > >> to
> > > >>>>> integrate the new value with the new object made in the
> > initializer.
> > > >>>>> Does that make sense?
> > > >>>>>
> > > >>>>> This is a good question and I will include this explanation in
> the
> > > kip
> > > >>> as
> > > >>>>> well.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Walker
> > > >>>>>
> > > >>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman <
> > > >>> sop...@confluent.io>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hey Walker,
> > > >>>>>>
> > > >>>>>> Thanks for the KIP! I have just a couple of questions:
> > > >>>>>>
> > > >>>>>> 1) It seems a little awkward to me that with the current API, we
> > > >> have a
> > > >>>>>> nearly identical
> > > >>>>>> "add stream to cogroup" method, except for the first which has a
> > > >>>>> different
> > > >>>>>> signature
> > > >>>>>> (ie the first stream is joined as stream.cogroup(Aggregator)
> while
> > > >> the
> > > >>>>>> subsequent ones
> > > >>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not sure what
> it
> > > >>> would
> > > >>>>>> look like exactly,
> > > >>>>>> but I was just wondering if you'd considered and/or rejected any
> > > >>>>>> alternative APIs?
> > > >>>>>>
> > > >>>>>> 2) This might just be my lack of familiarity with "cogroup" as a
> > > >>> concept,
> > > >>>>>> but with the
> > > >>>>>> current (non-optimal) API the user seems to have some control
> over
> > > >> how
> > > >>>>>> exactly
> > > >>>>>> the different streams are joined through the ValueJoiners. Would
> > > this
> > > >>> new
> > > >>>>>> cogroup
> > > >>>>>> simply concatenate the values from the different cogroup
> streams,
> > or
> > > >>>>> could
> > > >>>>>> users
> > > >>>>>> potentially pass some kind of Joiner to the cogroup/aggregate
> > > >> methods?
> > > >>>>> Or,
> > > >>>>>> is the
> > > >>>>>> whole point of cogroups that you no longer ever need to specify
> a
> > > >>> Joiner?
> > > >>>>>> If so, you
> > > >>>>>> should add a short line to the KIP explaining that for those of
> us
> > > >> who
> > > >>>>>> aren't fluent
> > > >>>>>> in cogroup semantics :)
> > > >>>>>>
> > > >>>>>> Cheers,
> > > >>>>>> Sophie
> > > >>>>>>
> > > >>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson <
> > > >> wcarl...@confluent.io>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Good catch I updated that.
> > > >>>>>>>
> > > >>>>>>> I have made a PR for this KIP
> > > >>>>>>>
> > > >>>>>>> I then am splitting it into 3 parts, first cogroup for a
> > key-value
> > > >>>>> store
> > > >>>>>> (
> > > >>>>>>> here <https://github.com/apache/kafka/pull/7538>), then for a
> > > >>>>>>> timeWindowedStore, and then a sessionWindowedStore + ensuring
> > > >>>>>> partitioning.
> > > >>>>>>>
> > > >>>>>>> Walker
> > > >>>>>>>
> > > >>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax <
> > > >>>>> matth...@confluent.io>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Walker,
> > > >>>>>>>>
> > > >>>>>>>> thanks for picking up the KIP and reworking it for the changed
> > > API.
> > > >>>>>>>>
> > > >>>>>>>> Overall, the updated API suggestions make sense to me. The
> seem
> > to
> > > >>>>>> align
> > > >>>>>>>> quite nicely with our current API design.
> > > >>>>>>>>
> > > >>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the type
> parameter
> > > of
> > > >>>>>>>> `Materialized` should be `V`, not `VR`?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> -Matthias
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote:
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > > >>>>>>>>> here
> > > >>>>>>>>> is a link
> > > >>>>>>>>>
> > > >>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson <
> > > >>>>>> wcarl...@confluent.io>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hello all,
> > > >>>>>>>>>>
> > > >>>>>>>>>> I have picked up and updated KIP-150. Due to changes to the
> > > >>>>> project
> > > >>>>>>>> since
> > > >>>>>>>>>> KIP #150 was written there are a few items that need to be
> > > >>>>> updated.
> > > >>>>>>>>>>
> > > >>>>>>>>>> First item that changed is the adoption of the Materialized
> > > >>>>>> parameter.
> > > >>>>>>>>>>
> > > >>>>>>>>>> The second item is the WindowedBy. How the old KIP handles
> > > >>>>> windowing
> > > >>>>>>> is
> > > >>>>>>>>>> that it overloads the aggregate function to take in a Window
> > > >>>>> object
> > > >>>>>> as
> > > >>>>>>>> well
> > > >>>>>>>>>> as the other parameters. The current practice to window
> > > >>>>>>> grouped-streams
> > > >>>>>>>> is
> > > >>>>>>>>>> to call windowedBy and receive a windowed stream object. The
> > > >>>>>> existing
> > > >>>>>>>>>> interface for a windowed stream made from a grouped stream
> > will
> > > >>>>> not
> > > >>>>>>> work
> > > >>>>>>>>>> for cogrouped streams. Hence, we have to make new interfaces
> > for
> > > >>>>>>>> cogrouped
> > > >>>>>>>>>> windowed streams.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Please take a look, I would like to hear your feedback,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Walker
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to