I messed the indentation on github code repos; this would be easier to read:
https://codeshare.io/GLWW8K Guozhang On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Damian / Kyle, > > I think I agree with you guys about the pros / cons of using the builder > pattern v.s. using some "secondary classes". And I'm thinking if we can > take a "mid" manner between these two. I spent some time with a slight > different approach from Damian's current proposal: > > https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/ > java/org/apache/kafka/streams/RefactoredAPIs.java > > The key idea is to tolerate the final "table()" or "stream()" function to > "upgrade" from the secondary classes to the first citizen classes, while > having all the specs inside this function. Also this proposal includes some > other refactoring that people have been discussed about for the builder to > reduce the overloaded functions as well. WDYT? > > > Guozhang > > > On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy <damian....@gmail.com> wrote: > >> Hi Jan, >> >> Thanks very much for the input. >> >> On Tue, 4 Jul 2017 at 08:54 Jan Filipiak <jan.filip...@trivago.com> >> wrote: >> >> > Hi Damian, >> > >> > I do see your point of something needs to change. But I fully agree with >> > Gouzhang when he says. >> > --- >> > >> > But since this is a incompatibility change, and we are going to remove >> the >> > compatibility annotations soon it means we only have one chance and we >> > really have to make it right. >> > ---- >> > >> > >> I think we all agree on this one! Hence the discussion. >> >> >> > I fear all suggestions do not go far enough to become something that >> will >> > carry on for very much longer. >> > I am currently working on KAFKA-3705 and try to find the most easy way >> for >> > the user to give me all the required functionality. The easiest >> interface I >> > could come up so far can be looked at here. >> > >> > >> > https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2 >> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/ >> kafka/streams/kstream/internals/KTableImpl.java#L622 >> > >> > >> And its already horribly complicated. I am currently unable to find the >> > right abstraction level to have everything falling into place >> naturally. To >> > be honest I already think introducing >> > >> > >> To be fair that is not a particularly easy problem to solve! >> >> >> > >> > https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2 >> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/ >> kafka/streams/kstream/internals/KTableImpl.java#L493 >> > >> > was unideal and makes everything a mess. >> >> >> I'm not sure i agree that it makes everything a mess, but It could have >> been done differently. >> >> The JoinType:Whatever is also not really flexible. 2 things come to my >> mind: >> > >> > 1. I don't think we should rule out config based decisions say configs >> like >> > streams.$applicationID.joins.$joinname.conf = value >> > >> >> Is this just for config? Or are you suggesting that we could somehow >> "code" >> the join in a config file? >> >> >> > This can allow for tremendous changes without single API change and IMO >> it >> > was not considered enough yet. >> > >> > 2. Push logic from the DSL to the Callback classes. A ValueJoiner for >> > example can be used to implement different join types as the user >> wishes. >> > >> >> Do you have an example of how this might look? >> >> >> > As Gouzhang said: stopping to break users is very important. >> >> >> Of course. We want to make it as easy as possible for people to use >> streams. >> >> >> especially with this changes + All the plans I sadly only have in my head >> > but hopefully the first link can give a glimpse. >> > >> > Thanks for preparing the examples made it way clearer to me what exactly >> > we are talking about. I would argue to go a bit slower and more >> carefull on >> > this one. At some point we need to get it right. Peeking over to the >> hadoop >> > guys with their hughe userbase. Config files really work well for them. >> > >> > Best Jan >> > >> > >> > >> > >> > >> > On 30.06.2017 09:31, Damian Guy wrote: >> > > Thanks Matthias >> > > >> > > On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <matth...@confluent.io> >> > wrote: >> > > >> > >> I am just catching up on this thread, so sorry for the long email in >> > >> advance... Also, it's to some extend a dump of thoughts and not >> always a >> > >> clear proposal. Still need to think about this in more detail. But >> maybe >> > >> it helps other to get new ideas :) >> > >> >> > >> >> > >>>> However, I don't understand your argument about putting aggregate() >> > >>>> after the withXX() -- all the calls to withXX() set optional >> > parameters >> > >>>> for aggregate() and not for groupBy() -- but a groupBy().withXX() >> > >>>> indicates that the withXX() belongs to the groupBy(). IMHO, this >> might >> > >>>> be quite confusion for developers. >> > >>>> >> > >>>> >> > >>> I see what you are saying, but the grouped stream is effectively a >> > no-op >> > >>> until you call one of the aggregate/count/reduce etc functions. So >> the >> > >>> optional params are ones that are applicable to any of the >> operations >> > you >> > >>> can perform on this grouped stream. Then the final >> > >>> count()/reduce()/aggregate() call has any of the params that are >> > >>> required/specific to that function. >> > >>> >> > >> I understand your argument, but you don't share the conclusion. If we >> > >> need a "final/terminal" call, the better way might be >> > >> >> > >> .groupBy().count().withXX().build() >> > >> >> > >> (with a better name for build() though) >> > >> >> > >> >> > > The point is that all the other calls, i.e,withBlah, windowed, etc >> apply >> > > too all the aggregate functions. The terminal call being the actual >> type >> > of >> > > aggregation you want to do. I personally find this more natural than >> > > groupBy().count().withBlah().build() >> > > >> > > >> > >>> groupedStream.count(/** non windowed count**/) >> > >>> groupedStream.windowed(TimeWindows.of(10L)).count(...) >> > >>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) >> > >> >> > >> I like this. However, I don't see a reason to have windowed() and >> > >> sessionWindowed(). We should have one top-level `Windows` interface >> that >> > >> both `TimeWindows` and `SessionWindows` implement and just have a >> single >> > >> windowed() method that accepts all `Windows`. (I did not like the >> > >> separation of `SessionWindows` in the first place, and this seems to >> be >> > >> an opportunity to clean this up. It was hard to change when we >> > >> introduced session windows) >> > >> >> > > Yes - true we should look into that. >> > > >> > > >> > >> Btw: we do you the imperative groupBy() and groupByKey(), and thus we >> > >> might also want to use windowBy() (instead of windowed()). Not sure >> how >> > >> important this is, but it seems to be inconsistent otherwise. >> > >> >> > >> >> > > Makes sense >> > > >> > > >> > >> About joins: I don't like .withJoinType(JoinType.LEFT) at all. I >> think, >> > >> defining an inner/left/outer join is not an optional argument but a >> > >> first class concept and should have a proper representation in the >> API >> > >> (like the current methods join(), leftJoin, outerJoin()). >> > >> >> > >> >> > > Yep, i did originally have it as a required param and maybe that is >> what >> > we >> > > go with. It could have a default, but maybe that is confusing. >> > > >> > > >> > > >> > >> About the two join API proposals, the second one has too much boiler >> > >> plate code for my taste. Also, the actual join() operator has only >> one >> > >> argument what is weird to me, as in my thinking process, the main >> > >> operator call, should have one parameter per mandatory argument but >> your >> > >> proposal put the mandatory arguments into Joins.streamStreamJoin() >> call. >> > >> This is far from intuitive IMHO. >> > >> >> > >> >> > > This is the builder pattern, you only need one param as the builder >> has >> > > captured all of the required and optional arguments. >> > > >> > > >> > >> The first join proposal also seems to align better with the pattern >> > >> suggested for aggregations and having the same pattern for all >> operators >> > >> is important (as you stated already). >> > >> >> > >> >> > > This is why i offered two alternatives as i started out with. 1 is the >> > > builder pattern, the other is the more fluent pattern. >> > > >> > > >> > >> >> > >> Coming back to the config vs optional parameter. What about having a >> > >> method withConfig[s](...) that allow to put in the configuration? >> > >> >> > >> >> > > Sure, it is currently called withLogConfig() as that is the only thing >> > that >> > > is really config. >> > > >> > > >> > >> This also raises the question if until() is a windows property? >> > >> Actually, until() seems to be a configuration parameter and thus, >> should >> > >> not not have it's own method. >> > >> >> > >> >> > > Hmmm, i don't agree. Until is a property of the window. It is going >> to be >> > > potentially different for every window operation you do in a streams >> app. >> > > >> > > >> > >> >> > >> Browsing throw your example DSL branch, I also saw this one: >> > >> >> > >>> final KTable<Windowed<String>, Long> windowed> >> > >> groupedStream.counting() >> > >>> .windowed(TimeWindows.of(10L).until(10)) >> > >>> .table(); >> > >> This is an interesting idea, and it remind my on some feedback about >> "I >> > >> wanted to count a stream, but there was no count() method -- I first >> > >> needed to figure out, that I need to group the stream first to be >> able >> > >> to count it. It does make sense in hindsight but was not obvious in >> the >> > >> beginning". Thus, carrying out this thought, we could also do the >> > >> following: >> > >> >> > >> stream.count().groupedBy().windowedBy().table(); >> > >> >> > >> -> Note, I use "grouped" and "windowed" instead of imperative here, >> as >> > >> it comes after the count() >> > >> >> > >> This would be more consistent than your proposal (that has grouping >> > >> before but windowing after count()). It might even allow us to enrich >> > >> the API with a some syntactic sugar like `stream.count().table()` to >> get >> > >> the overall count of all records (this would obviously not scale, >> but we >> > >> could support it -- if not now, maybe later). >> > >> >> > >> >> > > I guess i'd prefer >> > > stream.groupBy().windowBy().count() >> > > stream.groupBy().windowBy().reduce() >> > > stream.groupBy().count() >> > > >> > > As i said above, everything that happens before the final aggregate >> call >> > > can be applied to any of them. So it makes sense to me to do those >> things >> > > ahead of the final aggregate call. >> > > >> > > >> > >> Last about builder pattern. I am convinced that we need some >> "terminal" >> > >> operator/method that tells us when to add the processor to the >> topology. >> > >> But I don't see the need for a plain builder pattern that feels >> alien to >> > >> me (see my argument about the second join proposal). Using .stream() >> / >> > >> .table() as use in many examples might work. But maybe a more generic >> > >> name that we can use in all places like build() or apply() might >> also be >> > >> an option. >> > >> >> > >> >> > > Sure, a generic name might be ok. >> > > >> > > >> > > >> > > >> > >> -Matthias >> > >> >> > >> >> > >> >> > >> On 6/29/17 7:37 AM, Damian Guy wrote: >> > >>> Thanks Kyle. >> > >>> >> > >>> On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman < >> winkelman.k...@gmail.com> >> > >>> wrote: >> > >>> >> > >>>> Hi Damian, >> > >>>> >> > >>>>>>>> When trying to program in the fluent API that has been >> discussed >> > >> most >> > >>>> it >> > >>>>>>>> feels difficult to know when you will actually get an object >> you >> > can >> > >>>> reuse. >> > >>>>>>>> What if I make one KGroupedStream that I want to reuse, is it >> > legal >> > >> to >> > >>>>>>>> reuse it or does this approach expect you to call grouped each >> > time? >> > >>>>>> I'd anticipate that once you have a KGroupedStream you can >> re-use it >> > >> as >> > >>>> you >> > >>>>>> can today. >> > >>>> You said it yourself in another post that the grouped stream is >> > >>>> effectively a no-op until a count, reduce, or aggregate. The way I >> see >> > >> it >> > >>>> you wouldn’t be able to reuse anything except KStreams and KTables, >> > >> because >> > >>>> most of this fluent api would continue returning this (this being >> the >> > >>>> builder object currently being manipulated). >> > >>> So, if you ever store a reference to anything but KStreams and >> KTables >> > >> and >> > >>>> you use it in two different ways then its possible you make >> > conflicting >> > >>>> withXXX() calls on the same builder. >> > >>>> >> > >>>> >> > >>> No necessarily true. It could return a new instance of the builder, >> > i.e., >> > >>> the builders being immutable. So if you held a reference to the >> builder >> > >> it >> > >>> would always be the same as it was when it was created. >> > >>> >> > >>> >> > >>>> GroupedStream<K,V> groupedStreamWithDefaultSerdes = >> kStream.grouped(); >> > >>>> GroupedStream<K,V> groupedStreamWithDeclaredSerdes = >> > >>>> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…); >> > >>>> >> > >>>> I’ll admit that this shouldn’t happen but some user is going to do >> it >> > >>>> eventually… >> > >>>> Depending on implementation uses of groupedStreamWithDefaultSerdes >> > would >> > >>>> most likely be equivalent to the version withDeclaredSerdes. One >> work >> > >>>> around would be to always make copies of the config objects you are >> > >>>> building, but this approach has its own problem because now we >> have to >> > >>>> identify which configs are equivalent so we don’t create repeated >> > >>>> processors. >> > >>>> >> > >>>> The point of this long winded example is that we always have to be >> > >>>> thinking about all of the possible ways it could be misused by a >> user >> > >>>> (causing them to see hard to diagnose problems). >> > >>>> >> > >>> Exactly! That is the point of the discussion really. >> > >>> >> > >>> >> > >>>> In my attempt at a couple methods with builders I feel that I could >> > >>>> confidently say the user couldn’t really mess it up. >> > >>>>> // Count >> > >>>>> KTable<String, Long> count = >> > >>>>> >> > kGroupedStream.count(Count.count().withQueryableStoreName("my-store")); >> > >>>> The kGroupedStream is reusable and if they attempted to reuse the >> > Count >> > >>>> for some reason it would throw an error message saying that a store >> > >> named >> > >>>> “my-store” already exists. >> > >>>> >> > >>>> >> > >>> Yes i agree and i think using builders is my preferred pattern. >> > >>> >> > >>> Cheers, >> > >>> Damian >> > >>> >> > >>> >> > >>>> Thanks, >> > >>>> Kyle >> > >>>> >> > >>>> From: Damian Guy >> > >>>> Sent: Thursday, June 29, 2017 3:59 AM >> > >>>> To: d...@kafka.apache.org >> > >>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring >> > >>>> >> > >>>> Hi Kyle, >> > >>>> >> > >>>> Thanks for your input. Really appreciated. >> > >>>> >> > >>>> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman < >> winkelman.k...@gmail.com >> > > >> > >>>> wrote: >> > >>>> >> > >>>>> I like more of a builder pattern even though others have voiced >> > against >> > >>>>> it. The reason I like it is because it makes it clear to the user >> > that >> > >> a >> > >>>>> call to KGroupedStream#count will return a KTable not some >> > intermediate >> > >>>>> class that I need to undetstand. >> > >>>>> >> > >>>> Yes, that makes sense. >> > >>>> >> > >>>> >> > >>>>> When trying to program in the fluent API that has been discussed >> most >> > >> it >> > >>>>> feels difficult to know when you will actually get an object you >> can >> > >>>> reuse. >> > >>>>> What if I make one KGroupedStream that I want to reuse, is it >> legal >> > to >> > >>>>> reuse it or does this approach expect you to call grouped each >> time? >> > >>>> >> > >>>> I'd anticipate that once you have a KGroupedStream you can re-use >> it >> > as >> > >> you >> > >>>> can today. >> > >>>> >> > >>>> >> > >>>>> This question doesn’t pop into my head at all in the builder >> pattern >> > I >> > >>>>> assume I can reuse everything. >> > >>>>> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big >> > fan >> > >> of >> > >>>>> the grouped. >> > >>>>> >> > >>>>> Yes, grouped() was more for demonstration and because groupBy() >> and >> > >>>> groupByKey() were taken! So i'd imagine the api would actually >> want to >> > >> be >> > >>>> groupByKey(/** no required args***/).withOptionalArg() and >> > >>>> groupBy(KeyValueMapper m).withOpitionalArg(...) of course this all >> > >> depends >> > >>>> on maintaining backward compatibility. >> > >>>> >> > >>>> >> > >>>>> Unfortunately, the below approach would require atleast 2 >> (probably >> > 3) >> > >>>>> overloads (one for returning a KTable and one for returning a >> KTable >> > >> with >> > >>>>> Windowed Key, probably would want to split windowed and >> > sessionwindowed >> > >>>> for >> > >>>>> ease of implementation) of each count, reduce, and aggregate. >> > >>>>> Obviously not exhaustive but enough for you to get the picture. >> > Count, >> > >>>>> Reduce, and Aggregate supply 3 static methods to initialize the >> > >> builder: >> > >>>>> // Count >> > >>>>> KTable<String, Long> count = >> > >>>>> >> > groupedStream.count(Count.count().withQueryableStoreName("my-store")); >> > >>>>> >> > >>>>> // Windowed Count >> > >>>>> KTable<Windowed<String>, Long> windowedCount = >> > >>>>> >> > >> >> > groupedStream.count(Count.windowed(TimeWindows.of(10L).until >> (10)).withQueryableStoreName("my-windowed-store")); >> > >>>>> // Session Count >> > >>>>> KTable<Windowed<String>, Long> sessionCount = >> > >>>>> >> > >> >> > groupedStream.count(Count.sessionWindowed(SessionWindows. >> with(10L)).withQueryableStoreName("my-session-windowed-store")); >> > >>>>> >> > >>>> Above and below, i think i'd prefer it to be: >> > >>>> groupedStream.count(/** non windowed count**/) >> > >>>> groupedStream.windowed(TimeWindows.of(10L)).count(...) >> > >>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>>> // Reduce >> > >>>>> Reducer<Long> reducer; >> > >>>>> KTable<String, Long> reduce = groupedStream.reduce(reducer, >> > >>>>> Reduce.reduce().withQueryableStoreName("my-store")); >> > >>>>> >> > >>>>> // Aggregate Windowed with Custom Store >> > >>>>> Initializer<String> initializer; >> > >>>>> Aggregator<String, Long, String> aggregator; >> > >>>>> KTable<Windowed<String>, String> aggregate = >> > >>>>> groupedStream.aggregate(initializer, aggregator, >> > >>>>> >> > >> >> > Aggregate.windowed(TimeWindows.of(10L).until(10)). >> withStateStoreSupplier(stateStoreSupplier))); >> > >>>>> // Cogroup SessionWindowed >> > >>>>> KTable<String, String> cogrouped = >> > groupedStream1.cogroup(aggregator1) >> > >>>>> .cogroup(groupedStream2, aggregator2) >> > >>>>> .aggregate(initializer, aggregator, >> > >>>>> Aggregate.sessionWindowed(SessionWindows.with(10L), >> > >>>>> sessionMerger).withQueryableStoreName("my-store")); >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> public class Count { >> > >>>>> >> > >>>>> public static class Windowed extends Count { >> > >>>>> private Windows windows; >> > >>>>> } >> > >>>>> public static class SessionWindowed extends Count { >> > >>>>> private SessionWindows sessionWindows; >> > >>>>> } >> > >>>>> >> > >>>>> public static Count count(); >> > >>>>> public static Windowed windowed(Windows windows); >> > >>>>> public static SessionWindowed sessionWindowed(SessionWindows >> > >>>>> sessionWindows); >> > >>>>> >> > >>>>> // All withXXX(...) methods. >> > >>>>> } >> > >>>>> >> > >>>>> public class KGroupedStream { >> > >>>>> public KTable<K, Long> count(Count count); >> > >>>>> public KTable<Windowed<K>, Long> count(Count.Windowed count); >> > >>>>> public KTable<Windowed<K>, Long> count(Count.SessionWindowed >> > >> count); >> > >>>>> … >> > >>>>> } >> > >>>>> >> > >>>>> >> > >>>>> Thanks, >> > >>>>> Kyle >> > >>>>> >> > >>>>> From: Guozhang Wang >> > >>>>> Sent: Wednesday, June 28, 2017 7:45 PM >> > >>>>> To: d...@kafka.apache.org >> > >>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring >> > >>>>> >> > >>>>> I played the current proposal a bit with >> > >> https://github.com/dguy/kafka/ >> > >>>>> tree/dsl-experiment < >> > https://github.com/dguy/kafka/tree/dsl-experiment >> > >>> , >> > >>>>> and here are my observations: >> > >>>>> >> > >>>>> 1. Personally I prefer >> > >>>>> >> > >>>>> "stream.group(mapper) / stream.groupByKey()" >> > >>>>> >> > >>>>> than >> > >>>>> >> > >>>>> "stream.group().withKeyMapper(mapper) / stream.group()" >> > >>>>> >> > >>>>> Since 1) withKeyMapper is not enforced programmatically though it >> is >> > >> not >> > >>>>> "really" optional like others, 2) syntax-wise it reads more >> natural. >> > >>>>> >> > >>>>> I think it is okay to add the APIs in ( >> > >>>>> >> > >>>>> >> > >> >> > https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr >> c/main/java/org/apache/kafka/streams/kstream/GroupedStream.java >> > >>>>> ) >> > >>>>> in KGroupedStream. >> > >>>>> >> > >>>>> >> > >>>>> 2. For the "withStateStoreSupplier" API, are the user supposed to >> > pass >> > >> in >> > >>>>> the most-inner state store supplier (e.g. then one whose get() >> return >> > >>>>> RocksDBStore), or it is supposed to return the most-outer supplier >> > with >> > >>>>> logging / metrics / etc? I think it would be more useful to only >> > >> require >> > >>>>> users pass in the inner state store supplier while specifying >> > caching / >> > >>>>> logging through other APIs. >> > >>>>> >> > >>>>> In addition, the "GroupedWithCustomStore" is a bit suspicious to >> me: >> > we >> > >>>> are >> > >>>>> allowing users to call other APIs like "withQueryableName" >> multiple >> > >> time, >> > >>>>> but only call "withStateStoreSupplier" only once in the end. Why >> is >> > >> that? >> > >>>>> >> > >>>>> 3. The current DSL seems to be only for aggregations, what about >> > joins? >> > >>>>> >> > >>>>> >> > >>>>> 4. I think it is okay to keep the "withLogConfig": for the >> > >>>>> StateStoreSupplier it will still be user code specifying the >> topology >> > >> so >> > >>>> I >> > >>>>> do not see there is a big difference. >> > >>>>> >> > >>>>> >> > >>>>> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take >> the >> > >>>>> windowed state store supplier to enforce typing? >> > >>>>> >> > >>>>> >> > >>>>> Below are minor ones: >> > >>>>> >> > >>>>> 6. "withQueryableName": maybe better "withQueryableStateName"? >> > >>>>> >> > >>>>> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"? >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> Guozhang >> > >>>>> >> > >>>>> >> > >>>>> >> > >>>>> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax < >> > >> matth...@confluent.io> >> > >>>>> wrote: >> > >>>>> >> > >>>>>> I see your point about "when to add the processor to the >> topology". >> > >>>> That >> > >>>>>> is indeed an issue. Not sure it we could allow "updates" to the >> > >>>>> topology... >> > >>>>>> I don't see any problem with having all the withXX() in KTable >> > >>>> interface >> > >>>>>> -- but this might be subjective. >> > >>>>>> >> > >>>>>> >> > >>>>>> However, I don't understand your argument about putting >> aggregate() >> > >>>>>> after the withXX() -- all the calls to withXX() set optional >> > >> parameters >> > >>>>>> for aggregate() and not for groupBy() -- but a groupBy().withXX() >> > >>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this >> > might >> > >>>>>> be quite confusion for developers. >> > >>>>>> >> > >>>>>> >> > >>>>>> -Matthias >> > >>>>>> >> > >>>>>> On 6/28/17 2:55 AM, Damian Guy wrote: >> > >>>>>>>> I also think that mixing optional parameters with configs is a >> bad >> > >>>>> idea. >> > >>>>>>>> Have not proposal for this atm but just wanted to mention it. >> Hope >> > >>>> to >> > >>>>>>>> find some time to come up with something. >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>> Yes, i don't like the mix of config either. But the only real >> > config >> > >>>>> here >> > >>>>>>> is the logging config - which we don't really need as it can >> > already >> > >>>> be >> > >>>>>>> done via a custom StateStoreSupplier. >> > >>>>>>> >> > >>>>>>> >> > >>>>>>>> What I don't like in the current proposal is the >> > >>>>>>>> .grouped().withKeyMapper() -- the current solution with >> > >>>> .groupBy(...) >> > >>>>>>>> and .groupByKey() seems better. For clarity, we could rename to >> > >>>>>>>> .groupByNewKey(...) and .groupByCurrentKey() (even if we should >> > find >> > >>>>>>>> some better names). >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>> it could be groupByKey(), groupBy() or something different bt >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > >>>>>>>> The proposed pattern "chains" grouping and aggregation too >> close >> > >>>>>>>> together. I would rather separate both more than less, ie, do >> into >> > >>>> the >> > >>>>>>>> opposite direction. >> > >>>>>>>> >> > >>>>>>>> I am also wondering, if we could so something more "fluent". >> The >> > >>>>> initial >> > >>>>>>>> proposal was like: >> > >>>>>>>> >> > >>>>>>>>>> groupedStream.count() >> > >>>>>>>>>> .withStoreName("name") >> > >>>>>>>>>> .withCachingEnabled(false) >> > >>>>>>>>>> .withLoggingEnabled(config) >> > >>>>>>>>>> .table() >> > >>>>>>>> The .table() statement in the end was kinda alien. >> > >>>>>>>> >> > >>>>>>> I agree, but then all of the withXXX methods need to be on >> KTable >> > >>>> which >> > >>>>>> is >> > >>>>>>> worse in my opinion. You also need something that is going to >> > "build" >> > >>>>> the >> > >>>>>>> internal processors and add them to the topology. >> > >>>>>>> >> > >>>>>>> >> > >>>>>>>> The current proposal put the count() into the end -- ie, the >> > >>>> optional >> > >>>>>>>> parameter for count() have to specified on the .grouped() call >> -- >> > >>>> this >> > >>>>>>>> does not seems to be the best way either. >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>> I actually prefer this method as you are building a grouped >> stream >> > >>>> that >> > >>>>>> you >> > >>>>>>> will aggregate. So >> > >>>> table.grouped(...).withOptionalStuff().aggregate(..) >> > >>>>>> etc >> > >>>>>>> seems natural to me. >> > >>>>>>> >> > >>>>>>> >> > >>>>>>>> I did not think this through in detail, but can't we just do >> the >> > >>>>> initial >> > >>>>>>>> proposal with the .table() ? >> > >>>>>>>> >> > >>>>>>>> groupedStream.count().withStoreName("name").mapValues(...) >> > >>>>>>>> >> > >>>>>>>> Each .withXXX(...) return the current KTable and all the >> > .withXXX() >> > >>>>> are >> > >>>>>>>> just added to the KTable interface. Or do I miss anything why >> this >> > >>>>> wont' >> > >>>>>>>> work or any obvious disadvantage? >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>> See above. >> > >>>>>>> >> > >>>>>>> >> > >>>>>>>> -Matthias >> > >>>>>>>> >> > >>>>>>>> On 6/22/17 4:06 AM, Damian Guy wrote: >> > >>>>>>>>> Thanks everyone. My latest attempt is below. It builds on the >> > >>>> fluent >> > >>>>>>>>> approach, but i think it is slightly nicer. >> > >>>>>>>>> I agree with some of what Eno said about mixing configy stuff >> in >> > >>>> the >> > >>>>>> DSL, >> > >>>>>>>>> but i think that enabling caching and enabling logging are >> things >> > >>>>> that >> > >>>>>>>>> aren't actually config. I'd probably not add >> withLogConfig(...) >> > >>>> (even >> > >>>>>>>>> though it is below) as this is actually config and we already >> > have >> > >>>> a >> > >>>>>> way >> > >>>>>>>> of >> > >>>>>>>>> doing that, via the StateStoreSupplier. Arguably we could use >> the >> > >>>>>>>>> StateStoreSupplier for disabling caching etc, but as it stands >> > that >> > >>>>> is >> > >>>>>> a >> > >>>>>>>>> bit of a tedious process for someone that just wants to use >> the >> > >>>>> default >> > >>>>>>>>> storage engine, but not have caching enabled. >> > >>>>>>>>> >> > >>>>>>>>> There is also an orthogonal concern that Guozhang alluded >> to.... >> > If >> > >>>>> you >> > >>>>>>>>> want to plug in a custom storage engine and you want it to be >> > >>>> logged >> > >>>>>> etc, >> > >>>>>>>>> you would currently need to implement that yourself. Ideally >> we >> > can >> > >>>>>>>> provide >> > >>>>>>>>> a way where we will wrap the custom store with logging, >> metrics, >> > >>>>> etc. I >> > >>>>>>>>> need to think about where this fits, it is probably more >> > >>>> appropriate >> > >>>>> on >> > >>>>>>>> the >> > >>>>>>>>> Stores API. >> > >>>>>>>>> >> > >>>>>>>>> final KeyValueMapper<String, String, Long> keyMapper = null; >> > >>>>>>>>> // count with mapped key >> > >>>>>>>>> final KTable<Long, Long> count = stream.grouped() >> > >>>>>>>>> .withKeyMapper(keyMapper) >> > >>>>>>>>> .withKeySerde(Serdes.Long()) >> > >>>>>>>>> .withValueSerde(Serdes.String()) >> > >>>>>>>>> .withQueryableName("my-store") >> > >>>>>>>>> .count(); >> > >>>>>>>>> >> > >>>>>>>>> // windowed count >> > >>>>>>>>> final KTable<Windowed<String>, Long> windowedCount = >> > >>>> stream.grouped() >> > >>>>>>>>> .withQueryableName("my-window-store") >> > >>>>>>>>> .windowed(TimeWindows.of(10L).until(10)) >> > >>>>>>>>> .count(); >> > >>>>>>>>> >> > >>>>>>>>> // windowed reduce >> > >>>>>>>>> final Reducer<String> windowedReducer = null; >> > >>>>>>>>> final KTable<Windowed<String>, String> windowedReduce = >> > >>>>>> stream.grouped() >> > >>>>>>>>> .withQueryableName("my-window-store") >> > >>>>>>>>> .windowed(TimeWindows.of(10L).until(10)) >> > >>>>>>>>> .reduce(windowedReducer); >> > >>>>>>>>> >> > >>>>>>>>> final Aggregator<String, String, Long> aggregator = null; >> > >>>>>>>>> final Initializer<Long> init = null; >> > >>>>>>>>> >> > >>>>>>>>> // aggregate >> > >>>>>>>>> final KTable<String, Long> aggregate = stream.grouped() >> > >>>>>>>>> .withQueryableName("my-aggregate-store") >> > >>>>>>>>> .aggregate(aggregator, init, Serdes.Long()); >> > >>>>>>>>> >> > >>>>>>>>> final StateStoreSupplier<KeyValueStore<String, Long>> >> > >>>>>> stateStoreSupplier >> > >>>>>>>> = null; >> > >>>>>>>>> // aggregate with custom store >> > >>>>>>>>> final KTable<String, Long> aggWithCustomStore = >> stream.grouped() >> > >>>>>>>>> .withStateStoreSupplier(stateStoreSupplier) >> > >>>>>>>>> .aggregate(aggregator, init); >> > >>>>>>>>> >> > >>>>>>>>> // disable caching >> > >>>>>>>>> stream.grouped() >> > >>>>>>>>> .withQueryableName("name") >> > >>>>>>>>> .withCachingEnabled(false) >> > >>>>>>>>> .count(); >> > >>>>>>>>> >> > >>>>>>>>> // disable logging >> > >>>>>>>>> stream.grouped() >> > >>>>>>>>> .withQueryableName("q") >> > >>>>>>>>> .withLoggingEnabled(false) >> > >>>>>>>>> .count(); >> > >>>>>>>>> >> > >>>>>>>>> // override log config >> > >>>>>>>>> final Reducer<String> reducer = null; >> > >>>>>>>>> stream.grouped() >> > >>>>>>>>> .withLogConfig(Collections.sin >> gletonMap("segment.size", >> > >>>>> "10")) >> > >>>>>>>>> .reduce(reducer); >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>>> If anyone wants to play around with this you can find the code >> > >>>> here: >> > >>>>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment >> > >>>>>>>>> >> > >>>>>>>>> Note: It won't actually work as most of the methods just >> return >> > >>>> null. >> > >>>>>>>>> Thanks, >> > >>>>>>>>> Damian >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> >> > >>>> wrote: >> > >>>>>>>>>> Thanks Damian. I think both options have pros and cons. And >> both >> > >>>> are >> > >>>>>>>> better >> > >>>>>>>>>> than overload abuse. >> > >>>>>>>>>> >> > >>>>>>>>>> The fluent API approach reads better, no mention of builder >> or >> > >>>> build >> > >>>>>>>>>> anywhere. The main downside is that the method signatures >> are a >> > >>>>> little >> > >>>>>>>> less >> > >>>>>>>>>> clear. By reading the method signature, one doesn't >> necessarily >> > >>>>> knows >> > >>>>>>>> what >> > >>>>>>>>>> it returns. Also, one needs to figure out the special method >> > >>>>>> (`table()` >> > >>>>>>>> in >> > >>>>>>>>>> this case) that gives you what you actually care about >> (`KTable` >> > >>>> in >> > >>>>>> this >> > >>>>>>>>>> case). Not major issues, but worth mentioning while doing the >> > >>>>>>>> comparison. >> > >>>>>>>>>> The builder approach avoids the issues mentioned above, but >> it >> > >>>>> doesn't >> > >>>>>>>> read >> > >>>>>>>>>> as well. >> > >>>>>>>>>> >> > >>>>>>>>>> Ismael >> > >>>>>>>>>> >> > >>>>>>>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy < >> > damian....@gmail.com >> > >>>>>>>> wrote: >> > >>>>>>>>>>> Hi, >> > >>>>>>>>>>> >> > >>>>>>>>>>> I'd like to get a discussion going around some of the API >> > choices >> > >>>>>> we've >> > >>>>>>>>>>> made in the DLS. In particular those that relate to stateful >> > >>>>>> operations >> > >>>>>>>>>>> (though this could expand). >> > >>>>>>>>>>> As it stands we lean heavily on overloaded methods in the >> API, >> > >>>> i.e, >> > >>>>>>>> there >> > >>>>>>>>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming >> > >>>> noisy >> > >>>>>> and >> > >>>>>>>> i >> > >>>>>>>>>>> feel it is only going to get worse as we add more optional >> > >>>> params. >> > >>>>> In >> > >>>>>>>>>>> particular we've had some requests to be able to turn >> caching >> > >>>> off, >> > >>>>> or >> > >>>>>>>>>>> change log configs, on a per operator basis (note this can >> be >> > >>>> done >> > >>>>>> now >> > >>>>>>>>>> if >> > >>>>>>>>>>> you pass in a StateStoreSupplier, but this can be a bit >> > >>>>> cumbersome). >> > >>>>>>>>>>> So this is a bit of an open question. How can we change the >> DSL >> > >>>>>>>> overloads >> > >>>>>>>>>>> so that it flows, is simple to use and understand, and is >> > easily >> > >>>>>>>> extended >> > >>>>>>>>>>> in the future? >> > >>>>>>>>>>> >> > >>>>>>>>>>> One option would be to use a fluent API approach for >> providing >> > >>>> the >> > >>>>>>>>>> optional >> > >>>>>>>>>>> params, so something like this: >> > >>>>>>>>>>> >> > >>>>>>>>>>> groupedStream.count() >> > >>>>>>>>>>> .withStoreName("name") >> > >>>>>>>>>>> .withCachingEnabled(false) >> > >>>>>>>>>>> .withLoggingEnabled(config) >> > >>>>>>>>>>> .table() >> > >>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>>> >> > >>>>>>>>>>> Another option would be to provide a Builder to the count >> > method, >> > >>>>> so >> > >>>>>> it >> > >>>>>>>>>>> would look something like this: >> > >>>>>>>>>>> groupedStream.count(new >> > >>>>>>>>>>> CountBuilder("storeName").with >> CachingEnabled(false).build()) >> > >>>>>>>>>>> >> > >>>>>>>>>>> Another option is to say: Hey we don't need this, what are >> you >> > on >> > >>>>>>>> about! >> > >>>>>>>>>>> The above has focussed on state store related overloads, but >> > the >> > >>>>> same >> > >>>>>>>>>> ideas >> > >>>>>>>>>>> could be applied to joins etc, where we presently have many >> > join >> > >>>>>>>> methods >> > >>>>>>>>>>> and many overloads. >> > >>>>>>>>>>> >> > >>>>>>>>>>> Anyway, i look forward to hearing your opinions. >> > >>>>>>>>>>> >> > >>>>>>>>>>> Thanks, >> > >>>>>>>>>>> Damian >> > >>>>>>>>>>> >> > >>>>>>>> >> > >>>>>> >> > >>>>> >> > >>>>> -- >> > >>>>> -- Guozhang >> > >>>>> >> > >>>>> >> > >>>> >> > >> >> > >> > >> > > > > -- > -- Guozhang > -- -- Guozhang