This makes much more sense to me. +1
> On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <winkelman.k...@gmail.com> wrote:
>
> I have updated the KIP and my PR. Let me know what you think.
> To created a cogrouped stream just call cogroup on a KgroupedStream and
> supply the initializer, aggValueSerde, and an aggregator. Then continue
> adding kgroupedstreams and aggregators. Then call one of the many aggregate
> calls to create a KTable.
>
> Thanks,
> Kyle
>
> On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> wrote:
>
>> Hi Kyle,
>>
>> Thanks for the update. I think just one initializer makes sense as it
>> should only be called once per key and generally it is just going to create
>> a new instance of whatever the Aggregate class is.
>>
>> Cheers,
>> Damian
>>
>> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <winkelman.k...@gmail.com>
>> wrote:
>>
>>> Hello all,
>>>
>>> I have spent some more time on this and the best alternative I have come
>> up
>>> with is:
>>> KGroupedStream has a single cogroup call that takes an initializer and an
>>> aggregator.
>>> CogroupedKStream has a cogroup call that takes additional groupedStream
>>> aggregator pairs.
>>> CogroupedKStream has multiple aggregate methods that create the different
>>> stores.
>>>
>>> I plan on updating the kip but I want people's input on if we should have
>>> the initializer be passed in once at the beginning or if we should
>> instead
>>> have the initializer be required for each call to one of the aggregate
>>> calls. The first makes more sense to me but doesnt allow the user to
>>> specify different initializers for different tables.
>>>
>>> Thanks,
>>> Kyle
>>>
>>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <winkelman.k...@gmail.com>
>>> wrote:
>>>
>>>> Yea I really like that idea I'll see what I can do to update the kip
>> and
>>>> my pr when I have some time. I'm not sure how well creating the
>>>> kstreamaggregates will go though because at that point I will have
>> thrown
>>>> away the type of the values. It will be type safe I just may need to
>> do a
>>>> little forcing.
>>>>
>>>> Thanks,
>>>> Kyle
>>>>
>>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
>>>>
>>>>> Kyle,
>>>>>
>>>>> Thanks for the explanations, my previous read on the wiki examples was
>>>>> wrong.
>>>>>
>>>>> So I guess my motivation should be "reduced" to: can we move the
>> window
>>>>> specs param from "KGroupedStream#cogroup(..)" to
>>>>> "CogroupedKStream#aggregate(..)", and my motivations are:
>>>>>
>>>>> 1. minor: we can reduce the #.generics in CogroupedKStream from 3 to
>> 2.
>>>>> 2. major: this is for extensibility of the APIs, and since we are
>>> removing
>>>>> the "Evolving" annotations on Streams it may be harder to change it
>>> again
>>>>> in the future. The extended use cases are that people wanted to have
>>>>> windowed running aggregates on different granularities, e.g. "give me
>>> the
>>>>> counts per-minute, per-hour, per-day and per-week", and today in DSL
>> we
>>>>> need to specify that case in multiple aggregate operators, which gets
>> a
>>>>> state store / changelog, etc. And it is possible to optimize it as
>> well
>>> to
>>>>> a single state store. Its implementation would be tricky as you need
>> to
>>>>> contain different lengthed windows within your window store but just
>>> from
>>>>> the public API point of view, it could be specified as:
>>>>>
>>>>> CogroupedKStream stream = stream1.cogroup(stream2, ...
>>>>> "state-store-name");
>>>>>
>>>>> table1 = stream.aggregate(/*per-minute window*/)
>>>>> table2 = stream.aggregate(/*per-hour window*/)
>>>>> table3 = stream.aggregate(/*per-day window*/)
>>>>>
>>>>> while underlying we are only using a single store "state-store-name"
>> for
>>>>> it.
>>>>>
>>>>>
>>>>> Although this feature is out of the scope of this KIP, I'd like to
>>> discuss
>>>>> if we can "leave the door open" to make such changes without modifying
>>> the
>>>>> public APIs .
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
>>> winkelman.k...@gmail.com
>>>>>>
>>>>> wrote:
>>>>>
>>>>>> I allow defining a single window/sessionwindow one time when you
>> make
>>>>> the
>>>>>> cogroup call from a KGroupedStream. From then on you are using the
>>>>> cogroup
>>>>>> call from with in CogroupedKStream which doesnt accept any
>> additional
>>>>>> windows/sessionwindows.
>>>>>>
>>>>>> Is this what you meant by your question or did I misunderstand?
>>>>>>
>>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <wangg...@gmail.com>
>> wrote:
>>>>>>
>>>>>> Another question that came to me is on "window alignment": from the
>>> KIP
>>>>> it
>>>>>> seems you are allowing users to specify a (potentially different)
>>> window
>>>>>> spec in each co-grouped input stream. So if these window specs are
>>>>>> different how should we "align" them with different input streams? I
>>>>> think
>>>>>> it is more natural to only specify on window spec in the
>>>>>>
>>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows);
>>>>>>
>>>>>>
>>>>>> And remove it from the cogroup() functions. WDYT?
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <wangg...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the proposal Kyle, this is a quite common use case to
>>>>> support
>>>>>>> such multi-way table join (i.e. N source tables with N aggregate
>>> func)
>>>>>> with
>>>>>>> a single store and N+1 serdes, I have seen lots of people using
>> the
>>>>>>> low-level PAPI to achieve this goal.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
>>>>>> winkelman.k...@gmail.com
>>>>>>>> wrote:
>>>>>>>
>>>>>>>> I like your point about not handling other cases such as count
>> and
>>>>>> reduce.
>>>>>>>>
>>>>>>>> I think that reduce may not make sense because reduce assumes
>> that
>>>>> the
>>>>>>>> input values are the same as the output values. With cogroup
>> there
>>>>> may
>>>>>> be
>>>>>>>> multiple different input types and then your output type cant be
>>>>>> multiple
>>>>>>>> different things. In the case where you have all matching value
>>> types
>>>>>> you
>>>>>>>> can do KStreamBuilder#merge followed by the reduce.
>>>>>>>>
>>>>>>>> As for count I think it is possible to call count on all the
>>>>> individual
>>>>>>>> grouped streams and then do joins. Otherwise we could maybe make
>> a
>>>>>> special
>>>>>>>> call in groupedstream for this case. Because in this case we dont
>>>>> need
>>>>>> to
>>>>>>>> do type checking on the values. It could be similar to the
>> current
>>>>> count
>>>>>>>> methods but accept a var args of additonal grouped streams as
>> well
>>>>> and
>>>>>>>> make
>>>>>>>> sure they have a key type of K.
>>>>>>>>
>>>>>>>> The way I have put the kip together is to ensure that we do type
>>>>>> checking.
>>>>>>>> I don't see a way we could group them all first and then make a
>>> call
>>>>> to
>>>>>>>> count, reduce, or aggregate because with aggregate they would
>> need
>>> to
>>>>>> pass
>>>>>>>> a list of aggregators and we would have no way of type checking
>>> that
>>>>>> they
>>>>>>>> match the grouped streams.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Kyle
>>>>>>>>
>>>>>>>> On May 19, 2017 11:42 AM, "Xavier Léauté" <xav...@confluent.io>
>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sorry to jump on this thread so late. I agree this is a very
>>> useful
>>>>>>>>> addition and wanted to provide an additional use-case and some
>>> more
>>>>>>>>> comments.
>>>>>>>>>
>>>>>>>>> This is actually a very common analytics use-case in the
>> ad-tech
>>>>>>>> industry.
>>>>>>>>> The typical setup will have an auction stream, an impression
>>>>> stream,
>>>>>>>> and a
>>>>>>>>> click stream. Those three streams need to be combined to
>> compute
>>>>>>>> aggregate
>>>>>>>>> statistics (e.g. impression statistics, and click-through
>> rates),
>>>>>> since
>>>>>>>>> most of the attributes of interest are only present the auction
>>>>>> stream.
>>>>>>>>>
>>>>>>>>> A simple way to do this is to co-group all the streams by the
>>>>> auction
>>>>>>>> key,
>>>>>>>>> and process updates to the co-group as events for each stream
>>> come
>>>>> in,
>>>>>>>>> keeping only one value from each stream before sending
>> downstream
>>>>> for
>>>>>>>>> further processing / aggregation.
>>>>>>>>>
>>>>>>>>> One could view the result of that co-group operation as a
>>> "KTable"
>>>>>> with
>>>>>>>>> multiple values per key. The key being the grouping key, and
>> the
>>>>>> values
>>>>>>>>> consisting of one value per stream.
>>>>>>>>>
>>>>>>>>> What I like about Kyle's approach is that allows elegant
>>>>> co-grouping
>>>>>> of
>>>>>>>>> multiple streams without having to worry about the number of
>>>>> streams,
>>>>>>>> and
>>>>>>>>> avoids dealing with Tuple types or other generic interfaces
>> that
>>>>> could
>>>>>>>> get
>>>>>>>>> messy if we wanted to preserve all the value types in the
>>> resulting
>>>>>>>>> co-grouped stream.
>>>>>>>>>
>>>>>>>>> My only concern is that we only allow the cogroup + aggregate
>>>>> combined
>>>>>>>>> operation. This forces the user to build their own tuple
>>>>> serialization
>>>>>>>>> format if they want to preserve the individual input stream
>>> values
>>>>> as
>>>>>> a
>>>>>>>>> group. It also deviates quite a bit from our approach in
>>>>>> KGroupedStream
>>>>>>>>> which offers other operations, such as count and reduce, which
>>>>> should
>>>>>>>> also
>>>>>>>>> be applicable to a co-grouped stream.
>>>>>>>>>
>>>>>>>>> Overall I still think this is a really useful addition, but I
>>> feel
>>>>> we
>>>>>>>>> haven't spend much time trying to explore alternative DSLs that
>>>>> could
>>>>>>>> maybe
>>>>>>>>> generalize better or match our existing syntax more closely.
>>>>>>>>>
>>>>>>>>> On Tue, May 9, 2017 at 8:08 AM Kyle Winkelman <
>>>>>> winkelman.k...@gmail.com
>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Eno, is there anyone else that is an expert in the kafka
>>> streams
>>>>>> realm
>>>>>>>>> that
>>>>>>>>>> I should reach out to for input?
>>>>>>>>>>
>>>>>>>>>> I believe Damian Guy is still planning on reviewing this more
>>> in
>>>>>> depth
>>>>>>>>> so I
>>>>>>>>>> will wait for his inputs before continuing.
>>>>>>>>>>
>>>>>>>>>> On May 9, 2017 7:30 AM, "Eno Thereska" <
>> eno.there...@gmail.com
>>>>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Kyle, good arguments.
>>>>>>>>>>>
>>>>>>>>>>> Eno
>>>>>>>>>>>
>>>>>>>>>>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <
>>>>>>>> winkelman.k...@gmail.com
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> *- minor: could you add an exact example (similar to what
>>>>> Jay’s
>>>>>>>>> example
>>>>>>>>>>> is,
>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super
>>>>>> concrete?*
>>>>>>>>>>>> I have added a more concrete example to the KIP.
>>>>>>>>>>>>
>>>>>>>>>>>> *- my main concern is that we’re exposing this
>> optimization
>>>>> to
>>>>>> the
>>>>>>>>> DSL.
>>>>>>>>>>> In
>>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL
>>> and
>>>>> do
>>>>>>>> the
>>>>>>>>>> right
>>>>>>>>>>>> thing under the covers (create just one state store,
>>> arrange
>>>>> the
>>>>>>>>> nodes
>>>>>>>>>>>> etc). The original DSL had a bunch of small, composable
>>>>> pieces
>>>>>>>>> (group,
>>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d
>>>>> like to
>>>>>>>> hear
>>>>>>>>>>> your
>>>>>>>>>>>> thoughts on whether it’s possible to do this optimization
>>>>> with
>>>>>> the
>>>>>>>>>>> current
>>>>>>>>>>>> DSL, at the topology builder level.*
>>>>>>>>>>>> You would have to make a lot of checks to understand if
>> it
>>> is
>>>>>> even
>>>>>>>>>>> possible
>>>>>>>>>>>> to make this optimization:
>>>>>>>>>>>> 1. Make sure they are all KTableKTableOuterJoins
>>>>>>>>>>>> 2. None of the intermediate KTables are used for anything
>>>>> else.
>>>>>>>>>>>> 3. None of the intermediate stores are used. (This may be
>>>>>>>> impossible
>>>>>>>>>>>> especially if they use KafkaStreams#store after the
>>> topology
>>>>> has
>>>>>>>>>> already
>>>>>>>>>>>> been built.)
>>>>>>>>>>>> You would then need to make decisions during the
>>>>> optimization:
>>>>>>>>>>>> 1. Your new initializer would the composite of all the
>>>>>> individual
>>>>>>>>>>>> initializers and the valueJoiners.
>>>>>>>>>>>> 2. I am having a hard time thinking about how you would
>>> turn
>>>>> the
>>>>>>>>>>>> aggregators and valueJoiners into an aggregator that
>> would
>>>>> work
>>>>>> on
>>>>>>>>> the
>>>>>>>>>>>> final object, but this may be possible.
>>>>>>>>>>>> 3. Which state store would you use? The ones declared
>> would
>>>>> be
>>>>>> for
>>>>>>>>> the
>>>>>>>>>>>> aggregate values. None of the declared ones would be
>>>>> guaranteed
>>>>>> to
>>>>>>>>> hold
>>>>>>>>>>> the
>>>>>>>>>>>> final object. This would mean you must created a new
>> state
>>>>> store
>>>>>>>> and
>>>>>>>>>> not
>>>>>>>>>>>> created any of the declared ones.
>>>>>>>>>>>>
>>>>>>>>>>>> The main argument I have against it is even if it could
>> be
>>>>> done
>>>>>> I
>>>>>>>>> don't
>>>>>>>>>>>> know that we would want to have this be an optimization
>> in
>>>>> the
>>>>>>>>>> background
>>>>>>>>>>>> because the user would still be required to think about
>> all
>>>>> of
>>>>>> the
>>>>>>>>>>>> intermediate values that they shouldn't need to worry
>> about
>>>>> if
>>>>>>>> they
>>>>>>>>>> only
>>>>>>>>>>>> care about the final object.
>>>>>>>>>>>>
>>>>>>>>>>>> In my opinion cogroup is a common enough case that it
>>> should
>>>>> be
>>>>>>>> part
>>>>>>>>> of
>>>>>>>>>>> the
>>>>>>>>>>>> composable pieces (group, aggregate, join) because we
>> want
>>> to
>>>>>>>> allow
>>>>>>>>>>> people
>>>>>>>>>>>> to join more than 2 or more streams in an easy way. Right
>>>>> now I
>>>>>>>> don't
>>>>>>>>>>> think
>>>>>>>>>>>> we give them ways of handling this use case easily.
>>>>>>>>>>>>
>>>>>>>>>>>> *-I think there will be scope for several such
>>> optimizations
>>>>> in
>>>>>>>> the
>>>>>>>>>>> future
>>>>>>>>>>>> and perhaps at some point we need to think about
>> decoupling
>>>>> the
>>>>>>>> 1:1
>>>>>>>>>>> mapping
>>>>>>>>>>>> from the DSL into the physical topology.*
>>>>>>>>>>>> I would argue that cogroup is not just an optimization it
>>> is
>>>>> a
>>>>>> new
>>>>>>>>> way
>>>>>>>>>>> for
>>>>>>>>>>>> the users to look at accomplishing a problem that
>> requires
>>>>>>>> multiple
>>>>>>>>>>>> streams. I may sound like a broken record but I don't
>> think
>>>>>> users
>>>>>>>>>> should
>>>>>>>>>>>> have to build the N-1 intermediate tables and deal with
>>> their
>>>>>>>>>>> initializers,
>>>>>>>>>>>> serdes and stores if all they care about is the final
>>> object.
>>>>>>>>>>>> Now if for example someone uses cogroup but doesn't
>> supply
>>>>>>>> additional
>>>>>>>>>>>> streams and aggregators this case is equivalent to a
>> single
>>>>>>>> grouped
>>>>>>>>>>> stream
>>>>>>>>>>>> making an aggregate call. This case is what I view an
>>>>>> optimization
>>>>>>>>> as,
>>>>>>>>>> we
>>>>>>>>>>>> could remove the KStreamCogroup and act as if there was
>>> just
>>>>> a
>>>>>>>> call
>>>>>>>>> to
>>>>>>>>>>>> KGroupedStream#aggregate instead of calling
>>>>>>>> KGroupedStream#cogroup.
>>>>>>>>> (I
>>>>>>>>>>>> would prefer to just write a warning saying that this is
>>> not
>>>>> how
>>>>>>>>>> cogroup
>>>>>>>>>>> is
>>>>>>>>>>>> to be used.)
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Kyle
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <
>>>>>>>> eno.there...@gmail.com
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Kyle,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the KIP again. A couple of comments:
>>>>>>>>>>>>>
>>>>>>>>>>>>> - minor: could you add an exact example (similar to what
>>>>> Jay’s
>>>>>>>>> example
>>>>>>>>>>> is,
>>>>>>>>>>>>> or like your Spark/Pig pointers had) to make this super
>>>>>> concrete?
>>>>>>>>>>>>>
>>>>>>>>>>>>> - my main concern is that we’re exposing this
>> optimization
>>>>> to
>>>>>> the
>>>>>>>>> DSL.
>>>>>>>>>>> In
>>>>>>>>>>>>> an ideal world, an optimizer would take the existing DSL
>>>>> and do
>>>>>>>> the
>>>>>>>>>>> right
>>>>>>>>>>>>> thing under the covers (create just one state store,
>>> arrange
>>>>>> the
>>>>>>>>> nodes
>>>>>>>>>>>>> etc). The original DSL had a bunch of small, composable
>>>>> pieces
>>>>>>>>> (group,
>>>>>>>>>>>>> aggregate, join) that this proposal groups together. I’d
>>>>> like
>>>>>> to
>>>>>>>>> hear
>>>>>>>>>>> your
>>>>>>>>>>>>> thoughts on whether it’s possible to do this
>> optimization
>>>>> with
>>>>>>>> the
>>>>>>>>>>> current
>>>>>>>>>>>>> DSL, at the topology builder level.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think there will be scope for several such
>> optimizations
>>>>> in
>>>>>> the
>>>>>>>>>> future
>>>>>>>>>>>>> and perhaps at some point we need to think about
>>> decoupling
>>>>> the
>>>>>>>> 1:1
>>>>>>>>>>> mapping
>>>>>>>>>>>>> from the DSL into the physical topology.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <
>> j...@confluent.io>
>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I haven't digested the proposal but the use case is
>>> pretty
>>>>>>>> common.
>>>>>>>>> An
>>>>>>>>>>>>>> example would be the "customer 360" or "unified
>> customer
>>>>>>>> profile"
>>>>>>>>> use
>>>>>>>>>>>>> case
>>>>>>>>>>>>>> we often use. In that use case you have a dozen systems
>>>>> each
>>>>>> of
>>>>>>>>> which
>>>>>>>>>>> has
>>>>>>>>>>>>>> some information about your customer (account details,
>>>>>> settings,
>>>>>>>>>>> billing
>>>>>>>>>>>>>> info, customer service contacts, purchase history,
>> etc).
>>>>> Your
>>>>>>>> goal
>>>>>>>>> is
>>>>>>>>>>> to
>>>>>>>>>>>>>> join/munge these into a single profile record for each
>>>>>> customer
>>>>>>>>> that
>>>>>>>>>>> has
>>>>>>>>>>>>>> all the relevant info in a usable form and is
>> up-to-date
>>>>> with
>>>>>>>> all
>>>>>>>>> the
>>>>>>>>>>>>>> source systems. If you implement that with kstreams as
>> a
>>>>>>>> sequence
>>>>>>>>> of
>>>>>>>>>>>>> joins
>>>>>>>>>>>>>> i think today we'd fully materialize N-1 intermediate
>>>>> tables.
>>>>>>>> But
>>>>>>>>>>> clearly
>>>>>>>>>>>>>> you only need a single stage to group all these things
>>> that
>>>>>> are
>>>>>>>>>> already
>>>>>>>>>>>>>> co-partitioned. A distributed database would do this
>>> under
>>>>> the
>>>>>>>>> covers
>>>>>>>>>>>>> which
>>>>>>>>>>>>>> is arguably better (at least when it does the right
>>> thing)
>>>>> and
>>>>>>>>>> perhaps
>>>>>>>>>>> we
>>>>>>>>>>>>>> could do the same thing but I'm not sure we know the
>>>>>>>> partitioning
>>>>>>>>> so
>>>>>>>>>> we
>>>>>>>>>>>>> may
>>>>>>>>>>>>>> need an explicit cogroup command that impllies they are
>>>>>> already
>>>>>>>>>>>>>> co-partitioned.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <
>>>>>>>>>>> winkelman.k...@gmail.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yea thats a good way to look at it.
>>>>>>>>>>>>>>> I have seen this type of functionality in a couple
>> other
>>>>>>>> platforms
>>>>>>>>>>> like
>>>>>>>>>>>>>>> spark and pig.
>>>>>>>>>>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/
>>>>>>>>>>>>> PairRDDFunctions.html
>>>>>>>>>>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_
>>>>>>>>>>>>> cogroup_operator.htm
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <
>>>>> damian....@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Kyle,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If i'm reading this correctly it is like an N way
>> outer
>>>>>> join?
>>>>>>>> So
>>>>>>>>> an
>>>>>>>>>>>>> input
>>>>>>>>>>>>>>>> on any stream will always produce a new aggregated
>>> value
>>>>> -
>>>>>> is
>>>>>>>>> that
>>>>>>>>>>>>>>> correct?
>>>>>>>>>>>>>>>> Effectively, each Aggregator just looks up the
>> current
>>>>>> value,
>>>>>>>>>>>>> aggregates
>>>>>>>>>>>>>>>> and forwards the result.
>>>>>>>>>>>>>>>> I need to look into it and think about it a bit more,
>>>>> but it
>>>>>>>>> seems
>>>>>>>>>>> like
>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> could be a useful optimization.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <
>>>>>>>>>> winkelman.k...@gmail.com
>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I sure can. I have added the following description
>> to
>>> my
>>>>>>>> KIP. If
>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> doesn't help let me know and I will take some more
>>> time
>>>>> to
>>>>>>>>> build a
>>>>>>>>>>>>>>>> diagram
>>>>>>>>>>>>>>>>> and make more of a step by step description:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Example with Current API:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> KTable<K, V1> table1 =
>>>>>>>>>>>>>>>>> builder.stream("topic1").groupByKey().aggregate(
>>>>>> initializer1
>>>>>>>> ,
>>>>>>>>>>>>>>>> aggregator1,
>>>>>>>>>>>>>>>>> aggValueSerde1, storeName1);
>>>>>>>>>>>>>>>>> KTable<K, V2> table2 =
>>>>>>>>>>>>>>>>> builder.stream("topic2").groupByKey().aggregate(
>>>>>> initializer2
>>>>>>>> ,
>>>>>>>>>>>>>>>> aggregator2,
>>>>>>>>>>>>>>>>> aggValueSerde2, storeName2);
>>>>>>>>>>>>>>>>> KTable<K, V3> table3 =
>>>>>>>>>>>>>>>>> builder.stream("topic3").groupByKey().aggregate(
>>>>>> initializer3
>>>>>>>> ,
>>>>>>>>>>>>>>>> aggregator3,
>>>>>>>>>>>>>>>>> aggValueSerde3, storeName3);
>>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2,
>>>>>>>>>>>>>>>>> joinerOneAndTwo).outerJoin(table3,
>>>>> joinerOneTwoAndThree);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> As you can see this creates 3 StateStores, requires
>> 3
>>>>>>>>>> initializers,
>>>>>>>>>>>>>>> and 3
>>>>>>>>>>>>>>>>> aggValueSerdes. This also adds the pressure to user
>> to
>>>>>> define
>>>>>>>>> what
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> intermediate values are going to be (V1, V2, V3).
>> They
>>>>> are
>>>>>>>> left
>>>>>>>>>>> with a
>>>>>>>>>>>>>>>>> couple choices, first to make V1, V2, and V3 all the
>>>>> same
>>>>>> as
>>>>>>>> CG
>>>>>>>>>> and
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> two
>>>>>>>>>>>>>>>>> joiners are more like mergers, or second make them
>>>>>>>> intermediate
>>>>>>>>>>> states
>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the
>> joiners
>>>>> use
>>>>>>>> those
>>>>>>>>>> to
>>>>>>>>>>>>>>> build
>>>>>>>>>>>>>>>>> the final aggregate CG value. This is something the
>>> user
>>>>>>>> could
>>>>>>>>>> avoid
>>>>>>>>>>>>>>>>> thinking about with this KIP.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
>> will
>>>>> first
>>>>>>>> go
>>>>>>>>>>> through
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> KStreamAggregate grabbing the current aggregate from
>>>>>>>> storeName1.
>>>>>>>>>> It
>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> produce this in the form of the first intermediate
>>> value
>>>>>> and
>>>>>>>> get
>>>>>>>>>>> sent
>>>>>>>>>>>>>>>>> through a KTableKTableOuterJoin where it will look
>> up
>>>>> the
>>>>>>>>> current
>>>>>>>>>>>>> value
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> the key in storeName2. It will use the first joiner
>> to
>>>>>>>> calculate
>>>>>>>>>> the
>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>> intermediate value, which will go through an
>>> additional
>>>>>>>>>>>>>>>>> KTableKTableOuterJoin. Here it will look up the
>>> current
>>>>>>>> value of
>>>>>>>>>> the
>>>>>>>>>>>>>>> key
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> storeName3 and use the second joiner to build the
>>> final
>>>>>>>>> aggregate
>>>>>>>>>>>>>>> value.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If you think through all possibilities for incoming
>>>>> topics
>>>>>>>> you
>>>>>>>>>> will
>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>>> that no matter which topic it comes in through all
>>> three
>>>>>>>> stores
>>>>>>>>>> are
>>>>>>>>>>>>>>>> queried
>>>>>>>>>>>>>>>>> and all of the joiners must get used.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Topology wise for N incoming streams this creates N
>>>>>>>>>>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins,
>> and
>>>>> N-1
>>>>>>>>>>>>>>>>> KTableKTableJoinMergers.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Example with Proposed API:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> KGroupedStream<K, V1> grouped1 =
>>>>> builder.stream("topic1").
>>>>>>>>>>>>>>> groupByKey();
>>>>>>>>>>>>>>>>> KGroupedStream<K, V2> grouped2 =
>>>>> builder.stream("topic2").
>>>>>>>>>>>>>>> groupByKey();
>>>>>>>>>>>>>>>>> KGroupedStream<K, V3> grouped3 =
>>>>> builder.stream("topic3").
>>>>>>>>>>>>>>> groupByKey();
>>>>>>>>>>>>>>>>> KTable<K, CG> cogrouped =
>>> grouped1.cogroup(initializer1,
>>>>>>>>>>> aggregator1,
>>>>>>>>>>>>>>>>> aggValueSerde1, storeName1)
>>>>>>>>>>>>>>>>> .cogroup(grouped2, aggregator2)
>>>>>>>>>>>>>>>>> .cogroup(grouped3, aggregator3)
>>>>>>>>>>>>>>>>> .aggregate();
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> As you can see this creates 1 StateStore, requires 1
>>>>>>>>> initializer,
>>>>>>>>>>> and
>>>>>>>>>>>>> 1
>>>>>>>>>>>>>>>>> aggValueSerde. The user no longer has to worry about
>>> the
>>>>>>>>>>> intermediate
>>>>>>>>>>>>>>>>> values and the joiners. All they have to think about
>>> is
>>>>> how
>>>>>>>> each
>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>>>> impacts the creation of the final CG object.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When a new input arrives lets say at "topic1" it
>> will
>>>>> first
>>>>>>>> go
>>>>>>>>>>> through
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> KStreamAggreagte and grab the current aggregate from
>>>>>>>> storeName1.
>>>>>>>>>> It
>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> add its incoming object to the aggregate, update the
>>>>> store
>>>>>>>> and
>>>>>>>>>> pass
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>> aggregate on. This new aggregate goes through the
>>>>>>>> KStreamCogroup
>>>>>>>>>>> which
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> pretty much just a pass through processor and you
>> are
>>>>> done.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Topology wise for N incoming streams the new api
>> will
>>>>> only
>>>>>>>> every
>>>>>>>>>>>>>>> create N
>>>>>>>>>>>>>>>>> KStreamAggregates and 1 KStreamCogroup.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax <
>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kyle,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow,
>>>>> but I
>>>>>>>> could
>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> follow completely. Could you maybe add a more
>>> concrete
>>>>>>>> example,
>>>>>>>>>>> like
>>>>>>>>>>>>>>> 3
>>>>>>>>>>>>>>>>>> streams with 3 records each (plus expected result),
>>> and
>>>>>> show
>>>>>>>>> the
>>>>>>>>>>>>>>>>>> difference between current way to to implement it
>> and
>>>>> the
>>>>>>>>>> proposed
>>>>>>>>>>>>>>> API?
>>>>>>>>>>>>>>>>>> This could also cover the internal processing to
>> see
>>>>> what
>>>>>>>> store
>>>>>>>>>>> calls
>>>>>>>>>>>>>>>>>> would be required for both approaches etc.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I think, it's pretty advanced stuff you propose,
>> and
>>> it
>>>>>>>> would
>>>>>>>>>> help
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> understand it better.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks a lot!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote:
>>>>>>>>>>>>>>>>>>> I have made a pull request. It can be found here.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/2975
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I plan to write some more unit tests for my
>> classes
>>>>> and
>>>>>> get
>>>>>>>>>> around
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> writing documentation for the public api
>> additions.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> One thing I was curious about is during the
>>>>>>>>>>>>>>>>>> KCogroupedStreamImpl#aggregate
>>>>>>>>>>>>>>>>>>> method I pass null to the KGroupedStream#
>>>>>>>>> repartitionIfRequired
>>>>>>>>>>>>>>>> method.
>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>> can't supply the store name because if more than
>> one
>>>>>>>> grouped
>>>>>>>>>>> stream
>>>>>>>>>>>>>>>>>>> repartitions an error is thrown. Is there some
>> name
>>>>> that
>>>>>>>>> someone
>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>> recommend or should I leave the null and allow it
>> to
>>>>> fall
>>>>>>>> back
>>>>>>>>>> to
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> KGroupedStream.name?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Should this be expanded to handle grouped tables?
>>> This
>>>>>>>> would
>>>>>>>>> be
>>>>>>>>>>>>>>>> pretty
>>>>>>>>>>>>>>>>>> easy
>>>>>>>>>>>>>>>>>>> for a normal aggregate but one allowing session
>>> stores
>>>>>> and
>>>>>>>>>>> windowed
>>>>>>>>>>>>>>>>>> stores
>>>>>>>>>>>>>>>>>>> would required KTableSessionWindowAggregate and
>>>>>>>>>>>>>>> KTableWindowAggregate
>>>>>>>>>>>>>>>>>>> implementations.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Kyle
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <
>>>>>>>>> eno.there...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I’ll look as well asap, sorry, been swamped.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <
>>>>>>>>> damian....@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Kyle,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't
>> had
>>>>> the
>>>>>>>>> chance
>>>>>>>>>> to
>>>>>>>>>>>>>>>> look
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>> the KIP yet, but will schedule some time to look
>>>>> into
>>>>>> it
>>>>>>>>>>>>>>> tomorrow.
>>>>>>>>>>>>>>>>> For
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> implementation, can you raise a PR against kafka
>>>>> trunk
>>>>>>>> and
>>>>>>>>>> mark
>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>> WIP?
>>>>>>>>>>>>>>>>>>>>> It will be easier to review what you have done.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman <
>>>>>>>>>>>>>>>> winkelman.k...@gmail.com
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I am replying to this in hopes it will draw
>> some
>>>>>>>> attention
>>>>>>>>> to
>>>>>>>>>>> my
>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>>>> haven't heard from anyone in a couple days.
>> This
>>>>> is my
>>>>>>>>> first
>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>> first large contribution to the project so I'm
>>>>> sure I
>>>>>>>> did
>>>>>>>>>>>>>>>> something
>>>>>>>>>>>>>>>>>>>> wrong.
>>>>>>>>>>>>>>>>>>>>>> ;)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" <
>>>>>>>>>>>>>>>> winkelman.k...@gmail.com>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I have created KIP-150 to facilitate
>> discussion
>>>>> about
>>>>>>>>> adding
>>>>>>>>>>>>>>>>> cogroup
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> the streams DSL.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Please find the KIP here:
>>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
>>>>>> confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Please find my initial implementation here:
>>>>>>>>>>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>> Kyle Winkelman
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>
>>