I think we do have a very good discussion and people openly share their ideas. So I am not sure why your are frustrated (at least I get this impression).
Maybe it might be best if you propose an API change by yourself similar to what Damian and Guozhang did (to whatever extend your time constraint permits). I personally don't know exactly what an ideal API from your point of view is atm, but this discussion would benefit a lot if you could share it. > I don't understand why custom stores in DSL? Why not? Maybe you can elaborate a little more? > and I don't understand why >> we are not concidering a more generic config based appraoch? Not sure what you exactly mean. Sound interesting. I don't like the idea to mix configuration into the DSL (even if I am still not sure, where to draw the line, ie, what should we consider a config and what not). About `through`: I think it does make sense to allow the specification of a store-name to make the store queryable (it's optional in 0.11 btw). It's the same as for `KStreamBuilder.table()` -- so not sure why this should be wrong? Note, that not all KTables are materialized in a store atm. So it's an easy way to make a non-materialized KTable queryable. >> also providing Serdes by config is neat. wouldn't even need to go into >> the code then would also save a ton. (We have the defaults one in conf >> why not override the specific ones?) I am not sure, if Serdes are really a config? I mean, the data types are hard coded into the code, so it does make sense to specify the Serdes accordingly. I am also not sure how we would map Serdes from the config to the corresponding operator? -Matthias On 7/8/17 2:23 AM, Jan Filipiak wrote: > Hi Matthias thanks, > > Exactly what I was guessing. > > I don't understand why custom stores in DSL? and I don't understand why > we are not concidering a more generic config based appraoch? > > StateStores in DSL => what I really think we are looking for PAPA => DSL > => PAPI back and forth switcharoo capabilities. > > Looking at the most overloaded that I can currently find "through()" 2 > of them come from the broken idea of "the user provides a name for the > statestore for IQ" and custom statestores. > From the beginning I said that's madness. That is the real disease we > need to fix IMHO. To be honest I also don't understand why through with > statestore is particularly usefull, second Unique Key maybe? > > also providing Serdes by config is neat. wouldn't even need to go into > the code then would also save a ton. (We have the defaults one in conf > why not override the specific ones?) > > Does this makes sense to people? what pieces should i outline with code > (time is currently sparse :( but I can pull of some smaller examples i > guess) > > Best Jan > > > > > > On 08.07.2017 01:23, Matthias J. Sax wrote: >> It's too issues we want to tackle >> >> - too many overload (for some method we have already more than 10( >> - improve custom store API >> >> -Matthias >> >> >> On 7/7/17 3:42 PM, Jan Filipiak wrote: >>> It makes me want to cry. >>> >>> why on earth is the DSL going to expose all its implementation >>> details now? >>> especially being materialized or not. >>> >>> If we want to take usefull steps in that direction maybe we are looking >>> for a way to let the user switch back and forth between PAPI and DSL? >>> >>> A change as the proposed would not eliminate any of my pain points while >>> still being a heck of work migrating towards to. >>> >>> Since I am only following this from the point where Eno CC'ed it into >>> the users list: >>> >>> Can someone please rephrase for me what problem this is trying to solve? >>> I don't mean to be rude but It uses a problematic feature >>> "StateStoreSuppliers in DSL" to justify making it even worse. This helps >>> us nowhere in making the configs more flexible, its just syntactic >>> sugar. >>> >>> A low effort shoot like: lets add a properties to operations that would >>> otherwise become overloaded to heavy? Or pull the configs by some naming >>> schema >>> form the overall properties. Additionally to that we get rid of >>> StateStoreSuppliers in the DSL and have them also configured by said >>> properties. >>> >>> => way easier to migrate to, way less risk, way more flexible in the >>> future (different implementations of the same operation don't require >>> code change to configure) >>> >>> Line 184 makes especially no sense to me. what is a KTableKTable non >>> materialized join anyways? >>> >>> Hope we can discuss more on this. >>> >>> >>> >>> On 07.07.2017 17:23, Guozhang Wang wrote: >>>> I messed the indentation on github code repos; this would be easier to >>>> read: >>>> >>>> https://codeshare.io/GLWW8K >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>> >>>>> Hi Damian / Kyle, >>>>> >>>>> I think I agree with you guys about the pros / cons of using the >>>>> builder >>>>> pattern v.s. using some "secondary classes". And I'm thinking if we >>>>> can >>>>> take a "mid" manner between these two. I spent some time with a slight >>>>> different approach from Damian's current proposal: >>>>> >>>>> https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/ >>>>> >>>>> >>>>> java/org/apache/kafka/streams/RefactoredAPIs.java >>>>> >>>>> The key idea is to tolerate the final "table()" or "stream()" >>>>> function to >>>>> "upgrade" from the secondary classes to the first citizen classes, >>>>> while >>>>> having all the specs inside this function. Also this proposal >>>>> includes some >>>>> other refactoring that people have been discussed about for the >>>>> builder to >>>>> reduce the overloaded functions as well. WDYT? >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy <damian....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Jan, >>>>>> >>>>>> Thanks very much for the input. >>>>>> >>>>>> On Tue, 4 Jul 2017 at 08:54 Jan Filipiak <jan.filip...@trivago.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Damian, >>>>>>> >>>>>>> I do see your point of something needs to change. But I fully agree >>>>>>> with >>>>>>> Gouzhang when he says. >>>>>>> --- >>>>>>> >>>>>>> But since this is a incompatibility change, and we are going to >>>>>>> remove >>>>>> the >>>>>>> compatibility annotations soon it means we only have one chance >>>>>>> and we >>>>>>> really have to make it right. >>>>>>> ---- >>>>>>> >>>>>>> >>>>>> I think we all agree on this one! Hence the discussion. >>>>>> >>>>>> >>>>>>> I fear all suggestions do not go far enough to become something that >>>>>> will >>>>>>> carry on for very much longer. >>>>>>> I am currently working on KAFKA-3705 and try to find the most >>>>>>> easy way >>>>>> for >>>>>>> the user to give me all the required functionality. The easiest >>>>>> interface I >>>>>>> could come up so far can be looked at here. >>>>>>> >>>>>>> >>>>>>> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2 >>>>>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/ >>>>>> kafka/streams/kstream/internals/KTableImpl.java#L622 >>>>>> And its already horribly complicated. I am currently unable to >>>>>> find the >>>>>>> right abstraction level to have everything falling into place >>>>>> naturally. To >>>>>>> be honest I already think introducing >>>>>>> >>>>>>> >>>>>> To be fair that is not a particularly easy problem to solve! >>>>>> >>>>>> >>>>>>> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2 >>>>>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/ >>>>>> kafka/streams/kstream/internals/KTableImpl.java#L493 >>>>>>> was unideal and makes everything a mess. >>>>>> I'm not sure i agree that it makes everything a mess, but It could >>>>>> have >>>>>> been done differently. >>>>>> >>>>>> The JoinType:Whatever is also not really flexible. 2 things come >>>>>> to my >>>>>> mind: >>>>>>> 1. I don't think we should rule out config based decisions say >>>>>>> configs >>>>>> like >>>>>>> streams.$applicationID.joins.$joinname.conf = value >>>>>>> >>>>>> Is this just for config? Or are you suggesting that we could somehow >>>>>> "code" >>>>>> the join in a config file? >>>>>> >>>>>> >>>>>>> This can allow for tremendous changes without single API change and >>>>>>> IMO >>>>>> it >>>>>>> was not considered enough yet. >>>>>>> >>>>>>> 2. Push logic from the DSL to the Callback classes. A ValueJoiner >>>>>>> for >>>>>>> example can be used to implement different join types as the user >>>>>> wishes. >>>>>> Do you have an example of how this might look? >>>>>> >>>>>> >>>>>>> As Gouzhang said: stopping to break users is very important. >>>>>> Of course. We want to make it as easy as possible for people to use >>>>>> streams. >>>>>> >>>>>> >>>>>> especially with this changes + All the plans I sadly only have in my >>>>>> head >>>>>>> but hopefully the first link can give a glimpse. >>>>>>> >>>>>>> Thanks for preparing the examples made it way clearer to me what >>>>>>> exactly >>>>>>> we are talking about. I would argue to go a bit slower and more >>>>>> carefull on >>>>>>> this one. At some point we need to get it right. Peeking over to the >>>>>> hadoop >>>>>>> guys with their hughe userbase. Config files really work well for >>>>>>> them. >>>>>>> >>>>>>> Best Jan >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 30.06.2017 09:31, Damian Guy wrote: >>>>>>>> Thanks Matthias >>>>>>>> >>>>>>>> On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax >>>>>>>> <matth...@confluent.io> >>>>>>> wrote: >>>>>>>>> I am just catching up on this thread, so sorry for the long >>>>>>>>> email in >>>>>>>>> advance... Also, it's to some extend a dump of thoughts and not >>>>>> always a >>>>>>>>> clear proposal. Still need to think about this in more detail. But >>>>>> maybe >>>>>>>>> it helps other to get new ideas :) >>>>>>>>> >>>>>>>>> >>>>>>>>>>> However, I don't understand your argument about putting >>>>>>>>>>> aggregate() >>>>>>>>>>> after the withXX() -- all the calls to withXX() set optional >>>>>>> parameters >>>>>>>>>>> for aggregate() and not for groupBy() -- but a >>>>>>>>>>> groupBy().withXX() >>>>>>>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this >>>>>> might >>>>>>>>>>> be quite confusion for developers. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> I see what you are saying, but the grouped stream is >>>>>>>>>> effectively a >>>>>>> no-op >>>>>>>>>> until you call one of the aggregate/count/reduce etc >>>>>>>>>> functions. So >>>>>> the >>>>>>>>>> optional params are ones that are applicable to any of the >>>>>> operations >>>>>>> you >>>>>>>>>> can perform on this grouped stream. Then the final >>>>>>>>>> count()/reduce()/aggregate() call has any of the params that are >>>>>>>>>> required/specific to that function. >>>>>>>>>> >>>>>>>>> I understand your argument, but you don't share the conclusion. >>>>>>>>> If we >>>>>>>>> need a "final/terminal" call, the better way might be >>>>>>>>> >>>>>>>>> .groupBy().count().withXX().build() >>>>>>>>> >>>>>>>>> (with a better name for build() though) >>>>>>>>> >>>>>>>>> >>>>>>>> The point is that all the other calls, i.e,withBlah, windowed, etc >>>>>> apply >>>>>>>> too all the aggregate functions. The terminal call being the actual >>>>>> type >>>>>>> of >>>>>>>> aggregation you want to do. I personally find this more natural >>>>>>>> than >>>>>>>> groupBy().count().withBlah().build() >>>>>>>> >>>>>>>> >>>>>>>>>> groupedStream.count(/** non windowed count**/) >>>>>>>>>> groupedStream.windowed(TimeWindows.of(10L)).count(...) >>>>>>>>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) >>>>>>>>> I like this. However, I don't see a reason to have windowed() and >>>>>>>>> sessionWindowed(). We should have one top-level `Windows` >>>>>>>>> interface >>>>>> that >>>>>>>>> both `TimeWindows` and `SessionWindows` implement and just have a >>>>>> single >>>>>>>>> windowed() method that accepts all `Windows`. (I did not like the >>>>>>>>> separation of `SessionWindows` in the first place, and this >>>>>>>>> seems to >>>>>> be >>>>>>>>> an opportunity to clean this up. It was hard to change when we >>>>>>>>> introduced session windows) >>>>>>>>> >>>>>>>> Yes - true we should look into that. >>>>>>>> >>>>>>>> >>>>>>>>> Btw: we do you the imperative groupBy() and groupByKey(), and >>>>>>>>> thus we >>>>>>>>> might also want to use windowBy() (instead of windowed()). Not >>>>>>>>> sure >>>>>> how >>>>>>>>> important this is, but it seems to be inconsistent otherwise. >>>>>>>>> >>>>>>>>> >>>>>>>> Makes sense >>>>>>>> >>>>>>>> >>>>>>>>> About joins: I don't like .withJoinType(JoinType.LEFT) at all. I >>>>>> think, >>>>>>>>> defining an inner/left/outer join is not an optional argument >>>>>>>>> but a >>>>>>>>> first class concept and should have a proper representation in the >>>>>> API >>>>>>>>> (like the current methods join(), leftJoin, outerJoin()). >>>>>>>>> >>>>>>>>> >>>>>>>> Yep, i did originally have it as a required param and maybe that is >>>>>> what >>>>>>> we >>>>>>>> go with. It could have a default, but maybe that is confusing. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> About the two join API proposals, the second one has too much >>>>>>>>> boiler >>>>>>>>> plate code for my taste. Also, the actual join() operator has only >>>>>> one >>>>>>>>> argument what is weird to me, as in my thinking process, the main >>>>>>>>> operator call, should have one parameter per mandatory argument >>>>>>>>> but >>>>>> your >>>>>>>>> proposal put the mandatory arguments into Joins.streamStreamJoin() >>>>>> call. >>>>>>>>> This is far from intuitive IMHO. >>>>>>>>> >>>>>>>>> >>>>>>>> This is the builder pattern, you only need one param as the builder >>>>>> has >>>>>>>> captured all of the required and optional arguments. >>>>>>>> >>>>>>>> >>>>>>>>> The first join proposal also seems to align better with the >>>>>>>>> pattern >>>>>>>>> suggested for aggregations and having the same pattern for all >>>>>> operators >>>>>>>>> is important (as you stated already). >>>>>>>>> >>>>>>>>> >>>>>>>> This is why i offered two alternatives as i started out with. 1 is >>>>>>>> the >>>>>>>> builder pattern, the other is the more fluent pattern. >>>>>>>> >>>>>>>> >>>>>>>>> Coming back to the config vs optional parameter. What about >>>>>>>>> having a >>>>>>>>> method withConfig[s](...) that allow to put in the configuration? >>>>>>>>> >>>>>>>>> >>>>>>>> Sure, it is currently called withLogConfig() as that is the only >>>>>>>> thing >>>>>>> that >>>>>>>> is really config. >>>>>>>> >>>>>>>> >>>>>>>>> This also raises the question if until() is a windows property? >>>>>>>>> Actually, until() seems to be a configuration parameter and thus, >>>>>> should >>>>>>>>> not not have it's own method. >>>>>>>>> >>>>>>>>> >>>>>>>> Hmmm, i don't agree. Until is a property of the window. It is going >>>>>> to be >>>>>>>> potentially different for every window operation you do in a >>>>>>>> streams >>>>>> app. >>>>>>>>> Browsing throw your example DSL branch, I also saw this one: >>>>>>>>> >>>>>>>>>> final KTable<Windowed<String>, Long> windowed> >>>>>>>>> groupedStream.counting() >>>>>>>>>> .windowed(TimeWindows.of(10L).until(10)) >>>>>>>>>> .table(); >>>>>>>>> This is an interesting idea, and it remind my on some feedback >>>>>>>>> about >>>>>> "I >>>>>>>>> wanted to count a stream, but there was no count() method -- I >>>>>>>>> first >>>>>>>>> needed to figure out, that I need to group the stream first to be >>>>>> able >>>>>>>>> to count it. It does make sense in hindsight but was not >>>>>>>>> obvious in >>>>>> the >>>>>>>>> beginning". Thus, carrying out this thought, we could also do the >>>>>>>>> following: >>>>>>>>> >>>>>>>>> stream.count().groupedBy().windowedBy().table(); >>>>>>>>> >>>>>>>>> -> Note, I use "grouped" and "windowed" instead of imperative >>>>>>>>> here, >>>>>> as >>>>>>>>> it comes after the count() >>>>>>>>> >>>>>>>>> This would be more consistent than your proposal (that has >>>>>>>>> grouping >>>>>>>>> before but windowing after count()). It might even allow us to >>>>>>>>> enrich >>>>>>>>> the API with a some syntactic sugar like >>>>>>>>> `stream.count().table()` to >>>>>> get >>>>>>>>> the overall count of all records (this would obviously not scale, >>>>>> but we >>>>>>>>> could support it -- if not now, maybe later). >>>>>>>>> >>>>>>>>> >>>>>>>> I guess i'd prefer >>>>>>>> stream.groupBy().windowBy().count() >>>>>>>> stream.groupBy().windowBy().reduce() >>>>>>>> stream.groupBy().count() >>>>>>>> >>>>>>>> As i said above, everything that happens before the final aggregate >>>>>> call >>>>>>>> can be applied to any of them. So it makes sense to me to do those >>>>>> things >>>>>>>> ahead of the final aggregate call. >>>>>>>> >>>>>>>> >>>>>>>>> Last about builder pattern. I am convinced that we need some >>>>>> "terminal" >>>>>>>>> operator/method that tells us when to add the processor to the >>>>>> topology. >>>>>>>>> But I don't see the need for a plain builder pattern that feels >>>>>> alien to >>>>>>>>> me (see my argument about the second join proposal). Using >>>>>>>>> .stream() >>>>>> / >>>>>>>>> .table() as use in many examples might work. But maybe a more >>>>>>>>> generic >>>>>>>>> name that we can use in all places like build() or apply() might >>>>>> also be >>>>>>>>> an option. >>>>>>>>> >>>>>>>>> >>>>>>>> Sure, a generic name might be ok. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 6/29/17 7:37 AM, Damian Guy wrote: >>>>>>>>>> Thanks Kyle. >>>>>>>>>> >>>>>>>>>> On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman < >>>>>> winkelman.k...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Damian, >>>>>>>>>>> >>>>>>>>>>>>>>> When trying to program in the fluent API that has been >>>>>> discussed >>>>>>>>> most >>>>>>>>>>> it >>>>>>>>>>>>>>> feels difficult to know when you will actually get an object >>>>>> you >>>>>>> can >>>>>>>>>>> reuse. >>>>>>>>>>>>>>> What if I make one KGroupedStream that I want to reuse, >>>>>>>>>>>>>>> is it >>>>>>> legal >>>>>>>>> to >>>>>>>>>>>>>>> reuse it or does this approach expect you to call grouped >>>>>>>>>>>>>>> each >>>>>>> time? >>>>>>>>>>>>> I'd anticipate that once you have a KGroupedStream you can >>>>>> re-use it >>>>>>>>> as >>>>>>>>>>> you >>>>>>>>>>>>> can today. >>>>>>>>>>> You said it yourself in another post that the grouped stream is >>>>>>>>>>> effectively a no-op until a count, reduce, or aggregate. The >>>>>>>>>>> way I >>>>>> see >>>>>>>>> it >>>>>>>>>>> you wouldn’t be able to reuse anything except KStreams and >>>>>>>>>>> KTables, >>>>>>>>> because >>>>>>>>>>> most of this fluent api would continue returning this (this >>>>>>>>>>> being >>>>>> the >>>>>>>>>>> builder object currently being manipulated). >>>>>>>>>> So, if you ever store a reference to anything but KStreams and >>>>>> KTables >>>>>>>>> and >>>>>>>>>>> you use it in two different ways then its possible you make >>>>>>> conflicting >>>>>>>>>>> withXXX() calls on the same builder. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> No necessarily true. It could return a new instance of the >>>>>>>>>> builder, >>>>>>> i.e., >>>>>>>>>> the builders being immutable. So if you held a reference to the >>>>>> builder >>>>>>>>> it >>>>>>>>>> would always be the same as it was when it was created. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> GroupedStream<K,V> groupedStreamWithDefaultSerdes = >>>>>> kStream.grouped(); >>>>>>>>>>> GroupedStream<K,V> groupedStreamWithDeclaredSerdes = >>>>>>>>>>> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I’ll admit that this shouldn’t happen but some user is going >>>>>>>>>>> to do >>>>>> it >>>>>>>>>>> eventually… >>>>>>>>>>> Depending on implementation uses of >>>>>>>>>>> groupedStreamWithDefaultSerdes >>>>>>> would >>>>>>>>>>> most likely be equivalent to the version withDeclaredSerdes. One >>>>>> work >>>>>>>>>>> around would be to always make copies of the config objects you >>>>>>>>>>> are >>>>>>>>>>> building, but this approach has its own problem because now we >>>>>> have to >>>>>>>>>>> identify which configs are equivalent so we don’t create >>>>>>>>>>> repeated >>>>>>>>>>> processors. >>>>>>>>>>> >>>>>>>>>>> The point of this long winded example is that we always have >>>>>>>>>>> to be >>>>>>>>>>> thinking about all of the possible ways it could be misused by a >>>>>> user >>>>>>>>>>> (causing them to see hard to diagnose problems). >>>>>>>>>>> >>>>>>>>>> Exactly! That is the point of the discussion really. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> In my attempt at a couple methods with builders I feel that I >>>>>>>>>>> could >>>>>>>>>>> confidently say the user couldn’t really mess it up. >>>>>>>>>>>> // Count >>>>>>>>>>>> KTable<String, Long> count = >>>>>>>>>>>> >>>>>>> kGroupedStream.count(Count.count().withQueryableStoreName("my-store")); >>>>>>> >>>>>>> >>>>>>>>>>> The kGroupedStream is reusable and if they attempted to reuse >>>>>>>>>>> the >>>>>>> Count >>>>>>>>>>> for some reason it would throw an error message saying that a >>>>>>>>>>> store >>>>>>>>> named >>>>>>>>>>> “my-store” already exists. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Yes i agree and i think using builders is my preferred pattern. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Damian >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Kyle >>>>>>>>>>> >>>>>>>>>>> From: Damian Guy >>>>>>>>>>> Sent: Thursday, June 29, 2017 3:59 AM >>>>>>>>>>> To: d...@kafka.apache.org >>>>>>>>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring >>>>>>>>>>> >>>>>>>>>>> Hi Kyle, >>>>>>>>>>> >>>>>>>>>>> Thanks for your input. Really appreciated. >>>>>>>>>>> >>>>>>>>>>> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman < >>>>>> winkelman.k...@gmail.com >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I like more of a builder pattern even though others have voiced >>>>>>> against >>>>>>>>>>>> it. The reason I like it is because it makes it clear to the >>>>>>>>>>>> user >>>>>>> that >>>>>>>>> a >>>>>>>>>>>> call to KGroupedStream#count will return a KTable not some >>>>>>> intermediate >>>>>>>>>>>> class that I need to undetstand. >>>>>>>>>>>> >>>>>>>>>>> Yes, that makes sense. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> When trying to program in the fluent API that has been >>>>>>>>>>>> discussed >>>>>> most >>>>>>>>> it >>>>>>>>>>>> feels difficult to know when you will actually get an object >>>>>>>>>>>> you >>>>>> can >>>>>>>>>>> reuse. >>>>>>>>>>>> What if I make one KGroupedStream that I want to reuse, is it >>>>>> legal >>>>>>> to >>>>>>>>>>>> reuse it or does this approach expect you to call grouped each >>>>>> time? >>>>>>>>>>> I'd anticipate that once you have a KGroupedStream you can >>>>>>>>>>> re-use >>>>>> it >>>>>>> as >>>>>>>>> you >>>>>>>>>>> can today. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> This question doesn’t pop into my head at all in the builder >>>>>> pattern >>>>>>> I >>>>>>>>>>>> assume I can reuse everything. >>>>>>>>>>>> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a >>>>>>>>>>>> big >>>>>>> fan >>>>>>>>> of >>>>>>>>>>>> the grouped. >>>>>>>>>>>> >>>>>>>>>>>> Yes, grouped() was more for demonstration and because groupBy() >>>>>> and >>>>>>>>>>> groupByKey() were taken! So i'd imagine the api would actually >>>>>> want to >>>>>>>>> be >>>>>>>>>>> groupByKey(/** no required args***/).withOptionalArg() and >>>>>>>>>>> groupBy(KeyValueMapper m).withOpitionalArg(...) of course this >>>>>>>>>>> all >>>>>>>>> depends >>>>>>>>>>> on maintaining backward compatibility. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Unfortunately, the below approach would require atleast 2 >>>>>> (probably >>>>>>> 3) >>>>>>>>>>>> overloads (one for returning a KTable and one for returning a >>>>>> KTable >>>>>>>>> with >>>>>>>>>>>> Windowed Key, probably would want to split windowed and >>>>>>> sessionwindowed >>>>>>>>>>> for >>>>>>>>>>>> ease of implementation) of each count, reduce, and aggregate. >>>>>>>>>>>> Obviously not exhaustive but enough for you to get the picture. >>>>>>> Count, >>>>>>>>>>>> Reduce, and Aggregate supply 3 static methods to initialize the >>>>>>>>> builder: >>>>>>>>>>>> // Count >>>>>>>>>>>> KTable<String, Long> count = >>>>>>>>>>>> >>>>>>> groupedStream.count(Count.count().withQueryableStoreName("my-store")); >>>>>>> >>>>>>>>>>>> // Windowed Count >>>>>>>>>>>> KTable<Windowed<String>, Long> windowedCount = >>>>>>>>>>>> >>>>>>> groupedStream.count(Count.windowed(TimeWindows.of(10L).until >>>>>> (10)).withQueryableStoreName("my-windowed-store")); >>>>>>>>>>>> // Session Count >>>>>>>>>>>> KTable<Windowed<String>, Long> sessionCount = >>>>>>>>>>>> >>>>>>> groupedStream.count(Count.sessionWindowed(SessionWindows. >>>>>> with(10L)).withQueryableStoreName("my-session-windowed-store")); >>>>>>>>>>> Above and below, i think i'd prefer it to be: >>>>>>>>>>> groupedStream.count(/** non windowed count**/) >>>>>>>>>>> groupedStream.windowed(TimeWindows.of(10L)).count(...) >>>>>>>>>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> // Reduce >>>>>>>>>>>> Reducer<Long> reducer; >>>>>>>>>>>> KTable<String, Long> reduce = groupedStream.reduce(reducer, >>>>>>>>>>>> Reduce.reduce().withQueryableStoreName("my-store")); >>>>>>>>>>>> >>>>>>>>>>>> // Aggregate Windowed with Custom Store >>>>>>>>>>>> Initializer<String> initializer; >>>>>>>>>>>> Aggregator<String, Long, String> aggregator; >>>>>>>>>>>> KTable<Windowed<String>, String> aggregate = >>>>>>>>>>>> groupedStream.aggregate(initializer, aggregator, >>>>>>>>>>>> >>>>>>> Aggregate.windowed(TimeWindows.of(10L).until(10)). >>>>>> withStateStoreSupplier(stateStoreSupplier))); >>>>>>>>>>>> // Cogroup SessionWindowed >>>>>>>>>>>> KTable<String, String> cogrouped = >>>>>>> groupedStream1.cogroup(aggregator1) >>>>>>>>>>>> .cogroup(groupedStream2, aggregator2) >>>>>>>>>>>> .aggregate(initializer, aggregator, >>>>>>>>>>>> Aggregate.sessionWindowed(SessionWindows.with(10L), >>>>>>>>>>>> sessionMerger).withQueryableStoreName("my-store")); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> public class Count { >>>>>>>>>>>> >>>>>>>>>>>> public static class Windowed extends Count { >>>>>>>>>>>> private Windows windows; >>>>>>>>>>>> } >>>>>>>>>>>> public static class SessionWindowed extends Count { >>>>>>>>>>>> private SessionWindows sessionWindows; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> public static Count count(); >>>>>>>>>>>> public static Windowed windowed(Windows windows); >>>>>>>>>>>> public static SessionWindowed >>>>>>>>>>>> sessionWindowed(SessionWindows >>>>>>>>>>>> sessionWindows); >>>>>>>>>>>> >>>>>>>>>>>> // All withXXX(...) methods. >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> public class KGroupedStream { >>>>>>>>>>>> public KTable<K, Long> count(Count count); >>>>>>>>>>>> public KTable<Windowed<K>, Long> count(Count.Windowed >>>>>>>>>>>> count); >>>>>>>>>>>> public KTable<Windowed<K>, Long> >>>>>>>>>>>> count(Count.SessionWindowed >>>>>>>>> count); >>>>>>>>>>>> … >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Kyle >>>>>>>>>>>> >>>>>>>>>>>> From: Guozhang Wang >>>>>>>>>>>> Sent: Wednesday, June 28, 2017 7:45 PM >>>>>>>>>>>> To: d...@kafka.apache.org >>>>>>>>>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring >>>>>>>>>>>> >>>>>>>>>>>> I played the current proposal a bit with >>>>>>>>> https://github.com/dguy/kafka/ >>>>>>>>>>>> tree/dsl-experiment < >>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment >>>>>>>>>> , >>>>>>>>>>>> and here are my observations: >>>>>>>>>>>> >>>>>>>>>>>> 1. Personally I prefer >>>>>>>>>>>> >>>>>>>>>>>> "stream.group(mapper) / stream.groupByKey()" >>>>>>>>>>>> >>>>>>>>>>>> than >>>>>>>>>>>> >>>>>>>>>>>> "stream.group().withKeyMapper(mapper) / stream.group()" >>>>>>>>>>>> >>>>>>>>>>>> Since 1) withKeyMapper is not enforced programmatically >>>>>>>>>>>> though it >>>>>> is >>>>>>>>> not >>>>>>>>>>>> "really" optional like others, 2) syntax-wise it reads more >>>>>> natural. >>>>>>>>>>>> I think it is okay to add the APIs in ( >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>> https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr >>>>>> c/main/java/org/apache/kafka/streams/kstream/GroupedStream.java >>>>>>>>>>>> ) >>>>>>>>>>>> in KGroupedStream. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 2. For the "withStateStoreSupplier" API, are the user >>>>>>>>>>>> supposed to >>>>>>> pass >>>>>>>>> in >>>>>>>>>>>> the most-inner state store supplier (e.g. then one whose get() >>>>>> return >>>>>>>>>>>> RocksDBStore), or it is supposed to return the most-outer >>>>>>>>>>>> supplier >>>>>>> with >>>>>>>>>>>> logging / metrics / etc? I think it would be more useful to >>>>>>>>>>>> only >>>>>>>>> require >>>>>>>>>>>> users pass in the inner state store supplier while specifying >>>>>>> caching / >>>>>>>>>>>> logging through other APIs. >>>>>>>>>>>> >>>>>>>>>>>> In addition, the "GroupedWithCustomStore" is a bit >>>>>>>>>>>> suspicious to >>>>>> me: >>>>>>> we >>>>>>>>>>> are >>>>>>>>>>>> allowing users to call other APIs like "withQueryableName" >>>>>> multiple >>>>>>>>> time, >>>>>>>>>>>> but only call "withStateStoreSupplier" only once in the end. >>>>>>>>>>>> Why >>>>>> is >>>>>>>>> that? >>>>>>>>>>>> 3. The current DSL seems to be only for aggregations, what >>>>>>>>>>>> about >>>>>>> joins? >>>>>>>>>>>> 4. I think it is okay to keep the "withLogConfig": for the >>>>>>>>>>>> StateStoreSupplier it will still be user code specifying the >>>>>> topology >>>>>>>>> so >>>>>>>>>>> I >>>>>>>>>>>> do not see there is a big difference. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 5. "WindowedGroupedStream" 's withStateStoreSupplier should >>>>>>>>>>>> take >>>>>> the >>>>>>>>>>>> windowed state store supplier to enforce typing? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Below are minor ones: >>>>>>>>>>>> >>>>>>>>>>>> 6. "withQueryableName": maybe better "withQueryableStateName"? >>>>>>>>>>>> >>>>>>>>>>>> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax < >>>>>>>>> matth...@confluent.io> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I see your point about "when to add the processor to the >>>>>> topology". >>>>>>>>>>> That >>>>>>>>>>>>> is indeed an issue. Not sure it we could allow "updates" to >>>>>>>>>>>>> the >>>>>>>>>>>> topology... >>>>>>>>>>>>> I don't see any problem with having all the withXX() in KTable >>>>>>>>>>> interface >>>>>>>>>>>>> -- but this might be subjective. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> However, I don't understand your argument about putting >>>>>> aggregate() >>>>>>>>>>>>> after the withXX() -- all the calls to withXX() set optional >>>>>>>>> parameters >>>>>>>>>>>>> for aggregate() and not for groupBy() -- but a >>>>>>>>>>>>> groupBy().withXX() >>>>>>>>>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, >>>>>>>>>>>>> this >>>>>>> might >>>>>>>>>>>>> be quite confusion for developers. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> On 6/28/17 2:55 AM, Damian Guy wrote: >>>>>>>>>>>>>>> I also think that mixing optional parameters with configs >>>>>>>>>>>>>>> is a >>>>>> bad >>>>>>>>>>>> idea. >>>>>>>>>>>>>>> Have not proposal for this atm but just wanted to mention >>>>>>>>>>>>>>> it. >>>>>> Hope >>>>>>>>>>> to >>>>>>>>>>>>>>> find some time to come up with something. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Yes, i don't like the mix of config either. But the only real >>>>>>> config >>>>>>>>>>>> here >>>>>>>>>>>>>> is the logging config - which we don't really need as it can >>>>>>> already >>>>>>>>>>> be >>>>>>>>>>>>>> done via a custom StateStoreSupplier. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> What I don't like in the current proposal is the >>>>>>>>>>>>>>> .grouped().withKeyMapper() -- the current solution with >>>>>>>>>>> .groupBy(...) >>>>>>>>>>>>>>> and .groupByKey() seems better. For clarity, we could >>>>>>>>>>>>>>> rename to >>>>>>>>>>>>>>> .groupByNewKey(...) and .groupByCurrentKey() (even if we >>>>>>>>>>>>>>> should >>>>>>> find >>>>>>>>>>>>>>> some better names). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> it could be groupByKey(), groupBy() or something different bt >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> The proposed pattern "chains" grouping and aggregation too >>>>>> close >>>>>>>>>>>>>>> together. I would rather separate both more than less, >>>>>>>>>>>>>>> ie, do >>>>>> into >>>>>>>>>>> the >>>>>>>>>>>>>>> opposite direction. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am also wondering, if we could so something more "fluent". >>>>>> The >>>>>>>>>>>> initial >>>>>>>>>>>>>>> proposal was like: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> groupedStream.count() >>>>>>>>>>>>>>>>> .withStoreName("name") >>>>>>>>>>>>>>>>> .withCachingEnabled(false) >>>>>>>>>>>>>>>>> .withLoggingEnabled(config) >>>>>>>>>>>>>>>>> .table() >>>>>>>>>>>>>>> The .table() statement in the end was kinda alien. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> I agree, but then all of the withXXX methods need to be on >>>>>> KTable >>>>>>>>>>> which >>>>>>>>>>>>> is >>>>>>>>>>>>>> worse in my opinion. You also need something that is going to >>>>>>> "build" >>>>>>>>>>>> the >>>>>>>>>>>>>> internal processors and add them to the topology. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> The current proposal put the count() into the end -- ie, the >>>>>>>>>>> optional >>>>>>>>>>>>>>> parameter for count() have to specified on the .grouped() >>>>>>>>>>>>>>> call >>>>>> -- >>>>>>>>>>> this >>>>>>>>>>>>>>> does not seems to be the best way either. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> I actually prefer this method as you are building a grouped >>>>>> stream >>>>>>>>>>> that >>>>>>>>>>>>> you >>>>>>>>>>>>>> will aggregate. So >>>>>>>>>>> table.grouped(...).withOptionalStuff().aggregate(..) >>>>>>>>>>>>> etc >>>>>>>>>>>>>> seems natural to me. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I did not think this through in detail, but can't we just do >>>>>> the >>>>>>>>>>>> initial >>>>>>>>>>>>>>> proposal with the .table() ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> groupedStream.count().withStoreName("name").mapValues(...) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Each .withXXX(...) return the current KTable and all the >>>>>>> .withXXX() >>>>>>>>>>>> are >>>>>>>>>>>>>>> just added to the KTable interface. Or do I miss anything >>>>>>>>>>>>>>> why >>>>>> this >>>>>>>>>>>> wont' >>>>>>>>>>>>>>> work or any obvious disadvantage? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> See above. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 6/22/17 4:06 AM, Damian Guy wrote: >>>>>>>>>>>>>>>> Thanks everyone. My latest attempt is below. It builds >>>>>>>>>>>>>>>> on the >>>>>>>>>>> fluent >>>>>>>>>>>>>>>> approach, but i think it is slightly nicer. >>>>>>>>>>>>>>>> I agree with some of what Eno said about mixing configy >>>>>>>>>>>>>>>> stuff >>>>>> in >>>>>>>>>>> the >>>>>>>>>>>>> DSL, >>>>>>>>>>>>>>>> but i think that enabling caching and enabling logging are >>>>>> things >>>>>>>>>>>> that >>>>>>>>>>>>>>>> aren't actually config. I'd probably not add >>>>>> withLogConfig(...) >>>>>>>>>>> (even >>>>>>>>>>>>>>>> though it is below) as this is actually config and we >>>>>>>>>>>>>>>> already >>>>>>> have >>>>>>>>>>> a >>>>>>>>>>>>> way >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>> doing that, via the StateStoreSupplier. Arguably we >>>>>>>>>>>>>>>> could use >>>>>> the >>>>>>>>>>>>>>>> StateStoreSupplier for disabling caching etc, but as it >>>>>>>>>>>>>>>> stands >>>>>>> that >>>>>>>>>>>> is >>>>>>>>>>>>> a >>>>>>>>>>>>>>>> bit of a tedious process for someone that just wants to use >>>>>> the >>>>>>>>>>>> default >>>>>>>>>>>>>>>> storage engine, but not have caching enabled. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> There is also an orthogonal concern that Guozhang alluded >>>>>> to.... >>>>>>> If >>>>>>>>>>>> you >>>>>>>>>>>>>>>> want to plug in a custom storage engine and you want it >>>>>>>>>>>>>>>> to be >>>>>>>>>>> logged >>>>>>>>>>>>> etc, >>>>>>>>>>>>>>>> you would currently need to implement that yourself. >>>>>>>>>>>>>>>> Ideally >>>>>> we >>>>>>> can >>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>>> a way where we will wrap the custom store with logging, >>>>>> metrics, >>>>>>>>>>>> etc. I >>>>>>>>>>>>>>>> need to think about where this fits, it is probably more >>>>>>>>>>> appropriate >>>>>>>>>>>> on >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> Stores API. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> final KeyValueMapper<String, String, Long> keyMapper = >>>>>>>>>>>>>>>> null; >>>>>>>>>>>>>>>> // count with mapped key >>>>>>>>>>>>>>>> final KTable<Long, Long> count = stream.grouped() >>>>>>>>>>>>>>>> .withKeyMapper(keyMapper) >>>>>>>>>>>>>>>> .withKeySerde(Serdes.Long()) >>>>>>>>>>>>>>>> .withValueSerde(Serdes.String()) >>>>>>>>>>>>>>>> .withQueryableName("my-store") >>>>>>>>>>>>>>>> .count(); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // windowed count >>>>>>>>>>>>>>>> final KTable<Windowed<String>, Long> windowedCount = >>>>>>>>>>> stream.grouped() >>>>>>>>>>>>>>>> .withQueryableName("my-window-store") >>>>>>>>>>>>>>>> .windowed(TimeWindows.of(10L).until(10)) >>>>>>>>>>>>>>>> .count(); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // windowed reduce >>>>>>>>>>>>>>>> final Reducer<String> windowedReducer = null; >>>>>>>>>>>>>>>> final KTable<Windowed<String>, String> windowedReduce = >>>>>>>>>>>>> stream.grouped() >>>>>>>>>>>>>>>> .withQueryableName("my-window-store") >>>>>>>>>>>>>>>> .windowed(TimeWindows.of(10L).until(10)) >>>>>>>>>>>>>>>> .reduce(windowedReducer); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> final Aggregator<String, String, Long> aggregator = null; >>>>>>>>>>>>>>>> final Initializer<Long> init = null; >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // aggregate >>>>>>>>>>>>>>>> final KTable<String, Long> aggregate = stream.grouped() >>>>>>>>>>>>>>>> .withQueryableName("my-aggregate-store") >>>>>>>>>>>>>>>> .aggregate(aggregator, init, Serdes.Long()); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> final StateStoreSupplier<KeyValueStore<String, Long>> >>>>>>>>>>>>> stateStoreSupplier >>>>>>>>>>>>>>> = null; >>>>>>>>>>>>>>>> // aggregate with custom store >>>>>>>>>>>>>>>> final KTable<String, Long> aggWithCustomStore = >>>>>> stream.grouped() >>>>>>>>>>>>>>>> .withStateStoreSupplier(stateStoreSupplier) >>>>>>>>>>>>>>>> .aggregate(aggregator, init); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // disable caching >>>>>>>>>>>>>>>> stream.grouped() >>>>>>>>>>>>>>>> .withQueryableName("name") >>>>>>>>>>>>>>>> .withCachingEnabled(false) >>>>>>>>>>>>>>>> .count(); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // disable logging >>>>>>>>>>>>>>>> stream.grouped() >>>>>>>>>>>>>>>> .withQueryableName("q") >>>>>>>>>>>>>>>> .withLoggingEnabled(false) >>>>>>>>>>>>>>>> .count(); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // override log config >>>>>>>>>>>>>>>> final Reducer<String> reducer = null; >>>>>>>>>>>>>>>> stream.grouped() >>>>>>>>>>>>>>>> .withLogConfig(Collections.sin >>>>>> gletonMap("segment.size", >>>>>>>>>>>> "10")) >>>>>>>>>>>>>>>> .reduce(reducer); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If anyone wants to play around with this you can find the >>>>>>>>>>>>>>>> code >>>>>>>>>>> here: >>>>>>>>>>>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Note: It won't actually work as most of the methods just >>>>>> return >>>>>>>>>>> null. >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma >>>>>>>>>>>>>>>> <ism...@juma.me.uk> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> Thanks Damian. I think both options have pros and cons. >>>>>>>>>>>>>>>>> And >>>>>> both >>>>>>>>>>> are >>>>>>>>>>>>>>> better >>>>>>>>>>>>>>>>> than overload abuse. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The fluent API approach reads better, no mention of >>>>>>>>>>>>>>>>> builder >>>>>> or >>>>>>>>>>> build >>>>>>>>>>>>>>>>> anywhere. The main downside is that the method signatures >>>>>> are a >>>>>>>>>>>> little >>>>>>>>>>>>>>> less >>>>>>>>>>>>>>>>> clear. By reading the method signature, one doesn't >>>>>> necessarily >>>>>>>>>>>> knows >>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>> it returns. Also, one needs to figure out the special >>>>>>>>>>>>>>>>> method >>>>>>>>>>>>> (`table()` >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> this case) that gives you what you actually care about >>>>>> (`KTable` >>>>>>>>>>> in >>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> case). Not major issues, but worth mentioning while doing >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> comparison. >>>>>>>>>>>>>>>>> The builder approach avoids the issues mentioned above, >>>>>>>>>>>>>>>>> but >>>>>> it >>>>>>>>>>>> doesn't >>>>>>>>>>>>>>> read >>>>>>>>>>>>>>>>> as well. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ismael >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy < >>>>>>> damian....@gmail.com >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'd like to get a discussion going around some of the API >>>>>>> choices >>>>>>>>>>>>> we've >>>>>>>>>>>>>>>>>> made in the DLS. In particular those that relate to >>>>>>>>>>>>>>>>>> stateful >>>>>>>>>>>>> operations >>>>>>>>>>>>>>>>>> (though this could expand). >>>>>>>>>>>>>>>>>> As it stands we lean heavily on overloaded methods in the >>>>>> API, >>>>>>>>>>> i.e, >>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>> are 9 overloads for KGroupedStream.count(..)! It is >>>>>>>>>>>>>>>>>> becoming >>>>>>>>>>> noisy >>>>>>>>>>>>> and >>>>>>>>>>>>>>> i >>>>>>>>>>>>>>>>>> feel it is only going to get worse as we add more >>>>>>>>>>>>>>>>>> optional >>>>>>>>>>> params. >>>>>>>>>>>> In >>>>>>>>>>>>>>>>>> particular we've had some requests to be able to turn >>>>>> caching >>>>>>>>>>> off, >>>>>>>>>>>> or >>>>>>>>>>>>>>>>>> change log configs, on a per operator basis (note >>>>>>>>>>>>>>>>>> this can >>>>>> be >>>>>>>>>>> done >>>>>>>>>>>>> now >>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>> you pass in a StateStoreSupplier, but this can be a bit >>>>>>>>>>>> cumbersome). >>>>>>>>>>>>>>>>>> So this is a bit of an open question. How can we >>>>>>>>>>>>>>>>>> change the >>>>>> DSL >>>>>>>>>>>>>>> overloads >>>>>>>>>>>>>>>>>> so that it flows, is simple to use and understand, and is >>>>>>> easily >>>>>>>>>>>>>>> extended >>>>>>>>>>>>>>>>>> in the future? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> One option would be to use a fluent API approach for >>>>>> providing >>>>>>>>>>> the >>>>>>>>>>>>>>>>> optional >>>>>>>>>>>>>>>>>> params, so something like this: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> groupedStream.count() >>>>>>>>>>>>>>>>>> .withStoreName("name") >>>>>>>>>>>>>>>>>> .withCachingEnabled(false) >>>>>>>>>>>>>>>>>> .withLoggingEnabled(config) >>>>>>>>>>>>>>>>>> .table() >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Another option would be to provide a Builder to the count >>>>>>> method, >>>>>>>>>>>> so >>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>> would look something like this: >>>>>>>>>>>>>>>>>> groupedStream.count(new >>>>>>>>>>>>>>>>>> CountBuilder("storeName").with >>>>>> CachingEnabled(false).build()) >>>>>>>>>>>>>>>>>> Another option is to say: Hey we don't need this, what >>>>>>>>>>>>>>>>>> are >>>>>> you >>>>>>> on >>>>>>>>>>>>>>> about! >>>>>>>>>>>>>>>>>> The above has focussed on state store related overloads, >>>>>>>>>>>>>>>>>> but >>>>>>> the >>>>>>>>>>>> same >>>>>>>>>>>>>>>>> ideas >>>>>>>>>>>>>>>>>> could be applied to joins etc, where we presently have >>>>>>>>>>>>>>>>>> many >>>>>>> join >>>>>>>>>>>>>>> methods >>>>>>>>>>>>>>>>>> and many overloads. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Anyway, i look forward to hearing your opinions. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> -- Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>> > >
signature.asc
Description: OpenPGP digital signature