Best Jan
On 12.07.2017 21:27, Guozhang Wang wrote:
Hello Jan,
Thanks for your feedbacks. Let me try to clarify a few things with the
problems that we are trying to resolve and the motivations with the
current proposals.
As Matthias mentioned, one issue that we are trying to tackle is to
reduce the number of overloaded functions in the DSL due to serde
overridden / state store supplier overridden that are needed for
repartition, or for state store materializations. Another related
issue is that the current overridden state store supplier is not very
natural to use, for example:
1) If a user just want to disable caching / logging etc but do not
want to change the underlying store engine at all, she needs to learn
to know that, for example, if a windowed store or key-value store is
needed for this specific operator in the DSL, what serdes are needed
for materialize the store, in order to create a StateStoreSupplier
with caching / logging disabled, and then pass into the DSL.
2) Similarly, if a user just want to set different topic configs for
the changelog topic, she still need to specify the whole
StateStoreSupplier into the operator.
3) If a user want to use a different store engine (e.g. MyStore than
RocksDBStore) underneath but do not care about the default settings
for logging, caching, etc, he STILL needs to pass in the whole
StateStoreSupplier into the operator.
Note that all the above scenarios are for advanced users who do want
to override these settings, for users who are just OK with the default
settings they should be not exposed with such APIs at all, like you
said, "I do not be exposed with any of such implementation details",
if you do not care.
-----------------
We have been talking about the configs v.s. code for such settings,
since we have been using configs for "global" default configs; but the
arguments against using configs for such per-operator / per-store
settings as well is that it will simply make configs hard to manage /
hard to wire with tools. Personally speaking, I'm not a big fan of
using configs for per-entity overrides and that is mainly from my
experience with Samza:Samza inherits exactly the same approach for
per-stream / per-source configs:
http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html
<
http://samza.apache.org/learn/documentation/0.13/jobs/configuration-table.html>
([system-name][stream-id]
etc are all place-holders)
The main issues were 1) users making config changes need to deploy
this to all the instances, I think for Streams it would be even worse
as we need to make a config file on each of the running instance, and
whenever there is a change we need to make sure they are propagated to
all of them, 2) whenever users make some code changes, e.g. to add a
new stream / system, they need to remember to set the corresponding
changes in the config files as well and they kept forgetting about it,
the lesson learned there was that it is always better to change one
place (code change) than two (code change + config file change).
Again, this is not saying we have vetoed this option, and if people
have good reasons for this let's discuss them here.
-----------------
So the current proposals are mainly around keeping configs for the
global default settings, while still allowing users to override
per-operator / per-store settings in the code, while also keeping in
mind to not forced users to think about such implementation details if
they are fine with whatever the default settings. For example:
As a normal user it is sufficient to specify an aggregation as
```
table4.join(table5, joiner).table();
```
in which she can still just focus on the computational logic with all
implementation details abstracted away; only if the user are capable
enough with the implementation details (e.g. how is the joining tables
be materialized into state stores, etc) and want to specify her own
settings (e.g. I want to swap in my own state store engine, or I want
to disable caching for dedup, or use a different serde etc) she can
"explore" them with the DSL again:
```
table4.join(table5, joiner).table(Materialized.as("store1")); // use a
custom store name for interactive query
table4.join(table5, joiner).table(Materialized.as(MyStoreSupplier));
// use a custom store engine
table4.join(table5,
joiner).table(Materialized.as("store1").withLoggingEnabled(configs));
// use a custom store changelog topic configs
// ... more
```
Hope it helps.
Guozhang
On Fri, Jul 7, 2017 at 3:42 PM, Jan Filipiak <jan.filip...@trivago.com
<mailto:jan.filip...@trivago.com>> wrote:
It makes me want to cry.
why on earth is the DSL going to expose all its implementation
details now?
especially being materialized or not.
If we want to take usefull steps in that direction maybe we are
looking for a way to let the user switch back and forth between
PAPI and DSL?
A change as the proposed would not eliminate any of my pain points
while still being a heck of work migrating towards to.
Since I am only following this from the point where Eno CC'ed it
into the users list:
Can someone please rephrase for me what problem this is trying to
solve? I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This
helps us nowhere in making the configs more flexible, its just
syntactic sugar.
A low effort shoot like: lets add a properties to operations that
would otherwise become overloaded to heavy? Or pull the configs by
some naming schema
form the overall properties. Additionally to that we get rid of
StateStoreSuppliers in the DSL and have them also configured by
said properties.
=> way easier to migrate to, way less risk, way more flexible in
the future (different implementations of the same operation don't
require code change to configure)
Line 184 makes especially no sense to me. what is a KTableKTable
non materialized join anyways?
Hope we can discuss more on this.
On 07.07.2017 17:23, Guozhang Wang wrote:
I messed the indentation on github code repos; this would be
easier to read:
https://codeshare.io/GLWW8K
Guozhang
On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang
<wangg...@gmail.com <mailto:wangg...@gmail.com>> wrote:
Hi Damian / Kyle,
I think I agree with you guys about the pros / cons of
using the builder
pattern v.s. using some "secondary classes". And I'm
thinking if we can
take a "mid" manner between these two. I spent some time
with a slight
different approach from Damian's current proposal:
https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/
<
https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/>
java/org/apache/kafka/streams/RefactoredAPIs.java
The key idea is to tolerate the final "table()" or
"stream()" function to
"upgrade" from the secondary classes to the first citizen
classes, while
having all the specs inside this function. Also this
proposal includes some
other refactoring that people have been discussed about
for the builder to
reduce the overloaded functions as well. WDYT?
Guozhang
On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy
<damian....@gmail.com <mailto:damian....@gmail.com>> wrote:
Hi Jan,
Thanks very much for the input.
On Tue, 4 Jul 2017 at 08:54 Jan Filipiak
<jan.filip...@trivago.com
<mailto:jan.filip...@trivago.com>>
wrote:
Hi Damian,
I do see your point of something needs to change.
But I fully agree with
Gouzhang when he says.
---
But since this is a incompatibility change, and we
are going to remove
the
compatibility annotations soon it means we only
have one chance and we
really have to make it right.
----
I think we all agree on this one! Hence the discussion.
I fear all suggestions do not go far enough to
become something that
will
carry on for very much longer.
I am currently working on KAFKA-3705 and try to
find the most easy way
for
the user to give me all the required
functionality. The easiest
interface I
could come up so far can be looked at here.
https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
<
https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2>
de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L622
And its already horribly complicated. I am currently
unable to find the
right abstraction level to have everything falling
into place
naturally. To
be honest I already think introducing
To be fair that is not a particularly easy problem to
solve!
https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
<
https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2>
de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L493
was unideal and makes everything a mess.
I'm not sure i agree that it makes everything a mess,
but It could have
been done differently.
The JoinType:Whatever is also not really flexible. 2
things come to my
mind:
1. I don't think we should rule out config based
decisions say configs
like
streams.$applicationID.joins.$joinname.conf = value
Is this just for config? Or are you suggesting that we
could somehow
"code"
the join in a config file?
This can allow for tremendous changes without
single API change and IMO
it
was not considered enough yet.
2. Push logic from the DSL to the Callback
classes. A ValueJoiner for
example can be used to implement different join
types as the user
wishes.
Do you have an example of how this might look?
As Gouzhang said: stopping to break users is very
important.
Of course. We want to make it as easy as possible for
people to use
streams.
especially with this changes + All the plans I sadly
only have in my head
but hopefully the first link can give a glimpse.
Thanks for preparing the examples made it way
clearer to me what exactly
we are talking about. I would argue to go a bit
slower and more
carefull on
this one. At some point we need to get it right.
Peeking over to the
hadoop
guys with their hughe userbase. Config files
really work well for them.
Best Jan
On 30.06.2017 09:31, Damian Guy wrote:
Thanks Matthias
On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax
<matth...@confluent.io
<mailto:matth...@confluent.io>>
wrote:
I am just catching up on this thread, so
sorry for the long email in
advance... Also, it's to some extend a
dump of thoughts and not
always a
clear proposal. Still need to think about
this in more detail. But
maybe
it helps other to get new ideas :)
However, I don't understand your
argument about putting aggregate()
after the withXX() -- all the
calls to withXX() set optional
parameters
for aggregate() and not for
groupBy() -- but a groupBy().withXX()
indicates that the withXX()
belongs to the groupBy(). IMHO, this
might
be quite confusion for developers.
I see what you are saying, but the
grouped stream is effectively a
no-op
until you call one of the
aggregate/count/reduce etc functions. So
the
optional params are ones that are
applicable to any of the
operations
you
can perform on this grouped stream.
Then the final
count()/reduce()/aggregate() call has
any of the params that are
required/specific to that function.
I understand your argument, but you don't
share the conclusion. If we
need a "final/terminal" call, the better
way might be
.groupBy().count().withXX().build()
(with a better name for build() though)
The point is that all the other calls,
i.e,withBlah, windowed, etc
apply
too all the aggregate functions. The terminal
call being the actual
type
of
aggregation you want to do. I personally find
this more natural than
groupBy().count().withBlah().build()
groupedStream.count(/** non windowed
count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
I like this. However, I don't see a reason
to have windowed() and
sessionWindowed(). We should have one
top-level `Windows` interface
that
both `TimeWindows` and `SessionWindows`
implement and just have a
single
windowed() method that accepts all
`Windows`. (I did not like the
separation of `SessionWindows` in the
first place, and this seems to
be
an opportunity to clean this up. It was
hard to change when we
introduced session windows)
Yes - true we should look into that.
Btw: we do you the imperative groupBy()
and groupByKey(), and thus we
might also want to use windowBy() (instead
of windowed()). Not sure
how
important this is, but it seems to be
inconsistent otherwise.
Makes sense
About joins: I don't like
.withJoinType(JoinType.LEFT) at all. I
think,
defining an inner/left/outer join is not
an optional argument but a
first class concept and should have a
proper representation in the
API
(like the current methods join(),
leftJoin, outerJoin()).
Yep, i did originally have it as a required
param and maybe that is
what
we
go with. It could have a default, but maybe
that is confusing.
About the two join API proposals, the
second one has too much boiler
plate code for my taste. Also, the actual
join() operator has only
one
argument what is weird to me, as in my
thinking process, the main
operator call, should have one parameter
per mandatory argument but
your
proposal put the mandatory arguments into
Joins.streamStreamJoin()
call.
This is far from intuitive IMHO.
This is the builder pattern, you only need one
param as the builder
has
captured all of the required and optional
arguments.
The first join proposal also seems to
align better with the pattern
suggested for aggregations and having the
same pattern for all
operators
is important (as you stated already).
This is why i offered two alternatives as i
started out with. 1 is the
builder pattern, the other is the more fluent
pattern.
Coming back to the config vs optional
parameter. What about having a
method withConfig[s](...) that allow to
put in the configuration?
Sure, it is currently called withLogConfig()
as that is the only thing
that
is really config.
This also raises the question if until()
is a windows property?
Actually, until() seems to be a
configuration parameter and thus,
should
not not have it's own method.
Hmmm, i don't agree. Until is a property of
the window. It is going
to be
potentially different for every window
operation you do in a streams
app.
Browsing throw your example DSL branch, I
also saw this one:
final KTable<Windowed<String>, Long>
windowed>
groupedStream.counting()
.windowed(TimeWindows.of(10L).until(10))
.table();
This is an interesting idea, and it remind
my on some feedback about
"I
wanted to count a stream, but there was no
count() method -- I first
needed to figure out, that I need to group
the stream first to be
able
to count it. It does make sense in
hindsight but was not obvious in
the
beginning". Thus, carrying out this
thought, we could also do the
following:
stream.count().groupedBy().windowedBy().table();
-> Note, I use "grouped" and "windowed"
instead of imperative here,
as
it comes after the count()
This would be more consistent than your
proposal (that has grouping
before but windowing after count()). It
might even allow us to enrich
the API with a some syntactic sugar like
`stream.count().table()` to
get
the overall count of all records (this
would obviously not scale,
but we
could support it -- if not now, maybe later).
I guess i'd prefer
stream.groupBy().windowBy().count()
stream.groupBy().windowBy().reduce()
stream.groupBy().count()
As i said above, everything that happens
before the final aggregate
call
can be applied to any of them. So it makes
sense to me to do those
things
ahead of the final aggregate call.
Last about builder pattern. I am convinced
that we need some
"terminal"
operator/method that tells us when to add
the processor to the
topology.
But I don't see the need for a plain
builder pattern that feels
alien to
me (see my argument about the second join
proposal). Using .stream()
/
.table() as use in many examples might
work. But maybe a more generic
name that we can use in all places like
build() or apply() might
also be
an option.
Sure, a generic name might be ok.
-Matthias
On 6/29/17 7:37 AM, Damian Guy wrote:
Thanks Kyle.
On Thu, 29 Jun 2017 at 15:11 Kyle
Winkelman <
winkelman.k...@gmail.com
<mailto:winkelman.k...@gmail.com>>
wrote:
Hi Damian,
When trying to
program in the
fluent API that
has been
discussed
most
it
feels difficult to
know when you will
actually get an
object
you
can
reuse.
What if I make one
KGroupedStream
that I want to
reuse, is it
legal
to
reuse it or does
this approach
expect you to call
grouped each
time?
I'd anticipate that once
you have a KGroupedStream
you can
re-use it
as
you
can today.
You said it yourself in another
post that the grouped stream is
effectively a no-op until a count,
reduce, or aggregate. The way I
see
it
you wouldn’t be able to reuse
anything except KStreams and KTables,
because
most of this fluent api would
continue returning this (this being
the
builder object currently being
manipulated).
So, if you ever store a reference to
anything but KStreams and
KTables
and
you use it in two different ways
then its possible you make
conflicting
withXXX() calls on the same builder.
No necessarily true. It could return a
new instance of the builder,
i.e.,
the builders being immutable. So if
you held a reference to the
builder
it
would always be the same as it was
when it was created.
GroupedStream<K,V>
groupedStreamWithDefaultSerdes =
kStream.grouped();
GroupedStream<K,V>
groupedStreamWithDeclaredSerdes =
groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
I’ll admit that this shouldn’t
happen but some user is going to do
it
eventually…
Depending on implementation uses
of groupedStreamWithDefaultSerdes
would
most likely be equivalent to the
version withDeclaredSerdes. One
work
around would be to always make
copies of the config objects you are
building, but this approach has
its own problem because now we
have to
identify which configs are
equivalent so we don’t create
repeated
processors.
The point of this long winded
example is that we always have to be
thinking about all of the possible
ways it could be misused by a
user
(causing them to see hard to
diagnose problems).
Exactly! That is the point of the
discussion really.
In my attempt at a couple methods
with builders I feel that I could
confidently say the user couldn’t
really mess it up.
// Count
KTable<String, Long> count =
kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
The kGroupedStream is reusable and
if they attempted to reuse the
Count
for some reason it would throw an
error message saying that a store
named
“my-store” already exists.
Yes i agree and i think using builders
is my preferred pattern.
Cheers,
Damian
Thanks,
Kyle
From: Damian Guy
Sent: Thursday, June 29, 2017 3:59 AM
To: d...@kafka.apache.org
<mailto:d...@kafka.apache.org>
Subject: Re: [DISCUSS] Streams
DSL/StateStore Refactoring
Hi Kyle,
Thanks for your input. Really
appreciated.
On Thu, 29 Jun 2017 at 06:09 Kyle
Winkelman <
winkelman.k...@gmail.com <mailto:
winkelman.k...@gmail.com>
wrote:
I like more of a builder
pattern even though others
have voiced
against
it. The reason I like it is
because it makes it clear to
the user
that
a
call to KGroupedStream#count
will return a KTable not some
intermediate
class that I need to undetstand.
Yes, that makes sense.
When trying to program in the
fluent API that has been
discussed
most
it
feels difficult to know when
you will actually get an
object you
can
reuse.
What if I make one
KGroupedStream that I want to
reuse, is it
legal
to
reuse it or does this approach
expect you to call grouped each
time?
I'd anticipate that once you have
a KGroupedStream you can re-use
it
as
you
can today.
This question doesn’t pop into
my head at all in the builder
pattern
I
assume I can reuse everything.
Finally, I like .groupByKey
and .groupBy(KeyValueMapper)
not a big
fan
of
the grouped.
Yes, grouped() was more for
demonstration and because
groupBy()
and
groupByKey() were taken! So i'd
imagine the api would actually
want to
be
groupByKey(/** no required
args***/).withOptionalArg() and
groupBy(KeyValueMapper
m).withOpitionalArg(...) of
course this all
depends
on maintaining backward
compatibility.
Unfortunately, the below
approach would require atleast 2
(probably
3)
overloads (one for returning a
KTable and one for returning a
KTable
with
Windowed Key, probably would
want to split windowed and
sessionwindowed
for
ease of implementation) of
each count, reduce, and
aggregate.
Obviously not exhaustive but
enough for you to get the
picture.
Count,
Reduce, and Aggregate supply 3
static methods to initialize the
builder:
// Count
KTable<String, Long> count =
groupedStream.count(Count.count().withQueryableStoreName("my-store"));
// Windowed Count
KTable<Windowed<String>, Long>
windowedCount =
groupedStream.count(Count.windowed(TimeWindows.of(10L).until
(10)).withQueryableStoreName("my-windowed-store"));
// Session Count
KTable<Windowed<String>, Long>
sessionCount =
groupedStream.count(Count.sessionWindowed(SessionWindows.
with(10L)).withQueryableStoreName("my-session-windowed-store"));
Above and below, i think i'd
prefer it to be:
groupedStream.count(/** non
windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
// Reduce
Reducer<Long> reducer;
KTable<String, Long> reduce =
groupedStream.reduce(reducer,
Reduce.reduce().withQueryableStoreName("my-store"));
// Aggregate Windowed with
Custom Store
Initializer<String> initializer;
Aggregator<String, Long,
String> aggregator;
KTable<Windowed<String>,
String> aggregate =
groupedStream.aggregate(initializer,
aggregator,
Aggregate.windowed(TimeWindows.of(10L).until(10)).
withStateStoreSupplier(stateStoreSupplier)));
// Cogroup SessionWindowed
KTable<String, String> cogrouped
=
groupedStream1.cogroup(aggregator1)
.cogroup(groupedStream2,
aggregator2)