Thanks Matthias
On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <matth...@confluent.io> wrote:
I am just catching up on this thread, so sorry for the long email in
advance... Also, it's to some extend a dump of thoughts and not always a
clear proposal. Still need to think about this in more detail. But maybe
it helps other to get new ideas :)
However, I don't understand your argument about putting aggregate()
after the withXX() -- all the calls to withXX() set optional parameters
for aggregate() and not for groupBy() -- but a groupBy().withXX()
indicates that the withXX() belongs to the groupBy(). IMHO, this might
be quite confusion for developers.
I see what you are saying, but the grouped stream is effectively a no-op
until you call one of the aggregate/count/reduce etc functions. So the
optional params are ones that are applicable to any of the operations you
can perform on this grouped stream. Then the final
count()/reduce()/aggregate() call has any of the params that are
required/specific to that function.
I understand your argument, but you don't share the conclusion. If we
need a "final/terminal" call, the better way might be
.groupBy().count().withXX().build()
(with a better name for build() though)
The point is that all the other calls, i.e,withBlah, windowed, etc apply
too all the aggregate functions. The terminal call being the actual type of
aggregation you want to do. I personally find this more natural than
groupBy().count().withBlah().build()
groupedStream.count(/** non windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
I like this. However, I don't see a reason to have windowed() and
sessionWindowed(). We should have one top-level `Windows` interface that
both `TimeWindows` and `SessionWindows` implement and just have a single
windowed() method that accepts all `Windows`. (I did not like the
separation of `SessionWindows` in the first place, and this seems to be
an opportunity to clean this up. It was hard to change when we
introduced session windows)
Yes - true we should look into that.
Btw: we do you the imperative groupBy() and groupByKey(), and thus we
might also want to use windowBy() (instead of windowed()). Not sure how
important this is, but it seems to be inconsistent otherwise.
Makes sense
About joins: I don't like .withJoinType(JoinType.LEFT) at all. I think,
defining an inner/left/outer join is not an optional argument but a
first class concept and should have a proper representation in the API
(like the current methods join(), leftJoin, outerJoin()).
Yep, i did originally have it as a required param and maybe that is what we
go with. It could have a default, but maybe that is confusing.
About the two join API proposals, the second one has too much boiler
plate code for my taste. Also, the actual join() operator has only one
argument what is weird to me, as in my thinking process, the main
operator call, should have one parameter per mandatory argument but your
proposal put the mandatory arguments into Joins.streamStreamJoin() call.
This is far from intuitive IMHO.
This is the builder pattern, you only need one param as the builder has
captured all of the required and optional arguments.
The first join proposal also seems to align better with the pattern
suggested for aggregations and having the same pattern for all operators
is important (as you stated already).
This is why i offered two alternatives as i started out with. 1 is the
builder pattern, the other is the more fluent pattern.
Coming back to the config vs optional parameter. What about having a
method withConfig[s](...) that allow to put in the configuration?
Sure, it is currently called withLogConfig() as that is the only thing that
is really config.
This also raises the question if until() is a windows property?
Actually, until() seems to be a configuration parameter and thus, should
not not have it's own method.
Hmmm, i don't agree. Until is a property of the window. It is going to be
potentially different for every window operation you do in a streams app.
Browsing throw your example DSL branch, I also saw this one:
final KTable<Windowed<String>, Long> windowed>
groupedStream.counting()
.windowed(TimeWindows.of(10L).until(10))
.table();
This is an interesting idea, and it remind my on some feedback about "I
wanted to count a stream, but there was no count() method -- I first
needed to figure out, that I need to group the stream first to be able
to count it. It does make sense in hindsight but was not obvious in the
beginning". Thus, carrying out this thought, we could also do the
following:
stream.count().groupedBy().windowedBy().table();
-> Note, I use "grouped" and "windowed" instead of imperative here, as
it comes after the count()
This would be more consistent than your proposal (that has grouping
before but windowing after count()). It might even allow us to enrich
the API with a some syntactic sugar like `stream.count().table()` to get
the overall count of all records (this would obviously not scale, but we
could support it -- if not now, maybe later).
I guess i'd prefer
stream.groupBy().windowBy().count()
stream.groupBy().windowBy().reduce()
stream.groupBy().count()
As i said above, everything that happens before the final aggregate call
can be applied to any of them. So it makes sense to me to do those things
ahead of the final aggregate call.
Last about builder pattern. I am convinced that we need some "terminal"
operator/method that tells us when to add the processor to the topology.
But I don't see the need for a plain builder pattern that feels alien to
me (see my argument about the second join proposal). Using .stream() /
.table() as use in many examples might work. But maybe a more generic
name that we can use in all places like build() or apply() might also be
an option.
Sure, a generic name might be ok.
-Matthias
On 6/29/17 7:37 AM, Damian Guy wrote:
Thanks Kyle.
On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <winkelman.k...@gmail.com>
wrote:
Hi Damian,
When trying to program in the fluent API that has been discussed
most
it
feels difficult to know when you will actually get an object you can
reuse.
What if I make one KGroupedStream that I want to reuse, is it legal
to
reuse it or does this approach expect you to call grouped each time?
I'd anticipate that once you have a KGroupedStream you can re-use it
as
you
can today.
You said it yourself in another post that the grouped stream is
effectively a no-op until a count, reduce, or aggregate. The way I see
it
you wouldn’t be able to reuse anything except KStreams and KTables,
because
most of this fluent api would continue returning this (this being the
builder object currently being manipulated).
So, if you ever store a reference to anything but KStreams and KTables
and
you use it in two different ways then its possible you make conflicting
withXXX() calls on the same builder.
No necessarily true. It could return a new instance of the builder, i.e.,
the builders being immutable. So if you held a reference to the builder
it
would always be the same as it was when it was created.
GroupedStream<K,V> groupedStreamWithDefaultSerdes = kStream.grouped();
GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
I’ll admit that this shouldn’t happen but some user is going to do it
eventually…
Depending on implementation uses of groupedStreamWithDefaultSerdes would
most likely be equivalent to the version withDeclaredSerdes. One work
around would be to always make copies of the config objects you are
building, but this approach has its own problem because now we have to
identify which configs are equivalent so we don’t create repeated
processors.
The point of this long winded example is that we always have to be
thinking about all of the possible ways it could be misused by a user
(causing them to see hard to diagnose problems).
Exactly! That is the point of the discussion really.
In my attempt at a couple methods with builders I feel that I could
confidently say the user couldn’t really mess it up.
// Count
KTable<String, Long> count =
kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
The kGroupedStream is reusable and if they attempted to reuse the Count
for some reason it would throw an error message saying that a store
named
“my-store” already exists.
Yes i agree and i think using builders is my preferred pattern.
Cheers,
Damian
Thanks,
Kyle
From: Damian Guy
Sent: Thursday, June 29, 2017 3:59 AM
To: d...@kafka.apache.org
Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
Hi Kyle,
Thanks for your input. Really appreciated.
On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <winkelman.k...@gmail.com>
wrote:
I like more of a builder pattern even though others have voiced against
it. The reason I like it is because it makes it clear to the user that
a
call to KGroupedStream#count will return a KTable not some intermediate
class that I need to undetstand.
Yes, that makes sense.
When trying to program in the fluent API that has been discussed most
it
feels difficult to know when you will actually get an object you can
reuse.
What if I make one KGroupedStream that I want to reuse, is it legal to
reuse it or does this approach expect you to call grouped each time?
I'd anticipate that once you have a KGroupedStream you can re-use it as
you
can today.
This question doesn’t pop into my head at all in the builder pattern I
assume I can reuse everything.
Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big fan
of
the grouped.
Yes, grouped() was more for demonstration and because groupBy() and
groupByKey() were taken! So i'd imagine the api would actually want to
be
groupByKey(/** no required args***/).withOptionalArg() and
groupBy(KeyValueMapper m).withOpitionalArg(...) of course this all
depends
on maintaining backward compatibility.
Unfortunately, the below approach would require atleast 2 (probably 3)
overloads (one for returning a KTable and one for returning a KTable
with
Windowed Key, probably would want to split windowed and sessionwindowed
for
ease of implementation) of each count, reduce, and aggregate.
Obviously not exhaustive but enough for you to get the picture. Count,
Reduce, and Aggregate supply 3 static methods to initialize the
builder:
// Count
KTable<String, Long> count =
groupedStream.count(Count.count().withQueryableStoreName("my-store"));
// Windowed Count
KTable<Windowed<String>, Long> windowedCount =
groupedStream.count(Count.windowed(TimeWindows.of(10L).until(10)).withQueryableStoreName("my-windowed-store"));
// Session Count
KTable<Windowed<String>, Long> sessionCount =
groupedStream.count(Count.sessionWindowed(SessionWindows.with(10L)).withQueryableStoreName("my-session-windowed-store"));
Above and below, i think i'd prefer it to be:
groupedStream.count(/** non windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
// Reduce
Reducer<Long> reducer;
KTable<String, Long> reduce = groupedStream.reduce(reducer,
Reduce.reduce().withQueryableStoreName("my-store"));
// Aggregate Windowed with Custom Store
Initializer<String> initializer;
Aggregator<String, Long, String> aggregator;
KTable<Windowed<String>, String> aggregate =
groupedStream.aggregate(initializer, aggregator,
Aggregate.windowed(TimeWindows.of(10L).until(10)).withStateStoreSupplier(stateStoreSupplier)));
// Cogroup SessionWindowed
KTable<String, String> cogrouped = groupedStream1.cogroup(aggregator1)
.cogroup(groupedStream2, aggregator2)
.aggregate(initializer, aggregator,
Aggregate.sessionWindowed(SessionWindows.with(10L),
sessionMerger).withQueryableStoreName("my-store"));
public class Count {
public static class Windowed extends Count {
private Windows windows;
}
public static class SessionWindowed extends Count {
private SessionWindows sessionWindows;
}
public static Count count();
public static Windowed windowed(Windows windows);
public static SessionWindowed sessionWindowed(SessionWindows
sessionWindows);
// All withXXX(...) methods.
}
public class KGroupedStream {
public KTable<K, Long> count(Count count);
public KTable<Windowed<K>, Long> count(Count.Windowed count);
public KTable<Windowed<K>, Long> count(Count.SessionWindowed
count);
…
}
Thanks,
Kyle
From: Guozhang Wang
Sent: Wednesday, June 28, 2017 7:45 PM
To: d...@kafka.apache.org
Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
I played the current proposal a bit with
https://github.com/dguy/kafka/
tree/dsl-experiment <https://github.com/dguy/kafka/tree/dsl-experiment
,
and here are my observations:
1. Personally I prefer
"stream.group(mapper) / stream.groupByKey()"
than
"stream.group().withKeyMapper(mapper) / stream.group()"
Since 1) withKeyMapper is not enforced programmatically though it is
not
"really" optional like others, 2) syntax-wise it reads more natural.
I think it is okay to add the APIs in (
https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
)
in KGroupedStream.
2. For the "withStateStoreSupplier" API, are the user supposed to pass
in
the most-inner state store supplier (e.g. then one whose get() return
RocksDBStore), or it is supposed to return the most-outer supplier with
logging / metrics / etc? I think it would be more useful to only
require
users pass in the inner state store supplier while specifying caching /
logging through other APIs.
In addition, the "GroupedWithCustomStore" is a bit suspicious to me: we
are
allowing users to call other APIs like "withQueryableName" multiple
time,
but only call "withStateStoreSupplier" only once in the end. Why is
that?
3. The current DSL seems to be only for aggregations, what about joins?
4. I think it is okay to keep the "withLogConfig": for the
StateStoreSupplier it will still be user code specifying the topology
so
I
do not see there is a big difference.
5. "WindowedGroupedStream" 's withStateStoreSupplier should take the
windowed state store supplier to enforce typing?
Below are minor ones:
6. "withQueryableName": maybe better "withQueryableStateName"?
7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
Guozhang
On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <
matth...@confluent.io>
wrote:
I see your point about "when to add the processor to the topology".
That
is indeed an issue. Not sure it we could allow "updates" to the
topology...
I don't see any problem with having all the withXX() in KTable
interface
-- but this might be subjective.
However, I don't understand your argument about putting aggregate()
after the withXX() -- all the calls to withXX() set optional
parameters
for aggregate() and not for groupBy() -- but a groupBy().withXX()
indicates that the withXX() belongs to the groupBy(). IMHO, this might
be quite confusion for developers.
-Matthias
On 6/28/17 2:55 AM, Damian Guy wrote:
I also think that mixing optional parameters with configs is a bad
idea.
Have not proposal for this atm but just wanted to mention it. Hope
to
find some time to come up with something.
Yes, i don't like the mix of config either. But the only real config
here
is the logging config - which we don't really need as it can already
be
done via a custom StateStoreSupplier.
What I don't like in the current proposal is the
.grouped().withKeyMapper() -- the current solution with
.groupBy(...)
and .groupByKey() seems better. For clarity, we could rename to
.groupByNewKey(...) and .groupByCurrentKey() (even if we should find
some better names).
it could be groupByKey(), groupBy() or something different bt
The proposed pattern "chains" grouping and aggregation too close
together. I would rather separate both more than less, ie, do into
the
opposite direction.
I am also wondering, if we could so something more "fluent". The
initial
proposal was like:
groupedStream.count()
.withStoreName("name")
.withCachingEnabled(false)
.withLoggingEnabled(config)
.table()
The .table() statement in the end was kinda alien.
I agree, but then all of the withXXX methods need to be on KTable
which
is
worse in my opinion. You also need something that is going to "build"
the
internal processors and add them to the topology.
The current proposal put the count() into the end -- ie, the
optional
parameter for count() have to specified on the .grouped() call --
this
does not seems to be the best way either.
I actually prefer this method as you are building a grouped stream
that
you
will aggregate. So
table.grouped(...).withOptionalStuff().aggregate(..)
etc
seems natural to me.
I did not think this through in detail, but can't we just do the
initial
proposal with the .table() ?
groupedStream.count().withStoreName("name").mapValues(...)
Each .withXXX(...) return the current KTable and all the .withXXX()
are
just added to the KTable interface. Or do I miss anything why this
wont'
work or any obvious disadvantage?
See above.
-Matthias
On 6/22/17 4:06 AM, Damian Guy wrote:
Thanks everyone. My latest attempt is below. It builds on the
fluent
approach, but i think it is slightly nicer.
I agree with some of what Eno said about mixing configy stuff in
the
DSL,
but i think that enabling caching and enabling logging are things
that
aren't actually config. I'd probably not add withLogConfig(...)
(even
though it is below) as this is actually config and we already have
a
way
of
doing that, via the StateStoreSupplier. Arguably we could use the
StateStoreSupplier for disabling caching etc, but as it stands that
is
a
bit of a tedious process for someone that just wants to use the
default
storage engine, but not have caching enabled.
There is also an orthogonal concern that Guozhang alluded to.... If
you
want to plug in a custom storage engine and you want it to be
logged
etc,
you would currently need to implement that yourself. Ideally we can
provide
a way where we will wrap the custom store with logging, metrics,
etc. I
need to think about where this fits, it is probably more
appropriate
on
the
Stores API.
final KeyValueMapper<String, String, Long> keyMapper = null;
// count with mapped key
final KTable<Long, Long> count = stream.grouped()
.withKeyMapper(keyMapper)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
.withQueryableName("my-store")
.count();
// windowed count
final KTable<Windowed<String>, Long> windowedCount =
stream.grouped()
.withQueryableName("my-window-store")
.windowed(TimeWindows.of(10L).until(10))
.count();
// windowed reduce
final Reducer<String> windowedReducer = null;
final KTable<Windowed<String>, String> windowedReduce =
stream.grouped()
.withQueryableName("my-window-store")
.windowed(TimeWindows.of(10L).until(10))
.reduce(windowedReducer);
final Aggregator<String, String, Long> aggregator = null;
final Initializer<Long> init = null;
// aggregate
final KTable<String, Long> aggregate = stream.grouped()
.withQueryableName("my-aggregate-store")
.aggregate(aggregator, init, Serdes.Long());
final StateStoreSupplier<KeyValueStore<String, Long>>
stateStoreSupplier
= null;
// aggregate with custom store
final KTable<String, Long> aggWithCustomStore = stream.grouped()
.withStateStoreSupplier(stateStoreSupplier)
.aggregate(aggregator, init);
// disable caching
stream.grouped()
.withQueryableName("name")
.withCachingEnabled(false)
.count();
// disable logging
stream.grouped()
.withQueryableName("q")
.withLoggingEnabled(false)
.count();
// override log config
final Reducer<String> reducer = null;
stream.grouped()
.withLogConfig(Collections.singletonMap("segment.size",
"10"))
.reduce(reducer);
If anyone wants to play around with this you can find the code
here:
https://github.com/dguy/kafka/tree/dsl-experiment
Note: It won't actually work as most of the methods just return
null.
Thanks,
Damian
On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk>
wrote:
Thanks Damian. I think both options have pros and cons. And both
are
better
than overload abuse.
The fluent API approach reads better, no mention of builder or
build
anywhere. The main downside is that the method signatures are a
little
less
clear. By reading the method signature, one doesn't necessarily
knows
what
it returns. Also, one needs to figure out the special method
(`table()`
in
this case) that gives you what you actually care about (`KTable`
in
this
case). Not major issues, but worth mentioning while doing the
comparison.
The builder approach avoids the issues mentioned above, but it
doesn't
read
as well.
Ismael
On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com
wrote:
Hi,
I'd like to get a discussion going around some of the API choices
we've
made in the DLS. In particular those that relate to stateful
operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API,
i.e,
there
are 9 overloads for KGroupedStream.count(..)! It is becoming
noisy
and
i
feel it is only going to get worse as we add more optional
params.
In
particular we've had some requests to be able to turn caching
off,
or
change log configs, on a per operator basis (note this can be
done
now
if
you pass in a StateStoreSupplier, but this can be a bit
cumbersome).
So this is a bit of an open question. How can we change the DSL
overloads
so that it flows, is simple to use and understand, and is easily
extended
in the future?
One option would be to use a fluent API approach for providing
the
optional
params, so something like this:
groupedStream.count()
.withStoreName("name")
.withCachingEnabled(false)
.withLoggingEnabled(config)
.table()
Another option would be to provide a Builder to the count method,
so
it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())
Another option is to say: Hey we don't need this, what are you on
about!
The above has focussed on state store related overloads, but the
same
ideas
could be applied to joins etc, where we presently have many join
methods
and many overloads.
Anyway, i look forward to hearing your opinions.
Thanks,
Damian
--
-- Guozhang