Just to add to Damian's points, I think the optimizations for KAFKA-5581 can be done as a second step after this API refactoring, since it should not require any further public API changes and hence to me a bit orthogonal to this KIP. To illustrate my thinking:
``` someOps.table(); // the library may or may not materialize the resulted KTable in a new store depending on the // optimizations we have; but that is abstracted away from the user and even if // there is a materialized store, it is not exposed for querying. someOps.table(Materialized.as("store1")) // the library may or may not ACTUALLY materialize the resulted KTable // in a new store depending on the optimizations; // if it ever decides to materialize it will choose whatever the default // storage engine, but it will expose either the actual state store or // just the "logical view" for users to query. someOps.table(Materialized.as(MyStoreSupplier)) // the library was forced by user to materialize the resulted KTable // with the provided store supplier's storage engine; the users can // always query it with the supplier's name reference. ``` So, the only case that we have to materialize the store is when user did enforce a store supplier indicating that she wants to physically materialize it with the provided storage engine; in other cases the library still have the freedom to choose whether to materialize or not. Guozhang On Wed, Jul 19, 2017 at 3:03 AM, Damian Guy <damian....@gmail.com> wrote: > Hi Jan, > > Thanks for your input. Comments inline > > On Tue, 18 Jul 2017 at 15:21 Jan Filipiak <jan.filip...@trivago.com> > wrote: > >> Hi, >> >> >> 1. To many overloads: >> Currently, whenever a KTable is the result of an operation it gets and >> override with stateStoreName, and StatestoreSupplier in case people want >> to query that. >> This is what produces 2/3rd of the overloaded methods right now (not >> counting methods returning KStream) >> >> > As you state further down we are trying to address this. > > >> 2. Code copy and pasting. >> Almost all KTableProcessorSuppliers have the same block of (if(name != >> null) store.put(k,v)) >> >> > Yes, i agree. That is related to the KTable queryable store etc, and can > easily be addressed, but isn't necessarily part of this as it doesn't need > to be a public interface change, i.e., we can clean that up in the > background. > > >> 3. Runtime inefficiencies. >> Each querable table almost instantly causes a another store beeing >> required. Storing equivalent data of upstream KTables. >> > > Agreed. Again, this is not a public interface change. We don't need > another store, i.e., we can just use a "View" on the existing store, which > is basically just using the KTableValueGetter to apply the map or filter > operation to the original store. We also have this jira > https://issues.apache.org/jira/browse/KAFKA-5581 to look into optimizing > when we do and don't need to add additional changelogs. > > >> >> So I really see us tackeling only the first part currently. Wich in my >> opinion is to short-sighted to settle on an Public API. >> > > We are not settling on the public API. We do, however need to do KIPs for > public API discussions. For internal changes we don't necessarily need to > have a public discussion about it. > > >> This is why I want to tackle our approach to IQ-first, as it seems to me >> to be the most disruptive thing. And the cause of most problems. >> >> The Plan: >> >> Table from topic, kstream (don't even like this one, but probaly needed >> for some kind of enhanced flexibility) or aggregations would be the only >> KTables that would get associated with a statestore (their processors). >> For these operations one can have the "statestoresupplier" overload but >> also not the "querablestatestore" overload. From this point on KTables >> abstraction would be considered restored. >> All the overloads of join and through with respect to IQ would go away. >> "through" would go completely maybe the benefit added is. The method I >> would add is for a table to get a Queryhandle. >> This query handle will underneath remember its tables processor name. To >> access the data form IQ we would not rely on the "per processor >> statestore" but go the usual path through ValueGetterSupplier. >> *Note:* We do not necessarily have a Serde for V, especially after >> >> mapValues. also not for any intermediate Data types. It would be each >> KTableProccesors job to provide a serialized version of upstream >> Datatypes. >> KTableKTabkeJoinwould need to bring a JoinInputSerializer<V1,V2> that >> would serialize both upstream values for transport across boxes. >> >> This first step would kill all the "Storename" based overloads + many >> Statestore overloads. It would also avoid the bloated copy pasting in >> each KTableProcessor for maintaining the store. >> It would also make the runtime more efficient in a way that it does not >> store the same data twice, just for accessing from IQ. Tackeling problem >> 1 but also all other three problems mentioned above. >> >> From here ~3 or 4 (from kstream,topic or aggregate) methods would still >> be stuck with StateStoresupplier overload. For me, this is quite an >> improvement already, to reduce further overloads I am thinking >> to put a nullable properties to this operations. If people want to use >> all defaults they could throw in null and it wouldn't be to painfull. >> That doesn't necessarily require >> them to have config files laying around. They could if they wanted use >> property files to create such properties + we would over to look for >> configs in the streams property. >> So the complexity of distributing property files is optional and the >> user might choose to fill the configs by code or files. >> >> I think these steps can rescue the proper abstraction of a KTable. I >> believe that with the current proposals we are only sugarcoating problem >> 1 and end up with a broken idea of what KTable is. >> I think it will be even harder to develop further from there. Interface >> wise my proposal is like developing backwards as i am very certain we >> did a wrong turn with the IQ we shouldn't try to carry through. >> >> I hope I could explain how this re factoring can tackle the 3 above >> problems and especially why i don't think we can win tackiling only >> point 1 in the long run. >> If anything would need an implementation draft please feel free to ask >> me to provide one. Initially the proposal hopefully would get the job >> done of just removing clutter. >> >> > > I agree with some of what you have said in the above few paragraphs. I > think you are correct in that KTable has become littered with a bunch of > methods to make each stage queryable, i.e, adding the overloads > for queryableStoreName and StateStoreSupplier. I think we can do away with > both of them as once you have a KTable you can always build a view of it by > using the KTableValueGetter. So we don't ever need a StateStoreSupplier as > we have one already from when the original KTable was created. We can also > possibly remove the overloads with queryableName and always use a generated > name that can be retrieved from the method `String queryableStoreName` - > this can then be used with IQ if needed. > > The getQueryHandle idea you mention won't really work as things stand. The > KTable knows nothing about it's runtime context it is purely for building a > topology that can be executed. In order to successfully query a `KTable` > (state store) we need to know how many partitions and on which threads the > state stores are running. This is why we added the `stores` API to > `KafkaStreams` as this is the execution environment that has all of the > information. > > > Thanks, > Damian > > Looking forward to your comments. >> >> Best Jan >> >> >> >> On 12.07.2017 21:27, Guozhang Wang wrote: >> > Hello Jan, >> > >> > Thanks for your feedbacks. Let me try to clarify a few things with the >> > problems that we are trying to resolve and the motivations with the >> > current proposals. >> > >> > As Matthias mentioned, one issue that we are trying to tackle is to >> > reduce the number of overloaded functions in the DSL due to serde >> > overridden / state store supplier overridden that are needed for >> > repartition, or for state store materializations. Another related >> > issue is that the current overridden state store supplier is not very >> > natural to use, for example: >> > >> > 1) If a user just want to disable caching / logging etc but do not >> > want to change the underlying store engine at all, she needs to learn >> > to know that, for example, if a windowed store or key-value store is >> > needed for this specific operator in the DSL, what serdes are needed >> > for materialize the store, in order to create a StateStoreSupplier >> > with caching / logging disabled, and then pass into the DSL. >> > >> > 2) Similarly, if a user just want to set different topic configs for >> > the changelog topic, she still need to specify the whole >> > StateStoreSupplier into the operator. >> > >> > 3) If a user want to use a different store engine (e.g. MyStore than >> > RocksDBStore) underneath but do not care about the default settings >> > for logging, caching, etc, he STILL needs to pass in the whole >> > StateStoreSupplier into the operator. >> > >> > Note that all the above scenarios are for advanced users who do want >> > to override these settings, for users who are just OK with the default >> > settings they should be not exposed with such APIs at all, like you >> > said, "I do not be exposed with any of such implementation details", >> > if you do not care. >> > >> > ----------------- >> > >> > We have been talking about the configs v.s. code for such settings, >> > since we have been using configs for "global" default configs; but the >> > arguments against using configs for such per-operator / per-store >> > settings as well is that it will simply make configs hard to manage / >> > hard to wire with tools. Personally speaking, I'm not a big fan of >> > using configs for per-entity overrides and that is mainly from my >> > experience with Samza:Samza inherits exactly the same approach for >> > per-stream / per-source configs: >> > >> > http://samza.apache.org/learn/documentation/0.13/jobs/ >> configuration-table.html >> > <http://samza.apache.org/learn/documentation/0.13/jobs/ >> configuration-table.html> ([system-name][stream-id] >> > etc are all place-holders) >> > >> > The main issues were 1) users making config changes need to deploy >> > this to all the instances, I think for Streams it would be even worse >> > as we need to make a config file on each of the running instance, and >> > whenever there is a change we need to make sure they are propagated to >> > all of them, 2) whenever users make some code changes, e.g. to add a >> > new stream / system, they need to remember to set the corresponding >> > changes in the config files as well and they kept forgetting about it, >> > the lesson learned there was that it is always better to change one >> > place (code change) than two (code change + config file change). >> > >> > Again, this is not saying we have vetoed this option, and if people >> > have good reasons for this let's discuss them here. >> > >> > ----------------- >> > >> > So the current proposals are mainly around keeping configs for the >> > global default settings, while still allowing users to override >> > per-operator / per-store settings in the code, while also keeping in >> > mind to not forced users to think about such implementation details if >> > they are fine with whatever the default settings. For example: >> > >> > As a normal user it is sufficient to specify an aggregation as >> > >> > ``` >> > table4.join(table5, joiner).table(); >> > ``` >> > >> > in which she can still just focus on the computational logic with all >> > implementation details abstracted away; only if the user are capable >> > enough with the implementation details (e.g. how is the joining tables >> > be materialized into state stores, etc) and want to specify her own >> > settings (e.g. I want to swap in my own state store engine, or I want >> > to disable caching for dedup, or use a different serde etc) she can >> > "explore" them with the DSL again: >> > >> > ``` >> > table4.join(table5, joiner).table(Materialized.as("store1")); // use a >> > custom store name for interactive query >> > table4.join(table5, joiner).table(Materialized.as(MyStoreSupplier)); >> > // use a custom store engine >> > table4.join(table5, >> > joiner).table(Materialized.as("store1").withLoggingEnabled(configs)); >> > // use a custom store changelog topic configs >> > // ... more >> > ``` >> > >> > Hope it helps. >> > >> > >> > Guozhang >> > >> > >> > On Fri, Jul 7, 2017 at 3:42 PM, Jan Filipiak <jan.filip...@trivago.com >> > <mailto:jan.filip...@trivago.com>> wrote: >> > >> > It makes me want to cry. >> > >> > why on earth is the DSL going to expose all its implementation >> > details now? >> > especially being materialized or not. >> > >> > If we want to take usefull steps in that direction maybe we are >> > looking for a way to let the user switch back and forth between >> > PAPI and DSL? >> > >> > A change as the proposed would not eliminate any of my pain points >> > while still being a heck of work migrating towards to. >> > >> > Since I am only following this from the point where Eno CC'ed it >> > into the users list: >> > >> > Can someone please rephrase for me what problem this is trying to >> > solve? I don't mean to be rude but It uses a problematic feature >> > "StateStoreSuppliers in DSL" to justify making it even worse. This >> > helps us nowhere in making the configs more flexible, its just >> > syntactic sugar. >> > >> > A low effort shoot like: lets add a properties to operations that >> > would otherwise become overloaded to heavy? Or pull the configs by >> > some naming schema >> > form the overall properties. Additionally to that we get rid of >> > StateStoreSuppliers in the DSL and have them also configured by >> > said properties. >> > >> > => way easier to migrate to, way less risk, way more flexible in >> > the future (different implementations of the same operation don't >> > require code change to configure) >> > >> > Line 184 makes especially no sense to me. what is a KTableKTable >> > non materialized join anyways? >> > >> > Hope we can discuss more on this. >> > >> > >> > >> > >> > On 07.07.2017 17:23, Guozhang Wang wrote: >> > >> > I messed the indentation on github code repos; this would be >> > easier to read: >> > >> > https://codeshare.io/GLWW8K >> > >> > >> > Guozhang >> > >> > >> > On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang >> > <wangg...@gmail.com <mailto:wangg...@gmail.com>> wrote: >> > >> > Hi Damian / Kyle, >> > >> > I think I agree with you guys about the pros / cons of >> > using the builder >> > pattern v.s. using some "secondary classes". And I'm >> > thinking if we can >> > take a "mid" manner between these two. I spent some time >> > with a slight >> > different approach from Damian's current proposal: >> > >> > https://github.com/guozhangwang/kafka/blob/dsl- >> refactor/streams/src/main/ >> > <https://github.com/guozhangwang/kafka/blob/dsl- >> refactor/streams/src/main/> >> > java/org/apache/kafka/streams/RefactoredAPIs.java >> > >> > The key idea is to tolerate the final "table()" or >> > "stream()" function to >> > "upgrade" from the secondary classes to the first citizen >> > classes, while >> > having all the specs inside this function. Also this >> > proposal includes some >> > other refactoring that people have been discussed about >> > for the builder to >> > reduce the overloaded functions as well. WDYT? >> > >> > >> > Guozhang >> > >> > >> > On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy >> > <damian....@gmail.com <mailto:damian....@gmail.com>> wrote: >> > >> > Hi Jan, >> > >> > Thanks very much for the input. >> > >> > On Tue, 4 Jul 2017 at 08:54 Jan Filipiak >> > <jan.filip...@trivago.com >> > <mailto:jan.filip...@trivago.com>> >> >> > wrote: >> > >> > Hi Damian, >> > >> > I do see your point of something needs to change. >> > But I fully agree with >> > Gouzhang when he says. >> > --- >> > >> > But since this is a incompatibility change, and we >> > are going to remove >> > >> > the >> > >> > compatibility annotations soon it means we only >> > have one chance and we >> > really have to make it right. >> > ---- >> > >> > >> > I think we all agree on this one! Hence the discussion. >> > >> > >> > I fear all suggestions do not go far enough to >> > become something that >> > >> > will >> > >> > carry on for very much longer. >> > I am currently working on KAFKA-3705 and try to >> > find the most easy way >> > >> > for >> > >> > the user to give me all the required >> > functionality. The easiest >> > >> > interface I >> > >> > could come up so far can be looked at here. >> > >> > >> > https://github.com/Kaiserchen/kafka/blob/ >> 3da2b8f787a5d30dee2 >> > <https://github.com/Kaiserchen/kafka/blob/ >> 3da2b8f787a5d30dee2> >> > >> > de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/ >> > kafka/streams/kstream/internals/KTableImpl.java#L622 >> > >> > >> > And its already horribly complicated. I am currently >> > unable to find the >> > >> > right abstraction level to have everything falling >> > into place >> > >> > naturally. To >> > >> > be honest I already think introducing >> > >> > >> > To be fair that is not a particularly easy problem to >> > solve! >> > >> > >> > https://github.com/Kaiserchen/kafka/blob/ >> 3da2b8f787a5d30dee2 >> > <https://github.com/Kaiserchen/kafka/blob/ >> 3da2b8f787a5d30dee2> >> > >> > de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/ >> > kafka/streams/kstream/internals/KTableImpl.java#L493 >> > >> > was unideal and makes everything a mess. >> > >> > >> > I'm not sure i agree that it makes everything a mess, >> > but It could have >> > been done differently. >> > >> > The JoinType:Whatever is also not really flexible. 2 >> > things come to my >> > mind: >> > >> > 1. I don't think we should rule out config based >> > decisions say configs >> > >> > like >> > >> > >> > streams.$applicationID.joins.$joinname.conf = >> value >> > >> > Is this just for config? Or are you suggesting that we >> > could somehow >> > "code" >> > the join in a config file? >> > >> > >> > This can allow for tremendous changes without >> > single API change and IMO >> > >> > it >> > >> > was not considered enough yet. >> > >> > 2. Push logic from the DSL to the Callback >> > classes. A ValueJoiner for >> > example can be used to implement different join >> > types as the user >> > >> > wishes. >> > Do you have an example of how this might look? >> > >> > >> > As Gouzhang said: stopping to break users is very >> > important. >> > >> > >> > Of course. We want to make it as easy as possible for >> > people to use >> > streams. >> > >> > >> > especially with this changes + All the plans I sadly >> > only have in my head >> > >> > but hopefully the first link can give a glimpse. >> > >> > Thanks for preparing the examples made it way >> > clearer to me what exactly >> > we are talking about. I would argue to go a bit >> > slower and more >> > >> > carefull on >> > >> > this one. At some point we need to get it right. >> > Peeking over to the >> > >> > hadoop >> > >> > guys with their hughe userbase. Config files >> > really work well for them. >> > >> > Best Jan >> > >> > >> > >> > >> > >> > On 30.06.2017 09:31, Damian Guy wrote: >> > >> > Thanks Matthias >> > >> > On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax >> > <matth...@confluent.io >> > <mailto:matth...@confluent.io>> >> >> > >> > wrote: >> > >> > I am just catching up on this thread, so >> > sorry for the long email in >> > advance... Also, it's to some extend a >> > dump of thoughts and not >> > >> > always a >> > >> > clear proposal. Still need to think about >> > this in more detail. But >> > >> > maybe >> > >> > it helps other to get new ideas :) >> > >> > >> > However, I don't understand your >> > argument about putting aggregate() >> > after the withXX() -- all the >> > calls to withXX() set optional >> > >> > parameters >> > >> > for aggregate() and not for >> > groupBy() -- but a >> groupBy().withXX() >> > indicates that the withXX() >> > belongs to the groupBy(). IMHO, this >> > >> > might >> > >> > be quite confusion for developers. >> > >> > >> > I see what you are saying, but the >> > grouped stream is effectively a >> > >> > no-op >> > >> > until you call one of the >> > aggregate/count/reduce etc functions. So >> > >> > the >> > >> > optional params are ones that are >> > applicable to any of the >> > >> > operations >> > >> > you >> > >> > can perform on this grouped stream. >> > Then the final >> > count()/reduce()/aggregate() call has >> > any of the params that are >> > required/specific to that function. >> > >> > I understand your argument, but you don't >> > share the conclusion. If we >> > need a "final/terminal" call, the better >> > way might be >> > >> > .groupBy().count().withXX().build() >> > >> > (with a better name for build() though) >> > >> > >> > The point is that all the other calls, >> > i.e,withBlah, windowed, etc >> > >> > apply >> > >> > too all the aggregate functions. The terminal >> > call being the actual >> > >> > type >> > >> > of >> > >> > aggregation you want to do. I personally find >> > this more natural than >> > groupBy().count().withBlah().build() >> > >> > >> > groupedStream.count(/** non windowed >> > count**/) >> > groupedStream.windowed( >> TimeWindows.of(10L)).count(...) >> > groupedStream.sessionWindowed( >> SessionWindows.of(10L)).count(...) >> > >> > I like this. However, I don't see a reason >> > to have windowed() and >> > sessionWindowed(). We should have one >> > top-level `Windows` interface >> > >> > that >> > >> > both `TimeWindows` and `SessionWindows` >> > implement and just have a >> > >> > single >> > >> > windowed() method that accepts all >> > `Windows`. (I did not like the >> > separation of `SessionWindows` in the >> > first place, and this seems to >> > >> > be >> > >> > an opportunity to clean this up. It was >> > hard to change when we >> > introduced session windows) >> > >> > Yes - true we should look into that. >> > >> > >> > Btw: we do you the imperative groupBy() >> > and groupByKey(), and thus we >> > might also want to use windowBy() (instead >> > of windowed()). Not sure >> > >> > how >> > >> > important this is, but it seems to be >> > inconsistent otherwise. >> > >> > >> > Makes sense >> > >> > >> > About joins: I don't like >> > .withJoinType(JoinType.LEFT) at all. I >> > >> > think, >> > >> > defining an inner/left/outer join is not >> > an optional argument but a >> > first class concept and should have a >> > proper representation in the >> > >> > API >> > >> > (like the current methods join(), >> > leftJoin, outerJoin()). >> > >> > >> > Yep, i did originally have it as a required >> > param and maybe that is >> > >> > what >> > >> > we >> > >> > go with. It could have a default, but maybe >> > that is confusing. >> > >> > >> > >> > About the two join API proposals, the >> > second one has too much boiler >> > plate code for my taste. Also, the actual >> > join() operator has only >> > >> > one >> > >> > argument what is weird to me, as in my >> > thinking process, the main >> > operator call, should have one parameter >> > per mandatory argument but >> > >> > your >> > >> > proposal put the mandatory arguments into >> > Joins.streamStreamJoin() >> > >> > call. >> > >> > This is far from intuitive IMHO. >> > >> > >> > This is the builder pattern, you only need one >> > param as the builder >> > >> > has >> > >> > captured all of the required and optional >> > arguments. >> > >> > >> > The first join proposal also seems to >> > align better with the pattern >> > suggested for aggregations and having the >> > same pattern for all >> > >> > operators >> > >> > is important (as you stated already). >> > >> > >> > This is why i offered two alternatives as i >> > started out with. 1 is the >> > builder pattern, the other is the more fluent >> > pattern. >> > >> > >> > Coming back to the config vs optional >> > parameter. What about having a >> > method withConfig[s](...) that allow to >> > put in the configuration? >> > >> > >> > Sure, it is currently called withLogConfig() >> > as that is the only thing >> > >> > that >> > >> > is really config. >> > >> > >> > This also raises the question if until() >> > is a windows property? >> > Actually, until() seems to be a >> > configuration parameter and thus, >> > >> > should >> > >> > not not have it's own method. >> > >> > >> > Hmmm, i don't agree. Until is a property of >> > the window. It is going >> > >> > to be >> > >> > potentially different for every window >> > operation you do in a streams >> > >> > app. >> > >> > >> > Browsing throw your example DSL branch, I >> > also saw this one: >> > >> > final KTable<Windowed<String>, Long> >> > windowed> >> > >> > groupedStream.counting() >> > >> > .windowed(TimeWindows.of(10L). >> until(10)) >> > .table(); >> > >> > This is an interesting idea, and it remind >> > my on some feedback about >> > >> > "I >> > >> > wanted to count a stream, but there was no >> > count() method -- I first >> > needed to figure out, that I need to group >> > the stream first to be >> > >> > able >> > >> > to count it. It does make sense in >> > hindsight but was not obvious in >> > >> > the >> > >> > beginning". Thus, carrying out this >> > thought, we could also do the >> > following: >> > >> > stream.count().groupedBy(). >> windowedBy().table(); >> > >> > -> Note, I use "grouped" and "windowed" >> > instead of imperative here, >> > >> > as >> > >> > it comes after the count() >> > >> > This would be more consistent than your >> > proposal (that has grouping >> > before but windowing after count()). It >> > might even allow us to enrich >> > the API with a some syntactic sugar like >> > `stream.count().table()` to >> > >> > get >> > >> > the overall count of all records (this >> > would obviously not scale, >> > >> > but we >> > >> > could support it -- if not now, maybe >> later). >> > >> > >> > I guess i'd prefer >> > stream.groupBy().windowBy().count() >> > stream.groupBy().windowBy().reduce() >> > stream.groupBy().count() >> > >> > As i said above, everything that happens >> > before the final aggregate >> > >> > call >> > >> > can be applied to any of them. So it makes >> > sense to me to do those >> > >> > things >> > >> > ahead of the final aggregate call. >> > >> > >> > Last about builder pattern. I am convinced >> > that we need some >> > >> > "terminal" >> > >> > operator/method that tells us when to add >> > the processor to the >> > >> > topology. >> > >> > But I don't see the need for a plain >> > builder pattern that feels >> > >> > alien to >> > >> > me (see my argument about the second join >> > proposal). Using .stream() >> > >> > / >> > >> > .table() as use in many examples might >> > work. But maybe a more generic >> > name that we can use in all places like >> > build() or apply() might >> > >> > also be >> > >> > an option. >> > >> > >> > Sure, a generic name might be ok. >> > >> > >> > >> > >> > -Matthias >> > >> > >> > >> > On 6/29/17 7:37 AM, Damian Guy wrote: >> > >> > Thanks Kyle. >> > >> > On Thu, 29 Jun 2017 at 15:11 Kyle >> > Winkelman < >> > >> > winkelman.k...@gmail.com >> > <mailto:winkelman.k...@gmail.com>> >> >> > >> > wrote: >> > >> > Hi Damian, >> > >> > When trying to >> > program in the >> > fluent API that >> > has been >> > >> > discussed >> > >> > most >> > >> > it >> > >> > feels difficult to >> > know when you will >> > actually get an >> object >> > >> > you >> > >> > can >> > >> > reuse. >> > >> > What if I make one >> > KGroupedStream >> > that I want to >> > reuse, is it >> > >> > legal >> > >> > to >> > >> > reuse it or does >> > this approach >> > expect you to call >> > grouped each >> > >> > time? >> > >> > I'd anticipate that once >> > you have a KGroupedStream >> > you can >> > >> > re-use it >> > >> > as >> > >> > you >> > >> > can today. >> > >> > You said it yourself in another >> > post that the grouped stream is >> > effectively a no-op until a count, >> > reduce, or aggregate. The way I >> > >> > see >> > >> > it >> > >> > you wouldn’t be able to reuse >> > anything except KStreams and >> KTables, >> > >> > because >> > >> > most of this fluent api would >> > continue returning this (this being >> > >> > the >> > >> > builder object currently being >> > manipulated). >> > >> > So, if you ever store a reference to >> > anything but KStreams and >> > >> > KTables >> > >> > and >> > >> > you use it in two different ways >> > then its possible you make >> > >> > conflicting >> > >> > withXXX() calls on the same builder. >> > >> > >> > No necessarily true. It could return a >> > new instance of the builder, >> > >> > i.e., >> > >> > the builders being immutable. So if >> > you held a reference to the >> > >> > builder >> > >> > it >> > >> > would always be the same as it was >> > when it was created. >> > >> > >> > GroupedStream<K,V> >> > groupedStreamWithDefaultSerdes = >> > >> > kStream.grouped(); >> > >> > GroupedStream<K,V> >> > groupedStreamWithDeclaredSerdes = >> > groupedStreamsWithDefaultSerde >> s.withKeySerde(…).withValueSerde(…); >> > >> > I’ll admit that this shouldn’t >> > happen but some user is going to do >> > >> > it >> > >> > eventually… >> > Depending on implementation uses >> > of groupedStreamWithDefaultSerdes >> > >> > would >> > >> > most likely be equivalent to the >> > version withDeclaredSerdes. One >> > >> > work >> > >> > around would be to always make >> > copies of the config objects you are >> > building, but this approach has >> > its own problem because now we >> > >> > have to >> > >> > identify which configs are >> > equivalent so we don’t create >> repeated >> > processors. >> > >> > The point of this long winded >> > example is that we always have to be >> > thinking about all of the possible >> > ways it could be misused by a >> > >> > user >> > >> > (causing them to see hard to >> > diagnose problems). >> > >> > Exactly! That is the point of the >> > discussion really. >> > >> > >> > In my attempt at a couple methods >> > with builders I feel that I could >> > confidently say the user couldn’t >> > really mess it up. >> > >> > // Count >> > KTable<String, Long> count = >> > >> > kGroupedStream.count(Count.count(). >> withQueryableStoreName("my-store")); >> > >> > The kGroupedStream is reusable and >> > if they attempted to reuse the >> > >> > Count >> > >> > for some reason it would throw an >> > error message saying that a store >> > >> > named >> > >> > “my-store” already exists. >> > >> > >> > Yes i agree and i think using builders >> > is my preferred pattern. >> > >> > Cheers, >> > Damian >> > >> > >> > Thanks, >> > Kyle >> > >> > From: Damian Guy >> > Sent: Thursday, June 29, 2017 3:59 >> AM >> > To: d...@kafka.apache.org >> > <mailto:d...@kafka.apache.org> >> > Subject: Re: [DISCUSS] Streams >> > DSL/StateStore Refactoring >> > >> > Hi Kyle, >> > >> > Thanks for your input. Really >> > appreciated. >> > >> > On Thu, 29 Jun 2017 at 06:09 Kyle >> > Winkelman < >> > >> > winkelman.k...@gmail.com <mailto:winkelman.kyle@gmail. >> com> >> > >> > wrote: >> > >> > I like more of a builder >> > pattern even though others >> > have voiced >> > >> > against >> > >> > it. The reason I like it is >> > because it makes it clear to >> > the user >> > >> > that >> > >> > a >> > >> > call to KGroupedStream#count >> > will return a KTable not some >> > >> > intermediate >> > >> > class that I need to undetstand. >> > >> > Yes, that makes sense. >> > >> > >> > When trying to program in the >> > fluent API that has been >> discussed >> > >> > most >> > >> > it >> > >> > feels difficult to know when >> > you will actually get an >> > object you >> > >> > can >> > >> > reuse. >> > >> > What if I make one >> > KGroupedStream that I want to >> > reuse, is it >> > >> > legal >> > >> > to >> > >> > reuse it or does this approach >> > expect you to call grouped each >> > >> > time? >> > >> > I'd anticipate that once you have >> > a KGroupedStream you can re-use >> > >> > it >> > >> > as >> > >> > you >> > >> > can today. >> > >> > >> > This question doesn’t pop into >> > my head at all in the builder >> > >> > pattern >> > >> > I >> > >> > assume I can reuse everything. >> > Finally, I like .groupByKey >> > and .groupBy(KeyValueMapper) >> > not a big >> > >> > fan >> > >> > of >> > >> > the grouped. >> > >> > Yes, grouped() was more for >> > demonstration and because >> > groupBy() >> > >> > and >> > >> > groupByKey() were taken! So i'd >> > imagine the api would actually >> > >> > want to >> > >> > be >> > >> > groupByKey(/** no required >> > args***/).withOptionalArg() and >> > groupBy(KeyValueMapper >> > m).withOpitionalArg(...) of >> > course this all >> > >> > depends >> > >> > on maintaining backward >> compatibility. >> > >> > >> > Unfortunately, the below >> > approach would require atleast 2 >> > >> > (probably >> > >> > 3) >> > >> > overloads (one for returning a >> > KTable and one for returning a >> > >> > KTable >> > >> > with >> > >> > Windowed Key, probably would >> > want to split windowed and >> > >> > sessionwindowed >> > >> > for >> > >> > ease of implementation) of >> > each count, reduce, and >> aggregate. >> > Obviously not exhaustive but >> > enough for you to get the >> picture. >> > >> > Count, >> > >> > Reduce, and Aggregate supply 3 >> > static methods to initialize the >> > >> > builder: >> > >> > // Count >> > KTable<String, Long> count = >> > >> > groupedStream.count(Count.count(). >> withQueryableStoreName("my-store")); >> > >> > // Windowed Count >> > KTable<Windowed<String>, Long> >> > windowedCount = >> > >> > groupedStream.count(Count. >> windowed(TimeWindows.of(10L).until >> > >> > (10)).withQueryableStoreName("my-windowed-store")); >> > >> > // Session Count >> > KTable<Windowed<String>, Long> >> > sessionCount = >> > >> > groupedStream.count(Count.sessionWindowed( >> SessionWindows. >> > >> > with(10L)).withQueryableStoreName("my- >> session-windowed-store")); >> > >> > Above and below, i think i'd >> > prefer it to be: >> > groupedStream.count(/** non >> > windowed count**/) >> > groupedStream.windowed( >> TimeWindows.of(10L)).count(...) >> > groupedStream.sessionWindowed( >> SessionWindows.of(10L)).count(...) >> > >> > >> > >> > >> > // Reduce >> > Reducer<Long> reducer; >> > KTable<String, Long> reduce = >> > groupedStream.reduce(reducer, >> > Reduce.reduce(). >> withQueryableStoreName("my-store")); >> > >> > // Aggregate Windowed with >> > Custom Store >> > Initializer<String> initializer; >> > Aggregator<String, Long, >> > String> aggregator; >> > KTable<Windowed<String>, >> > String> aggregate = >> > groupedStream.aggregate( >> initializer, >> > aggregator, >> > >> > Aggregate.windowed(TimeWindows.of(10L).until(10)). >> > >> > withStateStoreSupplier(stateStoreSupplier))); >> > >> > // Cogroup SessionWindowed >> > KTable<String, String> >> cogrouped = >> > >> > groupedStream1.cogroup(aggregator1) >> > >> > >> > .cogroup(groupedStream2, >> > aggregator2) >> > >> > > > -- -- Guozhang