I've updated the experimental code with a couple of ways of doing joins. One following the fluent approach and one following the builder approach. The 2 examples can be found here: https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java#L714
The code looks like: @Test public void shouldBeFluentIsh() throws Exception { final KStream<String, String> one = null; final KStream<String, String> two = null; final Serde<String> serde = null; final ValueJoiner<String, String, String> vj = null; // inner join one.join2(two, vj, JoinWindows.of(10)) .withKeySerde(serde) .withThisValueSerde(serde) .withOtherValueSerde(serde) .stream(); // left join one.join2(two, vj, JoinWindows.of(10)) .withJoinType(JoinType.LEFT) .stream(); } @Test public void shouldUseBuilder() throws Exception { final KStream<String, String> one = null; final KStream<String, String> two = null; final Serde<String> serde = null; final ValueJoiner<String, String, String> vj = null; //inner one.join(Joins.streamStreamJoin(two, vj, JoinWindows.of(10)).build()); //left one.join(Joins.streamStreamJoin(two, vj, JoinWindows.of(10)).withJoinType(JoinType.LEFT).build()); } I'm not going to say which way i'm leaning, yet! Thanks, Damian On Thu, 29 Jun 2017 at 11:47 Damian Guy <damian....@gmail.com> wrote: > >> However, I don't understand your argument about putting aggregate() >> after the withXX() -- all the calls to withXX() set optional parameters >> for aggregate() and not for groupBy() -- but a groupBy().withXX() >> indicates that the withXX() belongs to the groupBy(). IMHO, this might >> be quite confusion for developers. >> >> > I see what you are saying, but the grouped stream is effectively a no-op > until you call one of the aggregate/count/reduce etc functions. So the > optional params are ones that are applicable to any of the operations you > can perform on this grouped stream. Then the final > count()/reduce()/aggregate() call has any of the params that are > required/specific to that function. > > >> >> -Matthias >> >> On 6/28/17 2:55 AM, Damian Guy wrote: >> >> I also think that mixing optional parameters with configs is a bad >> idea. >> >> Have not proposal for this atm but just wanted to mention it. Hope to >> >> find some time to come up with something. >> >> >> >> >> > Yes, i don't like the mix of config either. But the only real config >> here >> > is the logging config - which we don't really need as it can already be >> > done via a custom StateStoreSupplier. >> > >> > >> >> What I don't like in the current proposal is the >> >> .grouped().withKeyMapper() -- the current solution with .groupBy(...) >> >> and .groupByKey() seems better. For clarity, we could rename to >> >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find >> >> some better names). >> >> >> >> >> > it could be groupByKey(), groupBy() or something different bt >> > >> > >> > >> >> The proposed pattern "chains" grouping and aggregation too close >> >> together. I would rather separate both more than less, ie, do into the >> >> opposite direction. >> >> >> >> I am also wondering, if we could so something more "fluent". The >> initial >> >> proposal was like: >> >> >> >>>> groupedStream.count() >> >>>> .withStoreName("name") >> >>>> .withCachingEnabled(false) >> >>>> .withLoggingEnabled(config) >> >>>> .table() >> >> >> >> The .table() statement in the end was kinda alien. >> >> >> > >> > I agree, but then all of the withXXX methods need to be on KTable which >> is >> > worse in my opinion. You also need something that is going to "build" >> the >> > internal processors and add them to the topology. >> > >> > >> >> The current proposal put the count() into the end -- ie, the optional >> >> parameter for count() have to specified on the .grouped() call -- this >> >> does not seems to be the best way either. >> >> >> >> >> > I actually prefer this method as you are building a grouped stream that >> you >> > will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..) >> etc >> > seems natural to me. >> > >> > >> >> I did not think this through in detail, but can't we just do the >> initial >> >> proposal with the .table() ? >> >> >> >> groupedStream.count().withStoreName("name").mapValues(...) >> >> >> >> Each .withXXX(...) return the current KTable and all the .withXXX() are >> >> just added to the KTable interface. Or do I miss anything why this >> wont' >> >> work or any obvious disadvantage? >> >> >> >> >> >> >> > See above. >> > >> > >> >> >> >> -Matthias >> >> >> >> On 6/22/17 4:06 AM, Damian Guy wrote: >> >>> Thanks everyone. My latest attempt is below. It builds on the fluent >> >>> approach, but i think it is slightly nicer. >> >>> I agree with some of what Eno said about mixing configy stuff in the >> DSL, >> >>> but i think that enabling caching and enabling logging are things that >> >>> aren't actually config. I'd probably not add withLogConfig(...) (even >> >>> though it is below) as this is actually config and we already have a >> way >> >> of >> >>> doing that, via the StateStoreSupplier. Arguably we could use the >> >>> StateStoreSupplier for disabling caching etc, but as it stands that >> is a >> >>> bit of a tedious process for someone that just wants to use the >> default >> >>> storage engine, but not have caching enabled. >> >>> >> >>> There is also an orthogonal concern that Guozhang alluded to.... If >> you >> >>> want to plug in a custom storage engine and you want it to be logged >> etc, >> >>> you would currently need to implement that yourself. Ideally we can >> >> provide >> >>> a way where we will wrap the custom store with logging, metrics, etc. >> I >> >>> need to think about where this fits, it is probably more appropriate >> on >> >> the >> >>> Stores API. >> >>> >> >>> final KeyValueMapper<String, String, Long> keyMapper = null; >> >>> // count with mapped key >> >>> final KTable<Long, Long> count = stream.grouped() >> >>> .withKeyMapper(keyMapper) >> >>> .withKeySerde(Serdes.Long()) >> >>> .withValueSerde(Serdes.String()) >> >>> .withQueryableName("my-store") >> >>> .count(); >> >>> >> >>> // windowed count >> >>> final KTable<Windowed<String>, Long> windowedCount = stream.grouped() >> >>> .withQueryableName("my-window-store") >> >>> .windowed(TimeWindows.of(10L).until(10)) >> >>> .count(); >> >>> >> >>> // windowed reduce >> >>> final Reducer<String> windowedReducer = null; >> >>> final KTable<Windowed<String>, String> windowedReduce = >> stream.grouped() >> >>> .withQueryableName("my-window-store") >> >>> .windowed(TimeWindows.of(10L).until(10)) >> >>> .reduce(windowedReducer); >> >>> >> >>> final Aggregator<String, String, Long> aggregator = null; >> >>> final Initializer<Long> init = null; >> >>> >> >>> // aggregate >> >>> final KTable<String, Long> aggregate = stream.grouped() >> >>> .withQueryableName("my-aggregate-store") >> >>> .aggregate(aggregator, init, Serdes.Long()); >> >>> >> >>> final StateStoreSupplier<KeyValueStore<String, Long>> >> stateStoreSupplier >> >> = null; >> >>> >> >>> // aggregate with custom store >> >>> final KTable<String, Long> aggWithCustomStore = stream.grouped() >> >>> .withStateStoreSupplier(stateStoreSupplier) >> >>> .aggregate(aggregator, init); >> >>> >> >>> // disable caching >> >>> stream.grouped() >> >>> .withQueryableName("name") >> >>> .withCachingEnabled(false) >> >>> .count(); >> >>> >> >>> // disable logging >> >>> stream.grouped() >> >>> .withQueryableName("q") >> >>> .withLoggingEnabled(false) >> >>> .count(); >> >>> >> >>> // override log config >> >>> final Reducer<String> reducer = null; >> >>> stream.grouped() >> >>> .withLogConfig(Collections.singletonMap("segment.size", "10")) >> >>> .reduce(reducer); >> >>> >> >>> >> >>> If anyone wants to play around with this you can find the code here: >> >>> https://github.com/dguy/kafka/tree/dsl-experiment >> >>> >> >>> Note: It won't actually work as most of the methods just return null. >> >>> >> >>> Thanks, >> >>> Damian >> >>> >> >>> >> >>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> wrote: >> >>> >> >>>> Thanks Damian. I think both options have pros and cons. And both are >> >> better >> >>>> than overload abuse. >> >>>> >> >>>> The fluent API approach reads better, no mention of builder or build >> >>>> anywhere. The main downside is that the method signatures are a >> little >> >> less >> >>>> clear. By reading the method signature, one doesn't necessarily knows >> >> what >> >>>> it returns. Also, one needs to figure out the special method >> (`table()` >> >> in >> >>>> this case) that gives you what you actually care about (`KTable` in >> this >> >>>> case). Not major issues, but worth mentioning while doing the >> >> comparison. >> >>>> >> >>>> The builder approach avoids the issues mentioned above, but it >> doesn't >> >> read >> >>>> as well. >> >>>> >> >>>> Ismael >> >>>> >> >>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com> >> >> wrote: >> >>>> >> >>>>> Hi, >> >>>>> >> >>>>> I'd like to get a discussion going around some of the API choices >> we've >> >>>>> made in the DLS. In particular those that relate to stateful >> operations >> >>>>> (though this could expand). >> >>>>> As it stands we lean heavily on overloaded methods in the API, i.e, >> >> there >> >>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy >> and >> >> i >> >>>>> feel it is only going to get worse as we add more optional params. >> In >> >>>>> particular we've had some requests to be able to turn caching off, >> or >> >>>>> change log configs, on a per operator basis (note this can be done >> now >> >>>> if >> >>>>> you pass in a StateStoreSupplier, but this can be a bit cumbersome). >> >>>>> >> >>>>> So this is a bit of an open question. How can we change the DSL >> >> overloads >> >>>>> so that it flows, is simple to use and understand, and is easily >> >> extended >> >>>>> in the future? >> >>>>> >> >>>>> One option would be to use a fluent API approach for providing the >> >>>> optional >> >>>>> params, so something like this: >> >>>>> >> >>>>> groupedStream.count() >> >>>>> .withStoreName("name") >> >>>>> .withCachingEnabled(false) >> >>>>> .withLoggingEnabled(config) >> >>>>> .table() >> >>>>> >> >>>>> >> >>>>> >> >>>>> Another option would be to provide a Builder to the count method, >> so it >> >>>>> would look something like this: >> >>>>> groupedStream.count(new >> >>>>> CountBuilder("storeName").withCachingEnabled(false).build()) >> >>>>> >> >>>>> Another option is to say: Hey we don't need this, what are you on >> >> about! >> >>>>> >> >>>>> The above has focussed on state store related overloads, but the >> same >> >>>> ideas >> >>>>> could be applied to joins etc, where we presently have many join >> >> methods >> >>>>> and many overloads. >> >>>>> >> >>>>> Anyway, i look forward to hearing your opinions. >> >>>>> >> >>>>> Thanks, >> >>>>> Damian >> >>>>> >> >>>> >> >>> >> >> >> >> >> > >> >>