Damian, thanks for starting this discussion.
I am not a fan of the builder pattern. It's too clumsy to use IMHO an raised the entry level bar. I also think that mixing optional parameters with configs is a bad idea. Have not proposal for this atm but just wanted to mention it. Hope to find some time to come up with something. What I don't like in the current proposal is the .grouped().withKeyMapper() -- the current solution with .groupBy(...) and .groupByKey() seems better. For clarity, we could rename to .groupByNewKey(...) and .groupByCurrentKey() (even if we should find some better names). The proposed pattern "chains" grouping and aggregation too close together. I would rather separate both more than less, ie, do into the opposite direction. I am also wondering, if we could so something more "fluent". The initial proposal was like: >> groupedStream.count() >> .withStoreName("name") >> .withCachingEnabled(false) >> .withLoggingEnabled(config) >> .table() The .table() statement in the end was kinda alien. The current proposal put the count() into the end -- ie, the optional parameter for count() have to specified on the .grouped() call -- this does not seems to be the best way either. I did not think this through in detail, but can't we just do the initial proposal with the .table() ? groupedStream.count().withStoreName("name").mapValues(...) Each .withXXX(...) return the current KTable and all the .withXXX() are just added to the KTable interface. Or do I miss anything why this wont' work or any obvious disadvantage? -Matthias On 6/22/17 4:06 AM, Damian Guy wrote: > Thanks everyone. My latest attempt is below. It builds on the fluent > approach, but i think it is slightly nicer. > I agree with some of what Eno said about mixing configy stuff in the DSL, > but i think that enabling caching and enabling logging are things that > aren't actually config. I'd probably not add withLogConfig(...) (even > though it is below) as this is actually config and we already have a way of > doing that, via the StateStoreSupplier. Arguably we could use the > StateStoreSupplier for disabling caching etc, but as it stands that is a > bit of a tedious process for someone that just wants to use the default > storage engine, but not have caching enabled. > > There is also an orthogonal concern that Guozhang alluded to.... If you > want to plug in a custom storage engine and you want it to be logged etc, > you would currently need to implement that yourself. Ideally we can provide > a way where we will wrap the custom store with logging, metrics, etc. I > need to think about where this fits, it is probably more appropriate on the > Stores API. > > final KeyValueMapper<String, String, Long> keyMapper = null; > // count with mapped key > final KTable<Long, Long> count = stream.grouped() > .withKeyMapper(keyMapper) > .withKeySerde(Serdes.Long()) > .withValueSerde(Serdes.String()) > .withQueryableName("my-store") > .count(); > > // windowed count > final KTable<Windowed<String>, Long> windowedCount = stream.grouped() > .withQueryableName("my-window-store") > .windowed(TimeWindows.of(10L).until(10)) > .count(); > > // windowed reduce > final Reducer<String> windowedReducer = null; > final KTable<Windowed<String>, String> windowedReduce = stream.grouped() > .withQueryableName("my-window-store") > .windowed(TimeWindows.of(10L).until(10)) > .reduce(windowedReducer); > > final Aggregator<String, String, Long> aggregator = null; > final Initializer<Long> init = null; > > // aggregate > final KTable<String, Long> aggregate = stream.grouped() > .withQueryableName("my-aggregate-store") > .aggregate(aggregator, init, Serdes.Long()); > > final StateStoreSupplier<KeyValueStore<String, Long>> stateStoreSupplier = > null; > > // aggregate with custom store > final KTable<String, Long> aggWithCustomStore = stream.grouped() > .withStateStoreSupplier(stateStoreSupplier) > .aggregate(aggregator, init); > > // disable caching > stream.grouped() > .withQueryableName("name") > .withCachingEnabled(false) > .count(); > > // disable logging > stream.grouped() > .withQueryableName("q") > .withLoggingEnabled(false) > .count(); > > // override log config > final Reducer<String> reducer = null; > stream.grouped() > .withLogConfig(Collections.singletonMap("segment.size", "10")) > .reduce(reducer); > > > If anyone wants to play around with this you can find the code here: > https://github.com/dguy/kafka/tree/dsl-experiment > > Note: It won't actually work as most of the methods just return null. > > Thanks, > Damian > > > On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> wrote: > >> Thanks Damian. I think both options have pros and cons. And both are better >> than overload abuse. >> >> The fluent API approach reads better, no mention of builder or build >> anywhere. The main downside is that the method signatures are a little less >> clear. By reading the method signature, one doesn't necessarily knows what >> it returns. Also, one needs to figure out the special method (`table()` in >> this case) that gives you what you actually care about (`KTable` in this >> case). Not major issues, but worth mentioning while doing the comparison. >> >> The builder approach avoids the issues mentioned above, but it doesn't read >> as well. >> >> Ismael >> >> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com> wrote: >> >>> Hi, >>> >>> I'd like to get a discussion going around some of the API choices we've >>> made in the DLS. In particular those that relate to stateful operations >>> (though this could expand). >>> As it stands we lean heavily on overloaded methods in the API, i.e, there >>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i >>> feel it is only going to get worse as we add more optional params. In >>> particular we've had some requests to be able to turn caching off, or >>> change log configs, on a per operator basis (note this can be done now >> if >>> you pass in a StateStoreSupplier, but this can be a bit cumbersome). >>> >>> So this is a bit of an open question. How can we change the DSL overloads >>> so that it flows, is simple to use and understand, and is easily extended >>> in the future? >>> >>> One option would be to use a fluent API approach for providing the >> optional >>> params, so something like this: >>> >>> groupedStream.count() >>> .withStoreName("name") >>> .withCachingEnabled(false) >>> .withLoggingEnabled(config) >>> .table() >>> >>> >>> >>> Another option would be to provide a Builder to the count method, so it >>> would look something like this: >>> groupedStream.count(new >>> CountBuilder("storeName").withCachingEnabled(false).build()) >>> >>> Another option is to say: Hey we don't need this, what are you on about! >>> >>> The above has focussed on state store related overloads, but the same >> ideas >>> could be applied to joins etc, where we presently have many join methods >>> and many overloads. >>> >>> Anyway, i look forward to hearing your opinions. >>> >>> Thanks, >>> Damian >>> >> >
signature.asc
Description: OpenPGP digital signature