key-value serdes, we do not enforce the interface of `StateStoreSupplier`
to always retain that information, and hence we cannot assume that
StateStoreSuppliers always retain key / value serdes.
On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <xav...@confluent.io>
<xav...@confluent.io> wrote:
Another reason for the serde not to be in the first cogroup call, is that
the serde should not be required if you pass a StateStoreSupplier to
aggregate()
Regarding the aggregated type <T> I don't the why initializer should be
favored over aggregator to define the type. In my mind separating the
initializer into the last aggregate call clearly indicates that the
initializer is independent of any of the aggregators or streams and that we
don't wait for grouped1 events to initialize the co-group.
On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangg...@gmail.com>
<wangg...@gmail.com> wrote:
On a second thought... This is the current proposal API
```
<T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer,
final
Aggregator<? super K, ? super V, T> aggregator, final Serde<T>
aggValueSerde)
```
If we do not have the initializer in the first co-group it might be a bit
awkward for users to specify the aggregator that returns a typed <T>
value?
Maybe it is still better to put these two functions in the same api?
Guozhang
On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com>
<wangg...@gmail.com>
wrote:
This suggestion lgtm. I would vote for the first alternative than
adding
it to the `KStreamBuilder` though.
On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io>
<xav...@confluent.io>
wrote:
I have a minor suggestion to make the API a little bit more symmetric.
I feel it would make more sense to move the initializer and serde to
the
final aggregate statement, since the serde only applies to the state
store,
and the initializer doesn't bear any relation to the first group in
particular. It would end up looking like this:
KTable<K, CG> cogrouped =
grouped1.cogroup(aggregator1)
.cogroup(grouped2, aggregator2)
.cogroup(grouped3, aggregator3)
.aggregate(initializer1, aggValueSerde, storeName1);
Alternatively, we could move the the first cogroup() method to
KStreamBuilder, similar to how we have .merge()
and end up with an api that would be even more symmetric.
KStreamBuilder.cogroup(grouped1, aggregator1)
.cogroup(grouped2, aggregator2)
.cogroup(grouped3, aggregator3)
.aggregate(initializer1, aggValueSerde, storeName1);
This doesn't have to be a blocker, but I thought it would make the API
just
a tad cleaner.
On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com>
<wangg...@gmail.com>
wrote:
Kyle,
Thanks a lot for the updated KIP. It looks good to me.
Guozhang
On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com>
<j...@jagunet.com>
wrote:
This makes much more sense to me. +1
On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <
winkelman.k...@gmail.com>
wrote:
I have updated the KIP and my PR. Let me know what you think.
To created a cogrouped stream just call cogroup on a
KgroupedStream
and
supply the initializer, aggValueSerde, and an aggregator. Then
continue
adding kgroupedstreams and aggregators. Then call one of the
many
aggregate
calls to create a KTable.
Thanks,
Kyle
On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com>
<damian....@gmail.com>
wrote:
Hi Kyle,
Thanks for the update. I think just one initializer makes sense
as
it
should only be called once per key and generally it is just
going
to
create
a new instance of whatever the Aggregate class is.
Cheers,
Damian
On Wed, 31 May 2017 at 20:09 Kyle Winkelman <
winkelman.k...@gmail.com
wrote:
Hello all,
I have spent some more time on this and the best alternative I
have
come
up
with is:
KGroupedStream has a single cogroup call that takes an
initializer
and
an
aggregator.
CogroupedKStream has a cogroup call that takes additional
groupedStream
aggregator pairs.
CogroupedKStream has multiple aggregate methods that create
the
different
stores.
I plan on updating the kip but I want people's input on if we
should
have
the initializer be passed in once at the beginning or if we
should
instead
have the initializer be required for each call to one of the
aggregate
calls. The first makes more sense to me but doesnt allow the
user
to
specify different initializers for different tables.
Thanks,
Kyle
On May 24, 2017 7:46 PM, "Kyle Winkelman" <
winkelman.k...@gmail.com>
wrote:
Yea I really like that idea I'll see what I can do to update
the
kip
and
my pr when I have some time. I'm not sure how well creating
the
kstreamaggregates will go though because at that point I will
have
thrown
away the type of the values. It will be type safe I just may
need to
do a
little forcing.
Thanks,
Kyle
On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com
wrote:
Kyle,
Thanks for the explanations, my previous read on the wiki
examples
was
wrong.
So I guess my motivation should be "reduced" to: can we move
the
window
specs param from "KGroupedStream#cogroup(..)" to
"CogroupedKStream#aggregate(..)", and my motivations are:
1. minor: we can reduce the #.generics in CogroupedKStream
from
3
to
2.
2. major: this is for extensibility of the APIs, and since
we
are
removing
the "Evolving" annotations on Streams it may be harder to
change it
again
in the future. The extended use cases are that people wanted
to
have
windowed running aggregates on different granularities, e.g.
"give
me
the
counts per-minute, per-hour, per-day and per-week", and
today
in
DSL
we
need to specify that case in multiple aggregate operators,
which
gets
a
state store / changelog, etc. And it is possible to optimize
it
as
well
to
a single state store. Its implementation would be tricky as
you
need
to
contain different lengthed windows within your window store
but
just
from
the public API point of view, it could be specified as:
CogroupedKStream stream = stream1.cogroup(stream2, ...
"state-store-name");
table1 = stream.aggregate(/*per-minute window*/)
table2 = stream.aggregate(/*per-hour window*/)
table3 = stream.aggregate(/*per-day window*/)
while underlying we are only using a single store
"state-store-name"
for
it.
Although this feature is out of the scope of this KIP, I'd
like
to
discuss
if we can "leave the door open" to make such changes without
modifying
the
public APIs .
Guozhang
On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
winkelman.k...@gmail.com
wrote:
I allow defining a single window/sessionwindow one time
when
you
make
the
cogroup call from a KGroupedStream. From then on you are
using
the
cogroup
call from with in CogroupedKStream which doesnt accept any
additional
windows/sessionwindows.
Is this what you meant by your question or did I
misunderstand?
On May 23, 2017 9:33 PM, "Guozhang Wang" <
wangg...@gmail.com
wrote:
Another question that came to me is on "window alignment":
from
the
KIP
it
seems you are allowing users to specify a (potentially
different)
window
spec in each co-grouped input stream. So if these window
specs
are
different how should we "align" them with different input
streams? I
think
it is more natural to only specify on window spec in the
KTable<RK, V> CogroupedKStream#aggregate(Windows);
And remove it from the cogroup() functions. WDYT?
Guozhang
On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <
wangg...@gmail.com>
wrote:
Thanks for the proposal Kyle, this is a quite common use
case
to
support
such multi-way table join (i.e. N source tables with N
aggregate
func)
with
a single store and N+1 serdes, I have seen lots of people
using
the
low-level PAPI to achieve this goal.
On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
winkelman.k...@gmail.com
wrote:
I like your point about not handling other cases such as
count
and
reduce.
I think that reduce may not make sense because reduce
assumes
that
the
input values are the same as the output values. With
cogroup
...