Hi Jan, Thanks very much for the input.
On Tue, 4 Jul 2017 at 08:54 Jan Filipiak <jan.filip...@trivago.com> wrote: > Hi Damian, > > I do see your point of something needs to change. But I fully agree with > Gouzhang when he says. > --- > > But since this is a incompatibility change, and we are going to remove the > compatibility annotations soon it means we only have one chance and we > really have to make it right. > ---- > > I think we all agree on this one! Hence the discussion. > I fear all suggestions do not go far enough to become something that will > carry on for very much longer. > I am currently working on KAFKA-3705 and try to find the most easy way for > the user to give me all the required functionality. The easiest interface I > could come up so far can be looked at here. > > > https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L622 > > And its already horribly complicated. I am currently unable to find the > right abstraction level to have everything falling into place naturally. To > be honest I already think introducing > > To be fair that is not a particularly easy problem to solve! > > https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L493 > > was unideal and makes everything a mess. I'm not sure i agree that it makes everything a mess, but It could have been done differently. The JoinType:Whatever is also not really flexible. 2 things come to my mind: > > 1. I don't think we should rule out config based decisions say configs like > streams.$applicationID.joins.$joinname.conf = value > Is this just for config? Or are you suggesting that we could somehow "code" the join in a config file? > This can allow for tremendous changes without single API change and IMO it > was not considered enough yet. > > 2. Push logic from the DSL to the Callback classes. A ValueJoiner for > example can be used to implement different join types as the user wishes. > Do you have an example of how this might look? > As Gouzhang said: stopping to break users is very important. Of course. We want to make it as easy as possible for people to use streams. especially with this changes + All the plans I sadly only have in my head > but hopefully the first link can give a glimpse. > > Thanks for preparing the examples made it way clearer to me what exactly > we are talking about. I would argue to go a bit slower and more carefull on > this one. At some point we need to get it right. Peeking over to the hadoop > guys with their hughe userbase. Config files really work well for them. > > Best Jan > > > > > > On 30.06.2017 09:31, Damian Guy wrote: > > Thanks Matthias > > > > On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <matth...@confluent.io> > wrote: > > > >> I am just catching up on this thread, so sorry for the long email in > >> advance... Also, it's to some extend a dump of thoughts and not always a > >> clear proposal. Still need to think about this in more detail. But maybe > >> it helps other to get new ideas :) > >> > >> > >>>> However, I don't understand your argument about putting aggregate() > >>>> after the withXX() -- all the calls to withXX() set optional > parameters > >>>> for aggregate() and not for groupBy() -- but a groupBy().withXX() > >>>> indicates that the withXX() belongs to the groupBy(). IMHO, this might > >>>> be quite confusion for developers. > >>>> > >>>> > >>> I see what you are saying, but the grouped stream is effectively a > no-op > >>> until you call one of the aggregate/count/reduce etc functions. So the > >>> optional params are ones that are applicable to any of the operations > you > >>> can perform on this grouped stream. Then the final > >>> count()/reduce()/aggregate() call has any of the params that are > >>> required/specific to that function. > >>> > >> I understand your argument, but you don't share the conclusion. If we > >> need a "final/terminal" call, the better way might be > >> > >> .groupBy().count().withXX().build() > >> > >> (with a better name for build() though) > >> > >> > > The point is that all the other calls, i.e,withBlah, windowed, etc apply > > too all the aggregate functions. The terminal call being the actual type > of > > aggregation you want to do. I personally find this more natural than > > groupBy().count().withBlah().build() > > > > > >>> groupedStream.count(/** non windowed count**/) > >>> groupedStream.windowed(TimeWindows.of(10L)).count(...) > >>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) > >> > >> I like this. However, I don't see a reason to have windowed() and > >> sessionWindowed(). We should have one top-level `Windows` interface that > >> both `TimeWindows` and `SessionWindows` implement and just have a single > >> windowed() method that accepts all `Windows`. (I did not like the > >> separation of `SessionWindows` in the first place, and this seems to be > >> an opportunity to clean this up. It was hard to change when we > >> introduced session windows) > >> > > Yes - true we should look into that. > > > > > >> Btw: we do you the imperative groupBy() and groupByKey(), and thus we > >> might also want to use windowBy() (instead of windowed()). Not sure how > >> important this is, but it seems to be inconsistent otherwise. > >> > >> > > Makes sense > > > > > >> About joins: I don't like .withJoinType(JoinType.LEFT) at all. I think, > >> defining an inner/left/outer join is not an optional argument but a > >> first class concept and should have a proper representation in the API > >> (like the current methods join(), leftJoin, outerJoin()). > >> > >> > > Yep, i did originally have it as a required param and maybe that is what > we > > go with. It could have a default, but maybe that is confusing. > > > > > > > >> About the two join API proposals, the second one has too much boiler > >> plate code for my taste. Also, the actual join() operator has only one > >> argument what is weird to me, as in my thinking process, the main > >> operator call, should have one parameter per mandatory argument but your > >> proposal put the mandatory arguments into Joins.streamStreamJoin() call. > >> This is far from intuitive IMHO. > >> > >> > > This is the builder pattern, you only need one param as the builder has > > captured all of the required and optional arguments. > > > > > >> The first join proposal also seems to align better with the pattern > >> suggested for aggregations and having the same pattern for all operators > >> is important (as you stated already). > >> > >> > > This is why i offered two alternatives as i started out with. 1 is the > > builder pattern, the other is the more fluent pattern. > > > > > >> > >> Coming back to the config vs optional parameter. What about having a > >> method withConfig[s](...) that allow to put in the configuration? > >> > >> > > Sure, it is currently called withLogConfig() as that is the only thing > that > > is really config. > > > > > >> This also raises the question if until() is a windows property? > >> Actually, until() seems to be a configuration parameter and thus, should > >> not not have it's own method. > >> > >> > > Hmmm, i don't agree. Until is a property of the window. It is going to be > > potentially different for every window operation you do in a streams app. > > > > > >> > >> Browsing throw your example DSL branch, I also saw this one: > >> > >>> final KTable<Windowed<String>, Long> windowed> > >> groupedStream.counting() > >>> .windowed(TimeWindows.of(10L).until(10)) > >>> .table(); > >> This is an interesting idea, and it remind my on some feedback about "I > >> wanted to count a stream, but there was no count() method -- I first > >> needed to figure out, that I need to group the stream first to be able > >> to count it. It does make sense in hindsight but was not obvious in the > >> beginning". Thus, carrying out this thought, we could also do the > >> following: > >> > >> stream.count().groupedBy().windowedBy().table(); > >> > >> -> Note, I use "grouped" and "windowed" instead of imperative here, as > >> it comes after the count() > >> > >> This would be more consistent than your proposal (that has grouping > >> before but windowing after count()). It might even allow us to enrich > >> the API with a some syntactic sugar like `stream.count().table()` to get > >> the overall count of all records (this would obviously not scale, but we > >> could support it -- if not now, maybe later). > >> > >> > > I guess i'd prefer > > stream.groupBy().windowBy().count() > > stream.groupBy().windowBy().reduce() > > stream.groupBy().count() > > > > As i said above, everything that happens before the final aggregate call > > can be applied to any of them. So it makes sense to me to do those things > > ahead of the final aggregate call. > > > > > >> Last about builder pattern. I am convinced that we need some "terminal" > >> operator/method that tells us when to add the processor to the topology. > >> But I don't see the need for a plain builder pattern that feels alien to > >> me (see my argument about the second join proposal). Using .stream() / > >> .table() as use in many examples might work. But maybe a more generic > >> name that we can use in all places like build() or apply() might also be > >> an option. > >> > >> > > Sure, a generic name might be ok. > > > > > > > > > >> -Matthias > >> > >> > >> > >> On 6/29/17 7:37 AM, Damian Guy wrote: > >>> Thanks Kyle. > >>> > >>> On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <winkelman.k...@gmail.com> > >>> wrote: > >>> > >>>> Hi Damian, > >>>> > >>>>>>>> When trying to program in the fluent API that has been discussed > >> most > >>>> it > >>>>>>>> feels difficult to know when you will actually get an object you > can > >>>> reuse. > >>>>>>>> What if I make one KGroupedStream that I want to reuse, is it > legal > >> to > >>>>>>>> reuse it or does this approach expect you to call grouped each > time? > >>>>>> I'd anticipate that once you have a KGroupedStream you can re-use it > >> as > >>>> you > >>>>>> can today. > >>>> You said it yourself in another post that the grouped stream is > >>>> effectively a no-op until a count, reduce, or aggregate. The way I see > >> it > >>>> you wouldn’t be able to reuse anything except KStreams and KTables, > >> because > >>>> most of this fluent api would continue returning this (this being the > >>>> builder object currently being manipulated). > >>> So, if you ever store a reference to anything but KStreams and KTables > >> and > >>>> you use it in two different ways then its possible you make > conflicting > >>>> withXXX() calls on the same builder. > >>>> > >>>> > >>> No necessarily true. It could return a new instance of the builder, > i.e., > >>> the builders being immutable. So if you held a reference to the builder > >> it > >>> would always be the same as it was when it was created. > >>> > >>> > >>>> GroupedStream<K,V> groupedStreamWithDefaultSerdes = kStream.grouped(); > >>>> GroupedStream<K,V> groupedStreamWithDeclaredSerdes = > >>>> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…); > >>>> > >>>> I’ll admit that this shouldn’t happen but some user is going to do it > >>>> eventually… > >>>> Depending on implementation uses of groupedStreamWithDefaultSerdes > would > >>>> most likely be equivalent to the version withDeclaredSerdes. One work > >>>> around would be to always make copies of the config objects you are > >>>> building, but this approach has its own problem because now we have to > >>>> identify which configs are equivalent so we don’t create repeated > >>>> processors. > >>>> > >>>> The point of this long winded example is that we always have to be > >>>> thinking about all of the possible ways it could be misused by a user > >>>> (causing them to see hard to diagnose problems). > >>>> > >>> Exactly! That is the point of the discussion really. > >>> > >>> > >>>> In my attempt at a couple methods with builders I feel that I could > >>>> confidently say the user couldn’t really mess it up. > >>>>> // Count > >>>>> KTable<String, Long> count = > >>>>> > kGroupedStream.count(Count.count().withQueryableStoreName("my-store")); > >>>> The kGroupedStream is reusable and if they attempted to reuse the > Count > >>>> for some reason it would throw an error message saying that a store > >> named > >>>> “my-store” already exists. > >>>> > >>>> > >>> Yes i agree and i think using builders is my preferred pattern. > >>> > >>> Cheers, > >>> Damian > >>> > >>> > >>>> Thanks, > >>>> Kyle > >>>> > >>>> From: Damian Guy > >>>> Sent: Thursday, June 29, 2017 3:59 AM > >>>> To: d...@kafka.apache.org > >>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring > >>>> > >>>> Hi Kyle, > >>>> > >>>> Thanks for your input. Really appreciated. > >>>> > >>>> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <winkelman.k...@gmail.com > > > >>>> wrote: > >>>> > >>>>> I like more of a builder pattern even though others have voiced > against > >>>>> it. The reason I like it is because it makes it clear to the user > that > >> a > >>>>> call to KGroupedStream#count will return a KTable not some > intermediate > >>>>> class that I need to undetstand. > >>>>> > >>>> Yes, that makes sense. > >>>> > >>>> > >>>>> When trying to program in the fluent API that has been discussed most > >> it > >>>>> feels difficult to know when you will actually get an object you can > >>>> reuse. > >>>>> What if I make one KGroupedStream that I want to reuse, is it legal > to > >>>>> reuse it or does this approach expect you to call grouped each time? > >>>> > >>>> I'd anticipate that once you have a KGroupedStream you can re-use it > as > >> you > >>>> can today. > >>>> > >>>> > >>>>> This question doesn’t pop into my head at all in the builder pattern > I > >>>>> assume I can reuse everything. > >>>>> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big > fan > >> of > >>>>> the grouped. > >>>>> > >>>>> Yes, grouped() was more for demonstration and because groupBy() and > >>>> groupByKey() were taken! So i'd imagine the api would actually want to > >> be > >>>> groupByKey(/** no required args***/).withOptionalArg() and > >>>> groupBy(KeyValueMapper m).withOpitionalArg(...) of course this all > >> depends > >>>> on maintaining backward compatibility. > >>>> > >>>> > >>>>> Unfortunately, the below approach would require atleast 2 (probably > 3) > >>>>> overloads (one for returning a KTable and one for returning a KTable > >> with > >>>>> Windowed Key, probably would want to split windowed and > sessionwindowed > >>>> for > >>>>> ease of implementation) of each count, reduce, and aggregate. > >>>>> Obviously not exhaustive but enough for you to get the picture. > Count, > >>>>> Reduce, and Aggregate supply 3 static methods to initialize the > >> builder: > >>>>> // Count > >>>>> KTable<String, Long> count = > >>>>> > groupedStream.count(Count.count().withQueryableStoreName("my-store")); > >>>>> > >>>>> // Windowed Count > >>>>> KTable<Windowed<String>, Long> windowedCount = > >>>>> > >> > groupedStream.count(Count.windowed(TimeWindows.of(10L).until(10)).withQueryableStoreName("my-windowed-store")); > >>>>> // Session Count > >>>>> KTable<Windowed<String>, Long> sessionCount = > >>>>> > >> > groupedStream.count(Count.sessionWindowed(SessionWindows.with(10L)).withQueryableStoreName("my-session-windowed-store")); > >>>>> > >>>> Above and below, i think i'd prefer it to be: > >>>> groupedStream.count(/** non windowed count**/) > >>>> groupedStream.windowed(TimeWindows.of(10L)).count(...) > >>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) > >>>> > >>>> > >>>> > >>>> > >>>>> // Reduce > >>>>> Reducer<Long> reducer; > >>>>> KTable<String, Long> reduce = groupedStream.reduce(reducer, > >>>>> Reduce.reduce().withQueryableStoreName("my-store")); > >>>>> > >>>>> // Aggregate Windowed with Custom Store > >>>>> Initializer<String> initializer; > >>>>> Aggregator<String, Long, String> aggregator; > >>>>> KTable<Windowed<String>, String> aggregate = > >>>>> groupedStream.aggregate(initializer, aggregator, > >>>>> > >> > Aggregate.windowed(TimeWindows.of(10L).until(10)).withStateStoreSupplier(stateStoreSupplier))); > >>>>> // Cogroup SessionWindowed > >>>>> KTable<String, String> cogrouped = > groupedStream1.cogroup(aggregator1) > >>>>> .cogroup(groupedStream2, aggregator2) > >>>>> .aggregate(initializer, aggregator, > >>>>> Aggregate.sessionWindowed(SessionWindows.with(10L), > >>>>> sessionMerger).withQueryableStoreName("my-store")); > >>>>> > >>>>> > >>>>> > >>>>> public class Count { > >>>>> > >>>>> public static class Windowed extends Count { > >>>>> private Windows windows; > >>>>> } > >>>>> public static class SessionWindowed extends Count { > >>>>> private SessionWindows sessionWindows; > >>>>> } > >>>>> > >>>>> public static Count count(); > >>>>> public static Windowed windowed(Windows windows); > >>>>> public static SessionWindowed sessionWindowed(SessionWindows > >>>>> sessionWindows); > >>>>> > >>>>> // All withXXX(...) methods. > >>>>> } > >>>>> > >>>>> public class KGroupedStream { > >>>>> public KTable<K, Long> count(Count count); > >>>>> public KTable<Windowed<K>, Long> count(Count.Windowed count); > >>>>> public KTable<Windowed<K>, Long> count(Count.SessionWindowed > >> count); > >>>>> … > >>>>> } > >>>>> > >>>>> > >>>>> Thanks, > >>>>> Kyle > >>>>> > >>>>> From: Guozhang Wang > >>>>> Sent: Wednesday, June 28, 2017 7:45 PM > >>>>> To: d...@kafka.apache.org > >>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring > >>>>> > >>>>> I played the current proposal a bit with > >> https://github.com/dguy/kafka/ > >>>>> tree/dsl-experiment < > https://github.com/dguy/kafka/tree/dsl-experiment > >>> , > >>>>> and here are my observations: > >>>>> > >>>>> 1. Personally I prefer > >>>>> > >>>>> "stream.group(mapper) / stream.groupByKey()" > >>>>> > >>>>> than > >>>>> > >>>>> "stream.group().withKeyMapper(mapper) / stream.group()" > >>>>> > >>>>> Since 1) withKeyMapper is not enforced programmatically though it is > >> not > >>>>> "really" optional like others, 2) syntax-wise it reads more natural. > >>>>> > >>>>> I think it is okay to add the APIs in ( > >>>>> > >>>>> > >> > https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java > >>>>> ) > >>>>> in KGroupedStream. > >>>>> > >>>>> > >>>>> 2. For the "withStateStoreSupplier" API, are the user supposed to > pass > >> in > >>>>> the most-inner state store supplier (e.g. then one whose get() return > >>>>> RocksDBStore), or it is supposed to return the most-outer supplier > with > >>>>> logging / metrics / etc? I think it would be more useful to only > >> require > >>>>> users pass in the inner state store supplier while specifying > caching / > >>>>> logging through other APIs. > >>>>> > >>>>> In addition, the "GroupedWithCustomStore" is a bit suspicious to me: > we > >>>> are > >>>>> allowing users to call other APIs like "withQueryableName" multiple > >> time, > >>>>> but only call "withStateStoreSupplier" only once in the end. Why is > >> that? > >>>>> > >>>>> 3. The current DSL seems to be only for aggregations, what about > joins? > >>>>> > >>>>> > >>>>> 4. I think it is okay to keep the "withLogConfig": for the > >>>>> StateStoreSupplier it will still be user code specifying the topology > >> so > >>>> I > >>>>> do not see there is a big difference. > >>>>> > >>>>> > >>>>> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the > >>>>> windowed state store supplier to enforce typing? > >>>>> > >>>>> > >>>>> Below are minor ones: > >>>>> > >>>>> 6. "withQueryableName": maybe better "withQueryableStateName"? > >>>>> > >>>>> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"? > >>>>> > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> > >>>>> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> I see your point about "when to add the processor to the topology". > >>>> That > >>>>>> is indeed an issue. Not sure it we could allow "updates" to the > >>>>> topology... > >>>>>> I don't see any problem with having all the withXX() in KTable > >>>> interface > >>>>>> -- but this might be subjective. > >>>>>> > >>>>>> > >>>>>> However, I don't understand your argument about putting aggregate() > >>>>>> after the withXX() -- all the calls to withXX() set optional > >> parameters > >>>>>> for aggregate() and not for groupBy() -- but a groupBy().withXX() > >>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this > might > >>>>>> be quite confusion for developers. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 6/28/17 2:55 AM, Damian Guy wrote: > >>>>>>>> I also think that mixing optional parameters with configs is a bad > >>>>> idea. > >>>>>>>> Have not proposal for this atm but just wanted to mention it. Hope > >>>> to > >>>>>>>> find some time to come up with something. > >>>>>>>> > >>>>>>>> > >>>>>>> Yes, i don't like the mix of config either. But the only real > config > >>>>> here > >>>>>>> is the logging config - which we don't really need as it can > already > >>>> be > >>>>>>> done via a custom StateStoreSupplier. > >>>>>>> > >>>>>>> > >>>>>>>> What I don't like in the current proposal is the > >>>>>>>> .grouped().withKeyMapper() -- the current solution with > >>>> .groupBy(...) > >>>>>>>> and .groupByKey() seems better. For clarity, we could rename to > >>>>>>>> .groupByNewKey(...) and .groupByCurrentKey() (even if we should > find > >>>>>>>> some better names). > >>>>>>>> > >>>>>>>> > >>>>>>> it could be groupByKey(), groupBy() or something different bt > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>>> The proposed pattern "chains" grouping and aggregation too close > >>>>>>>> together. I would rather separate both more than less, ie, do into > >>>> the > >>>>>>>> opposite direction. > >>>>>>>> > >>>>>>>> I am also wondering, if we could so something more "fluent". The > >>>>> initial > >>>>>>>> proposal was like: > >>>>>>>> > >>>>>>>>>> groupedStream.count() > >>>>>>>>>> .withStoreName("name") > >>>>>>>>>> .withCachingEnabled(false) > >>>>>>>>>> .withLoggingEnabled(config) > >>>>>>>>>> .table() > >>>>>>>> The .table() statement in the end was kinda alien. > >>>>>>>> > >>>>>>> I agree, but then all of the withXXX methods need to be on KTable > >>>> which > >>>>>> is > >>>>>>> worse in my opinion. You also need something that is going to > "build" > >>>>> the > >>>>>>> internal processors and add them to the topology. > >>>>>>> > >>>>>>> > >>>>>>>> The current proposal put the count() into the end -- ie, the > >>>> optional > >>>>>>>> parameter for count() have to specified on the .grouped() call -- > >>>> this > >>>>>>>> does not seems to be the best way either. > >>>>>>>> > >>>>>>>> > >>>>>>> I actually prefer this method as you are building a grouped stream > >>>> that > >>>>>> you > >>>>>>> will aggregate. So > >>>> table.grouped(...).withOptionalStuff().aggregate(..) > >>>>>> etc > >>>>>>> seems natural to me. > >>>>>>> > >>>>>>> > >>>>>>>> I did not think this through in detail, but can't we just do the > >>>>> initial > >>>>>>>> proposal with the .table() ? > >>>>>>>> > >>>>>>>> groupedStream.count().withStoreName("name").mapValues(...) > >>>>>>>> > >>>>>>>> Each .withXXX(...) return the current KTable and all the > .withXXX() > >>>>> are > >>>>>>>> just added to the KTable interface. Or do I miss anything why this > >>>>> wont' > >>>>>>>> work or any obvious disadvantage? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> See above. > >>>>>>> > >>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> On 6/22/17 4:06 AM, Damian Guy wrote: > >>>>>>>>> Thanks everyone. My latest attempt is below. It builds on the > >>>> fluent > >>>>>>>>> approach, but i think it is slightly nicer. > >>>>>>>>> I agree with some of what Eno said about mixing configy stuff in > >>>> the > >>>>>> DSL, > >>>>>>>>> but i think that enabling caching and enabling logging are things > >>>>> that > >>>>>>>>> aren't actually config. I'd probably not add withLogConfig(...) > >>>> (even > >>>>>>>>> though it is below) as this is actually config and we already > have > >>>> a > >>>>>> way > >>>>>>>> of > >>>>>>>>> doing that, via the StateStoreSupplier. Arguably we could use the > >>>>>>>>> StateStoreSupplier for disabling caching etc, but as it stands > that > >>>>> is > >>>>>> a > >>>>>>>>> bit of a tedious process for someone that just wants to use the > >>>>> default > >>>>>>>>> storage engine, but not have caching enabled. > >>>>>>>>> > >>>>>>>>> There is also an orthogonal concern that Guozhang alluded to.... > If > >>>>> you > >>>>>>>>> want to plug in a custom storage engine and you want it to be > >>>> logged > >>>>>> etc, > >>>>>>>>> you would currently need to implement that yourself. Ideally we > can > >>>>>>>> provide > >>>>>>>>> a way where we will wrap the custom store with logging, metrics, > >>>>> etc. I > >>>>>>>>> need to think about where this fits, it is probably more > >>>> appropriate > >>>>> on > >>>>>>>> the > >>>>>>>>> Stores API. > >>>>>>>>> > >>>>>>>>> final KeyValueMapper<String, String, Long> keyMapper = null; > >>>>>>>>> // count with mapped key > >>>>>>>>> final KTable<Long, Long> count = stream.grouped() > >>>>>>>>> .withKeyMapper(keyMapper) > >>>>>>>>> .withKeySerde(Serdes.Long()) > >>>>>>>>> .withValueSerde(Serdes.String()) > >>>>>>>>> .withQueryableName("my-store") > >>>>>>>>> .count(); > >>>>>>>>> > >>>>>>>>> // windowed count > >>>>>>>>> final KTable<Windowed<String>, Long> windowedCount = > >>>> stream.grouped() > >>>>>>>>> .withQueryableName("my-window-store") > >>>>>>>>> .windowed(TimeWindows.of(10L).until(10)) > >>>>>>>>> .count(); > >>>>>>>>> > >>>>>>>>> // windowed reduce > >>>>>>>>> final Reducer<String> windowedReducer = null; > >>>>>>>>> final KTable<Windowed<String>, String> windowedReduce = > >>>>>> stream.grouped() > >>>>>>>>> .withQueryableName("my-window-store") > >>>>>>>>> .windowed(TimeWindows.of(10L).until(10)) > >>>>>>>>> .reduce(windowedReducer); > >>>>>>>>> > >>>>>>>>> final Aggregator<String, String, Long> aggregator = null; > >>>>>>>>> final Initializer<Long> init = null; > >>>>>>>>> > >>>>>>>>> // aggregate > >>>>>>>>> final KTable<String, Long> aggregate = stream.grouped() > >>>>>>>>> .withQueryableName("my-aggregate-store") > >>>>>>>>> .aggregate(aggregator, init, Serdes.Long()); > >>>>>>>>> > >>>>>>>>> final StateStoreSupplier<KeyValueStore<String, Long>> > >>>>>> stateStoreSupplier > >>>>>>>> = null; > >>>>>>>>> // aggregate with custom store > >>>>>>>>> final KTable<String, Long> aggWithCustomStore = stream.grouped() > >>>>>>>>> .withStateStoreSupplier(stateStoreSupplier) > >>>>>>>>> .aggregate(aggregator, init); > >>>>>>>>> > >>>>>>>>> // disable caching > >>>>>>>>> stream.grouped() > >>>>>>>>> .withQueryableName("name") > >>>>>>>>> .withCachingEnabled(false) > >>>>>>>>> .count(); > >>>>>>>>> > >>>>>>>>> // disable logging > >>>>>>>>> stream.grouped() > >>>>>>>>> .withQueryableName("q") > >>>>>>>>> .withLoggingEnabled(false) > >>>>>>>>> .count(); > >>>>>>>>> > >>>>>>>>> // override log config > >>>>>>>>> final Reducer<String> reducer = null; > >>>>>>>>> stream.grouped() > >>>>>>>>> .withLogConfig(Collections.singletonMap("segment.size", > >>>>> "10")) > >>>>>>>>> .reduce(reducer); > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> If anyone wants to play around with this you can find the code > >>>> here: > >>>>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment > >>>>>>>>> > >>>>>>>>> Note: It won't actually work as most of the methods just return > >>>> null. > >>>>>>>>> Thanks, > >>>>>>>>> Damian > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> > >>>> wrote: > >>>>>>>>>> Thanks Damian. I think both options have pros and cons. And both > >>>> are > >>>>>>>> better > >>>>>>>>>> than overload abuse. > >>>>>>>>>> > >>>>>>>>>> The fluent API approach reads better, no mention of builder or > >>>> build > >>>>>>>>>> anywhere. The main downside is that the method signatures are a > >>>>> little > >>>>>>>> less > >>>>>>>>>> clear. By reading the method signature, one doesn't necessarily > >>>>> knows > >>>>>>>> what > >>>>>>>>>> it returns. Also, one needs to figure out the special method > >>>>>> (`table()` > >>>>>>>> in > >>>>>>>>>> this case) that gives you what you actually care about (`KTable` > >>>> in > >>>>>> this > >>>>>>>>>> case). Not major issues, but worth mentioning while doing the > >>>>>>>> comparison. > >>>>>>>>>> The builder approach avoids the issues mentioned above, but it > >>>>> doesn't > >>>>>>>> read > >>>>>>>>>> as well. > >>>>>>>>>> > >>>>>>>>>> Ismael > >>>>>>>>>> > >>>>>>>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy < > damian....@gmail.com > >>>>>>>> wrote: > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> I'd like to get a discussion going around some of the API > choices > >>>>>> we've > >>>>>>>>>>> made in the DLS. In particular those that relate to stateful > >>>>>> operations > >>>>>>>>>>> (though this could expand). > >>>>>>>>>>> As it stands we lean heavily on overloaded methods in the API, > >>>> i.e, > >>>>>>>> there > >>>>>>>>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming > >>>> noisy > >>>>>> and > >>>>>>>> i > >>>>>>>>>>> feel it is only going to get worse as we add more optional > >>>> params. > >>>>> In > >>>>>>>>>>> particular we've had some requests to be able to turn caching > >>>> off, > >>>>> or > >>>>>>>>>>> change log configs, on a per operator basis (note this can be > >>>> done > >>>>>> now > >>>>>>>>>> if > >>>>>>>>>>> you pass in a StateStoreSupplier, but this can be a bit > >>>>> cumbersome). > >>>>>>>>>>> So this is a bit of an open question. How can we change the DSL > >>>>>>>> overloads > >>>>>>>>>>> so that it flows, is simple to use and understand, and is > easily > >>>>>>>> extended > >>>>>>>>>>> in the future? > >>>>>>>>>>> > >>>>>>>>>>> One option would be to use a fluent API approach for providing > >>>> the > >>>>>>>>>> optional > >>>>>>>>>>> params, so something like this: > >>>>>>>>>>> > >>>>>>>>>>> groupedStream.count() > >>>>>>>>>>> .withStoreName("name") > >>>>>>>>>>> .withCachingEnabled(false) > >>>>>>>>>>> .withLoggingEnabled(config) > >>>>>>>>>>> .table() > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Another option would be to provide a Builder to the count > method, > >>>>> so > >>>>>> it > >>>>>>>>>>> would look something like this: > >>>>>>>>>>> groupedStream.count(new > >>>>>>>>>>> CountBuilder("storeName").withCachingEnabled(false).build()) > >>>>>>>>>>> > >>>>>>>>>>> Another option is to say: Hey we don't need this, what are you > on > >>>>>>>> about! > >>>>>>>>>>> The above has focussed on state store related overloads, but > the > >>>>> same > >>>>>>>>>> ideas > >>>>>>>>>>> could be applied to joins etc, where we presently have many > join > >>>>>>>> methods > >>>>>>>>>>> and many overloads. > >>>>>>>>>>> > >>>>>>>>>>> Anyway, i look forward to hearing your opinions. > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> Damian > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>>> > >>>> > >> > >