Kyle,

Thanks a lot for the updated KIP. It looks good to me.


Guozhang


On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> wrote:

> This makes much more sense to me. +1
>
> > On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <winkelman.k...@gmail.com>
> wrote:
> >
> > I have updated the KIP and my PR. Let me know what you think.
> > To created a cogrouped stream just call cogroup on a KgroupedStream and
> > supply the initializer, aggValueSerde, and an aggregator. Then continue
> > adding kgroupedstreams and aggregators. Then call one of the many
> aggregate
> > calls to create a KTable.
> >
> > Thanks,
> > Kyle
> >
> > On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> wrote:
> >
> >> Hi Kyle,
> >>
> >> Thanks for the update. I think just one initializer makes sense as it
> >> should only be called once per key and generally it is just going to
> create
> >> a new instance of whatever the Aggregate class is.
> >>
> >> Cheers,
> >> Damian
> >>
> >> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <winkelman.k...@gmail.com>
> >> wrote:
> >>
> >>> Hello all,
> >>>
> >>> I have spent some more time on this and the best alternative I have
> come
> >> up
> >>> with is:
> >>> KGroupedStream has a single cogroup call that takes an initializer and
> an
> >>> aggregator.
> >>> CogroupedKStream has a cogroup call that takes additional groupedStream
> >>> aggregator pairs.
> >>> CogroupedKStream has multiple aggregate methods that create the
> different
> >>> stores.
> >>>
> >>> I plan on updating the kip but I want people's input on if we should
> have
> >>> the initializer be passed in once at the beginning or if we should
> >> instead
> >>> have the initializer be required for each call to one of the aggregate
> >>> calls. The first makes more sense to me but doesnt allow the user to
> >>> specify different initializers for different tables.
> >>>
> >>> Thanks,
> >>> Kyle
> >>>
> >>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <winkelman.k...@gmail.com>
> >>> wrote:
> >>>
> >>>> Yea I really like that idea I'll see what I can do to update the kip
> >> and
> >>>> my pr when I have some time. I'm not sure how well creating the
> >>>> kstreamaggregates will go though because at that point I will have
> >> thrown
> >>>> away the type of the values. It will be type safe I just may need to
> >> do a
> >>>> little forcing.
> >>>>
> >>>> Thanks,
> >>>> Kyle
> >>>>
> >>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
> >>>>
> >>>>> Kyle,
> >>>>>
> >>>>> Thanks for the explanations, my previous read on the wiki examples
> was
> >>>>> wrong.
> >>>>>
> >>>>> So I guess my motivation should be "reduced" to: can we move the
> >> window
> >>>>> specs param from "KGroupedStream#cogroup(..)" to
> >>>>> "CogroupedKStream#aggregate(..)", and my motivations are:
> >>>>>
> >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream from 3 to
> >> 2.
> >>>>> 2. major: this is for extensibility of the APIs, and since we are
> >>> removing
> >>>>> the "Evolving" annotations on Streams it may be harder to change it
> >>> again
> >>>>> in the future. The extended use cases are that people wanted to have
> >>>>> windowed running aggregates on different granularities, e.g. "give me
> >>> the
> >>>>> counts per-minute, per-hour, per-day and per-week", and today in DSL
> >> we
> >>>>> need to specify that case in multiple aggregate operators, which gets
> >> a
> >>>>> state store / changelog, etc. And it is possible to optimize it as
> >> well
> >>> to
> >>>>> a single state store. Its implementation would be tricky as you need
> >> to
> >>>>> contain different lengthed windows within your window store but just
> >>> from
> >>>>> the public API point of view, it could be specified as:
> >>>>>
> >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ...
> >>>>> "state-store-name");
> >>>>>
> >>>>> table1 = stream.aggregate(/*per-minute window*/)
> >>>>> table2 = stream.aggregate(/*per-hour window*/)
> >>>>> table3 = stream.aggregate(/*per-day window*/)
> >>>>>
> >>>>> while underlying we are only using a single store "state-store-name"
> >> for
> >>>>> it.
> >>>>>
> >>>>>
> >>>>> Although this feature is out of the scope of this KIP, I'd like to
> >>> discuss
> >>>>> if we can "leave the door open" to make such changes without
> modifying
> >>> the
> >>>>> public APIs .
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
> >>> winkelman.k...@gmail.com
> >>>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> I allow defining a single window/sessionwindow one time when you
> >> make
> >>>>> the
> >>>>>> cogroup call from a KGroupedStream. From then on you are using the
> >>>>> cogroup
> >>>>>> call from with in CogroupedKStream which doesnt accept any
> >> additional
> >>>>>> windows/sessionwindows.
> >>>>>>
> >>>>>> Is this what you meant by your question or did I misunderstand?
> >>>>>>
> >>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>> Another question that came to me is on "window alignment": from the
> >>> KIP
> >>>>> it
> >>>>>> seems you are allowing users to specify a (potentially different)
> >>> window
> >>>>>> spec in each co-grouped input stream. So if these window specs are
> >>>>>> different how should we "align" them with different input streams? I
> >>>>> think
> >>>>>> it is more natural to only specify on window spec in the
> >>>>>>
> >>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows);
> >>>>>>
> >>>>>>
> >>>>>> And remove it from the cogroup() functions. WDYT?
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <wangg...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Thanks for the proposal Kyle, this is a quite common use case to
> >>>>> support
> >>>>>>> such multi-way table join (i.e. N source tables with N aggregate
> >>> func)
> >>>>>> with
> >>>>>>> a single store and N+1 serdes, I have seen lots of people using
> >> the
> >>>>>>> low-level PAPI to achieve this goal.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
> >>>>>> winkelman.k...@gmail.com
> >>>>>>>> wrote:
> >>>>>>>
> >>>>>>>> I like your point about not handling other cases such as count
> >> and
> >>>>>> reduce.
> >>>>>>>>
> >>>>>>>> I think that reduce may not make sense because reduce assumes
> >> that
> >>>>> the
> >>>>>>>> input values are the same as the output values. With cogroup
> >> there
> >>>>> may
> >>>>>> be
> >>>>>>>> multiple different input types and then your output type cant be
> >>>>>> multiple
> >>>>>>>> different things. In the case where you have all matching value
> >>> types
> >>>>>> you
> >>>>>>>> can do KStreamBuilder#merge followed by the reduce.
> >>>>>>>>
> >>>>>>>> As for count I think it is possible to call count on all the
> >>>>> individual
> >>>>>>>> grouped streams and then do joins. Otherwise we could maybe make
> >> a
> >>>>>> special
> >>>>>>>> call in groupedstream for this case. Because in this case we dont
> >>>>> need
> >>>>>> to
> >>>>>>>> do type checking on the values. It could be similar to the
> >> current
> >>>>> count
> >>>>>>>> methods but accept a var args of additonal grouped streams as
> >> well
> >>>>> and
> >>>>>>>> make
> >>>>>>>> sure they have a key type of K.
> >>>>>>>>
> >>>>>>>> The way I have put the kip together is to ensure that we do type
> >>>>>> checking.
> >>>>>>>> I don't see a way we could group them all first and then make a
> >>> call
> >>>>> to
> >>>>>>>> count, reduce, or aggregate because with aggregate they would
> >> need
> >>> to
> >>>>>> pass
> >>>>>>>> a list of aggregators and we would have no way of type checking
> >>> that
> >>>>>> they
> >>>>>>>> match the grouped streams.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Kyle
> >>>>>>>>
> >>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" <xav...@confluent.io>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Sorry to jump on this thread so late. I agree this is a very
> >>> useful
> >>>>>>>>> addition and wanted to provide an additional use-case and some
> >>> more
> >>>>>>>>> comments.
> >>>>>>>>>
> >>>>>>>>> This is actually a very common analytics use-case in the
> >> ad-tech
> >>>>>>>> industry.
> >>>>>>>>> The typical setup will have an auction stream, an impression
> >>>>> stream,
> >>>>>>>> and a
> >>>>>>>>> click stream. Those three streams need to be combined to
> >> compute
> >>>>>>>> aggregate
> >>>>>>>>> statistics (e.g. impression statistics, and click-through
> >> rates),
> >>>>>> since
> >>>>>>>>> most of the attributes of interest are only present the auction
> >>>>>> stream.
> >>>>>>>>>
> >>>>>>>>> A simple way to do this is to co-group all the streams by the
> >>>>> auction
> >>>>>>>> key,
> >>>>>>>>> and process updates to the co-group as events for each stream
> >>> come
> >>>>> in,
> >>>>>>>>> keeping only one value from each stream before sending
> >> downstream
> >>>>> for
> >>>>>>>>> further processing / aggregation.
> >>>>>>>>>
> >>>>>>>>> One could view the result of that co-group operation as a
> >>> "KTable"
> >>>>>> with
> >>>>>>>>> multiple values per key. The key being the grouping key, and
> >> the
> >>>>>> values
> >>>>>>>>> consisting of one value per stream.
> >>>>>>>>>
> >>>>>>>>> What I like about Kyle's approach is that allows elegant
> >>>>> co-grouping
> >>>>>> of
> >>>>>>>>> multiple streams without having to worry about the number of
> >>>>> streams,
> >>>>>>>> and
> >>>>>>>>> avoids dealing with Tuple types or other generic interfaces
> >> that
> >>>>> could
> >>>>>>>> get
> >>>>>>>>> messy if we wanted to preserve all the value types in the
> >>> resulting
> >>>>>>>>> co-grouped stream.
> >>>>>>>>>
> >>>>>>>>> My only concern is that we only allow the cogroup + aggregate
> >>>>> combined
> >>>>>>>>> operation. This forces the user to build their own tuple
> >>>>> serialization
> >>>>>>>>> format if they want to preserve the individual input stream
> >>> values
> >>>>> as
> >>>>>> a
> >>>>>>>>> group. It also deviates quite a bit from our approach in
> >>>>>> KGroupedStream
> >>>>>>>>> which offers other operations, such as count and reduce, which
> >>>>> should
> >>>>>>>> also
> >>>>>>>>> be applicable to a co-grouped stream.
> >>>>>>>>>
> >>>>>>>>> Overall I still think this is a really useful addition, but I
> >>> feel
> >>>>> we
> >>>>>>>>> haven't spend much time trying to explore alternative DSLs that
> >>>>> could
> >>>>>>>> maybe
> >>>>>>>>> generalize better or match our existing syntax more closely.
> >>>>>>>>>
> >>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <
> >>>>>> winkelman.k...@gmail.com
> >>>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Eno, is there anyone else that is an expert in the kafka
> >>> streams
> >>>>>> realm
> >>>>>>>>> that
> >>>>>>>>>> I should reach out to for input?
> >>>>>>>>>>
> >>>>>>>>>> I believe Damian Guy is still planning on reviewing this more
> >>> in
> >>>>>> depth
> >>>>>>>>> so I
> >>>>>>>>>> will wait for his inputs before continuing.
> >>>>>>>>>>
> >>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" <
> >> eno.there...@gmail.com
> >>>>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks Kyle, good arguments.
> >>>>>>>>>>>
> >>>>>>>>>>> Eno
> >>>>>>>>>>>
> >>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <
> >>>>>>>> winkelman.k...@gmail.com
> >>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> *- minor: could you add an exact example (similar to what
> >>>>> Jay’s
> >>>>>>>>> example
> >>>>>>>>>>> is,
> >>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super
> >>>>>> concrete?*
> >>>>>>>>>>>> I have added a more concrete example to the KIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> *- my main concern is that we’re exposing this
> >> optimization
> >>>>> to
> >>>>>> the
> >>>>>>>>> DSL.
> >>>>>>>>>>> In
> >>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL
> >>> and
> >>>>> do
> >>>>>>>> the
> >>>>>>>>>> right
> >>>>>>>>>>>> thing under the covers (create just one state store,
> >>> arrange
> >>>>> the
> >>>>>>>>> nodes
> >>>>>>>>>>>> etc). The original DSL had a bunch of small, composable
> >>>>> pieces
> >>>>>>>>> (group,
> >>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d
> >>>>> like to
> >>>>>>>> hear
> >>>>>>>>>>> your
> >>>>>>>>>>>> thoughts on whether it’s possible to do this optimization
> >>>>> with
> >>>>>> the
> >>>>>>>>>>> current
> >>>>>>>>>>>> DSL, at the topology builder level.*
> >>>>>>>>>>>> You would have to make a lot of checks to understand if
> >> it
> >>> is
> >>>>>> even
> >>>>>>>>>>> possible
> >>>>>>>>>>>> to make this optimization:
> >>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins
> >>>>>>>>>>>> 2. None of the intermediate KTables are used for anything
> >>>>> else.
> >>>>>>>>>>>> 3. None of the intermediate stores are used. (This may be
> >>>>>>>> impossible
> >>>>>>>>>>>> especially if they use KafkaStreams#store after the
> >>> topology
> >>>>> has
> >>>>>>>>>> already
> >>>>>>>>>>>> been built.)
> >>>>>>>>>>>> You would then need to make decisions during the
> >>>>> optimization:
> >>>>>>>>>>>> 1. Your new initializer would the composite of all the
> >>>>>> individual
> >>>>>>>>>>>> initializers and the valueJoiners.
> >>>>>>>>>>>> 2. I am having a hard time thinking about how you would
> >>> turn
> >>>>> the
> >>>>>>>>>>>> aggregators and valueJoiners into an aggregator that
> >> would
> >>>>> work
> >>>>>> on
> >>>>>>>>> the
> >>>>>>>>>>>> final object, but this may be possible.
> >>>>>>>>>>>> 3. Which state store would you use? The ones declared
> >> would
> >>>>> be
> >>>>>> for
> >>>>>>>>> the
> >>>>>>>>>>>> aggregate values. None of the declared ones would be
> >>>>> guaranteed
> >>>>>> to
> >>>>>>>>> hold
> >>>>>>>>>>> the
> >>>>>>>>>>>> final object. This would mean you must created a new
> >> state
> >>>>> store
> >>>>>>>> and
> >>>>>>>>>> not
> >>>>>>>>>>>> created any of the declared ones.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The main argument I have against it is even if it could
> >> be
> >>>>> done
> >>>>>> I
> >>>>>>>>> don't
> >>>>>>>>>>>> know that we would want to have this be an optimization
> >> in
> >>>>> the
> >>>>>>>>>> background
> >>>>>>>>>>>> because the user would still be required to think about
> >> all
> >>>>> of
> >>>>>> the
> >>>>>>>>>>>> intermediate values that they shouldn't need to worry
> >> about
> >>>>> if
> >>>>>>>> they
> >>>>>>>>>> only
> >>>>>>>>>>>> care about the final object.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In my opinion cogroup is a common enough case that it
> >>> should
> >>>>> be
> >>>>>>>> part
> >>>>>>>>> of
> >>>>>>>>>>> the
> >>>>>>>>>>>> composable pieces (group, aggregate, join) because we
> >> want
> >>> to
> >>>>>>>> allow
> >>>>>>>>>>> people
> >>>>>>>>>>>> to join more than 2 or more streams in an easy way. Right
> >>>>> now I
> >>>>>>>> don't
> >>>>>>>>>>> think
> >>>>>>>>>>>> we give them ways of handling this use case easily.
> >>>>>>>>>>>>
> >>>>>>>>>>>> *-I think there will be scope for several such
> >>> optimizations
> >>>>> in
> >>>>>>>> the
> >>>>>>>>>>> future
> >>>>>>>>>>>> and perhaps at some point we need to think about
> >> decoupling
> >>>>> the
> >>>>>>>> 1:1
> >>>>>>>>>>> mapping
> >>>>>>>>>>>> from the DSL into the physical topology.*
> >>>>>>>>>>>> I would argue that cogroup is not just an optimization it
> >>> is
> >>>>> a
> >>>>>> new
> >>>>>>>>> way
> >>>>>>>>>>> for
> >>>>>>>>>>>> the users to look at accomplishing a problem that
> >> requires
> >>>>>>>> multiple
> >>>>>>>>>>>> streams. I may sound like a broken record but I don't
> >> think
> >>>>>> users
> >>>>>>>>>> should
> >>>>>>>>>>>> have to build the N-1 intermediate tables and deal with
> >>> their
> >>>>>>>>>>> initializers,
> >>>>>>>>>>>> serdes and stores if all they care about is the final
> >>> object.
> >>>>>>>>>>>> Now if for example someone uses cogroup but doesn't
> >> supply
> >>>>>>>> additional
> >>>>>>>>>>>> streams and aggregators this case is equivalent to a
> >> single
> >>>>>>>> grouped
> >>>>>>>>>>> stream
> >>>>>>>>>>>> making an aggregate call. This case is what I view an
> >>>>>> optimization
> >>>>>>>>> as,
> >>>>>>>>>> we
> >>>>>>>>>>>> could remove the KStreamCogroup and act as if there was
> >>> just
> >>>>> a
> >>>>>>>> call
> >>>>>>>>> to
> >>>>>>>>>>>> KGroupedStream#aggregate instead of calling
> >>>>>>>> KGroupedStream#cogroup.
> >>>>>>>>> (I
> >>>>>>>>>>>> would prefer to just write a warning saying that this is
> >>> not
> >>>>> how
> >>>>>>>>>> cogroup
> >>>>>>>>>>> is
> >>>>>>>>>>>> to be used.)
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Kyle
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <
> >>>>>>>> eno.there...@gmail.com
> >>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Kyle,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP again. A couple of comments:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> - minor: could you add an exact example (similar to what
> >>>>> Jay’s
> >>>>>>>>> example
> >>>>>>>>>>> is,
> >>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super
> >>>>>> concrete?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> - my main concern is that we’re exposing this
> >> optimization
> >>>>> to
> >>>>>> the
> >>>>>>>>> DSL.
> >>>>>>>>>>> In
> >>>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL
> >>>>> and do
> >>>>>>>> the
> >>>>>>>>>>> right
> >>>>>>>>>>>>> thing under the covers (create just one state store,
> >>> arrange
> >>>>>> the
> >>>>>>>>> nodes
> >>>>>>>>>>>>> etc). The original DSL had a bunch of small, composable
> >>>>> pieces
> >>>>>>>>> (group,
> >>>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d
> >>>>> like
> >>>>>> to
> >>>>>>>>> hear
> >>>>>>>>>>> your
> >>>>>>>>>>>>> thoughts on whether it’s possible to do this
> >> optimization
> >>>>> with
> >>>>>>>> the
> >>>>>>>>>>> current
> >>>>>>>>>>>>> DSL, at the topology builder level.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think there will be scope for several such
> >> optimizations
> >>>>> in
> >>>>>> the
> >>>>>>>>>> future
> >>>>>>>>>>>>> and perhaps at some point we need to think about
> >>> decoupling
> >>>>> the
> >>>>>>>> 1:1
> >>>>>>>>>>> mapping
> >>>>>>>>>>>>> from the DSL into the physical topology.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>> Eno
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <
> >> j...@confluent.io>
> >>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I haven't digested the proposal but the use case is
> >>> pretty
> >>>>>>>> common.
> >>>>>>>>> An
> >>>>>>>>>>>>>> example would be the "customer 360" or "unified
> >> customer
> >>>>>>>> profile"
> >>>>>>>>> use
> >>>>>>>>>>>>> case
> >>>>>>>>>>>>>> we often use. In that use case you have a dozen systems
> >>>>> each
> >>>>>> of
> >>>>>>>>> which
> >>>>>>>>>>> has
> >>>>>>>>>>>>>> some information about your customer (account details,
> >>>>>> settings,
> >>>>>>>>>>> billing
> >>>>>>>>>>>>>> info, customer service contacts, purchase history,
> >> etc).
> >>>>> Your
> >>>>>>>> goal
> >>>>>>>>> is
> >>>>>>>>>>> to
> >>>>>>>>>>>>>> join/munge these into a single profile record for each
> >>>>>> customer
> >>>>>>>>> that
> >>>>>>>>>>> has
> >>>>>>>>>>>>>> all the relevant info in a usable form and is
> >> up-to-date
> >>>>> with
> >>>>>>>> all
> >>>>>>>>> the
> >>>>>>>>>>>>>> source systems. If you implement that with kstreams as
> >> a
> >>>>>>>> sequence
> >>>>>>>>> of
> >>>>>>>>>>>>> joins
> >>>>>>>>>>>>>> i think today we'd fully materialize N-1 intermediate
> >>>>> tables.
> >>>>>>>> But
> >>>>>>>>>>> clearly
> >>>>>>>>>>>>>> you only need a single stage to group all these things
> >>> that
> >>>>>> are
> >>>>>>>>>> already
> >>>>>>>>>>>>>> co-partitioned. A distributed database would do this
> >>> under
> >>>>> the
> >>>>>>>>> covers
> >>>>>>>>>>>>> which
> >>>>>>>>>>>>>> is arguably better (at least when it does the right
> >>> thing)
> >>>>> and
> >>>>>>>>>> perhaps
> >>>>>>>>>>> we
> >>>>>>>>>>>>>> could do the same thing but I'm not sure we know the
> >>>>>>>> partitioning
> >>>>>>>>> so
> >>>>>>>>>> we
> >>>>>>>>>>>>> may
> >>>>>>>>>>>>>> need an explicit cogroup command that impllies they are
> >>>>>> already
> >>>>>>>>>>>>>> co-partitioned.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Jay
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> >>>>>>>>>>> winkelman.k...@gmail.com
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Yea thats a good way to look at it.
> >>>>>>>>>>>>>>> I have seen this type of functionality in a couple
> >> other
> >>>>>>>> platforms
> >>>>>>>>>>> like
> >>>>>>>>>>>>>>> spark and pig.
> >>>>>>>>>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
> >>>>>>>>>>>>> PairRDDFunctions.html
> >>>>>>>>>>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
> >>>>>>>>>>>>> cogroup_operator.htm
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <
> >>>>> damian....@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Kyle,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way
> >> outer
> >>>>>> join?
> >>>>>>>> So
> >>>>>>>>> an
> >>>>>>>>>>>>> input
> >>>>>>>>>>>>>>>> on any stream will always produce a new aggregated
> >>> value
> >>>>> -
> >>>>>> is
> >>>>>>>>> that
> >>>>>>>>>>>>>>> correct?
> >>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the
> >> current
> >>>>>> value,
> >>>>>>>>>>>>> aggregates
> >>>>>>>>>>>>>>>> and forwards the result.
> >>>>>>>>>>>>>>>> I need to look into it and think about it a bit more,
> >>>>> but it
> >>>>>>>>> seems
> >>>>>>>>>>> like
> >>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> could be a useful optimization.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
> >>>>>>>>>> winkelman.k...@gmail.com
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I sure can. I have added the following description
> >> to
> >>> my
> >>>>>>>> KIP. If
> >>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> doesn't help let me know and I will take some more
> >>> time
> >>>>> to
> >>>>>>>>> build a
> >>>>>>>>>>>>>>>> diagram
> >>>>>>>>>>>>>>>>> and make more of a step by step description:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Example with Current API:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> KTable<K, V1> table1 =
> >>>>>>>>>>>>>>>>> builder.stream("topic1").groupByKey().aggregate(
> >>>>>> initializer1
> >>>>>>>> ,
> >>>>>>>>>>>>>>>> aggregator1,
> >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1);
> >>>>>>>>>>>>>>>>> KTable<K, V2> table2 =
> >>>>>>>>>>>>>>>>> builder.stream("topic2").groupByKey().aggregate(
> >>>>>> initializer2
> >>>>>>>> ,
> >>>>>>>>>>>>>>>> aggregator2,
> >>>>>>>>>>>>>>>>> aggValueSerde2, storeName2);
> >>>>>>>>>>>>>>>>> KTable<K, V3> table3 =
> >>>>>>>>>>>>>>>>> builder.stream("topic3").groupByKey().aggregate(
> >>>>>> initializer3
> >>>>>>>> ,
> >>>>>>>>>>>>>>>> aggregator3,
> >>>>>>>>>>>>>>>>> aggValueSerde3, storeName3);
> >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
> >>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3,
> >>>>> joinerOneTwoAndThree);
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores, requires
> >> 3
> >>>>>>>>>> initializers,
> >>>>>>>>>>>>>>> and 3
> >>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to user
> >> to
> >>>>>> define
> >>>>>>>>> what
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2, V3).
> >> They
> >>>>> are
> >>>>>>>> left
> >>>>>>>>>>> with a
> >>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all the
> >>>>> same
> >>>>>> as
> >>>>>>>> CG
> >>>>>>>>>> and
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>> joiners are more like mergers, or second make them
> >>>>>>>> intermediate
> >>>>>>>>>>> states
> >>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the
> >> joiners
> >>>>> use
> >>>>>>>> those
> >>>>>>>>>> to
> >>>>>>>>>>>>>>> build
> >>>>>>>>>>>>>>>>> the final aggregate CG value. This is something the
> >>> user
> >>>>>>>> could
> >>>>>>>>>> avoid
> >>>>>>>>>>>>>>>>> thinking about with this KIP.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
> >> will
> >>>>> first
> >>>>>>>> go
> >>>>>>>>>>> through
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate from
> >>>>>>>> storeName1.
> >>>>>>>>>> It
> >>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>> produce this in the form of the first intermediate
> >>> value
> >>>>>> and
> >>>>>>>> get
> >>>>>>>>>>> sent
> >>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will look
> >> up
> >>>>> the
> >>>>>>>>> current
> >>>>>>>>>>>>> value
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the key in storeName2. It will use the first joiner
> >> to
> >>>>>>>> calculate
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> second
> >>>>>>>>>>>>>>>>> intermediate value, which will go through an
> >>> additional
> >>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the
> >>> current
> >>>>>>>> value of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build the
> >>> final
> >>>>>>>>> aggregate
> >>>>>>>>>>>>>>> value.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If you think through all possibilities for incoming
> >>>>> topics
> >>>>>>>> you
> >>>>>>>>>> will
> >>>>>>>>>>>>> see
> >>>>>>>>>>>>>>>>> that no matter which topic it comes in through all
> >>> three
> >>>>>>>> stores
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>> queried
> >>>>>>>>>>>>>>>>> and all of the joiners must get used.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Topology wise for N incoming streams this creates N
> >>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins,
> >> and
> >>>>> N-1
> >>>>>>>>>>>>>>>>> KTableKTableJoinMergers.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Example with Proposed API:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 =
> >>>>> builder.stream("topic1").
> >>>>>>>>>>>>>>> groupByKey();
> >>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 =
> >>>>> builder.stream("topic2").
> >>>>>>>>>>>>>>> groupByKey();
> >>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 =
> >>>>> builder.stream("topic3").
> >>>>>>>>>>>>>>> groupByKey();
> >>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped =
> >>> grouped1.cogroup(initializer1,
> >>>>>>>>>>> aggregator1,
> >>>>>>>>>>>>>>>>> aggValueSerde1, storeName1)
> >>>>>>>>>>>>>>>>>      .cogroup(grouped2, aggregator2)
> >>>>>>>>>>>>>>>>>      .cogroup(grouped3, aggregator3)
> >>>>>>>>>>>>>>>>>      .aggregate();
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore, requires 1
> >>>>>>>>> initializer,
> >>>>>>>>>>> and
> >>>>>>>>>>>>> 1
> >>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry about
> >>> the
> >>>>>>>>>>> intermediate
> >>>>>>>>>>>>>>>>> values and the joiners. All they have to think about
> >>> is
> >>>>> how
> >>>>>>>> each
> >>>>>>>>>>>>> stream
> >>>>>>>>>>>>>>>>> impacts the creation of the final CG object.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
> >> will
> >>>>> first
> >>>>>>>> go
> >>>>>>>>>>> through
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate from
> >>>>>>>> storeName1.
> >>>>>>>>>> It
> >>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update the
> >>>>> store
> >>>>>>>> and
> >>>>>>>>>> pass
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through the
> >>>>>>>> KStreamCogroup
> >>>>>>>>>>> which
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> pretty much just a pass through processor and you
> >> are
> >>>>> done.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api
> >> will
> >>>>> only
> >>>>>>>> every
> >>>>>>>>>>>>>>> create N
> >>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
> >>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Kyle,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow,
> >>>>> but I
> >>>>>>>> could
> >>>>>>>>>> not
> >>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more
> >>> concrete
> >>>>>>>> example,
> >>>>>>>>>>> like
> >>>>>>>>>>>>>>> 3
> >>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected result),
> >>> and
> >>>>>> show
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>> difference between current way to to implement it
> >> and
> >>>>> the
> >>>>>>>>>> proposed
> >>>>>>>>>>>>>>> API?
> >>>>>>>>>>>>>>>>>> This could also cover the internal processing to
> >> see
> >>>>> what
> >>>>>>>> store
> >>>>>>>>>>> calls
> >>>>>>>>>>>>>>>>>> would be required for both approaches etc.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you propose,
> >> and
> >>> it
> >>>>>>>> would
> >>>>>>>>>> help
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> understand it better.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks a lot!
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> >>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found here.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my
> >> classes
> >>>>> and
> >>>>>> get
> >>>>>>>>>> around
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> writing documentation for the public api
> >> additions.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> One thing I was curious about is during the
> >>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate
> >>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream#
> >>>>>>>>> repartitionIfRequired
> >>>>>>>>>>>>>>>> method.
> >>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>> can't supply the store name because if more than
> >> one
> >>>>>>>> grouped
> >>>>>>>>>>> stream
> >>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some
> >> name
> >>>>> that
> >>>>>>>>> someone
> >>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow it
> >> to
> >>>>> fall
> >>>>>>>> back
> >>>>>>>>>> to
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> KGroupedStream.name?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped tables?
> >>> This
> >>>>>>>> would
> >>>>>>>>> be
> >>>>>>>>>>>>>>>> pretty
> >>>>>>>>>>>>>>>>>> easy
> >>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing session
> >>> stores
> >>>>>> and
> >>>>>>>>>>> windowed
> >>>>>>>>>>>>>>>>>> stores
> >>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate and
> >>>>>>>>>>>>>>> KTableWindowAggregate
> >>>>>>>>>>>>>>>>>>> implementations.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>> Kyle
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <
> >>>>>>>>> eno.there...@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Eno
> >>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <
> >>>>>>>>> damian....@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Kyle,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't
> >> had
> >>>>> the
> >>>>>>>>> chance
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> look
> >>>>>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to look
> >>>>> into
> >>>>>> it
> >>>>>>>>>>>>>>> tomorrow.
> >>>>>>>>>>>>>>>>> For
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against kafka
> >>>>> trunk
> >>>>>>>> and
> >>>>>>>>>> mark
> >>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>> WIP?
> >>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have done.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>> Damian
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> >>>>>>>>>>>>>>>> winkelman.k...@gmail.com
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw
> >> some
> >>>>>>>> attention
> >>>>>>>>> to
> >>>>>>>>>>> my
> >>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days.
> >> This
> >>>>> is my
> >>>>>>>>> first
> >>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so I'm
> >>>>> sure I
> >>>>>>>> did
> >>>>>>>>>>>>>>>> something
> >>>>>>>>>>>>>>>>>>>> wrong.
> >>>>>>>>>>>>>>>>>>>>>> ;)
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
> >>>>>>>>>>>>>>>> winkelman.k...@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hello all,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate
> >> discussion
> >>>>> about
> >>>>>>>>> adding
> >>>>>>>>>>>>>>>>> cogroup
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> the streams DSL.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here:
> >>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> >>>>>> confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation here:
> >>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>
> >>
>
>


-- 
-- Guozhang

Reply via email to