Hi Mathias,

thanks again for your reply. Ill try to take some time now and make some more exhaustive remarks.

On 09.07.2017 21:48, Matthias J. Sax wrote:
I think we do have a very good discussion and people openly share their
ideas. So I am not sure why your are frustrated (at least I get this
impression).
We do have good discussions. Its just that I am currently unhappy with the balance I make from: "How bad I want my Ideas to be understood" vs "How much time I can throw in". That number is currently just through the roof! sorry that this makes it into the tone, really just my circumstances right now.

Additional to that: We have many features 1:n join, a different group by implementation, lateral view. All these things developed in 0.10.0.1 and have most of our stream processing needs covered now. Mostly with PAPI <=> DSL Switcharoo. We finally start an attempt to help upstreaming these feautures and coming up with some sort of blog post. And 0.10.0.1 was like this little grass roots thing and sooo damn pretty. And I look at current trunk and there is quite some weeds growing and it really hurts me. More on this later I guess.

Maybe it might be best if you propose an API change by yourself similar
to what Damian and Guozhang did (to whatever extend your time constraint
permits). I personally don't know exactly what an ideal API from your
point of view is atm, but this discussion would benefit a lot if you
could share it.
not entirely sure what to line out. Ill try at the end to give it a
I don't understand why custom stores in DSL?
Why not? Maybe you can elaborate a little more?
A KTable is nothing by default: A KTable can _describe_ a mapping and a filtering from the join of 2 aggregates. Sources and Aggregates are the 2 things that are something. They are special KTables not like a join. When I check the newer codebase a join will now maintain a state store that is a copy of the 2 inputs (if its made queryiable with some nullchecks on strings). 2 reasons I can think we are there: 1. Less lookups on reads (wich kinda makes a point for High throughput IQ wich in my opionion becomes a task for Connect and materialize in some high throughput replicated DB) 2. It was to complicated to keep track of who has the serde for what and we didn't want to implement nested serialisation for all joined together things (wich is what a Ktable really is and I would have voted for that) On the other to make it usable for a wide audience (we hope to get to hadoop grade adoption?) It probably needs to be configurable if you do 1 or 2 as people can balance tradeoffs between more diskspace used / query latency / unnecessary transport (filter finds out its null after join)

When users put a Storename or a Supplier. Usually what they want to say is "I want to use it in IQ" (would love to hear counter arguments on that, honestly can't think of any). Back then when IQ was beeing investigated I lengthly described my Idea of just putting KTable:getQueryHandle(). It would return a query handle wich implements get(K):V. A very basic approach would be to get the tables name and just wrap that around. People could store the
QueryHandle and after starting streams use it to access the tables data.

and I don't understand why
we are not concidering a more generic config based appraoch?
Not sure what you exactly mean. Sound interesting. I don't like the idea
to mix configuration into the DSL (even if I am still not sure, where to
draw the line, ie, what should we consider a config and what not).
Very difficult line to draw indeed. In a case to case evaluation i feel like finding reasonable answers, but I am very config heavy to be honest. Having tons of Hive on my back, configs feel really neat. I can do all sorts of things by just setting options (run this in MR, this in TEZ, this on spark, no map side agg buffer please, yada yada yada yada) So I really do like the idea. Having the idea of "topology might change but semantics are preserved" sounds good for configs. Most of the Statestore stuff probably changes semantics, but usually in crash situations (persistent(), enablelogging()). I am 100% with you that the decision for that is not easy and may be the most problematic point.

About `through`: I think it does make sense to allow the specification
of a store-name to make the store queryable (it's optional in 0.11 btw).
It's the same as for `KStreamBuilder.table()` -- so not sure why this
should be wrong?
getQueryHandle() would easily cover this case aswell. Whyever a table on the other side of a topic is worth more I don't quite get, some edgy repartitioning thing I guess. Just opening into many partitions
I would call to the connect guys again.

Note, that not all KTables are materialized in a store atm. So it's an
easy way to make a non-materialized KTable queryable.
I think that is inline with my comment above.

also providing Serdes by config is neat. wouldn't even need to go into
the code then would also save a ton. (We have the defaults one in conf
why not override the specific ones?)
I am not sure, if Serdes are really a config? I mean, the data types are
hard coded into the code, so it does make sense to specify the Serdes
accordingly. I am also not sure how we would map Serdes from the config
to the corresponding operator?
true! maybe not an ideal case where configs help with overloading. I guess people are either using the global untyped one or a typed one for all steps. So statestore is probably a better case. Its going to be referenced by a name always anyways so one could use this name to provide additional configs to the Statestore.
Probably also defining a factory used to build it.

Similarly a join has some sort of name, currently its 3 names, wich would need unifying to some degree, but then also the joins could be addressed with configs. But Joins don't seem to have the to heaver overloading problem (Only store related :D). But to be honest I can't judge the usefulness of outer and left. Not a pattern that I came across yet for us its always inner. Maybe materialized but not sending old values is that what it does? Sorry can't wrap my head round that just now
heading towards 3am.

The example I provided was

streams.$applicationid.stores.$storename.inmemory = false
streams.$applicationid.stores.$storename.cachesize = 40k

for the configs. The Query Handle thing make sense hopefully.

Best Jan


-Matthias


On 7/8/17 2:23 AM, Jan Filipiak wrote:
Hi Matthias thanks,

Exactly what I was guessing.

I don't understand why custom stores in DSL? and I don't understand why
we are not concidering a more generic config based appraoch?

StateStores in DSL => what I really think we are looking for PAPA => DSL
=> PAPI  back and forth switcharoo capabilities.

Looking at the most overloaded that I can currently find "through()" 2
of them come from the broken idea of "the user provides a name for the
statestore for IQ" and custom statestores.
 From the beginning I said that's madness. That is the real disease we
need to fix IMHO. To be honest I also don't understand why through with
statestore is particularly usefull, second Unique Key maybe?

also providing Serdes by config is neat. wouldn't even need to go into
the code then would also save a ton. (We have the defaults one in conf
why not override the specific ones?)

Does this makes sense to people? what pieces should i outline with code
(time is currently sparse :( but I can pull of some smaller examples i
guess)

Best Jan





On 08.07.2017 01:23, Matthias J. Sax wrote:
It's too issues we want to tackle

   - too many overload (for some method we have already more than 10(
   - improve custom store API

-Matthias


On 7/7/17 3:42 PM, Jan Filipiak wrote:
It makes me want to cry.

why on earth is the DSL going to expose all its implementation
details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking
for a way to let the user switch back and forth between PAPI and DSL?

A change as the proposed would not eliminate any of my pain points while
still being a heck of work migrating towards to.

Since I am only following this from the point where Eno CC'ed it into
the users list:

Can someone please rephrase for me what problem this is trying to solve?
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps
us nowhere in making the configs more flexible, its just syntactic
sugar.

A low effort shoot like: lets add a properties to operations that would
otherwise become overloaded to heavy? Or pull the configs by some naming
schema
form the overall properties. Additionally to that we get rid of
StateStoreSuppliers in the DSL and have them also configured by said
properties.

=> way easier to migrate to, way less risk, way more flexible in the
future (different implementations of the same operation don't require
code change to configure)

Line 184 makes especially no sense to me. what is a KTableKTable non
materialized join anyways?

Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:
I messed the indentation on github code repos; this would be easier to
read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang <wangg...@gmail.com>
wrote:

Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the
builder
pattern v.s. using some "secondary classes". And I'm thinking if we
can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/


java/org/apache/kafka/streams/RefactoredAPIs.java

The key idea is to tolerate the final "table()" or "stream()"
function to
"upgrade" from the secondary classes to the first citizen classes,
while
having all the specs inside this function. Also this proposal
includes some
other refactoring that people have been discussed about for the
builder to
reduce the overloaded functions as well. WDYT?


Guozhang


On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy <damian....@gmail.com>
wrote:

Hi Jan,

Thanks very much for the input.

On Tue, 4 Jul 2017 at 08:54 Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi Damian,

I do see your point of something needs to change. But I fully agree
with
Gouzhang when he says.
---

But since this is a incompatibility change, and we are going to
remove
the
compatibility annotations soon it means we only have one chance
and we
really have to make it right.
----


I think we all agree on this one! Hence the discussion.


I fear all suggestions do not go far enough to become something that
will
carry on for very much longer.
I am currently working on KAFKA-3705 and try to find the most
easy way
for
the user to give me all the required functionality. The easiest
interface I
could come up so far can be looked at here.


https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L622
And its already horribly complicated. I am currently unable to
find the
right abstraction level to have everything falling into place
naturally. To
be honest I already think introducing


To be fair that is not a particularly easy problem to solve!


https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L493
was unideal and makes everything a mess.
I'm not sure i agree that it makes everything a mess, but It could
have
been done differently.

The JoinType:Whatever is also not really flexible. 2 things come
to my
mind:
1. I don't think we should rule out config based decisions say
configs
like
           streams.$applicationID.joins.$joinname.conf = value

Is this just for config? Or are you suggesting that we could somehow
"code"
the join in a config file?


This can allow for tremendous changes without single API change and
IMO
it
was not considered enough yet.

2. Push logic from the DSL to the Callback classes. A ValueJoiner
for
example can be used to implement different join types as the user
wishes.
Do you have an example of how this might look?


As Gouzhang said: stopping to break users is very important.
Of course. We want to make it as easy as possible for people to use
streams.


especially with this changes + All the plans I sadly only have in my
head
but hopefully the first link can give a glimpse.

Thanks for preparing the examples made it way clearer to me what
exactly
we are talking about. I would argue to go a bit slower and more
carefull on
this one. At some point we need to get it right. Peeking over to the
hadoop
guys with their hughe userbase. Config files really work well for
them.

Best Jan





On 30.06.2017 09:31, Damian Guy wrote:
Thanks Matthias

On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax
<matth...@confluent.io>
wrote:
I am just catching up on this thread, so sorry for the long
email in
advance... Also, it's to some extend a dump of thoughts and not
always a
clear proposal. Still need to think about this in more detail. But
maybe
it helps other to get new ideas :)


However, I don't understand your argument about putting
aggregate()
after the withXX() -- all the calls to withXX() set optional
parameters
for aggregate() and not for groupBy() -- but a
groupBy().withXX()
indicates that the withXX() belongs to the groupBy(). IMHO, this
might
be quite confusion for developers.


I see what you are saying, but the grouped stream is
effectively a
no-op
until you call one of the aggregate/count/reduce etc
functions. So
the
optional params are ones that are applicable to any of the
operations
you
can perform on this grouped stream. Then the final
count()/reduce()/aggregate() call has any of the params that are
required/specific to that function.

I understand your argument, but you don't share the conclusion.
If we
need a "final/terminal" call, the better way might be

.groupBy().count().withXX().build()

(with a better name for build() though)


The point is that all the other calls, i.e,withBlah, windowed, etc
apply
too all the aggregate functions. The terminal call being the actual
type
of
aggregation you want to do. I personally find this more natural
than
groupBy().count().withBlah().build()


groupedStream.count(/** non windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
I like this. However, I don't see a reason to have windowed() and
sessionWindowed(). We should have one top-level `Windows`
interface
that
both `TimeWindows` and `SessionWindows` implement and just have a
single
windowed() method that accepts all `Windows`. (I did not like the
separation of `SessionWindows` in the first place, and this
seems to
be
an opportunity to clean this up. It was hard to change when we
introduced session windows)

Yes - true we should look into that.


Btw: we do you the imperative groupBy() and groupByKey(), and
thus we
might also want to use windowBy() (instead of windowed()). Not
sure
how
important this is, but it seems to be inconsistent otherwise.


Makes sense


About joins:  I don't like .withJoinType(JoinType.LEFT) at all. I
think,
defining an inner/left/outer join is not an optional argument
but a
first class concept and should have a proper representation in the
API
(like the current methods join(), leftJoin, outerJoin()).


Yep, i did originally have it as a required param and maybe that is
what
we
go with. It could have a default, but maybe that is confusing.



About the two join API proposals, the second one has too much
boiler
plate code for my taste. Also, the actual join() operator has only
one
argument what is weird to me, as in my thinking process, the main
operator call, should have one parameter per mandatory argument
but
your
proposal put the mandatory arguments into Joins.streamStreamJoin()
call.
This is far from intuitive IMHO.


This is the builder pattern, you only need one param as the builder
has
captured all of the required and optional arguments.


The first join proposal also seems to align better with the
pattern
suggested for aggregations and having the same pattern for all
operators
is important (as you stated already).


This is why i offered two alternatives as i started out with. 1 is
the
builder pattern, the other is the more fluent pattern.


Coming back to the config vs optional parameter. What about
having a
method withConfig[s](...) that allow to put in the configuration?


Sure, it is currently called withLogConfig() as that is the only
thing
that
is really config.


This also raises the question if until() is a windows property?
Actually, until() seems to be a configuration parameter and thus,
should
not not have it's own method.


Hmmm, i don't agree. Until is a property of the window. It is going
to be
potentially different for every window operation you do in a
streams
app.
Browsing throw your example DSL branch, I also saw this one:

final KTable<Windowed<String>, Long> windowed>
     groupedStream.counting()
                     .windowed(TimeWindows.of(10L).until(10))
                     .table();
This is an interesting idea, and it remind my on some feedback
about
"I
wanted to count a stream, but there was no count() method -- I
first
needed to figure out, that I need to group the stream first to be
able
to count it. It does make sense in hindsight but was not
obvious in
the
beginning". Thus, carrying out this thought, we could also do the
following:

stream.count().groupedBy().windowedBy().table();

-> Note, I use "grouped" and "windowed" instead of imperative
here,
as
it comes after the count()

This would be more consistent than your proposal (that has
grouping
before but windowing after count()). It might even allow us to
enrich
the API with a some syntactic sugar like
`stream.count().table()` to
get
the overall count of all records (this would obviously not scale,
but we
could support it -- if not now, maybe later).


I guess i'd prefer
stream.groupBy().windowBy().count()
stream.groupBy().windowBy().reduce()
stream.groupBy().count()

As i said above, everything that happens before the final aggregate
call
can be applied to any of them. So it makes sense to me to do those
things
ahead of the final aggregate call.


Last about builder pattern. I am convinced that we need some
"terminal"
operator/method that tells us when to add the processor to the
topology.
But I don't see the need for a plain builder pattern that feels
alien to
me (see my argument about the second join proposal). Using
.stream()
/
.table() as use in many examples might work. But maybe a more
generic
name that we can use in all places like build() or apply() might
also be
an option.


Sure, a generic name might be ok.




-Matthias



On 6/29/17 7:37 AM, Damian Guy wrote:
Thanks Kyle.

On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <
winkelman.k...@gmail.com>
wrote:

Hi Damian,

When trying to program in the fluent API that has been
discussed
most
it
feels difficult to know when you will actually get an object
you
can
reuse.
What if I make one KGroupedStream that I want to reuse,
is it
legal
to
reuse it or does this approach expect you to call grouped
each
time?
I'd anticipate that once you have a KGroupedStream you can
re-use it
as
you
can today.
You said it yourself in another post that the grouped stream is
effectively a no-op until a count, reduce, or aggregate. The
way I
see
it
you wouldn’t be able to reuse anything except KStreams and
KTables,
because
most of this fluent api would continue returning this (this
being
the
builder object currently being manipulated).
So, if you ever store a reference to anything but KStreams and
KTables
and
you use it in two different ways then its possible you make
conflicting
withXXX() calls on the same builder.


No necessarily true. It could return a new instance of the
builder,
i.e.,
the builders being immutable. So if you held a reference to the
builder
it
would always be the same as it was when it was created.


GroupedStream<K,V> groupedStreamWithDefaultSerdes =
kStream.grouped();
GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);


I’ll admit that this shouldn’t happen but some user is going
to do
it
eventually…
Depending on implementation uses of
groupedStreamWithDefaultSerdes
would
most likely be equivalent to the version withDeclaredSerdes. One
work
around would be to always make copies of the config objects you
are
building, but this approach has its own problem because now we
have to
identify which configs are equivalent so we don’t create
repeated
processors.

The point of this long winded example is that we always have
to be
thinking about all of the possible ways it could be misused by a
user
(causing them to see hard to diagnose problems).

Exactly! That is the point of the discussion really.


In my attempt at a couple methods with builders I feel that I
could
confidently say the user couldn’t really mess it up.
// Count
KTable<String, Long> count =

kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));


The kGroupedStream is reusable and if they attempted to reuse
the
Count
for some reason it would throw an error message saying that a
store
named
“my-store” already exists.


Yes i agree and i think using builders is my preferred pattern.

Cheers,
Damian


Thanks,
Kyle

From: Damian Guy
Sent: Thursday, June 29, 2017 3:59 AM
To: d...@kafka.apache.org
Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring

Hi Kyle,

Thanks for your input. Really appreciated.

On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <
winkelman.k...@gmail.com
wrote:

I like more of a builder pattern even though others have voiced
against
it. The reason I like it is because it makes it clear to the
user
that
a
call to KGroupedStream#count will return a KTable not some
intermediate
class that I need to undetstand.

Yes, that makes sense.


When trying to program in the fluent API that has been
discussed
most
it
feels difficult to know when you will actually get an object
you
can
reuse.
What if I make one KGroupedStream that I want to reuse, is it
legal
to
reuse it or does this approach expect you to call grouped each
time?
I'd anticipate that once you have a KGroupedStream you can
re-use
it
as
you
can today.


This question doesn’t pop into my head at all in the builder
pattern
I
assume I can reuse everything.
Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a
big
fan
of
the grouped.

Yes, grouped() was more for demonstration and because groupBy()
and
groupByKey() were taken! So i'd imagine the api would actually
want to
be
groupByKey(/** no required args***/).withOptionalArg() and
groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this
all
depends
on maintaining backward compatibility.


Unfortunately, the below approach would require atleast 2
(probably
3)
overloads (one for returning a KTable and one for returning a
KTable
with
Windowed Key, probably would want to split windowed and
sessionwindowed
for
ease of implementation) of each count, reduce, and aggregate.
Obviously not exhaustive but enough for you to get the picture.
Count,
Reduce, and Aggregate supply 3 static methods to initialize the
builder:
// Count
KTable<String, Long> count =

groupedStream.count(Count.count().withQueryableStoreName("my-store"));

// Windowed Count
KTable<Windowed<String>, Long> windowedCount =

groupedStream.count(Count.windowed(TimeWindows.of(10L).until
(10)).withQueryableStoreName("my-windowed-store"));
// Session Count
KTable<Windowed<String>, Long> sessionCount =

groupedStream.count(Count.sessionWindowed(SessionWindows.
with(10L)).withQueryableStoreName("my-session-windowed-store"));
Above and below, i think i'd prefer it to be:
groupedStream.count(/** non windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)




// Reduce
Reducer<Long> reducer;
KTable<String, Long> reduce = groupedStream.reduce(reducer,
Reduce.reduce().withQueryableStoreName("my-store"));

// Aggregate Windowed with Custom Store
Initializer<String> initializer;
Aggregator<String, Long, String> aggregator;
KTable<Windowed<String>, String> aggregate =
groupedStream.aggregate(initializer, aggregator,

Aggregate.windowed(TimeWindows.of(10L).until(10)).
withStateStoreSupplier(stateStoreSupplier)));
// Cogroup SessionWindowed
KTable<String, String> cogrouped =
groupedStream1.cogroup(aggregator1)
            .cogroup(groupedStream2, aggregator2)
            .aggregate(initializer, aggregator,
Aggregate.sessionWindowed(SessionWindows.with(10L),
sessionMerger).withQueryableStoreName("my-store"));



public class Count {

        public static class Windowed extends Count {
            private Windows windows;
        }
        public static class SessionWindowed extends Count {
            private SessionWindows sessionWindows;
        }

        public static Count count();
        public static Windowed windowed(Windows windows);
        public static SessionWindowed
sessionWindowed(SessionWindows
sessionWindows);

        // All withXXX(...) methods.
}

public class KGroupedStream {
        public KTable<K, Long> count(Count count);
        public KTable<Windowed<K>, Long> count(Count.Windowed
count);
        public KTable<Windowed<K>, Long>
count(Count.SessionWindowed
count);
…
}


Thanks,
Kyle

From: Guozhang Wang
Sent: Wednesday, June 28, 2017 7:45 PM
To: d...@kafka.apache.org
Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring

I played the current proposal a bit with
https://github.com/dguy/kafka/
tree/dsl-experiment <
https://github.com/dguy/kafka/tree/dsl-experiment
,
and here are my observations:

1. Personally I prefer

        "stream.group(mapper) / stream.groupByKey()"

than

        "stream.group().withKeyMapper(mapper) / stream.group()"

Since 1) withKeyMapper is not enforced programmatically
though it
is
not
"really" optional like others, 2) syntax-wise it reads more
natural.
I think it is okay to add the APIs in (


https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr
c/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
)
in KGroupedStream.


2. For the "withStateStoreSupplier" API, are the user
supposed to
pass
in
the most-inner state store supplier (e.g. then one whose get()
return
RocksDBStore), or it is supposed to return the most-outer
supplier
with
logging / metrics / etc? I think it would be more useful to
only
require
users pass in the inner state store supplier while specifying
caching /
logging through other APIs.

In addition, the "GroupedWithCustomStore" is a bit
suspicious to
me:
we
are
allowing users to call other APIs like "withQueryableName"
multiple
time,
but only call "withStateStoreSupplier" only once in the end.
Why
is
that?
3. The current DSL seems to be only for aggregations, what
about
joins?
4. I think it is okay to keep the "withLogConfig": for the
StateStoreSupplier it will still be user code specifying the
topology
so
I
do not see there is a big difference.


5. "WindowedGroupedStream" 's withStateStoreSupplier should
take
the
windowed state store supplier to enforce typing?


Below are minor ones:

6. "withQueryableName": maybe better "withQueryableStateName"?

7. "withLogConfig": maybe better "withLoggingTopicConfig()"?



Guozhang



On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <
matth...@confluent.io>
wrote:

I see your point about "when to add the processor to the
topology".
That
is indeed an issue. Not sure it we could allow "updates" to
the
topology...
I don't see any problem with having all the withXX() in KTable
interface
-- but this might be subjective.


However, I don't understand your argument about putting
aggregate()
after the withXX() -- all the calls to withXX() set optional
parameters
for aggregate() and not for groupBy() -- but a
groupBy().withXX()
indicates that the withXX() belongs to the groupBy(). IMHO,
this
might
be quite confusion for developers.


-Matthias

On 6/28/17 2:55 AM, Damian Guy wrote:
I also think that mixing optional parameters with configs
is a
bad
idea.
Have not proposal for this atm but just wanted to mention
it.
Hope
to
find some time to come up with something.


Yes, i don't like the mix of config either. But the only real
config
here
is the logging config - which we don't really need as it can
already
be
done via a custom StateStoreSupplier.


What I don't like in the current proposal is the
.grouped().withKeyMapper() -- the current solution with
.groupBy(...)
and .groupByKey() seems better. For clarity, we could
rename to
.groupByNewKey(...) and .groupByCurrentKey() (even if we
should
find
some better names).


it could be groupByKey(), groupBy() or something different bt



The proposed pattern "chains" grouping and aggregation too
close
together. I would rather separate both more than less,
ie, do
into
the
opposite direction.

I am also wondering, if we could so something more "fluent".
The
initial
proposal was like:

groupedStream.count()
       .withStoreName("name")
       .withCachingEnabled(false)
       .withLoggingEnabled(config)
       .table()
The .table() statement in the end was kinda alien.

I agree, but then all of the withXXX methods need to be on
KTable
which
is
worse in my opinion. You also need something that is going to
"build"
the
internal processors and add them to the topology.


The current proposal put the count() into the end -- ie, the
optional
parameter for count() have to specified on the .grouped()
call
--
this
does not seems to be the best way either.


I actually prefer this method as you are building a grouped
stream
that
you
will aggregate. So
table.grouped(...).withOptionalStuff().aggregate(..)
etc
seems natural to me.


I did not think this through in detail, but can't we just do
the
initial
proposal with the .table() ?

groupedStream.count().withStoreName("name").mapValues(...)

Each .withXXX(...) return the current KTable and all the
.withXXX()
are
just added to the KTable interface. Or do I miss anything
why
this
wont'
work or any obvious disadvantage?



See above.


-Matthias

On 6/22/17 4:06 AM, Damian Guy wrote:
Thanks everyone. My latest attempt is below. It builds
on the
fluent
approach, but i think it is slightly nicer.
I agree with some of what Eno said about mixing configy
stuff
in
the
DSL,
but i think that enabling caching and enabling logging are
things
that
aren't actually config. I'd probably not add
withLogConfig(...)
(even
though it is below) as this is actually config and we
already
have
a
way
of
doing that, via the StateStoreSupplier. Arguably we
could use
the
StateStoreSupplier for disabling caching etc, but as it
stands
that
is
a
bit of a tedious process for someone that just wants to use
the
default
storage engine, but not have caching enabled.

There is also an orthogonal concern that Guozhang alluded
to....
If
you
want to plug in a custom storage engine and you want it
to be
logged
etc,
you would currently need to implement that yourself.
Ideally
we
can
provide
a way where we will wrap the custom store with logging,
metrics,
etc. I
need to think about where this fits, it is probably more
appropriate
on
the
Stores API.

final KeyValueMapper<String, String, Long> keyMapper =
null;
// count with mapped key
final KTable<Long, Long> count = stream.grouped()
            .withKeyMapper(keyMapper)
            .withKeySerde(Serdes.Long())
            .withValueSerde(Serdes.String())
            .withQueryableName("my-store")
            .count();

// windowed count
final KTable<Windowed<String>, Long> windowedCount =
stream.grouped()
            .withQueryableName("my-window-store")
            .windowed(TimeWindows.of(10L).until(10))
            .count();

// windowed reduce
final Reducer<String> windowedReducer = null;
final KTable<Windowed<String>, String> windowedReduce =
stream.grouped()
            .withQueryableName("my-window-store")
            .windowed(TimeWindows.of(10L).until(10))
            .reduce(windowedReducer);

final Aggregator<String, String, Long> aggregator = null;
final Initializer<Long> init = null;

// aggregate
final KTable<String, Long> aggregate = stream.grouped()
            .withQueryableName("my-aggregate-store")
            .aggregate(aggregator, init, Serdes.Long());

final StateStoreSupplier<KeyValueStore<String, Long>>
stateStoreSupplier
= null;
// aggregate with custom store
final KTable<String, Long> aggWithCustomStore =
stream.grouped()
            .withStateStoreSupplier(stateStoreSupplier)
            .aggregate(aggregator, init);

// disable caching
stream.grouped()
            .withQueryableName("name")
            .withCachingEnabled(false)
            .count();

// disable logging
stream.grouped()
            .withQueryableName("q")
            .withLoggingEnabled(false)
            .count();

// override log config
final Reducer<String> reducer = null;
stream.grouped()
            .withLogConfig(Collections.sin
gletonMap("segment.size",
"10"))
            .reduce(reducer);


If anyone wants to play around with this you can find the
code
here:
https://github.com/dguy/kafka/tree/dsl-experiment

Note: It won't actually work as most of the methods just
return
null.
Thanks,
Damian


On Thu, 22 Jun 2017 at 11:18 Ismael Juma
<ism...@juma.me.uk>
wrote:
Thanks Damian. I think both options have pros and cons.
And
both
are
better
than overload abuse.

The fluent API approach reads better, no mention of
builder
or
build
anywhere. The main downside is that the method signatures
are a
little
less
clear. By reading the method signature, one doesn't
necessarily
knows
what
it returns. Also, one needs to figure out the special
method
(`table()`
in
this case) that gives you what you actually care about
(`KTable`
in
this
case). Not major issues, but worth mentioning while doing
the
comparison.
The builder approach avoids the issues mentioned above,
but
it
doesn't
read
as well.

Ismael

On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <
damian....@gmail.com
wrote:
Hi,

I'd like to get a discussion going around some of the API
choices
we've
made in the DLS. In particular those that relate to
stateful
operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the
API,
i.e,
there
are 9 overloads for KGroupedStream.count(..)! It is
becoming
noisy
and
i
feel it is only going to get worse as we add more
optional
params.
In
particular we've had some requests to be able to turn
caching
off,
or
change log configs,  on a per operator basis (note
this can
be
done
now
if
you pass in a StateStoreSupplier, but this can be a bit
cumbersome).
So this is a bit of an open question. How can we
change the
DSL
overloads
so that it flows, is simple to use and understand, and is
easily
extended
in the future?

One option would be to use a fluent API approach for
providing
the
optional
params, so something like this:

groupedStream.count()
       .withStoreName("name")
       .withCachingEnabled(false)
       .withLoggingEnabled(config)
       .table()



Another option would be to provide a Builder to the count
method,
so
it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").with
CachingEnabled(false).build())
Another option is to say: Hey we don't need this, what
are
you
on
about!
The above has focussed on state store related overloads,
but
the
same
ideas
could  be applied to joins etc, where we presently have
many
join
methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian

--
-- Guozhang


--
-- Guozhang



Reply via email to