You're right, I haven't thought of that.

Cheers,

Michał


On 13/06/17 13:00, Kyle Winkelman wrote:
First, I would prefer not calling it aggregate because there are already
plenty of aggregate methods.

Second, I dont think this would really work because after each aggregate
you now have a unique KTable (someone may want a table with 4 streams and
reuse those 4 in another table but with one more stream added) and unless
we completely duplicate everything every time this isnt really possible.
Additionally, the cogroup way just requires 1 more call to create two
different tables (normal, windowed, and session windowed) this new way
would require copying the aggregate chain.

Another way to think about it is with cogroup we know that when they call
aggregate they arent going to be adding any more aggregators to that table
but your way requires us to assume they are done adding aggregators after
each call so we must return a ktable just to possibly not need to have
created it.

On Jun 13, 2017 5:20 AM, "Michal Borowiecki" <michal.borowie...@openbet.com>
wrote:

Actually, just had a thought. It started with naming.

Are we actually co-grouping these streams or are we co-aggregating them?

After all, in each of the cogroup calls we are providing an Aggregator
implementation.


If they are really co-aggregated, why don't we turn this around:
KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();

KTable<K, CG> coagg = grouped1.aggregate(initializer1, aggregator1,
aggValueSerde1) // this is the unchanged aggregate method
         .aggregate(grouped2, aggregator2)  // this is a new method
         .aggregate(grouped3, aggregator3); // ditto

This means instead of adding cogroup methods on KGroupStream interface,
adding aggregate method on KTable interface.

Is that feasible?

Cheers,
Michał

On 13/06/17 10:56, Michal Borowiecki wrote:

Also, I still feel that putting initializer on the first cogroup can
mislead users into thinking the first stream is in some way special.
Just my 5c.
Michał

On 13/06/17 09:54, Michal Borowiecki wrote:

Agree completely with the argument for serdes belonging in the same place
as the state store name, which is in the aggregate method.

Cheers,

Michał

On 12/06/17 18:20, Xavier Léauté wrote:

I think we are discussing two separate things here, so it might be worth
clarifying:

1) the position of the initializer with respect to the aggregators. If I
understand correctly, Guozhang seems to think it is more natural to specify
the initializer first, despite it not bearing any relation to the first
aggregator. I can see the argument for specifying the initializer first,
but I think it is debatable whether mixing it into the first cogroup call
leads to a cleaner API or not.

2) where the serde should be defined (if necessary). Looking at our
existing APIs in KGroupedStreams, we always offer two aggregate()
methods. The first one takes the name of the store and associated aggregate
value serde e.g. KGroupedStream.aggregate(Initializer<VR> initializer,
Aggregator<? super K, ? super V, VR> aggregator, Serde<VR> aggValueSerde,
String queryableStoreName)
The second one only takes a state store supplier, and does not specify any
serde, e.g. KGroupedStream.aggregate(Initializer<VR>
initializer, Aggregator<? super K, ? super V, VR> aggregator, final
StateStoreSupplier<KeyValueStore> storeSupplier)
Presumably, when specifying a state store supplier it shouldn't be
necessary to specify an aggregate value serde, since the provided
statestore might not need to serialize the values (e.g. it may just keep
them as regular objects in heap) or it may have its own
internal serialization format.

For consistency I think it would be valuable to preserve the same two
aggregate methods for cogroup as well. Since the serde is only required in
one of the two cases, I believe the serde has no place in the first
cogroup() call and should only have to be specified as part of the
aggregate() method that takes a state store name. In the case of a state
store supplier, no serde would be necessary.


On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <wangg...@gmail.com> wrote:

I'd agree that the aggregate value serde and the initializer does not
bear direct relationship with the first `cogroup` calls, but after I tried
to write some example code with these two different set of APIs I felt the
current APIs just program more naturally.

I know it is kinda subjective, but I do think that user experience may be
more important as a deciding factor than the logical argument for public
interfaces. So I'd recommend people to also try out writing some example
lines also and we can circle back and discuss which one feels more natural
to write code.


Guozhang

On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

I feel it would make more sense to move the initializer and serde to the
final aggregate statement, since the serde only applies to the state
store,
and the initializer doesn't bear any relation to the first group in
particular.

+1 for moving initializer and serde from cogroup() to the aggregate()
for the reasons mentioned above.

Cheers,

Michał

On 08/06/17 22:44, Guozhang Wang wrote:

Note that although the internal `AbstractStoreSupplier` does maintain the
key-value serdes, we do not enforce the interface of `StateStoreSupplier`
to always retain that information, and hence we cannot assume that
StateStoreSuppliers always retain key / value serdes.

On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <xav...@confluent.io> 
<xav...@confluent.io> wrote:


Another reason for the serde not to be in the first cogroup call, is that
the serde should not be required if you pass a StateStoreSupplier to
aggregate()

Regarding the aggregated type <T> I don't the why initializer should be
favored over aggregator to define the type. In my mind separating the
initializer into the last aggregate call clearly indicates that the
initializer is independent of any of the aggregators or streams and that we
don't wait for grouped1 events to initialize the co-group.

On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangg...@gmail.com> 
<wangg...@gmail.com> wrote:


On a second thought... This is the current proposal API


```

<T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer,

final

Aggregator<? super K, ? super V, T> aggregator, final Serde<T>
aggValueSerde)

```


If we do not have the initializer in the first co-group it might be a bit
awkward for users to specify the aggregator that returns a typed <T>

value?

Maybe it is still better to put these two functions in the same api?



Guozhang

On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com> 
<wangg...@gmail.com>

wrote:

This suggestion lgtm. I would vote for the first alternative than

adding

it to the `KStreamBuilder` though.

On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io> 
<xav...@confluent.io>
wrote:


I have a minor suggestion to make the API a little bit more symmetric.
I feel it would make more sense to move the initializer and serde to

the

final aggregate statement, since the serde only applies to the state
store,
and the initializer doesn't bear any relation to the first group in
particular. It would end up looking like this:

KTable<K, CG> cogrouped =
     grouped1.cogroup(aggregator1)
             .cogroup(grouped2, aggregator2)
             .cogroup(grouped3, aggregator3)
             .aggregate(initializer1, aggValueSerde, storeName1);

Alternatively, we could move the the first cogroup() method to
KStreamBuilder, similar to how we have .merge()
and end up with an api that would be even more symmetric.

KStreamBuilder.cogroup(grouped1, aggregator1)
               .cogroup(grouped2, aggregator2)
               .cogroup(grouped3, aggregator3)
               .aggregate(initializer1, aggValueSerde, storeName1);

This doesn't have to be a blocker, but I thought it would make the API
just
a tad cleaner.

On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> 
<wangg...@gmail.com>

wrote:

Kyle,

Thanks a lot for the updated KIP. It looks good to me.


Guozhang


On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> 
<j...@jagunet.com>

wrote:

This makes much more sense to me. +1


On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <

winkelman.k...@gmail.com>

wrote:

I have updated the KIP and my PR. Let me know what you think.
To created a cogrouped stream just call cogroup on a

KgroupedStream

and

supply the initializer, aggValueSerde, and an aggregator. Then

continue

adding kgroupedstreams and aggregators. Then call one of the

many

aggregate

calls to create a KTable.

Thanks,
Kyle

On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> 
<damian....@gmail.com>

wrote:

Hi Kyle,

Thanks for the update. I think just one initializer makes sense

as

it

should only be called once per key and generally it is just

going

to

create

a new instance of whatever the Aggregate class is.

Cheers,
Damian

On Wed, 31 May 2017 at 20:09 Kyle Winkelman <

winkelman.k...@gmail.com

wrote:


Hello all,

I have spent some more time on this and the best alternative I

have

come

up

with is:
KGroupedStream has a single cogroup call that takes an

initializer

and

an

aggregator.
CogroupedKStream has a cogroup call that takes additional

groupedStream

aggregator pairs.
CogroupedKStream has multiple aggregate methods that create

the

different

stores.

I plan on updating the kip but I want people's input on if we

should

have

the initializer be passed in once at the beginning or if we

should

instead

have the initializer be required for each call to one of the

aggregate

calls. The first makes more sense to me but doesnt allow the

user

to

specify different initializers for different tables.

Thanks,
Kyle

On May 24, 2017 7:46 PM, "Kyle Winkelman" <

winkelman.k...@gmail.com>

wrote:


Yea I really like that idea I'll see what I can do to update

the

kip

and

my pr when I have some time. I'm not sure how well creating

the

kstreamaggregates will go though because at that point I will

have

thrown

away the type of the values. It will be type safe I just may

need to

do a

little forcing.

Thanks,
Kyle

On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com

wrote:

Kyle,

Thanks for the explanations, my previous read on the wiki

examples

was

wrong.

So I guess my motivation should be "reduced" to: can we move

the

window

specs param from "KGroupedStream#cogroup(..)" to
"CogroupedKStream#aggregate(..)", and my motivations are:

1. minor: we can reduce the #.generics in CogroupedKStream

from

3

to

2.

2. major: this is for extensibility of the APIs, and since

we

are

removing

the "Evolving" annotations on Streams it may be harder to

change it

again

in the future. The extended use cases are that people wanted

to

have

windowed running aggregates on different granularities, e.g.

"give

me

the

counts per-minute, per-hour, per-day and per-week", and

today

in

DSL

we

need to specify that case in multiple aggregate operators,

which

gets

a

state store / changelog, etc. And it is possible to optimize

it

as

well

to

a single state store. Its implementation would be tricky as

you

need

to

contain different lengthed windows within your window store

but

just

from

the public API point of view, it could be specified as:

CogroupedKStream stream = stream1.cogroup(stream2, ...
"state-store-name");

table1 = stream.aggregate(/*per-minute window*/)
table2 = stream.aggregate(/*per-hour window*/)
table3 = stream.aggregate(/*per-day window*/)

while underlying we are only using a single store

"state-store-name"

for

it.


Although this feature is out of the scope of this KIP, I'd

like

to

discuss

if we can "leave the door open" to make such changes without

modifying

the

public APIs .

Guozhang


On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <

winkelman.k...@gmail.com

wrote:


I allow defining a single window/sessionwindow one time

when

you

make

the

cogroup call from a KGroupedStream. From then on you are

using

the

cogroup

call from with in CogroupedKStream which doesnt accept any

additional

windows/sessionwindows.

Is this what you meant by your question or did I

misunderstand?

On May 23, 2017 9:33 PM, "Guozhang Wang" <

wangg...@gmail.com

wrote:

Another question that came to me is on "window alignment":

from

the

KIP

it

seems you are allowing users to specify a (potentially

different)

window

spec in each co-grouped input stream. So if these window

specs

are

different how should we "align" them with different input

streams? I

think

it is more natural to only specify on window spec in the

KTable<RK, V> CogroupedKStream#aggregate(Windows);


And remove it from the cogroup() functions. WDYT?


Guozhang

On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <

wangg...@gmail.com>

wrote:

Thanks for the proposal Kyle, this is a quite common use

case

to

support

such multi-way table join (i.e. N source tables with N

aggregate

func)

with

a single store and N+1 serdes, I have seen lots of people

using

the

low-level PAPI to achieve this goal.


On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <

winkelman.k...@gmail.com

wrote:

I like your point about not handling other cases such as

count

and

reduce.

I think that reduce may not make sense because reduce

assumes

that

the

input values are the same as the output values. With

cogroup

...

--
Signature
<http://www.openbet.com/>         Michal Borowiecki
Senior Software Engineer L4
        T:      +44 208 742 1600

        
        +44 203 249 8448

        
        
        E:      michal.borowie...@openbet.com
        W:      www.openbet.com <http://www.openbet.com/>

        
        OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK

        
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612

Reply via email to