I think we do have a very good discussion and people openly share their
ideas. So I am not sure why your are frustrated (at least I get this
impression).

Maybe it might be best if you propose an API change by yourself similar
to what Damian and Guozhang did (to whatever extend your time constraint
permits). I personally don't know exactly what an ideal API from your
point of view is atm, but this discussion would benefit a lot if you
could share it.

> I don't understand why custom stores in DSL?

Why not? Maybe you can elaborate a little more?

> and I don't understand why
>> we are not concidering a more generic config based appraoch?

Not sure what you exactly mean. Sound interesting. I don't like the idea
to mix configuration into the DSL (even if I am still not sure, where to
draw the line, ie, what should we consider a config and what not).

About `through`: I think it does make sense to allow the specification
of a store-name to make the store queryable (it's optional in 0.11 btw).
It's the same as for `KStreamBuilder.table()` -- so not sure why this
should be wrong?

Note, that not all KTables are materialized in a store atm. So it's an
easy way to make a non-materialized KTable queryable.

>> also providing Serdes by config is neat. wouldn't even need to go into
>> the code then would also save a ton. (We have the defaults one in conf
>> why not override the specific ones?)

I am not sure, if Serdes are really a config? I mean, the data types are
hard coded into the code, so it does make sense to specify the Serdes
accordingly. I am also not sure how we would map Serdes from the config
to the corresponding operator?


-Matthias


On 7/8/17 2:23 AM, Jan Filipiak wrote:
> Hi Matthias thanks,
> 
> Exactly what I was guessing.
> 
> I don't understand why custom stores in DSL? and I don't understand why
> we are not concidering a more generic config based appraoch?
> 
> StateStores in DSL => what I really think we are looking for PAPA => DSL
> => PAPI  back and forth switcharoo capabilities.
> 
> Looking at the most overloaded that I can currently find "through()" 2
> of them come from the broken idea of "the user provides a name for the
> statestore for IQ" and custom statestores.
> From the beginning I said that's madness. That is the real disease we
> need to fix IMHO. To be honest I also don't understand why through with
> statestore is particularly usefull, second Unique Key maybe?
> 
> also providing Serdes by config is neat. wouldn't even need to go into
> the code then would also save a ton. (We have the defaults one in conf
> why not override the specific ones?)
> 
> Does this makes sense to people? what pieces should i outline with code
> (time is currently sparse :( but I can pull of some smaller examples i
> guess)
> 
> Best Jan
> 
> 
> 
> 
> 
> On 08.07.2017 01:23, Matthias J. Sax wrote:
>> It's too issues we want to tackle
>>
>>   - too many overload (for some method we have already more than 10(
>>   - improve custom store API
>>
>> -Matthias
>>
>>
>> On 7/7/17 3:42 PM, Jan Filipiak wrote:
>>> It makes me want to cry.
>>>
>>> why on earth is the DSL going to expose all its implementation
>>> details now?
>>> especially being materialized or not.
>>>
>>> If we want to take usefull steps in that direction maybe we are looking
>>> for a way to let the user switch back and forth between PAPI and DSL?
>>>
>>> A change as the proposed would not eliminate any of my pain points while
>>> still being a heck of work migrating towards to.
>>>
>>> Since I am only following this from the point where Eno CC'ed it into
>>> the users list:
>>>
>>> Can someone please rephrase for me what problem this is trying to solve?
>>> I don't mean to be rude but It uses a problematic feature
>>> "StateStoreSuppliers in DSL" to justify making it even worse. This helps
>>> us nowhere in making the configs more flexible, its just syntactic
>>> sugar.
>>>
>>> A low effort shoot like: lets add a properties to operations that would
>>> otherwise become overloaded to heavy? Or pull the configs by some naming
>>> schema
>>> form the overall properties. Additionally to that we get rid of
>>> StateStoreSuppliers in the DSL and have them also configured by said
>>> properties.
>>>
>>> => way easier to migrate to, way less risk, way more flexible in the
>>> future (different implementations of the same operation don't require
>>> code change to configure)
>>>
>>> Line 184 makes especially no sense to me. what is a KTableKTable non
>>> materialized join anyways?
>>>
>>> Hope we can discuss more on this.
>>>
>>>
>>>
>>> On 07.07.2017 17:23, Guozhang Wang wrote:
>>>> I messed the indentation on github code repos; this would be easier to
>>>> read:
>>>>
>>>> https://codeshare.io/GLWW8K
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Damian / Kyle,
>>>>>
>>>>> I think I agree with you guys about the pros / cons of using the
>>>>> builder
>>>>> pattern v.s. using some "secondary classes". And I'm thinking if we
>>>>> can
>>>>> take a "mid" manner between these two. I spent some time with a slight
>>>>> different approach from Damian's current proposal:
>>>>>
>>>>> https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/
>>>>>
>>>>>
>>>>> java/org/apache/kafka/streams/RefactoredAPIs.java
>>>>>
>>>>> The key idea is to tolerate the final "table()" or "stream()"
>>>>> function to
>>>>> "upgrade" from the secondary classes to the first citizen classes,
>>>>> while
>>>>> having all the specs inside this function. Also this proposal
>>>>> includes some
>>>>> other refactoring that people have been discussed about for the
>>>>> builder to
>>>>> reduce the overloaded functions as well. WDYT?
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy <damian....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jan,
>>>>>>
>>>>>> Thanks very much for the input.
>>>>>>
>>>>>> On Tue, 4 Jul 2017 at 08:54 Jan Filipiak <jan.filip...@trivago.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Damian,
>>>>>>>
>>>>>>> I do see your point of something needs to change. But I fully agree
>>>>>>> with
>>>>>>> Gouzhang when he says.
>>>>>>> ---
>>>>>>>
>>>>>>> But since this is a incompatibility change, and we are going to
>>>>>>> remove
>>>>>> the
>>>>>>> compatibility annotations soon it means we only have one chance
>>>>>>> and we
>>>>>>> really have to make it right.
>>>>>>> ----
>>>>>>>
>>>>>>>
>>>>>> I think we all agree on this one! Hence the discussion.
>>>>>>
>>>>>>
>>>>>>> I fear all suggestions do not go far enough to become something that
>>>>>> will
>>>>>>> carry on for very much longer.
>>>>>>> I am currently working on KAFKA-3705 and try to find the most
>>>>>>> easy way
>>>>>> for
>>>>>>> the user to give me all the required functionality. The easiest
>>>>>> interface I
>>>>>>> could come up so far can be looked at here.
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
>>>>>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>>>>>> kafka/streams/kstream/internals/KTableImpl.java#L622
>>>>>> And its already horribly complicated. I am currently unable to
>>>>>> find the
>>>>>>> right abstraction level to have everything falling into place
>>>>>> naturally. To
>>>>>>> be honest I already think introducing
>>>>>>>
>>>>>>>
>>>>>> To be fair that is not a particularly easy problem to solve!
>>>>>>
>>>>>>
>>>>>>> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2
>>>>>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>>>>>> kafka/streams/kstream/internals/KTableImpl.java#L493
>>>>>>> was unideal and makes everything a mess.
>>>>>> I'm not sure i agree that it makes everything a mess, but It could
>>>>>> have
>>>>>> been done differently.
>>>>>>
>>>>>> The JoinType:Whatever is also not really flexible. 2 things come
>>>>>> to my
>>>>>> mind:
>>>>>>> 1. I don't think we should rule out config based decisions say
>>>>>>> configs
>>>>>> like
>>>>>>>           streams.$applicationID.joins.$joinname.conf = value
>>>>>>>
>>>>>> Is this just for config? Or are you suggesting that we could somehow
>>>>>> "code"
>>>>>> the join in a config file?
>>>>>>
>>>>>>
>>>>>>> This can allow for tremendous changes without single API change and
>>>>>>> IMO
>>>>>> it
>>>>>>> was not considered enough yet.
>>>>>>>
>>>>>>> 2. Push logic from the DSL to the Callback classes. A ValueJoiner
>>>>>>> for
>>>>>>> example can be used to implement different join types as the user
>>>>>> wishes.
>>>>>> Do you have an example of how this might look?
>>>>>>
>>>>>>
>>>>>>> As Gouzhang said: stopping to break users is very important.
>>>>>> Of course. We want to make it as easy as possible for people to use
>>>>>> streams.
>>>>>>
>>>>>>
>>>>>> especially with this changes + All the plans I sadly only have in my
>>>>>> head
>>>>>>> but hopefully the first link can give a glimpse.
>>>>>>>
>>>>>>> Thanks for preparing the examples made it way clearer to me what
>>>>>>> exactly
>>>>>>> we are talking about. I would argue to go a bit slower and more
>>>>>> carefull on
>>>>>>> this one. At some point we need to get it right. Peeking over to the
>>>>>> hadoop
>>>>>>> guys with their hughe userbase. Config files really work well for
>>>>>>> them.
>>>>>>>
>>>>>>> Best Jan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 30.06.2017 09:31, Damian Guy wrote:
>>>>>>>> Thanks Matthias
>>>>>>>>
>>>>>>>> On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax
>>>>>>>> <matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>>> I am just catching up on this thread, so sorry for the long
>>>>>>>>> email in
>>>>>>>>> advance... Also, it's to some extend a dump of thoughts and not
>>>>>> always a
>>>>>>>>> clear proposal. Still need to think about this in more detail. But
>>>>>> maybe
>>>>>>>>> it helps other to get new ideas :)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>> However, I don't understand your argument about putting
>>>>>>>>>>> aggregate()
>>>>>>>>>>> after the withXX() -- all the calls to withXX() set optional
>>>>>>> parameters
>>>>>>>>>>> for aggregate() and not for groupBy() -- but a
>>>>>>>>>>> groupBy().withXX()
>>>>>>>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this
>>>>>> might
>>>>>>>>>>> be quite confusion for developers.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> I see what you are saying, but the grouped stream is
>>>>>>>>>> effectively a
>>>>>>> no-op
>>>>>>>>>> until you call one of the aggregate/count/reduce etc
>>>>>>>>>> functions. So
>>>>>> the
>>>>>>>>>> optional params are ones that are applicable to any of the
>>>>>> operations
>>>>>>> you
>>>>>>>>>> can perform on this grouped stream. Then the final
>>>>>>>>>> count()/reduce()/aggregate() call has any of the params that are
>>>>>>>>>> required/specific to that function.
>>>>>>>>>>
>>>>>>>>> I understand your argument, but you don't share the conclusion.
>>>>>>>>> If we
>>>>>>>>> need a "final/terminal" call, the better way might be
>>>>>>>>>
>>>>>>>>> .groupBy().count().withXX().build()
>>>>>>>>>
>>>>>>>>> (with a better name for build() though)
>>>>>>>>>
>>>>>>>>>
>>>>>>>> The point is that all the other calls, i.e,withBlah, windowed, etc
>>>>>> apply
>>>>>>>> too all the aggregate functions. The terminal call being the actual
>>>>>> type
>>>>>>> of
>>>>>>>> aggregation you want to do. I personally find this more natural
>>>>>>>> than
>>>>>>>> groupBy().count().withBlah().build()
>>>>>>>>
>>>>>>>>
>>>>>>>>>> groupedStream.count(/** non windowed count**/)
>>>>>>>>>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
>>>>>>>>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>>>>>>>>> I like this. However, I don't see a reason to have windowed() and
>>>>>>>>> sessionWindowed(). We should have one top-level `Windows`
>>>>>>>>> interface
>>>>>> that
>>>>>>>>> both `TimeWindows` and `SessionWindows` implement and just have a
>>>>>> single
>>>>>>>>> windowed() method that accepts all `Windows`. (I did not like the
>>>>>>>>> separation of `SessionWindows` in the first place, and this
>>>>>>>>> seems to
>>>>>> be
>>>>>>>>> an opportunity to clean this up. It was hard to change when we
>>>>>>>>> introduced session windows)
>>>>>>>>>
>>>>>>>> Yes - true we should look into that.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Btw: we do you the imperative groupBy() and groupByKey(), and
>>>>>>>>> thus we
>>>>>>>>> might also want to use windowBy() (instead of windowed()). Not
>>>>>>>>> sure
>>>>>> how
>>>>>>>>> important this is, but it seems to be inconsistent otherwise.
>>>>>>>>>
>>>>>>>>>
>>>>>>>> Makes sense
>>>>>>>>
>>>>>>>>
>>>>>>>>> About joins:  I don't like .withJoinType(JoinType.LEFT) at all. I
>>>>>> think,
>>>>>>>>> defining an inner/left/outer join is not an optional argument
>>>>>>>>> but a
>>>>>>>>> first class concept and should have a proper representation in the
>>>>>> API
>>>>>>>>> (like the current methods join(), leftJoin, outerJoin()).
>>>>>>>>>
>>>>>>>>>
>>>>>>>> Yep, i did originally have it as a required param and maybe that is
>>>>>> what
>>>>>>> we
>>>>>>>> go with. It could have a default, but maybe that is confusing.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> About the two join API proposals, the second one has too much
>>>>>>>>> boiler
>>>>>>>>> plate code for my taste. Also, the actual join() operator has only
>>>>>> one
>>>>>>>>> argument what is weird to me, as in my thinking process, the main
>>>>>>>>> operator call, should have one parameter per mandatory argument
>>>>>>>>> but
>>>>>> your
>>>>>>>>> proposal put the mandatory arguments into Joins.streamStreamJoin()
>>>>>> call.
>>>>>>>>> This is far from intuitive IMHO.
>>>>>>>>>
>>>>>>>>>
>>>>>>>> This is the builder pattern, you only need one param as the builder
>>>>>> has
>>>>>>>> captured all of the required and optional arguments.
>>>>>>>>
>>>>>>>>
>>>>>>>>> The first join proposal also seems to align better with the
>>>>>>>>> pattern
>>>>>>>>> suggested for aggregations and having the same pattern for all
>>>>>> operators
>>>>>>>>> is important (as you stated already).
>>>>>>>>>
>>>>>>>>>
>>>>>>>> This is why i offered two alternatives as i started out with. 1 is
>>>>>>>> the
>>>>>>>> builder pattern, the other is the more fluent pattern.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Coming back to the config vs optional parameter. What about
>>>>>>>>> having a
>>>>>>>>> method withConfig[s](...) that allow to put in the configuration?
>>>>>>>>>
>>>>>>>>>
>>>>>>>> Sure, it is currently called withLogConfig() as that is the only
>>>>>>>> thing
>>>>>>> that
>>>>>>>> is really config.
>>>>>>>>
>>>>>>>>
>>>>>>>>> This also raises the question if until() is a windows property?
>>>>>>>>> Actually, until() seems to be a configuration parameter and thus,
>>>>>> should
>>>>>>>>> not not have it's own method.
>>>>>>>>>
>>>>>>>>>
>>>>>>>> Hmmm, i don't agree. Until is a property of the window. It is going
>>>>>> to be
>>>>>>>> potentially different for every window operation you do in a
>>>>>>>> streams
>>>>>> app.
>>>>>>>>> Browsing throw your example DSL branch, I also saw this one:
>>>>>>>>>
>>>>>>>>>> final KTable<Windowed<String>, Long> windowed>
>>>>>>>>>     groupedStream.counting()
>>>>>>>>>>                     .windowed(TimeWindows.of(10L).until(10))
>>>>>>>>>>                     .table();
>>>>>>>>> This is an interesting idea, and it remind my on some feedback
>>>>>>>>> about
>>>>>> "I
>>>>>>>>> wanted to count a stream, but there was no count() method -- I
>>>>>>>>> first
>>>>>>>>> needed to figure out, that I need to group the stream first to be
>>>>>> able
>>>>>>>>> to count it. It does make sense in hindsight but was not
>>>>>>>>> obvious in
>>>>>> the
>>>>>>>>> beginning". Thus, carrying out this thought, we could also do the
>>>>>>>>> following:
>>>>>>>>>
>>>>>>>>> stream.count().groupedBy().windowedBy().table();
>>>>>>>>>
>>>>>>>>> -> Note, I use "grouped" and "windowed" instead of imperative
>>>>>>>>> here,
>>>>>> as
>>>>>>>>> it comes after the count()
>>>>>>>>>
>>>>>>>>> This would be more consistent than your proposal (that has
>>>>>>>>> grouping
>>>>>>>>> before but windowing after count()). It might even allow us to
>>>>>>>>> enrich
>>>>>>>>> the API with a some syntactic sugar like
>>>>>>>>> `stream.count().table()` to
>>>>>> get
>>>>>>>>> the overall count of all records (this would obviously not scale,
>>>>>> but we
>>>>>>>>> could support it -- if not now, maybe later).
>>>>>>>>>
>>>>>>>>>
>>>>>>>> I guess i'd prefer
>>>>>>>> stream.groupBy().windowBy().count()
>>>>>>>> stream.groupBy().windowBy().reduce()
>>>>>>>> stream.groupBy().count()
>>>>>>>>
>>>>>>>> As i said above, everything that happens before the final aggregate
>>>>>> call
>>>>>>>> can be applied to any of them. So it makes sense to me to do those
>>>>>> things
>>>>>>>> ahead of the final aggregate call.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Last about builder pattern. I am convinced that we need some
>>>>>> "terminal"
>>>>>>>>> operator/method that tells us when to add the processor to the
>>>>>> topology.
>>>>>>>>> But I don't see the need for a plain builder pattern that feels
>>>>>> alien to
>>>>>>>>> me (see my argument about the second join proposal). Using
>>>>>>>>> .stream()
>>>>>> /
>>>>>>>>> .table() as use in many examples might work. But maybe a more
>>>>>>>>> generic
>>>>>>>>> name that we can use in all places like build() or apply() might
>>>>>> also be
>>>>>>>>> an option.
>>>>>>>>>
>>>>>>>>>
>>>>>>>> Sure, a generic name might be ok.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 6/29/17 7:37 AM, Damian Guy wrote:
>>>>>>>>>> Thanks Kyle.
>>>>>>>>>>
>>>>>>>>>> On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <
>>>>>> winkelman.k...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Damian,
>>>>>>>>>>>
>>>>>>>>>>>>>>> When trying to program in the fluent API that has been
>>>>>> discussed
>>>>>>>>> most
>>>>>>>>>>> it
>>>>>>>>>>>>>>> feels difficult to know when you will actually get an object
>>>>>> you
>>>>>>> can
>>>>>>>>>>> reuse.
>>>>>>>>>>>>>>> What if I make one KGroupedStream that I want to reuse,
>>>>>>>>>>>>>>> is it
>>>>>>> legal
>>>>>>>>> to
>>>>>>>>>>>>>>> reuse it or does this approach expect you to call grouped
>>>>>>>>>>>>>>> each
>>>>>>> time?
>>>>>>>>>>>>> I'd anticipate that once you have a KGroupedStream you can
>>>>>> re-use it
>>>>>>>>> as
>>>>>>>>>>> you
>>>>>>>>>>>>> can today.
>>>>>>>>>>> You said it yourself in another post that the grouped stream is
>>>>>>>>>>> effectively a no-op until a count, reduce, or aggregate. The
>>>>>>>>>>> way I
>>>>>> see
>>>>>>>>> it
>>>>>>>>>>> you wouldn’t be able to reuse anything except KStreams and
>>>>>>>>>>> KTables,
>>>>>>>>> because
>>>>>>>>>>> most of this fluent api would continue returning this (this
>>>>>>>>>>> being
>>>>>> the
>>>>>>>>>>> builder object currently being manipulated).
>>>>>>>>>> So, if you ever store a reference to anything but KStreams and
>>>>>> KTables
>>>>>>>>> and
>>>>>>>>>>> you use it in two different ways then its possible you make
>>>>>>> conflicting
>>>>>>>>>>> withXXX() calls on the same builder.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> No necessarily true. It could return a new instance of the
>>>>>>>>>> builder,
>>>>>>> i.e.,
>>>>>>>>>> the builders being immutable. So if you held a reference to the
>>>>>> builder
>>>>>>>>> it
>>>>>>>>>> would always be the same as it was when it was created.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> GroupedStream<K,V> groupedStreamWithDefaultSerdes =
>>>>>> kStream.grouped();
>>>>>>>>>>> GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
>>>>>>>>>>> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I’ll admit that this shouldn’t happen but some user is going
>>>>>>>>>>> to do
>>>>>> it
>>>>>>>>>>> eventually…
>>>>>>>>>>> Depending on implementation uses of
>>>>>>>>>>> groupedStreamWithDefaultSerdes
>>>>>>> would
>>>>>>>>>>> most likely be equivalent to the version withDeclaredSerdes. One
>>>>>> work
>>>>>>>>>>> around would be to always make copies of the config objects you
>>>>>>>>>>> are
>>>>>>>>>>> building, but this approach has its own problem because now we
>>>>>> have to
>>>>>>>>>>> identify which configs are equivalent so we don’t create
>>>>>>>>>>> repeated
>>>>>>>>>>> processors.
>>>>>>>>>>>
>>>>>>>>>>> The point of this long winded example is that we always have
>>>>>>>>>>> to be
>>>>>>>>>>> thinking about all of the possible ways it could be misused by a
>>>>>> user
>>>>>>>>>>> (causing them to see hard to diagnose problems).
>>>>>>>>>>>
>>>>>>>>>> Exactly! That is the point of the discussion really.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> In my attempt at a couple methods with builders I feel that I
>>>>>>>>>>> could
>>>>>>>>>>> confidently say the user couldn’t really mess it up.
>>>>>>>>>>>> // Count
>>>>>>>>>>>> KTable<String, Long> count =
>>>>>>>>>>>>
>>>>>>> kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
>>>>>>>
>>>>>>>
>>>>>>>>>>> The kGroupedStream is reusable and if they attempted to reuse
>>>>>>>>>>> the
>>>>>>> Count
>>>>>>>>>>> for some reason it would throw an error message saying that a
>>>>>>>>>>> store
>>>>>>>>> named
>>>>>>>>>>> “my-store” already exists.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> Yes i agree and i think using builders is my preferred pattern.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Damian
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Kyle
>>>>>>>>>>>
>>>>>>>>>>> From: Damian Guy
>>>>>>>>>>> Sent: Thursday, June 29, 2017 3:59 AM
>>>>>>>>>>> To: d...@kafka.apache.org
>>>>>>>>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>>>>>>>>>>>
>>>>>>>>>>> Hi Kyle,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your input. Really appreciated.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <
>>>>>> winkelman.k...@gmail.com
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I like more of a builder pattern even though others have voiced
>>>>>>> against
>>>>>>>>>>>> it. The reason I like it is because it makes it clear to the
>>>>>>>>>>>> user
>>>>>>> that
>>>>>>>>> a
>>>>>>>>>>>> call to KGroupedStream#count will return a KTable not some
>>>>>>> intermediate
>>>>>>>>>>>> class that I need to undetstand.
>>>>>>>>>>>>
>>>>>>>>>>> Yes, that makes sense.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> When trying to program in the fluent API that has been
>>>>>>>>>>>> discussed
>>>>>> most
>>>>>>>>> it
>>>>>>>>>>>> feels difficult to know when you will actually get an object
>>>>>>>>>>>> you
>>>>>> can
>>>>>>>>>>> reuse.
>>>>>>>>>>>> What if I make one KGroupedStream that I want to reuse, is it
>>>>>> legal
>>>>>>> to
>>>>>>>>>>>> reuse it or does this approach expect you to call grouped each
>>>>>> time?
>>>>>>>>>>> I'd anticipate that once you have a KGroupedStream you can
>>>>>>>>>>> re-use
>>>>>> it
>>>>>>> as
>>>>>>>>> you
>>>>>>>>>>> can today.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> This question doesn’t pop into my head at all in the builder
>>>>>> pattern
>>>>>>> I
>>>>>>>>>>>> assume I can reuse everything.
>>>>>>>>>>>> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a
>>>>>>>>>>>> big
>>>>>>> fan
>>>>>>>>> of
>>>>>>>>>>>> the grouped.
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, grouped() was more for demonstration and because groupBy()
>>>>>> and
>>>>>>>>>>> groupByKey() were taken! So i'd imagine the api would actually
>>>>>> want to
>>>>>>>>> be
>>>>>>>>>>> groupByKey(/** no required args***/).withOptionalArg() and
>>>>>>>>>>> groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this
>>>>>>>>>>> all
>>>>>>>>> depends
>>>>>>>>>>> on maintaining backward compatibility.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately, the below approach would require atleast 2
>>>>>> (probably
>>>>>>> 3)
>>>>>>>>>>>> overloads (one for returning a KTable and one for returning a
>>>>>> KTable
>>>>>>>>> with
>>>>>>>>>>>> Windowed Key, probably would want to split windowed and
>>>>>>> sessionwindowed
>>>>>>>>>>> for
>>>>>>>>>>>> ease of implementation) of each count, reduce, and aggregate.
>>>>>>>>>>>> Obviously not exhaustive but enough for you to get the picture.
>>>>>>> Count,
>>>>>>>>>>>> Reduce, and Aggregate supply 3 static methods to initialize the
>>>>>>>>> builder:
>>>>>>>>>>>> // Count
>>>>>>>>>>>> KTable<String, Long> count =
>>>>>>>>>>>>
>>>>>>> groupedStream.count(Count.count().withQueryableStoreName("my-store"));
>>>>>>>
>>>>>>>>>>>> // Windowed Count
>>>>>>>>>>>> KTable<Windowed<String>, Long> windowedCount =
>>>>>>>>>>>>
>>>>>>> groupedStream.count(Count.windowed(TimeWindows.of(10L).until
>>>>>> (10)).withQueryableStoreName("my-windowed-store"));
>>>>>>>>>>>> // Session Count
>>>>>>>>>>>> KTable<Windowed<String>, Long> sessionCount =
>>>>>>>>>>>>
>>>>>>> groupedStream.count(Count.sessionWindowed(SessionWindows.
>>>>>> with(10L)).withQueryableStoreName("my-session-windowed-store"));
>>>>>>>>>>> Above and below, i think i'd prefer it to be:
>>>>>>>>>>> groupedStream.count(/** non windowed count**/)
>>>>>>>>>>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
>>>>>>>>>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> // Reduce
>>>>>>>>>>>> Reducer<Long> reducer;
>>>>>>>>>>>> KTable<String, Long> reduce = groupedStream.reduce(reducer,
>>>>>>>>>>>> Reduce.reduce().withQueryableStoreName("my-store"));
>>>>>>>>>>>>
>>>>>>>>>>>> // Aggregate Windowed with Custom Store
>>>>>>>>>>>> Initializer<String> initializer;
>>>>>>>>>>>> Aggregator<String, Long, String> aggregator;
>>>>>>>>>>>> KTable<Windowed<String>, String> aggregate =
>>>>>>>>>>>> groupedStream.aggregate(initializer, aggregator,
>>>>>>>>>>>>
>>>>>>> Aggregate.windowed(TimeWindows.of(10L).until(10)).
>>>>>> withStateStoreSupplier(stateStoreSupplier)));
>>>>>>>>>>>> // Cogroup SessionWindowed
>>>>>>>>>>>> KTable<String, String> cogrouped =
>>>>>>> groupedStream1.cogroup(aggregator1)
>>>>>>>>>>>>            .cogroup(groupedStream2, aggregator2)
>>>>>>>>>>>>            .aggregate(initializer, aggregator,
>>>>>>>>>>>> Aggregate.sessionWindowed(SessionWindows.with(10L),
>>>>>>>>>>>> sessionMerger).withQueryableStoreName("my-store"));
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> public class Count {
>>>>>>>>>>>>
>>>>>>>>>>>>        public static class Windowed extends Count {
>>>>>>>>>>>>            private Windows windows;
>>>>>>>>>>>>        }
>>>>>>>>>>>>        public static class SessionWindowed extends Count {
>>>>>>>>>>>>            private SessionWindows sessionWindows;
>>>>>>>>>>>>        }
>>>>>>>>>>>>
>>>>>>>>>>>>        public static Count count();
>>>>>>>>>>>>        public static Windowed windowed(Windows windows);
>>>>>>>>>>>>        public static SessionWindowed
>>>>>>>>>>>> sessionWindowed(SessionWindows
>>>>>>>>>>>> sessionWindows);
>>>>>>>>>>>>
>>>>>>>>>>>>        // All withXXX(...) methods.
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> public class KGroupedStream {
>>>>>>>>>>>>        public KTable<K, Long> count(Count count);
>>>>>>>>>>>>        public KTable<Windowed<K>, Long> count(Count.Windowed
>>>>>>>>>>>> count);
>>>>>>>>>>>>        public KTable<Windowed<K>, Long>
>>>>>>>>>>>> count(Count.SessionWindowed
>>>>>>>>> count);
>>>>>>>>>>>> …
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Kyle
>>>>>>>>>>>>
>>>>>>>>>>>> From: Guozhang Wang
>>>>>>>>>>>> Sent: Wednesday, June 28, 2017 7:45 PM
>>>>>>>>>>>> To: d...@kafka.apache.org
>>>>>>>>>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>>>>>>>>>>>>
>>>>>>>>>>>> I played the current proposal a bit with
>>>>>>>>> https://github.com/dguy/kafka/
>>>>>>>>>>>> tree/dsl-experiment <
>>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment
>>>>>>>>>> ,
>>>>>>>>>>>> and here are my observations:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Personally I prefer
>>>>>>>>>>>>
>>>>>>>>>>>>        "stream.group(mapper) / stream.groupByKey()"
>>>>>>>>>>>>
>>>>>>>>>>>> than
>>>>>>>>>>>>
>>>>>>>>>>>>        "stream.group().withKeyMapper(mapper) / stream.group()"
>>>>>>>>>>>>
>>>>>>>>>>>> Since 1) withKeyMapper is not enforced programmatically
>>>>>>>>>>>> though it
>>>>>> is
>>>>>>>>> not
>>>>>>>>>>>> "really" optional like others, 2) syntax-wise it reads more
>>>>>> natural.
>>>>>>>>>>>> I think it is okay to add the APIs in (
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>> https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr
>>>>>> c/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
>>>>>>>>>>>> )
>>>>>>>>>>>> in KGroupedStream.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2. For the "withStateStoreSupplier" API, are the user
>>>>>>>>>>>> supposed to
>>>>>>> pass
>>>>>>>>> in
>>>>>>>>>>>> the most-inner state store supplier (e.g. then one whose get()
>>>>>> return
>>>>>>>>>>>> RocksDBStore), or it is supposed to return the most-outer
>>>>>>>>>>>> supplier
>>>>>>> with
>>>>>>>>>>>> logging / metrics / etc? I think it would be more useful to
>>>>>>>>>>>> only
>>>>>>>>> require
>>>>>>>>>>>> users pass in the inner state store supplier while specifying
>>>>>>> caching /
>>>>>>>>>>>> logging through other APIs.
>>>>>>>>>>>>
>>>>>>>>>>>> In addition, the "GroupedWithCustomStore" is a bit
>>>>>>>>>>>> suspicious to
>>>>>> me:
>>>>>>> we
>>>>>>>>>>> are
>>>>>>>>>>>> allowing users to call other APIs like "withQueryableName"
>>>>>> multiple
>>>>>>>>> time,
>>>>>>>>>>>> but only call "withStateStoreSupplier" only once in the end.
>>>>>>>>>>>> Why
>>>>>> is
>>>>>>>>> that?
>>>>>>>>>>>> 3. The current DSL seems to be only for aggregations, what
>>>>>>>>>>>> about
>>>>>>> joins?
>>>>>>>>>>>> 4. I think it is okay to keep the "withLogConfig": for the
>>>>>>>>>>>> StateStoreSupplier it will still be user code specifying the
>>>>>> topology
>>>>>>>>> so
>>>>>>>>>>> I
>>>>>>>>>>>> do not see there is a big difference.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 5. "WindowedGroupedStream" 's withStateStoreSupplier should
>>>>>>>>>>>> take
>>>>>> the
>>>>>>>>>>>> windowed state store supplier to enforce typing?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Below are minor ones:
>>>>>>>>>>>>
>>>>>>>>>>>> 6. "withQueryableName": maybe better "withQueryableStateName"?
>>>>>>>>>>>>
>>>>>>>>>>>> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <
>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I see your point about "when to add the processor to the
>>>>>> topology".
>>>>>>>>>>> That
>>>>>>>>>>>>> is indeed an issue. Not sure it we could allow "updates" to
>>>>>>>>>>>>> the
>>>>>>>>>>>> topology...
>>>>>>>>>>>>> I don't see any problem with having all the withXX() in KTable
>>>>>>>>>>> interface
>>>>>>>>>>>>> -- but this might be subjective.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> However, I don't understand your argument about putting
>>>>>> aggregate()
>>>>>>>>>>>>> after the withXX() -- all the calls to withXX() set optional
>>>>>>>>> parameters
>>>>>>>>>>>>> for aggregate() and not for groupBy() -- but a
>>>>>>>>>>>>> groupBy().withXX()
>>>>>>>>>>>>> indicates that the withXX() belongs to the groupBy(). IMHO,
>>>>>>>>>>>>> this
>>>>>>> might
>>>>>>>>>>>>> be quite confusion for developers.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 6/28/17 2:55 AM, Damian Guy wrote:
>>>>>>>>>>>>>>> I also think that mixing optional parameters with configs
>>>>>>>>>>>>>>> is a
>>>>>> bad
>>>>>>>>>>>> idea.
>>>>>>>>>>>>>>> Have not proposal for this atm but just wanted to mention
>>>>>>>>>>>>>>> it.
>>>>>> Hope
>>>>>>>>>>> to
>>>>>>>>>>>>>>> find some time to come up with something.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, i don't like the mix of config either. But the only real
>>>>>>> config
>>>>>>>>>>>> here
>>>>>>>>>>>>>> is the logging config - which we don't really need as it can
>>>>>>> already
>>>>>>>>>>> be
>>>>>>>>>>>>>> done via a custom StateStoreSupplier.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What I don't like in the current proposal is the
>>>>>>>>>>>>>>> .grouped().withKeyMapper() -- the current solution with
>>>>>>>>>>> .groupBy(...)
>>>>>>>>>>>>>>> and .groupByKey() seems better. For clarity, we could
>>>>>>>>>>>>>>> rename to
>>>>>>>>>>>>>>> .groupByNewKey(...) and .groupByCurrentKey() (even if we
>>>>>>>>>>>>>>> should
>>>>>>> find
>>>>>>>>>>>>>>> some better names).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> it could be groupByKey(), groupBy() or something different bt
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The proposed pattern "chains" grouping and aggregation too
>>>>>> close
>>>>>>>>>>>>>>> together. I would rather separate both more than less,
>>>>>>>>>>>>>>> ie, do
>>>>>> into
>>>>>>>>>>> the
>>>>>>>>>>>>>>> opposite direction.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am also wondering, if we could so something more "fluent".
>>>>>> The
>>>>>>>>>>>> initial
>>>>>>>>>>>>>>> proposal was like:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> groupedStream.count()
>>>>>>>>>>>>>>>>>       .withStoreName("name")
>>>>>>>>>>>>>>>>>       .withCachingEnabled(false)
>>>>>>>>>>>>>>>>>       .withLoggingEnabled(config)
>>>>>>>>>>>>>>>>>       .table()
>>>>>>>>>>>>>>> The .table() statement in the end was kinda alien.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree, but then all of the withXXX methods need to be on
>>>>>> KTable
>>>>>>>>>>> which
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> worse in my opinion. You also need something that is going to
>>>>>>> "build"
>>>>>>>>>>>> the
>>>>>>>>>>>>>> internal processors and add them to the topology.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The current proposal put the count() into the end -- ie, the
>>>>>>>>>>> optional
>>>>>>>>>>>>>>> parameter for count() have to specified on the .grouped()
>>>>>>>>>>>>>>> call
>>>>>> -- 
>>>>>>>>>>> this
>>>>>>>>>>>>>>> does not seems to be the best way either.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I actually prefer this method as you are building a grouped
>>>>>> stream
>>>>>>>>>>> that
>>>>>>>>>>>>> you
>>>>>>>>>>>>>> will aggregate. So
>>>>>>>>>>> table.grouped(...).withOptionalStuff().aggregate(..)
>>>>>>>>>>>>> etc
>>>>>>>>>>>>>> seems natural to me.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I did not think this through in detail, but can't we just do
>>>>>> the
>>>>>>>>>>>> initial
>>>>>>>>>>>>>>> proposal with the .table() ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> groupedStream.count().withStoreName("name").mapValues(...)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Each .withXXX(...) return the current KTable and all the
>>>>>>> .withXXX()
>>>>>>>>>>>> are
>>>>>>>>>>>>>>> just added to the KTable interface. Or do I miss anything
>>>>>>>>>>>>>>> why
>>>>>> this
>>>>>>>>>>>> wont'
>>>>>>>>>>>>>>> work or any obvious disadvantage?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> See above.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 6/22/17 4:06 AM, Damian Guy wrote:
>>>>>>>>>>>>>>>> Thanks everyone. My latest attempt is below. It builds
>>>>>>>>>>>>>>>> on the
>>>>>>>>>>> fluent
>>>>>>>>>>>>>>>> approach, but i think it is slightly nicer.
>>>>>>>>>>>>>>>> I agree with some of what Eno said about mixing configy
>>>>>>>>>>>>>>>> stuff
>>>>>> in
>>>>>>>>>>> the
>>>>>>>>>>>>> DSL,
>>>>>>>>>>>>>>>> but i think that enabling caching and enabling logging are
>>>>>> things
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> aren't actually config. I'd probably not add
>>>>>> withLogConfig(...)
>>>>>>>>>>> (even
>>>>>>>>>>>>>>>> though it is below) as this is actually config and we
>>>>>>>>>>>>>>>> already
>>>>>>> have
>>>>>>>>>>> a
>>>>>>>>>>>>> way
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> doing that, via the StateStoreSupplier. Arguably we
>>>>>>>>>>>>>>>> could use
>>>>>> the
>>>>>>>>>>>>>>>> StateStoreSupplier for disabling caching etc, but as it
>>>>>>>>>>>>>>>> stands
>>>>>>> that
>>>>>>>>>>>> is
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> bit of a tedious process for someone that just wants to use
>>>>>> the
>>>>>>>>>>>> default
>>>>>>>>>>>>>>>> storage engine, but not have caching enabled.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> There is also an orthogonal concern that Guozhang alluded
>>>>>> to....
>>>>>>> If
>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> want to plug in a custom storage engine and you want it
>>>>>>>>>>>>>>>> to be
>>>>>>>>>>> logged
>>>>>>>>>>>>> etc,
>>>>>>>>>>>>>>>> you would currently need to implement that yourself.
>>>>>>>>>>>>>>>> Ideally
>>>>>> we
>>>>>>> can
>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>> a way where we will wrap the custom store with logging,
>>>>>> metrics,
>>>>>>>>>>>> etc. I
>>>>>>>>>>>>>>>> need to think about where this fits, it is probably more
>>>>>>>>>>> appropriate
>>>>>>>>>>>> on
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> Stores API.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> final KeyValueMapper<String, String, Long> keyMapper =
>>>>>>>>>>>>>>>> null;
>>>>>>>>>>>>>>>> // count with mapped key
>>>>>>>>>>>>>>>> final KTable<Long, Long> count = stream.grouped()
>>>>>>>>>>>>>>>>            .withKeyMapper(keyMapper)
>>>>>>>>>>>>>>>>            .withKeySerde(Serdes.Long())
>>>>>>>>>>>>>>>>            .withValueSerde(Serdes.String())
>>>>>>>>>>>>>>>>            .withQueryableName("my-store")
>>>>>>>>>>>>>>>>            .count();
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // windowed count
>>>>>>>>>>>>>>>> final KTable<Windowed<String>, Long> windowedCount =
>>>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>>>            .withQueryableName("my-window-store")
>>>>>>>>>>>>>>>>            .windowed(TimeWindows.of(10L).until(10))
>>>>>>>>>>>>>>>>            .count();
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // windowed reduce
>>>>>>>>>>>>>>>> final Reducer<String> windowedReducer = null;
>>>>>>>>>>>>>>>> final KTable<Windowed<String>, String> windowedReduce =
>>>>>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>>>            .withQueryableName("my-window-store")
>>>>>>>>>>>>>>>>            .windowed(TimeWindows.of(10L).until(10))
>>>>>>>>>>>>>>>>            .reduce(windowedReducer);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> final Aggregator<String, String, Long> aggregator = null;
>>>>>>>>>>>>>>>> final Initializer<Long> init = null;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // aggregate
>>>>>>>>>>>>>>>> final KTable<String, Long> aggregate = stream.grouped()
>>>>>>>>>>>>>>>>            .withQueryableName("my-aggregate-store")
>>>>>>>>>>>>>>>>            .aggregate(aggregator, init, Serdes.Long());
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> final StateStoreSupplier<KeyValueStore<String, Long>>
>>>>>>>>>>>>> stateStoreSupplier
>>>>>>>>>>>>>>> = null;
>>>>>>>>>>>>>>>> // aggregate with custom store
>>>>>>>>>>>>>>>> final KTable<String, Long> aggWithCustomStore =
>>>>>> stream.grouped()
>>>>>>>>>>>>>>>>            .withStateStoreSupplier(stateStoreSupplier)
>>>>>>>>>>>>>>>>            .aggregate(aggregator, init);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // disable caching
>>>>>>>>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>>>            .withQueryableName("name")
>>>>>>>>>>>>>>>>            .withCachingEnabled(false)
>>>>>>>>>>>>>>>>            .count();
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // disable logging
>>>>>>>>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>>>            .withQueryableName("q")
>>>>>>>>>>>>>>>>            .withLoggingEnabled(false)
>>>>>>>>>>>>>>>>            .count();
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // override log config
>>>>>>>>>>>>>>>> final Reducer<String> reducer = null;
>>>>>>>>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>>>            .withLogConfig(Collections.sin
>>>>>> gletonMap("segment.size",
>>>>>>>>>>>> "10"))
>>>>>>>>>>>>>>>>            .reduce(reducer);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If anyone wants to play around with this you can find the
>>>>>>>>>>>>>>>> code
>>>>>>>>>>> here:
>>>>>>>>>>>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Note: It won't actually work as most of the methods just
>>>>>> return
>>>>>>>>>>> null.
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma
>>>>>>>>>>>>>>>> <ism...@juma.me.uk>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Thanks Damian. I think both options have pros and cons.
>>>>>>>>>>>>>>>>> And
>>>>>> both
>>>>>>>>>>> are
>>>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>>> than overload abuse.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The fluent API approach reads better, no mention of
>>>>>>>>>>>>>>>>> builder
>>>>>> or
>>>>>>>>>>> build
>>>>>>>>>>>>>>>>> anywhere. The main downside is that the method signatures
>>>>>> are a
>>>>>>>>>>>> little
>>>>>>>>>>>>>>> less
>>>>>>>>>>>>>>>>> clear. By reading the method signature, one doesn't
>>>>>> necessarily
>>>>>>>>>>>> knows
>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>> it returns. Also, one needs to figure out the special
>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>> (`table()`
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> this case) that gives you what you actually care about
>>>>>> (`KTable`
>>>>>>>>>>> in
>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> case). Not major issues, but worth mentioning while doing
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> comparison.
>>>>>>>>>>>>>>>>> The builder approach avoids the issues mentioned above,
>>>>>>>>>>>>>>>>> but
>>>>>> it
>>>>>>>>>>>> doesn't
>>>>>>>>>>>>>>> read
>>>>>>>>>>>>>>>>> as well.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ismael
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <
>>>>>>> damian....@gmail.com
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I'd like to get a discussion going around some of the API
>>>>>>> choices
>>>>>>>>>>>>> we've
>>>>>>>>>>>>>>>>>> made in the DLS. In particular those that relate to
>>>>>>>>>>>>>>>>>> stateful
>>>>>>>>>>>>> operations
>>>>>>>>>>>>>>>>>> (though this could expand).
>>>>>>>>>>>>>>>>>> As it stands we lean heavily on overloaded methods in the
>>>>>> API,
>>>>>>>>>>> i.e,
>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>> are 9 overloads for KGroupedStream.count(..)! It is
>>>>>>>>>>>>>>>>>> becoming
>>>>>>>>>>> noisy
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> i
>>>>>>>>>>>>>>>>>> feel it is only going to get worse as we add more
>>>>>>>>>>>>>>>>>> optional
>>>>>>>>>>> params.
>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>>> particular we've had some requests to be able to turn
>>>>>> caching
>>>>>>>>>>> off,
>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>> change log configs,  on a per operator basis (note
>>>>>>>>>>>>>>>>>> this can
>>>>>> be
>>>>>>>>>>> done
>>>>>>>>>>>>> now
>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>> you pass in a StateStoreSupplier, but this can be a bit
>>>>>>>>>>>> cumbersome).
>>>>>>>>>>>>>>>>>> So this is a bit of an open question. How can we
>>>>>>>>>>>>>>>>>> change the
>>>>>> DSL
>>>>>>>>>>>>>>> overloads
>>>>>>>>>>>>>>>>>> so that it flows, is simple to use and understand, and is
>>>>>>> easily
>>>>>>>>>>>>>>> extended
>>>>>>>>>>>>>>>>>> in the future?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> One option would be to use a fluent API approach for
>>>>>> providing
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> optional
>>>>>>>>>>>>>>>>>> params, so something like this:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> groupedStream.count()
>>>>>>>>>>>>>>>>>>       .withStoreName("name")
>>>>>>>>>>>>>>>>>>       .withCachingEnabled(false)
>>>>>>>>>>>>>>>>>>       .withLoggingEnabled(config)
>>>>>>>>>>>>>>>>>>       .table()
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Another option would be to provide a Builder to the count
>>>>>>> method,
>>>>>>>>>>>> so
>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>> would look something like this:
>>>>>>>>>>>>>>>>>> groupedStream.count(new
>>>>>>>>>>>>>>>>>> CountBuilder("storeName").with
>>>>>> CachingEnabled(false).build())
>>>>>>>>>>>>>>>>>> Another option is to say: Hey we don't need this, what
>>>>>>>>>>>>>>>>>> are
>>>>>> you
>>>>>>> on
>>>>>>>>>>>>>>> about!
>>>>>>>>>>>>>>>>>> The above has focussed on state store related overloads,
>>>>>>>>>>>>>>>>>> but
>>>>>>> the
>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>> ideas
>>>>>>>>>>>>>>>>>> could  be applied to joins etc, where we presently have
>>>>>>>>>>>>>>>>>> many
>>>>>>> join
>>>>>>>>>>>>>>> methods
>>>>>>>>>>>>>>>>>> and many overloads.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Anyway, i look forward to hearing your opinions.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>> -- 
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>>>>> -- 
>>>>> -- Guozhang
>>>>>
>>>>
>>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to