Hello Jan, Thanks for your proposal. As Bill mentioned the main difference is that we extract the user-customizable materialization logic out of the topology building DSL workflow. And the main motivations are in two folds:
1) efficiency wise, it allows some KTables to not be materialized if unnecessary, saving one state store instance and changelog topic. 2) programming wise, it looks nicer to separate the topology construction code from the KTable materialization for IQ uses code. Here are my thoughts regarding these two points: Regarding 1), I think with whichever the public APIs (either Damian's proposal or yours), we can always apply the internal optimization to not physically materialize the KTable. You can take a look at the internal interface of "KTableValueGetterSupplier", which is used exactly for this purposes such that a get call on a "logically" materialized KTable can be traced back to its parent KTables that are physically materialized in a state store. So following proposed APIs, for example: stream.groupByKey(..).aggregate(.., Materializedas("store1")) // this resulted KTable is materialized in order to complete the aggregation operation .filter(Materialized.as("store2")) // this restuled KTable is not materialized but its GetterSupplier is implemented to get values from "store1" Or table1 = stream.groupByKey(..).aggregate(..); table2 = table1.filter(); tabel1.queryHandle("store1"); // this resulted KTable is materialized in order to complete the aggregation operation tabel1.queryHandle("store2") // this restuled KTable is not materialized but its GetterSupplier is implemented to get values from "store1" When user query a value for "store2" which is not actually materialized into a state store, the GetterSupplier will be triggered to in turn query the store for "store1", and then apply the filter operator on-the-fly to return the value. So the bottom line is, we can achieve the same efficiency optimization with either of the public APIs. Regarding 2), I actually have proposed a similar API to yours earlier in this discussion thread: --------------------------------------- // specifying the topology, should be concise and conveniently concatenated, no specs of materialization at all KStream stream1 = builder.stream(); KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator, sessionMerger, sessionWindows); // do not allow to pass-in a state store supplier here any more // additional code to the topology above, could be more prescriptive than descriptive // only advanced users would want to code in both parts above; while other users would only code the topology as above. table1.materialize("queryableStoreName"); // or.. table1.materialize("queryableStoreName").enableCaching().enableLogging(); // or.. table1.materialize(stateStoreSupplier); // we check type (key-value types, windowed or not etc) at starting time and add the metrics / logging / caching / windowing wrapper on top of the store, or.. table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); // etc.. --------------------------------------- But one caveat of that, as illustrated above, is that you need to have separate object of the KTable in order to call either "queryHandle" or "materialize" (whatever the function name is) for the specifications of materialization options. This can break the concatenation of the topology construction part of the code, that you cannot simply add one operator directly after another. So I think this is a trade-off we have to make and the current approach looks better in this regard. Guozhang On Wed, Aug 2, 2017 at 2:07 PM, Jan Filipiak <jan.filip...@trivago.com> wrote: > Hi Bill, > > totally! So in the original discussion it was mentioned that the overloads > are nasty when implementing new features. So we wanted to get rid of them. > But what I felt was that the > copy & pasted code in the KTableProcessors for maintaining IQ stores was > as big as a hurdle as the overloads. > > With this proposal I try to shift things into the direction of getting IQ > for free if > KTableValueGetterSupplier is properly implemented (like getting join for > free then). Instead of having the code for maintaining IQ stores all the > places. I realized I can do that while getting rid of the overloads, that > makes me feel my proposal is superior. > > Further I try to optimize by using as few stores as possible to give the > user what he needs. That should save all sorts of resources while allowing > faster rebalances. > > The target ultimately is to only have KTableSource and the Aggregators > maintain a Store and provide a ValueGetterSupplier. > > Does this makes sense to you? > > Best Jan > > On 02.08.2017 18:09, Bill Bejeck wrote: > >> Hi Jan, >> >> Thanks for the effort in putting your thoughts down on paper. >> >> Comparing what I see from your proposal and what is presented in KIP-182, >> one of the main differences is the exclusion of an`Materialized` instance >> in the `KTable` methods. >> >> Can you go into more detail why this is so and the specific problems is >> avoids and or solves with this approach? >> >> Thanks! >> Bill >> >> On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <damian....@gmail.com <mailto: >> damian....@gmail.com>> wrote: >> >> Hi Jan, >> >> Thanks for taking the time to put this together, appreciated. For the >> benefit of others would you mind explaining a bit about your >> motivation? >> >> Cheers, >> Damian >> >> On Wed, 2 Aug 2017 at 01:40 Jan Filipiak <jan.filip...@trivago.com >> <mailto:jan.filip...@trivago.com>> wrote: >> >> > Hi all, >> > >> > after some further discussions, the best thing to show my Idea >> of how it >> > should evolve would be a bigger mock/interface description. >> > The goal is to reduce the store maintaining processors to only the >> > Aggregators + and KTableSource. While having KTableSource optionally >> > materialized. >> > >> > Introducing KTable:copy() will allow users to maintain state >> twice if >> > they really want to. KStream::join*() wasn't touched. I never >> personally >> > used that so I didn't feel >> > comfortable enough touching it. Currently still making up my >> mind. None >> > of the suggestions made it querieable so far. Gouzhangs >> 'Buffered' idea >> > seems ideal here. >> > >> > please have a look. Looking forward for your opinions. >> > >> > Best Jan >> > >> > >> > >> > On 21.06.2017 17 <tel:21.06.2017%2017>:24, Eno Thereska wrote: >> > > (cc’ing user-list too) >> > > >> > > Given that we already have StateStoreSuppliers that are >> configurable >> > using the fluent-like API, probably it’s worth discussing the other >> > examples with joins and serdes first since those have many >> overloads and >> > are in need of some TLC. >> > > >> > > So following your example, I guess you’d have something like: >> > > .join() >> > > .withKeySerdes(…) >> > > .withValueSerdes(…) >> > > .withJoinType(“outer”) >> > > >> > > etc? >> > > >> > > I like the approach since it still remains declarative and >> it’d reduce >> > the number of overloads by quite a bit. >> > > >> > > Eno >> > > >> > >> On Jun 21, 2017, at 3:37 PM, Damian Guy <damian....@gmail.com >> <mailto:damian....@gmail.com>> wrote: >> > >> >> > >> Hi, >> > >> >> > >> I'd like to get a discussion going around some of the API >> choices we've >> > >> made in the DLS. In particular those that relate to stateful >> operations >> > >> (though this could expand). >> > >> As it stands we lean heavily on overloaded methods in the >> API, i.e, >> > there >> > >> are 9 overloads for KGroupedStream.count(..)! It is becoming >> noisy and i >> > >> feel it is only going to get worse as we add more optional >> params. In >> > >> particular we've had some requests to be able to turn caching >> off, or >> > >> change log configs, on a per operator basis (note this can >> be done now >> > if >> > >> you pass in a StateStoreSupplier, but this can be a bit >> cumbersome). >> > >> >> > >> So this is a bit of an open question. How can we change the DSL >> > overloads >> > >> so that it flows, is simple to use and understand, and is easily >> > extended >> > >> in the future? >> > >> >> > >> One option would be to use a fluent API approach for >> providing the >> > optional >> > >> params, so something like this: >> > >> >> > >> groupedStream.count() >> > >> .withStoreName("name") >> > >> .withCachingEnabled(false) >> > >> .withLoggingEnabled(config) >> > >> .table() >> > >> >> > >> >> > >> >> > >> Another option would be to provide a Builder to the count >> method, so it >> > >> would look something like this: >> > >> groupedStream.count(new >> > >> CountBuilder("storeName").withCachingEnabled(false).build()) >> > >> >> > >> Another option is to say: Hey we don't need this, what are >> you on about! >> > >> >> > >> The above has focussed on state store related overloads, but >> the same >> > ideas >> > >> could be applied to joins etc, where we presently have many join >> > methods >> > >> and many overloads. >> > >> >> > >> Anyway, i look forward to hearing your opinions. >> > >> >> > >> Thanks, >> > >> Damian >> > >> > >> >> >> > -- -- Guozhang