It's too issues we want to tackle - too many overload (for some method we have already more than 10( - improve custom store API
-Matthias On 7/7/17 3:42 PM, Jan Filipiak wrote: > It makes me want to cry. > > why on earth is the DSL going to expose all its implementation details now? > especially being materialized or not. > > If we want to take usefull steps in that direction maybe we are looking > for a way to let the user switch back and forth between PAPI and DSL? > > A change as the proposed would not eliminate any of my pain points while > still being a heck of work migrating towards to. > > Since I am only following this from the point where Eno CC'ed it into > the users list: > > Can someone please rephrase for me what problem this is trying to solve? > I don't mean to be rude but It uses a problematic feature > "StateStoreSuppliers in DSL" to justify making it even worse. This helps > us nowhere in making the configs more flexible, its just syntactic sugar. > > A low effort shoot like: lets add a properties to operations that would > otherwise become overloaded to heavy? Or pull the configs by some naming > schema > form the overall properties. Additionally to that we get rid of > StateStoreSuppliers in the DSL and have them also configured by said > properties. > > => way easier to migrate to, way less risk, way more flexible in the > future (different implementations of the same operation don't require > code change to configure) > > Line 184 makes especially no sense to me. what is a KTableKTable non > materialized join anyways? > > Hope we can discuss more on this. > > > > On 07.07.2017 17:23, Guozhang Wang wrote: >> I messed the indentation on github code repos; this would be easier to >> read: >> >> https://codeshare.io/GLWW8K >> >> >> Guozhang >> >> >> On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Hi Damian / Kyle, >>> >>> I think I agree with you guys about the pros / cons of using the builder >>> pattern v.s. using some "secondary classes". And I'm thinking if we can >>> take a "mid" manner between these two. I spent some time with a slight >>> different approach from Damian's current proposal: >>> >>> https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/ >>> >>> java/org/apache/kafka/streams/RefactoredAPIs.java >>> >>> The key idea is to tolerate the final "table()" or "stream()" >>> function to >>> "upgrade" from the secondary classes to the first citizen classes, while >>> having all the specs inside this function. Also this proposal >>> includes some >>> other refactoring that people have been discussed about for the >>> builder to >>> reduce the overloaded functions as well. WDYT? >>> >>> >>> Guozhang >>> >>> >>> On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy <damian....@gmail.com> wrote: >>> >>>> Hi Jan, >>>> >>>> Thanks very much for the input. >>>> >>>> On Tue, 4 Jul 2017 at 08:54 Jan Filipiak <jan.filip...@trivago.com> >>>> wrote: >>>> >>>>> Hi Damian, >>>>> >>>>> I do see your point of something needs to change. But I fully agree >>>>> with >>>>> Gouzhang when he says. >>>>> --- >>>>> >>>>> But since this is a incompatibility change, and we are going to remove >>>> the >>>>> compatibility annotations soon it means we only have one chance and we >>>>> really have to make it right. >>>>> ---- >>>>> >>>>> >>>> I think we all agree on this one! Hence the discussion. >>>> >>>> >>>>> I fear all suggestions do not go far enough to become something that >>>> will >>>>> carry on for very much longer. >>>>> I am currently working on KAFKA-3705 and try to find the most easy way >>>> for >>>>> the user to give me all the required functionality. The easiest >>>> interface I >>>>> could come up so far can be looked at here. >>>>> >>>>> >>>>> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2 >>>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/ >>>> kafka/streams/kstream/internals/KTableImpl.java#L622 >>>>> >>>> And its already horribly complicated. I am currently unable to find the >>>>> right abstraction level to have everything falling into place >>>> naturally. To >>>>> be honest I already think introducing >>>>> >>>>> >>>> To be fair that is not a particularly easy problem to solve! >>>> >>>> >>>>> https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2 >>>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/ >>>> kafka/streams/kstream/internals/KTableImpl.java#L493 >>>>> was unideal and makes everything a mess. >>>> >>>> I'm not sure i agree that it makes everything a mess, but It could have >>>> been done differently. >>>> >>>> The JoinType:Whatever is also not really flexible. 2 things come to my >>>> mind: >>>>> 1. I don't think we should rule out config based decisions say configs >>>> like >>>>> streams.$applicationID.joins.$joinname.conf = value >>>>> >>>> Is this just for config? Or are you suggesting that we could somehow >>>> "code" >>>> the join in a config file? >>>> >>>> >>>>> This can allow for tremendous changes without single API change and >>>>> IMO >>>> it >>>>> was not considered enough yet. >>>>> >>>>> 2. Push logic from the DSL to the Callback classes. A ValueJoiner for >>>>> example can be used to implement different join types as the user >>>> wishes. >>>> Do you have an example of how this might look? >>>> >>>> >>>>> As Gouzhang said: stopping to break users is very important. >>>> >>>> Of course. We want to make it as easy as possible for people to use >>>> streams. >>>> >>>> >>>> especially with this changes + All the plans I sadly only have in my >>>> head >>>>> but hopefully the first link can give a glimpse. >>>>> >>>>> Thanks for preparing the examples made it way clearer to me what >>>>> exactly >>>>> we are talking about. I would argue to go a bit slower and more >>>> carefull on >>>>> this one. At some point we need to get it right. Peeking over to the >>>> hadoop >>>>> guys with their hughe userbase. Config files really work well for >>>>> them. >>>>> >>>>> Best Jan >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On 30.06.2017 09:31, Damian Guy wrote: >>>>>> Thanks Matthias >>>>>> >>>>>> On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <matth...@confluent.io> >>>>> wrote: >>>>>>> I am just catching up on this thread, so sorry for the long email in >>>>>>> advance... Also, it's to some extend a dump of thoughts and not >>>> always a >>>>>>> clear proposal. Still need to think about this in more detail. But >>>> maybe >>>>>>> it helps other to get new ideas :) >>>>>>> >>>>>>> >>>>>>>>> However, I don't understand your argument about putting >>>>>>>>> aggregate() >>>>>>>>> after the withXX() -- all the calls to withXX() set optional >>>>> parameters >>>>>>>>> for aggregate() and not for groupBy() -- but a groupBy().withXX() >>>>>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this >>>> might >>>>>>>>> be quite confusion for developers. >>>>>>>>> >>>>>>>>> >>>>>>>> I see what you are saying, but the grouped stream is effectively a >>>>> no-op >>>>>>>> until you call one of the aggregate/count/reduce etc functions. So >>>> the >>>>>>>> optional params are ones that are applicable to any of the >>>> operations >>>>> you >>>>>>>> can perform on this grouped stream. Then the final >>>>>>>> count()/reduce()/aggregate() call has any of the params that are >>>>>>>> required/specific to that function. >>>>>>>> >>>>>>> I understand your argument, but you don't share the conclusion. >>>>>>> If we >>>>>>> need a "final/terminal" call, the better way might be >>>>>>> >>>>>>> .groupBy().count().withXX().build() >>>>>>> >>>>>>> (with a better name for build() though) >>>>>>> >>>>>>> >>>>>> The point is that all the other calls, i.e,withBlah, windowed, etc >>>> apply >>>>>> too all the aggregate functions. The terminal call being the actual >>>> type >>>>> of >>>>>> aggregation you want to do. I personally find this more natural than >>>>>> groupBy().count().withBlah().build() >>>>>> >>>>>> >>>>>>>> groupedStream.count(/** non windowed count**/) >>>>>>>> groupedStream.windowed(TimeWindows.of(10L)).count(...) >>>>>>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) >>>>>>> I like this. However, I don't see a reason to have windowed() and >>>>>>> sessionWindowed(). We should have one top-level `Windows` interface >>>> that >>>>>>> both `TimeWindows` and `SessionWindows` implement and just have a >>>> single >>>>>>> windowed() method that accepts all `Windows`. (I did not like the >>>>>>> separation of `SessionWindows` in the first place, and this seems to >>>> be >>>>>>> an opportunity to clean this up. It was hard to change when we >>>>>>> introduced session windows) >>>>>>> >>>>>> Yes - true we should look into that. >>>>>> >>>>>> >>>>>>> Btw: we do you the imperative groupBy() and groupByKey(), and >>>>>>> thus we >>>>>>> might also want to use windowBy() (instead of windowed()). Not sure >>>> how >>>>>>> important this is, but it seems to be inconsistent otherwise. >>>>>>> >>>>>>> >>>>>> Makes sense >>>>>> >>>>>> >>>>>>> About joins: I don't like .withJoinType(JoinType.LEFT) at all. I >>>> think, >>>>>>> defining an inner/left/outer join is not an optional argument but a >>>>>>> first class concept and should have a proper representation in the >>>> API >>>>>>> (like the current methods join(), leftJoin, outerJoin()). >>>>>>> >>>>>>> >>>>>> Yep, i did originally have it as a required param and maybe that is >>>> what >>>>> we >>>>>> go with. It could have a default, but maybe that is confusing. >>>>>> >>>>>> >>>>>> >>>>>>> About the two join API proposals, the second one has too much boiler >>>>>>> plate code for my taste. Also, the actual join() operator has only >>>> one >>>>>>> argument what is weird to me, as in my thinking process, the main >>>>>>> operator call, should have one parameter per mandatory argument but >>>> your >>>>>>> proposal put the mandatory arguments into Joins.streamStreamJoin() >>>> call. >>>>>>> This is far from intuitive IMHO. >>>>>>> >>>>>>> >>>>>> This is the builder pattern, you only need one param as the builder >>>> has >>>>>> captured all of the required and optional arguments. >>>>>> >>>>>> >>>>>>> The first join proposal also seems to align better with the pattern >>>>>>> suggested for aggregations and having the same pattern for all >>>> operators >>>>>>> is important (as you stated already). >>>>>>> >>>>>>> >>>>>> This is why i offered two alternatives as i started out with. 1 is >>>>>> the >>>>>> builder pattern, the other is the more fluent pattern. >>>>>> >>>>>> >>>>>>> Coming back to the config vs optional parameter. What about having a >>>>>>> method withConfig[s](...) that allow to put in the configuration? >>>>>>> >>>>>>> >>>>>> Sure, it is currently called withLogConfig() as that is the only >>>>>> thing >>>>> that >>>>>> is really config. >>>>>> >>>>>> >>>>>>> This also raises the question if until() is a windows property? >>>>>>> Actually, until() seems to be a configuration parameter and thus, >>>> should >>>>>>> not not have it's own method. >>>>>>> >>>>>>> >>>>>> Hmmm, i don't agree. Until is a property of the window. It is going >>>> to be >>>>>> potentially different for every window operation you do in a streams >>>> app. >>>>>> >>>>>>> Browsing throw your example DSL branch, I also saw this one: >>>>>>> >>>>>>>> final KTable<Windowed<String>, Long> windowed> >>>>>>> groupedStream.counting() >>>>>>>> .windowed(TimeWindows.of(10L).until(10)) >>>>>>>> .table(); >>>>>>> This is an interesting idea, and it remind my on some feedback about >>>> "I >>>>>>> wanted to count a stream, but there was no count() method -- I first >>>>>>> needed to figure out, that I need to group the stream first to be >>>> able >>>>>>> to count it. It does make sense in hindsight but was not obvious in >>>> the >>>>>>> beginning". Thus, carrying out this thought, we could also do the >>>>>>> following: >>>>>>> >>>>>>> stream.count().groupedBy().windowedBy().table(); >>>>>>> >>>>>>> -> Note, I use "grouped" and "windowed" instead of imperative here, >>>> as >>>>>>> it comes after the count() >>>>>>> >>>>>>> This would be more consistent than your proposal (that has grouping >>>>>>> before but windowing after count()). It might even allow us to >>>>>>> enrich >>>>>>> the API with a some syntactic sugar like `stream.count().table()` to >>>> get >>>>>>> the overall count of all records (this would obviously not scale, >>>> but we >>>>>>> could support it -- if not now, maybe later). >>>>>>> >>>>>>> >>>>>> I guess i'd prefer >>>>>> stream.groupBy().windowBy().count() >>>>>> stream.groupBy().windowBy().reduce() >>>>>> stream.groupBy().count() >>>>>> >>>>>> As i said above, everything that happens before the final aggregate >>>> call >>>>>> can be applied to any of them. So it makes sense to me to do those >>>> things >>>>>> ahead of the final aggregate call. >>>>>> >>>>>> >>>>>>> Last about builder pattern. I am convinced that we need some >>>> "terminal" >>>>>>> operator/method that tells us when to add the processor to the >>>> topology. >>>>>>> But I don't see the need for a plain builder pattern that feels >>>> alien to >>>>>>> me (see my argument about the second join proposal). Using .stream() >>>> / >>>>>>> .table() as use in many examples might work. But maybe a more >>>>>>> generic >>>>>>> name that we can use in all places like build() or apply() might >>>> also be >>>>>>> an option. >>>>>>> >>>>>>> >>>>>> Sure, a generic name might be ok. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 6/29/17 7:37 AM, Damian Guy wrote: >>>>>>>> Thanks Kyle. >>>>>>>> >>>>>>>> On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman < >>>> winkelman.k...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Damian, >>>>>>>>> >>>>>>>>>>>>> When trying to program in the fluent API that has been >>>> discussed >>>>>>> most >>>>>>>>> it >>>>>>>>>>>>> feels difficult to know when you will actually get an object >>>> you >>>>> can >>>>>>>>> reuse. >>>>>>>>>>>>> What if I make one KGroupedStream that I want to reuse, is it >>>>> legal >>>>>>> to >>>>>>>>>>>>> reuse it or does this approach expect you to call grouped each >>>>> time? >>>>>>>>>>> I'd anticipate that once you have a KGroupedStream you can >>>> re-use it >>>>>>> as >>>>>>>>> you >>>>>>>>>>> can today. >>>>>>>>> You said it yourself in another post that the grouped stream is >>>>>>>>> effectively a no-op until a count, reduce, or aggregate. The way I >>>> see >>>>>>> it >>>>>>>>> you wouldn’t be able to reuse anything except KStreams and >>>>>>>>> KTables, >>>>>>> because >>>>>>>>> most of this fluent api would continue returning this (this being >>>> the >>>>>>>>> builder object currently being manipulated). >>>>>>>> So, if you ever store a reference to anything but KStreams and >>>> KTables >>>>>>> and >>>>>>>>> you use it in two different ways then its possible you make >>>>> conflicting >>>>>>>>> withXXX() calls on the same builder. >>>>>>>>> >>>>>>>>> >>>>>>>> No necessarily true. It could return a new instance of the builder, >>>>> i.e., >>>>>>>> the builders being immutable. So if you held a reference to the >>>> builder >>>>>>> it >>>>>>>> would always be the same as it was when it was created. >>>>>>>> >>>>>>>> >>>>>>>>> GroupedStream<K,V> groupedStreamWithDefaultSerdes = >>>> kStream.grouped(); >>>>>>>>> GroupedStream<K,V> groupedStreamWithDeclaredSerdes = >>>>>>>>> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…); >>>>>>>>> >>>>>>>>> I’ll admit that this shouldn’t happen but some user is going to do >>>> it >>>>>>>>> eventually… >>>>>>>>> Depending on implementation uses of groupedStreamWithDefaultSerdes >>>>> would >>>>>>>>> most likely be equivalent to the version withDeclaredSerdes. One >>>> work >>>>>>>>> around would be to always make copies of the config objects you >>>>>>>>> are >>>>>>>>> building, but this approach has its own problem because now we >>>> have to >>>>>>>>> identify which configs are equivalent so we don’t create repeated >>>>>>>>> processors. >>>>>>>>> >>>>>>>>> The point of this long winded example is that we always have to be >>>>>>>>> thinking about all of the possible ways it could be misused by a >>>> user >>>>>>>>> (causing them to see hard to diagnose problems). >>>>>>>>> >>>>>>>> Exactly! That is the point of the discussion really. >>>>>>>> >>>>>>>> >>>>>>>>> In my attempt at a couple methods with builders I feel that I >>>>>>>>> could >>>>>>>>> confidently say the user couldn’t really mess it up. >>>>>>>>>> // Count >>>>>>>>>> KTable<String, Long> count = >>>>>>>>>> >>>>> kGroupedStream.count(Count.count().withQueryableStoreName("my-store")); >>>>> >>>>>>>>> The kGroupedStream is reusable and if they attempted to reuse the >>>>> Count >>>>>>>>> for some reason it would throw an error message saying that a >>>>>>>>> store >>>>>>> named >>>>>>>>> “my-store” already exists. >>>>>>>>> >>>>>>>>> >>>>>>>> Yes i agree and i think using builders is my preferred pattern. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Damian >>>>>>>> >>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Kyle >>>>>>>>> >>>>>>>>> From: Damian Guy >>>>>>>>> Sent: Thursday, June 29, 2017 3:59 AM >>>>>>>>> To: dev@kafka.apache.org >>>>>>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring >>>>>>>>> >>>>>>>>> Hi Kyle, >>>>>>>>> >>>>>>>>> Thanks for your input. Really appreciated. >>>>>>>>> >>>>>>>>> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman < >>>> winkelman.k...@gmail.com >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I like more of a builder pattern even though others have voiced >>>>> against >>>>>>>>>> it. The reason I like it is because it makes it clear to the user >>>>> that >>>>>>> a >>>>>>>>>> call to KGroupedStream#count will return a KTable not some >>>>> intermediate >>>>>>>>>> class that I need to undetstand. >>>>>>>>>> >>>>>>>>> Yes, that makes sense. >>>>>>>>> >>>>>>>>> >>>>>>>>>> When trying to program in the fluent API that has been discussed >>>> most >>>>>>> it >>>>>>>>>> feels difficult to know when you will actually get an object you >>>> can >>>>>>>>> reuse. >>>>>>>>>> What if I make one KGroupedStream that I want to reuse, is it >>>> legal >>>>> to >>>>>>>>>> reuse it or does this approach expect you to call grouped each >>>> time? >>>>>>>>> I'd anticipate that once you have a KGroupedStream you can re-use >>>> it >>>>> as >>>>>>> you >>>>>>>>> can today. >>>>>>>>> >>>>>>>>> >>>>>>>>>> This question doesn’t pop into my head at all in the builder >>>> pattern >>>>> I >>>>>>>>>> assume I can reuse everything. >>>>>>>>>> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a >>>>>>>>>> big >>>>> fan >>>>>>> of >>>>>>>>>> the grouped. >>>>>>>>>> >>>>>>>>>> Yes, grouped() was more for demonstration and because groupBy() >>>> and >>>>>>>>> groupByKey() were taken! So i'd imagine the api would actually >>>> want to >>>>>>> be >>>>>>>>> groupByKey(/** no required args***/).withOptionalArg() and >>>>>>>>> groupBy(KeyValueMapper m).withOpitionalArg(...) of course this >>>>>>>>> all >>>>>>> depends >>>>>>>>> on maintaining backward compatibility. >>>>>>>>> >>>>>>>>> >>>>>>>>>> Unfortunately, the below approach would require atleast 2 >>>> (probably >>>>> 3) >>>>>>>>>> overloads (one for returning a KTable and one for returning a >>>> KTable >>>>>>> with >>>>>>>>>> Windowed Key, probably would want to split windowed and >>>>> sessionwindowed >>>>>>>>> for >>>>>>>>>> ease of implementation) of each count, reduce, and aggregate. >>>>>>>>>> Obviously not exhaustive but enough for you to get the picture. >>>>> Count, >>>>>>>>>> Reduce, and Aggregate supply 3 static methods to initialize the >>>>>>> builder: >>>>>>>>>> // Count >>>>>>>>>> KTable<String, Long> count = >>>>>>>>>> >>>>> groupedStream.count(Count.count().withQueryableStoreName("my-store")); >>>>>>>>>> // Windowed Count >>>>>>>>>> KTable<Windowed<String>, Long> windowedCount = >>>>>>>>>> >>>>> groupedStream.count(Count.windowed(TimeWindows.of(10L).until >>>> (10)).withQueryableStoreName("my-windowed-store")); >>>>>>>>>> // Session Count >>>>>>>>>> KTable<Windowed<String>, Long> sessionCount = >>>>>>>>>> >>>>> groupedStream.count(Count.sessionWindowed(SessionWindows. >>>> with(10L)).withQueryableStoreName("my-session-windowed-store")); >>>>>>>>> Above and below, i think i'd prefer it to be: >>>>>>>>> groupedStream.count(/** non windowed count**/) >>>>>>>>> groupedStream.windowed(TimeWindows.of(10L)).count(...) >>>>>>>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> // Reduce >>>>>>>>>> Reducer<Long> reducer; >>>>>>>>>> KTable<String, Long> reduce = groupedStream.reduce(reducer, >>>>>>>>>> Reduce.reduce().withQueryableStoreName("my-store")); >>>>>>>>>> >>>>>>>>>> // Aggregate Windowed with Custom Store >>>>>>>>>> Initializer<String> initializer; >>>>>>>>>> Aggregator<String, Long, String> aggregator; >>>>>>>>>> KTable<Windowed<String>, String> aggregate = >>>>>>>>>> groupedStream.aggregate(initializer, aggregator, >>>>>>>>>> >>>>> Aggregate.windowed(TimeWindows.of(10L).until(10)). >>>> withStateStoreSupplier(stateStoreSupplier))); >>>>>>>>>> // Cogroup SessionWindowed >>>>>>>>>> KTable<String, String> cogrouped = >>>>> groupedStream1.cogroup(aggregator1) >>>>>>>>>> .cogroup(groupedStream2, aggregator2) >>>>>>>>>> .aggregate(initializer, aggregator, >>>>>>>>>> Aggregate.sessionWindowed(SessionWindows.with(10L), >>>>>>>>>> sessionMerger).withQueryableStoreName("my-store")); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> public class Count { >>>>>>>>>> >>>>>>>>>> public static class Windowed extends Count { >>>>>>>>>> private Windows windows; >>>>>>>>>> } >>>>>>>>>> public static class SessionWindowed extends Count { >>>>>>>>>> private SessionWindows sessionWindows; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> public static Count count(); >>>>>>>>>> public static Windowed windowed(Windows windows); >>>>>>>>>> public static SessionWindowed >>>>>>>>>> sessionWindowed(SessionWindows >>>>>>>>>> sessionWindows); >>>>>>>>>> >>>>>>>>>> // All withXXX(...) methods. >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> public class KGroupedStream { >>>>>>>>>> public KTable<K, Long> count(Count count); >>>>>>>>>> public KTable<Windowed<K>, Long> count(Count.Windowed >>>>>>>>>> count); >>>>>>>>>> public KTable<Windowed<K>, Long> >>>>>>>>>> count(Count.SessionWindowed >>>>>>> count); >>>>>>>>>> … >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Kyle >>>>>>>>>> >>>>>>>>>> From: Guozhang Wang >>>>>>>>>> Sent: Wednesday, June 28, 2017 7:45 PM >>>>>>>>>> To: dev@kafka.apache.org >>>>>>>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring >>>>>>>>>> >>>>>>>>>> I played the current proposal a bit with >>>>>>> https://github.com/dguy/kafka/ >>>>>>>>>> tree/dsl-experiment < >>>>> https://github.com/dguy/kafka/tree/dsl-experiment >>>>>>>> , >>>>>>>>>> and here are my observations: >>>>>>>>>> >>>>>>>>>> 1. Personally I prefer >>>>>>>>>> >>>>>>>>>> "stream.group(mapper) / stream.groupByKey()" >>>>>>>>>> >>>>>>>>>> than >>>>>>>>>> >>>>>>>>>> "stream.group().withKeyMapper(mapper) / stream.group()" >>>>>>>>>> >>>>>>>>>> Since 1) withKeyMapper is not enforced programmatically though it >>>> is >>>>>>> not >>>>>>>>>> "really" optional like others, 2) syntax-wise it reads more >>>> natural. >>>>>>>>>> I think it is okay to add the APIs in ( >>>>>>>>>> >>>>>>>>>> >>>>> https://github.com/dguy/kafka/blob/dsl-experiment/streams/sr >>>> c/main/java/org/apache/kafka/streams/kstream/GroupedStream.java >>>>>>>>>> ) >>>>>>>>>> in KGroupedStream. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2. For the "withStateStoreSupplier" API, are the user supposed to >>>>> pass >>>>>>> in >>>>>>>>>> the most-inner state store supplier (e.g. then one whose get() >>>> return >>>>>>>>>> RocksDBStore), or it is supposed to return the most-outer >>>>>>>>>> supplier >>>>> with >>>>>>>>>> logging / metrics / etc? I think it would be more useful to only >>>>>>> require >>>>>>>>>> users pass in the inner state store supplier while specifying >>>>> caching / >>>>>>>>>> logging through other APIs. >>>>>>>>>> >>>>>>>>>> In addition, the "GroupedWithCustomStore" is a bit suspicious to >>>> me: >>>>> we >>>>>>>>> are >>>>>>>>>> allowing users to call other APIs like "withQueryableName" >>>> multiple >>>>>>> time, >>>>>>>>>> but only call "withStateStoreSupplier" only once in the end. Why >>>> is >>>>>>> that? >>>>>>>>>> 3. The current DSL seems to be only for aggregations, what about >>>>> joins? >>>>>>>>>> >>>>>>>>>> 4. I think it is okay to keep the "withLogConfig": for the >>>>>>>>>> StateStoreSupplier it will still be user code specifying the >>>> topology >>>>>>> so >>>>>>>>> I >>>>>>>>>> do not see there is a big difference. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take >>>> the >>>>>>>>>> windowed state store supplier to enforce typing? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Below are minor ones: >>>>>>>>>> >>>>>>>>>> 6. "withQueryableName": maybe better "withQueryableStateName"? >>>>>>>>>> >>>>>>>>>> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax < >>>>>>> matth...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I see your point about "when to add the processor to the >>>> topology". >>>>>>>>> That >>>>>>>>>>> is indeed an issue. Not sure it we could allow "updates" to the >>>>>>>>>> topology... >>>>>>>>>>> I don't see any problem with having all the withXX() in KTable >>>>>>>>> interface >>>>>>>>>>> -- but this might be subjective. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> However, I don't understand your argument about putting >>>> aggregate() >>>>>>>>>>> after the withXX() -- all the calls to withXX() set optional >>>>>>> parameters >>>>>>>>>>> for aggregate() and not for groupBy() -- but a >>>>>>>>>>> groupBy().withXX() >>>>>>>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this >>>>> might >>>>>>>>>>> be quite confusion for developers. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 6/28/17 2:55 AM, Damian Guy wrote: >>>>>>>>>>>>> I also think that mixing optional parameters with configs is a >>>> bad >>>>>>>>>> idea. >>>>>>>>>>>>> Have not proposal for this atm but just wanted to mention it. >>>> Hope >>>>>>>>> to >>>>>>>>>>>>> find some time to come up with something. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> Yes, i don't like the mix of config either. But the only real >>>>> config >>>>>>>>>> here >>>>>>>>>>>> is the logging config - which we don't really need as it can >>>>> already >>>>>>>>> be >>>>>>>>>>>> done via a custom StateStoreSupplier. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> What I don't like in the current proposal is the >>>>>>>>>>>>> .grouped().withKeyMapper() -- the current solution with >>>>>>>>> .groupBy(...) >>>>>>>>>>>>> and .groupByKey() seems better. For clarity, we could >>>>>>>>>>>>> rename to >>>>>>>>>>>>> .groupByNewKey(...) and .groupByCurrentKey() (even if we >>>>>>>>>>>>> should >>>>> find >>>>>>>>>>>>> some better names). >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> it could be groupByKey(), groupBy() or something different bt >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> The proposed pattern "chains" grouping and aggregation too >>>> close >>>>>>>>>>>>> together. I would rather separate both more than less, ie, do >>>> into >>>>>>>>> the >>>>>>>>>>>>> opposite direction. >>>>>>>>>>>>> >>>>>>>>>>>>> I am also wondering, if we could so something more "fluent". >>>> The >>>>>>>>>> initial >>>>>>>>>>>>> proposal was like: >>>>>>>>>>>>> >>>>>>>>>>>>>>> groupedStream.count() >>>>>>>>>>>>>>> .withStoreName("name") >>>>>>>>>>>>>>> .withCachingEnabled(false) >>>>>>>>>>>>>>> .withLoggingEnabled(config) >>>>>>>>>>>>>>> .table() >>>>>>>>>>>>> The .table() statement in the end was kinda alien. >>>>>>>>>>>>> >>>>>>>>>>>> I agree, but then all of the withXXX methods need to be on >>>> KTable >>>>>>>>> which >>>>>>>>>>> is >>>>>>>>>>>> worse in my opinion. You also need something that is going to >>>>> "build" >>>>>>>>>> the >>>>>>>>>>>> internal processors and add them to the topology. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> The current proposal put the count() into the end -- ie, the >>>>>>>>> optional >>>>>>>>>>>>> parameter for count() have to specified on the .grouped() call >>>> -- >>>>>>>>> this >>>>>>>>>>>>> does not seems to be the best way either. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> I actually prefer this method as you are building a grouped >>>> stream >>>>>>>>> that >>>>>>>>>>> you >>>>>>>>>>>> will aggregate. So >>>>>>>>> table.grouped(...).withOptionalStuff().aggregate(..) >>>>>>>>>>> etc >>>>>>>>>>>> seems natural to me. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> I did not think this through in detail, but can't we just do >>>> the >>>>>>>>>> initial >>>>>>>>>>>>> proposal with the .table() ? >>>>>>>>>>>>> >>>>>>>>>>>>> groupedStream.count().withStoreName("name").mapValues(...) >>>>>>>>>>>>> >>>>>>>>>>>>> Each .withXXX(...) return the current KTable and all the >>>>> .withXXX() >>>>>>>>>> are >>>>>>>>>>>>> just added to the KTable interface. Or do I miss anything why >>>> this >>>>>>>>>> wont' >>>>>>>>>>>>> work or any obvious disadvantage? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> See above. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> On 6/22/17 4:06 AM, Damian Guy wrote: >>>>>>>>>>>>>> Thanks everyone. My latest attempt is below. It builds on the >>>>>>>>> fluent >>>>>>>>>>>>>> approach, but i think it is slightly nicer. >>>>>>>>>>>>>> I agree with some of what Eno said about mixing configy stuff >>>> in >>>>>>>>> the >>>>>>>>>>> DSL, >>>>>>>>>>>>>> but i think that enabling caching and enabling logging are >>>> things >>>>>>>>>> that >>>>>>>>>>>>>> aren't actually config. I'd probably not add >>>> withLogConfig(...) >>>>>>>>> (even >>>>>>>>>>>>>> though it is below) as this is actually config and we already >>>>> have >>>>>>>>> a >>>>>>>>>>> way >>>>>>>>>>>>> of >>>>>>>>>>>>>> doing that, via the StateStoreSupplier. Arguably we could use >>>> the >>>>>>>>>>>>>> StateStoreSupplier for disabling caching etc, but as it >>>>>>>>>>>>>> stands >>>>> that >>>>>>>>>> is >>>>>>>>>>> a >>>>>>>>>>>>>> bit of a tedious process for someone that just wants to use >>>> the >>>>>>>>>> default >>>>>>>>>>>>>> storage engine, but not have caching enabled. >>>>>>>>>>>>>> >>>>>>>>>>>>>> There is also an orthogonal concern that Guozhang alluded >>>> to.... >>>>> If >>>>>>>>>> you >>>>>>>>>>>>>> want to plug in a custom storage engine and you want it to be >>>>>>>>> logged >>>>>>>>>>> etc, >>>>>>>>>>>>>> you would currently need to implement that yourself. Ideally >>>> we >>>>> can >>>>>>>>>>>>> provide >>>>>>>>>>>>>> a way where we will wrap the custom store with logging, >>>> metrics, >>>>>>>>>> etc. I >>>>>>>>>>>>>> need to think about where this fits, it is probably more >>>>>>>>> appropriate >>>>>>>>>> on >>>>>>>>>>>>> the >>>>>>>>>>>>>> Stores API. >>>>>>>>>>>>>> >>>>>>>>>>>>>> final KeyValueMapper<String, String, Long> keyMapper = null; >>>>>>>>>>>>>> // count with mapped key >>>>>>>>>>>>>> final KTable<Long, Long> count = stream.grouped() >>>>>>>>>>>>>> .withKeyMapper(keyMapper) >>>>>>>>>>>>>> .withKeySerde(Serdes.Long()) >>>>>>>>>>>>>> .withValueSerde(Serdes.String()) >>>>>>>>>>>>>> .withQueryableName("my-store") >>>>>>>>>>>>>> .count(); >>>>>>>>>>>>>> >>>>>>>>>>>>>> // windowed count >>>>>>>>>>>>>> final KTable<Windowed<String>, Long> windowedCount = >>>>>>>>> stream.grouped() >>>>>>>>>>>>>> .withQueryableName("my-window-store") >>>>>>>>>>>>>> .windowed(TimeWindows.of(10L).until(10)) >>>>>>>>>>>>>> .count(); >>>>>>>>>>>>>> >>>>>>>>>>>>>> // windowed reduce >>>>>>>>>>>>>> final Reducer<String> windowedReducer = null; >>>>>>>>>>>>>> final KTable<Windowed<String>, String> windowedReduce = >>>>>>>>>>> stream.grouped() >>>>>>>>>>>>>> .withQueryableName("my-window-store") >>>>>>>>>>>>>> .windowed(TimeWindows.of(10L).until(10)) >>>>>>>>>>>>>> .reduce(windowedReducer); >>>>>>>>>>>>>> >>>>>>>>>>>>>> final Aggregator<String, String, Long> aggregator = null; >>>>>>>>>>>>>> final Initializer<Long> init = null; >>>>>>>>>>>>>> >>>>>>>>>>>>>> // aggregate >>>>>>>>>>>>>> final KTable<String, Long> aggregate = stream.grouped() >>>>>>>>>>>>>> .withQueryableName("my-aggregate-store") >>>>>>>>>>>>>> .aggregate(aggregator, init, Serdes.Long()); >>>>>>>>>>>>>> >>>>>>>>>>>>>> final StateStoreSupplier<KeyValueStore<String, Long>> >>>>>>>>>>> stateStoreSupplier >>>>>>>>>>>>> = null; >>>>>>>>>>>>>> // aggregate with custom store >>>>>>>>>>>>>> final KTable<String, Long> aggWithCustomStore = >>>> stream.grouped() >>>>>>>>>>>>>> .withStateStoreSupplier(stateStoreSupplier) >>>>>>>>>>>>>> .aggregate(aggregator, init); >>>>>>>>>>>>>> >>>>>>>>>>>>>> // disable caching >>>>>>>>>>>>>> stream.grouped() >>>>>>>>>>>>>> .withQueryableName("name") >>>>>>>>>>>>>> .withCachingEnabled(false) >>>>>>>>>>>>>> .count(); >>>>>>>>>>>>>> >>>>>>>>>>>>>> // disable logging >>>>>>>>>>>>>> stream.grouped() >>>>>>>>>>>>>> .withQueryableName("q") >>>>>>>>>>>>>> .withLoggingEnabled(false) >>>>>>>>>>>>>> .count(); >>>>>>>>>>>>>> >>>>>>>>>>>>>> // override log config >>>>>>>>>>>>>> final Reducer<String> reducer = null; >>>>>>>>>>>>>> stream.grouped() >>>>>>>>>>>>>> .withLogConfig(Collections.sin >>>> gletonMap("segment.size", >>>>>>>>>> "10")) >>>>>>>>>>>>>> .reduce(reducer); >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> If anyone wants to play around with this you can find the >>>>>>>>>>>>>> code >>>>>>>>> here: >>>>>>>>>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment >>>>>>>>>>>>>> >>>>>>>>>>>>>> Note: It won't actually work as most of the methods just >>>> return >>>>>>>>> null. >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Damian >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> >>>>>>>>> wrote: >>>>>>>>>>>>>>> Thanks Damian. I think both options have pros and cons. And >>>> both >>>>>>>>> are >>>>>>>>>>>>> better >>>>>>>>>>>>>>> than overload abuse. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The fluent API approach reads better, no mention of builder >>>> or >>>>>>>>> build >>>>>>>>>>>>>>> anywhere. The main downside is that the method signatures >>>> are a >>>>>>>>>> little >>>>>>>>>>>>> less >>>>>>>>>>>>>>> clear. By reading the method signature, one doesn't >>>> necessarily >>>>>>>>>> knows >>>>>>>>>>>>> what >>>>>>>>>>>>>>> it returns. Also, one needs to figure out the special method >>>>>>>>>>> (`table()` >>>>>>>>>>>>> in >>>>>>>>>>>>>>> this case) that gives you what you actually care about >>>> (`KTable` >>>>>>>>> in >>>>>>>>>>> this >>>>>>>>>>>>>>> case). Not major issues, but worth mentioning while doing >>>>>>>>>>>>>>> the >>>>>>>>>>>>> comparison. >>>>>>>>>>>>>>> The builder approach avoids the issues mentioned above, but >>>> it >>>>>>>>>> doesn't >>>>>>>>>>>>> read >>>>>>>>>>>>>>> as well. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ismael >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy < >>>>> damian....@gmail.com >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'd like to get a discussion going around some of the API >>>>> choices >>>>>>>>>>> we've >>>>>>>>>>>>>>>> made in the DLS. In particular those that relate to >>>>>>>>>>>>>>>> stateful >>>>>>>>>>> operations >>>>>>>>>>>>>>>> (though this could expand). >>>>>>>>>>>>>>>> As it stands we lean heavily on overloaded methods in the >>>> API, >>>>>>>>> i.e, >>>>>>>>>>>>> there >>>>>>>>>>>>>>>> are 9 overloads for KGroupedStream.count(..)! It is >>>>>>>>>>>>>>>> becoming >>>>>>>>> noisy >>>>>>>>>>> and >>>>>>>>>>>>> i >>>>>>>>>>>>>>>> feel it is only going to get worse as we add more optional >>>>>>>>> params. >>>>>>>>>> In >>>>>>>>>>>>>>>> particular we've had some requests to be able to turn >>>> caching >>>>>>>>> off, >>>>>>>>>> or >>>>>>>>>>>>>>>> change log configs, on a per operator basis (note this can >>>> be >>>>>>>>> done >>>>>>>>>>> now >>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>> you pass in a StateStoreSupplier, but this can be a bit >>>>>>>>>> cumbersome). >>>>>>>>>>>>>>>> So this is a bit of an open question. How can we change the >>>> DSL >>>>>>>>>>>>> overloads >>>>>>>>>>>>>>>> so that it flows, is simple to use and understand, and is >>>>> easily >>>>>>>>>>>>> extended >>>>>>>>>>>>>>>> in the future? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> One option would be to use a fluent API approach for >>>> providing >>>>>>>>> the >>>>>>>>>>>>>>> optional >>>>>>>>>>>>>>>> params, so something like this: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> groupedStream.count() >>>>>>>>>>>>>>>> .withStoreName("name") >>>>>>>>>>>>>>>> .withCachingEnabled(false) >>>>>>>>>>>>>>>> .withLoggingEnabled(config) >>>>>>>>>>>>>>>> .table() >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Another option would be to provide a Builder to the count >>>>> method, >>>>>>>>>> so >>>>>>>>>>> it >>>>>>>>>>>>>>>> would look something like this: >>>>>>>>>>>>>>>> groupedStream.count(new >>>>>>>>>>>>>>>> CountBuilder("storeName").with >>>> CachingEnabled(false).build()) >>>>>>>>>>>>>>>> Another option is to say: Hey we don't need this, what are >>>> you >>>>> on >>>>>>>>>>>>> about! >>>>>>>>>>>>>>>> The above has focussed on state store related overloads, >>>>>>>>>>>>>>>> but >>>>> the >>>>>>>>>> same >>>>>>>>>>>>>>> ideas >>>>>>>>>>>>>>>> could be applied to joins etc, where we presently have >>>>>>>>>>>>>>>> many >>>>> join >>>>>>>>>>>>> methods >>>>>>>>>>>>>>>> and many overloads. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Anyway, i look forward to hearing your opinions. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>>> >>>>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> > >
signature.asc
Description: OpenPGP digital signature