I have added code blocks and a note about the partial results.

Can I ask why you dont like KCogroupedStream? I just think that because it
is created from a KGroupedStream we should keep a similar name format.

On May 16, 2017 9:23 AM, "Damian Guy" <damian....@gmail.com> wrote:

Hi Kyle,

Can you put the code examples etc in {code} blocks to make it easier to
read?

I think this is probably a pretty common use-case and therefore a
worthwhile addition to the API.

I'd suggest dropping the K from KCogroupedStream and calling it
CogroupedStream or CogroupedKStream.
In your example, wouldn't the output potentially have more partial results?
I.e, for each input on any stream you'd potentially see a record produced?
I think it is worth mentioning this.

Thanks,
Damian

On Tue, 16 May 2017 at 12:26 Kyle Winkelman <winkelman.k...@gmail.com>
wrote:

> No problem, I just wanted to make sure people still had more to say. I
will
> wait another week.
>
> Thanks,
> Kyle
>
> On May 16, 2017 4:25 AM, "Eno Thereska" <eno.there...@gmail.com> wrote:
>
> > Hi Kyle,
> >
> > Sorry for the delay in reviews, tomorrow is feature freeze deadline (
> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0
> <
> > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0
> >)
> > so people are busier than usual. Stay tuned.
> >
> > Eno
> > > On 15 May 2017, at 13:25, Kyle Winkelman <winkelman.k...@gmail.com>
> > wrote:
> > >
> > > Damian Guy, could you let me know if you plan to review this further?
> > There
> > > is no rush, but if you dont have any additional comments I could start
> > the
> > > voting and finish my WIP PR.
> > >
> > > Thanks,
> > > Kyle
> > >
> > > On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com>
> > wrote:
> > >
> > >> Eno, is there anyone else that is an expert in the kafka streams
realm
> > >> that I should reach out to for input?
> > >>
> > >> I believe Damian Guy is still planning on reviewing this more in
depth
> > so
> > >> I will wait for his inputs before continuing.
> > >>
> > >> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com>
> wrote:
> > >>
> > >>> Thanks Kyle, good arguments.
> > >>>
> > >>> Eno
> > >>>
> > >>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <
> winkelman.k...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>> *- minor: could you add an exact example (similar to what Jay’s
> > example
> > >>> is,
> > >>>> or like your Spark/Pig pointers had) to make this super concrete?*
> > >>>> I have added a more concrete example to the KIP.
> > >>>>
> > >>>> *- my main concern is that we’re exposing this optimization to the
> > DSL.
> > >>> In
> > >>>> an ideal world, an optimizer would take the existing DSL and do the
> > >>> right
> > >>>> thing under the covers (create just one state store, arrange the
> nodes
> > >>>> etc). The original DSL had a bunch of small, composable pieces
> (group,
> > >>>> aggregate, join) that this proposal groups together. I’d like to
> hear
> > >>> your
> > >>>> thoughts on whether it’s possible to do this optimization with the
> > >>> current
> > >>>> DSL, at the topology builder level.*
> > >>>> You would have to make a lot of checks to understand if it is even
> > >>> possible
> > >>>> to make this optimization:
> > >>>> 1. Make sure they are all KTableKTableOuterJoins
> > >>>> 2. None of the intermediate KTables are used for anything else.
> > >>>> 3. None of the intermediate stores are used. (This may be
impossible
> > >>>> especially if they use KafkaStreams#store after the topology has
> > already
> > >>>> been built.)
> > >>>> You would then need to make decisions during the optimization:
> > >>>> 1. Your new initializer would the composite of all the individual
> > >>>> initializers and the valueJoiners.
> > >>>> 2. I am having a hard time thinking about how you would turn the
> > >>>> aggregators and valueJoiners into an aggregator that would work on
> the
> > >>>> final object, but this may be possible.
> > >>>> 3. Which state store would you use? The ones declared would be for
> the
> > >>>> aggregate values. None of the declared ones would be guaranteed to
> > hold
> > >>> the
> > >>>> final object. This would mean you must created a new state store
and
> > not
> > >>>> created any of the declared ones.
> > >>>>
> > >>>> The main argument I have against it is even if it could be done I
> > don't
> > >>>> know that we would want to have this be an optimization in the
> > >>> background
> > >>>> because the user would still be required to think about all of the
> > >>>> intermediate values that they shouldn't need to worry about if they
> > only
> > >>>> care about the final object.
> > >>>>
> > >>>> In my opinion cogroup is a common enough case that it should be
part
> > of
> > >>> the
> > >>>> composable pieces (group, aggregate, join) because we want to allow
> > >>> people
> > >>>> to join more than 2 or more streams in an easy way. Right now I
> don't
> > >>> think
> > >>>> we give them ways of handling this use case easily.
> > >>>>
> > >>>> *-I think there will be scope for several such optimizations in the
> > >>> future
> > >>>> and perhaps at some point we need to think about decoupling the 1:1
> > >>> mapping
> > >>>> from the DSL into the physical topology.*
> > >>>> I would argue that cogroup is not just an optimization it is a new
> way
> > >>> for
> > >>>> the users to look at accomplishing a problem that requires multiple
> > >>>> streams. I may sound like a broken record but I don't think users
> > should
> > >>>> have to build the N-1 intermediate tables and deal with their
> > >>> initializers,
> > >>>> serdes and stores if all they care about is the final object.
> > >>>> Now if for example someone uses cogroup but doesn't supply
> additional
> > >>>> streams and aggregators this case is equivalent to a single grouped
> > >>> stream
> > >>>> making an aggregate call. This case is what I view an optimization
> as,
> > >>> we
> > >>>> could remove the KStreamCogroup and act as if there was just a call
> to
> > >>>> KGroupedStream#aggregate instead of calling KGroupedStream#cogroup.
> (I
> > >>>> would prefer to just write a warning saying that this is not how
> > >>> cogroup is
> > >>>> to be used.)
> > >>>>
> > >>>> Thanks,
> > >>>> Kyle
> > >>>>
> > >>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <
> eno.there...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>>> Hi Kyle,
> > >>>>>
> > >>>>> Thanks for the KIP again. A couple of comments:
> > >>>>>
> > >>>>> - minor: could you add an exact example (similar to what Jay’s
> > example
> > >>> is,
> > >>>>> or like your Spark/Pig pointers had) to make this super concrete?
> > >>>>>
> > >>>>> - my main concern is that we’re exposing this optimization to the
> > DSL.
> > >>> In
> > >>>>> an ideal world, an optimizer would take the existing DSL and do
the
> > >>> right
> > >>>>> thing under the covers (create just one state store, arrange the
> > nodes
> > >>>>> etc). The original DSL had a bunch of small, composable pieces
> > (group,
> > >>>>> aggregate, join) that this proposal groups together. I’d like to
> hear
> > >>> your
> > >>>>> thoughts on whether it’s possible to do this optimization with the
> > >>> current
> > >>>>> DSL, at the topology builder level.
> > >>>>>
> > >>>>> I think there will be scope for several such optimizations in the
> > >>> future
> > >>>>> and perhaps at some point we need to think about decoupling the
1:1
> > >>> mapping
> > >>>>> from the DSL into the physical topology.
> > >>>>>
> > >>>>> Thanks
> > >>>>> Eno
> > >>>>>
> > >>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote:
> > >>>>>>
> > >>>>>> I haven't digested the proposal but the use case is pretty
common.
> > An
> > >>>>>> example would be the "customer 360" or "unified customer profile"
> > use
> > >>>>> case
> > >>>>>> we often use. In that use case you have a dozen systems each of
> > which
> > >>> has
> > >>>>>> some information about your customer (account details, settings,
> > >>> billing
> > >>>>>> info, customer service contacts, purchase history, etc). Your
goal
> > is
> > >>> to
> > >>>>>> join/munge these into a single profile record for each customer
> that
> > >>> has
> > >>>>>> all the relevant info in a usable form and is up-to-date with all
> > the
> > >>>>>> source systems. If you implement that with kstreams as a sequence
> of
> > >>>>> joins
> > >>>>>> i think today we'd fully materialize N-1 intermediate tables. But
> > >>> clearly
> > >>>>>> you only need a single stage to group all these things that are
> > >>> already
> > >>>>>> co-partitioned. A distributed database would do this under the
> > covers
> > >>>>> which
> > >>>>>> is arguably better (at least when it does the right thing) and
> > >>> perhaps we
> > >>>>>> could do the same thing but I'm not sure we know the partitioning
> so
> > >>> we
> > >>>>> may
> > >>>>>> need an explicit cogroup command that impllies they are already
> > >>>>>> co-partitioned.
> > >>>>>>
> > >>>>>> -Jay
> > >>>>>>
> > >>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
> > >>> winkelman.k...@gmail.com
> > >>>>>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Yea thats a good way to look at it.
> > >>>>>>> I have seen this type of functionality in a couple other
> platforms
> > >>> like
> > >>>>>>> spark and pig.
> > >>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
> > >>>>> PairRDDFunctions.html
> > >>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
> > >>>>> cogroup_operator.htm
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com>
> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Kyle,
> > >>>>>>>>
> > >>>>>>>> If i'm reading this correctly it is like an N way outer join?
So
> > an
> > >>>>> input
> > >>>>>>>> on any stream will always produce a new aggregated value - is
> that
> > >>>>>>> correct?
> > >>>>>>>> Effectively, each Aggregator just looks up the current value,
> > >>>>> aggregates
> > >>>>>>>> and forwards the result.
> > >>>>>>>> I need to look into it and think about it a bit more, but it
> seems
> > >>> like
> > >>>>>>> it
> > >>>>>>>> could be a useful optimization.
> > >>>>>>>>
> > >>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
> > >>> winkelman.k...@gmail.com>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> I sure can. I have added the following description to my KIP.
> If
> > >>> this
> > >>>>>>>>> doesn't help let me know and I will take some more time to
> build
> > a
> > >>>>>>>> diagram
> > >>>>>>>>> and make more of a step by step description:
> > >>>>>>>>>
> > >>>>>>>>> Example with Current API:
> > >>>>>>>>>
> > >>>>>>>>> KTable<K, V1> table1 =
> > >>>>>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1,
> > >>>>>>>> aggregator1,
> > >>>>>>>>> aggValueSerde1, storeName1);
> > >>>>>>>>> KTable<K, V2> table2 =
> > >>>>>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2,
> > >>>>>>>> aggregator2,
> > >>>>>>>>> aggValueSerde2, storeName2);
> > >>>>>>>>> KTable<K, V3> table3 =
> > >>>>>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3,
> > >>>>>>>> aggregator3,
> > >>>>>>>>> aggValueSerde3, storeName3);
> > >>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
> > >>>>>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
> > >>>>>>>>>
> > >>>>>>>>> As you can see this creates 3 StateStores, requires 3
> > initializers,
> > >>>>>>> and 3
> > >>>>>>>>> aggValueSerdes. This also adds the pressure to user to define
> > what
> > >>> the
> > >>>>>>>>> intermediate values are going to be (V1, V2, V3). They are
left
> > >>> with a
> > >>>>>>>>> couple choices, first to make V1, V2, and V3 all the same as
CG
> > and
> > >>>>> the
> > >>>>>>>> two
> > >>>>>>>>> joiners are more like mergers, or second make them
intermediate
> > >>> states
> > >>>>>>>> such
> > >>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use
> those
> > to
> > >>>>>>> build
> > >>>>>>>>> the final aggregate CG value. This is something the user could
> > >>> avoid
> > >>>>>>>>> thinking about with this KIP.
> > >>>>>>>>>
> > >>>>>>>>> When a new input arrives lets say at "topic1" it will first go
> > >>> through
> > >>>>>>> a
> > >>>>>>>>> KStreamAggregate grabbing the current aggregate from
> storeName1.
> > It
> > >>>>>>> will
> > >>>>>>>>> produce this in the form of the first intermediate value and
> get
> > >>> sent
> > >>>>>>>>> through a KTableKTableOuterJoin where it will look up the
> current
> > >>>>> value
> > >>>>>>>> of
> > >>>>>>>>> the key in storeName2. It will use the first joiner to
> calculate
> > >>> the
> > >>>>>>>> second
> > >>>>>>>>> intermediate value, which will go through an additional
> > >>>>>>>>> KTableKTableOuterJoin. Here it will look up the current value
> of
> > >>> the
> > >>>>>>> key
> > >>>>>>>> in
> > >>>>>>>>> storeName3 and use the second joiner to build the final
> aggregate
> > >>>>>>> value.
> > >>>>>>>>>
> > >>>>>>>>> If you think through all possibilities for incoming topics you
> > will
> > >>>>> see
> > >>>>>>>>> that no matter which topic it comes in through all three
stores
> > are
> > >>>>>>>> queried
> > >>>>>>>>> and all of the joiners must get used.
> > >>>>>>>>>
> > >>>>>>>>> Topology wise for N incoming streams this creates N
> > >>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1
> > >>>>>>>>> KTableKTableJoinMergers.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Example with Proposed API:
> > >>>>>>>>>
> > >>>>>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
> > >>>>>>> groupByKey();
> > >>>>>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
> > >>>>>>> groupByKey();
> > >>>>>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
> > >>>>>>> groupByKey();
> > >>>>>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1,
> > >>> aggregator1,
> > >>>>>>>>> aggValueSerde1, storeName1)
> > >>>>>>>>>      .cogroup(grouped2, aggregator2)
> > >>>>>>>>>      .cogroup(grouped3, aggregator3)
> > >>>>>>>>>      .aggregate();
> > >>>>>>>>>
> > >>>>>>>>> As you can see this creates 1 StateStore, requires 1
> initializer,
> > >>> and
> > >>>>> 1
> > >>>>>>>>> aggValueSerde. The user no longer has to worry about the
> > >>> intermediate
> > >>>>>>>>> values and the joiners. All they have to think about is how
> each
> > >>>>> stream
> > >>>>>>>>> impacts the creation of the final CG object.
> > >>>>>>>>>
> > >>>>>>>>> When a new input arrives lets say at "topic1" it will first go
> > >>> through
> > >>>>>>> a
> > >>>>>>>>> KStreamAggreagte and grab the current aggregate from
> storeName1.
> > It
> > >>>>>>> will
> > >>>>>>>>> add its incoming object to the aggregate, update the store and
> > pass
> > >>>>> the
> > >>>>>>>> new
> > >>>>>>>>> aggregate on. This new aggregate goes through the
> KStreamCogroup
> > >>> which
> > >>>>>>> is
> > >>>>>>>>> pretty much just a pass through processor and you are done.
> > >>>>>>>>>
> > >>>>>>>>> Topology wise for N incoming streams the new api will only
> every
> > >>>>>>> create N
> > >>>>>>>>> KStreamAggregates and 1 KStreamCogroup.
> > >>>>>>>>>
> > >>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
> > >>>>> matth...@confluent.io
> > >>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Kyle,
> > >>>>>>>>>>
> > >>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I
> could
> > >>> not
> > >>>>>>>>>> follow completely. Could you maybe add a more concrete
> example,
> > >>> like
> > >>>>>>> 3
> > >>>>>>>>>> streams with 3 records each (plus expected result), and show
> the
> > >>>>>>>>>> difference between current way to to implement it and the
> > proposed
> > >>>>>>> API?
> > >>>>>>>>>> This could also cover the internal processing to see what
> store
> > >>> calls
> > >>>>>>>>>> would be required for both approaches etc.
> > >>>>>>>>>>
> > >>>>>>>>>> I think, it's pretty advanced stuff you propose, and it would
> > >>> help to
> > >>>>>>>>>> understand it better.
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks a lot!
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> -Matthias
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
> > >>>>>>>>>>> I have made a pull request. It can be found here.
> > >>>>>>>>>>>
> > >>>>>>>>>>> https://github.com/apache/kafka/pull/2975
> > >>>>>>>>>>>
> > >>>>>>>>>>> I plan to write some more unit tests for my classes and get
> > >>> around
> > >>>>>>> to
> > >>>>>>>>>>> writing documentation for the public api additions.
> > >>>>>>>>>>>
> > >>>>>>>>>>> One thing I was curious about is during the
> > >>>>>>>>>> KCogroupedStreamImpl#aggregate
> > >>>>>>>>>>> method I pass null to the
> KGroupedStream#repartitionIfRequired
> > >>>>>>>> method.
> > >>>>>>>>> I
> > >>>>>>>>>>> can't supply the store name because if more than one grouped
> > >>> stream
> > >>>>>>>>>>> repartitions an error is thrown. Is there some name that
> > someone
> > >>>>>>> can
> > >>>>>>>>>>> recommend or should I leave the null and allow it to fall
> back
> > to
> > >>>>>>> the
> > >>>>>>>>>>> KGroupedStream.name?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Should this be expanded to handle grouped tables? This would
> be
> > >>>>>>>> pretty
> > >>>>>>>>>> easy
> > >>>>>>>>>>> for a normal aggregate but one allowing session stores and
> > >>> windowed
> > >>>>>>>>>> stores
> > >>>>>>>>>>> would required KTableSessionWindowAggregate and
> > >>>>>>> KTableWindowAggregate
> > >>>>>>>>>>> implementations.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>> Kyle
> > >>>>>>>>>>>
> > >>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <
> eno.there...@gmail.com
> > >
> > >>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> I’ll look as well asap, sorry, been swamped.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Eno
> > >>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <
> damian....@gmail.com
> > >
> > >>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi Kyle,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the
> chance
> > >>> to
> > >>>>>>>> look
> > >>>>>>>>>> at
> > >>>>>>>>>>>>> the KIP yet, but will schedule some time to look into it
> > >>>>>>> tomorrow.
> > >>>>>>>>> For
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>> implementation, can you raise a PR against kafka trunk and
> > mark
> > >>>>>>> it
> > >>>>>>>> as
> > >>>>>>>>>>>> WIP?
> > >>>>>>>>>>>>> It will be easier to review what you have done.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>> Damian
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
> > >>>>>>>> winkelman.k...@gmail.com
> > >>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I am replying to this in hopes it will draw some
attention
> > to
> > >>> my
> > >>>>>>>> KIP
> > >>>>>>>>>> as
> > >>>>>>>>>>>> I
> > >>>>>>>>>>>>>> haven't heard from anyone in a couple days. This is my
> first
> > >>> KIP
> > >>>>>>>> and
> > >>>>>>>>>> my
> > >>>>>>>>>>>>>> first large contribution to the project so I'm sure I did
> > >>>>>>>> something
> > >>>>>>>>>>>> wrong.
> > >>>>>>>>>>>>>> ;)
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
> > >>>>>>>> winkelman.k...@gmail.com>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hello all,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I have created KIP-150 to facilitate discussion about
> > adding
> > >>>>>>>>> cogroup
> > >>>>>>>>>> to
> > >>>>>>>>>>>>>>> the streams DSL.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Please find the KIP here:
> > >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Please find my initial implementation here:
> > >>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>> Kyle Winkelman
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>>
> > >>>
> > >>>
> >
> >
>

Reply via email to