I have a couple of comments but otherwise it LGTM: 1. For these two functions in StreamsBuilder, the topic String is set as the second parameter in between of two options. Would that be better to be set as the first or the last one instead?
public synchronized <K, V> KTable<K, V> table(final Consumed<K, V> consumed, final String topic, final Materialized<K, V> materialized) public synchronized <K, V> GlobalKTable<K, V> globalTable(final Consumed<K, V> consumed, final String topic, final Materialized<K, V> materialized) I understand that we cannot do it for the first parameter because of the vararg type. So I'd suggest either a) set it as the last parameter, but then it is inconsistent with other functions like these: void to(final String topic, final Produced<V, V> options); KTable<K, V> through(final String topic, final Materialized<K, V> options); b) only allow one single topic name parameter in StreamsBuilder.stream() since in practice we do not see too many usages of multiple topics, plus it can be semi-supported with "merge" as we move it from StreamsBuilder to KStream (KAFKA-5765), 2. KGroupedStream's function: <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); The "aggValueSerde" seems not needed? 3. +1 on `KGroupedStream` v.s. `GroupedKStream`. I think KGroupedStream was a bad name as a hind-sight. I personally feel we should just correct it with a new class and deprecate / remove the old one before 1.0.0, but that could be in its own KIP. Guozhang On Wed, Aug 23, 2017 at 1:01 PM, Damian Guy <damian....@gmail.com> wrote: > We already have GlobalKTable and i can't rename KGroupedStream, which > really should be GroupedKStream. So I think we should name new things > correctly, i.e., WindowedKStream etc and fix the others when we can. > > On Wed, 23 Aug 2017 at 20:38 Matthias J. Sax <matth...@confluent.io> > wrote: > > > About KGroupedStream vs GroupedKStream: shouldn't we keep the naming > > convention consistent? And if we change the naming schema just change > > all at once? I personally don't care which naming scheme is better, but > > I think consistency is super important! > > > > About Bill's comment: I agree, and had a similar thought. > > > > > > -Matthias > > > > On 8/23/17 12:24 PM, Bill Bejeck wrote: > > > Thanks for all the work on this KIP Damian. > > > > > > Both `Produced` and `Joined` have a `with` method accepting all > > parameters, > > > but `Consumed` doesn't. Should we add one for consistency? > > > > > > Thanks, > > > Bill > > > > > > On Wed, Aug 23, 2017 at 4:15 AM, Damian Guy <damian....@gmail.com> > > wrote: > > > > > >> KIP has been updated. thanks > > >> > > >> On Wed, 23 Aug 2017 at 09:10 Damian Guy <damian....@gmail.com> wrote: > > >> > > >>> Hi Matthias, > > >>> > > >>> > > >>>> KStream: > > >>>> leftJoin and outerJoin for KStream/KTable join should not have > > >>>> `JoinWindows` parameter > > >>>> > > >>>> Thanks! > > >>> > > >>> > > >>>> > > >>>> Nit: TopologyBuilder -> Topology > > >>>> > > >>>> Ack > > >>> > > >>> > > >>>> Nit: new class Serialized list static method #with twice > > >>>> > > >>>> Ack > > >>> > > >>> > > >>>> WindowedKStream -> for consistency we should either have > > GroupedKStream > > >>>> or KWindowedStream... (similar argument for SessionWindowedKStream) > > >>>> > > >>>> We can't rename KGroupedStream -> GroupedKStream without breaking > > >>> compatibility. So we are stuck with it for now. Hopefully in the > future > > >> we > > >>> can rename KGroupedStream to GroupedKStream. > > >>> > > >>> > > >>>> > > >>>> KGroupedStream > > >>>> -> why do we use a different name for `sessionWindowedBy()` -- seems > > to > > >>>> be cleaner to call both methods `windowedBy()` > > >>>> > > >>>> > > >>> I beg to differ that it is cleaner either way! > > >>> > > >>> > > >>>> > > >>>> StreamsBuilder#stream -> parameter order is confusing... We should > > have > > >>>> Pattern as second parameter to align both methods. > > >>>> > > >>>> Ack > > >>> > > >>> > > >>>> StreamsBuilder#table/globalTable -> move parameter `Consumed` as > first > > >>>> parameter to align with `#stream` > > >>>> > > >>>> > > >>>> Ack > > >>> > > >>>> Produced#with(Serde, Serde) > > >>>> Produced#with(StreamPartitioner, Serde, Serde) > > >>>> -> should StreamPartitioner be the third argument instead of the > > first? > > >>>> > > >>>> Sure > > >>> > > >>>> > > >>>> Consumed: > > >>>> Why do we need 3 different names for the 3 static methods? I would > all > > >>>> of them just call `with()`. Current names sound clumsy to me. And a > > >>>> plain `with()` also aligns with the naming of static methods of > other > > >>>> classes. > > >>>> > > >>> > > >>> I disagree that the names sound clumsy! But yes they should be > aligned > > >>> with the others. > > >>> > > >>> > > >>>> > > >>>> > > >>>> I guess we are also deprecation a bunch of method for > > >>>> KStream/KTable/KGroupedStream/KGroupedTable and should mention > which > > >>>> one? There is just one sentence "Deprecate the existing overloads.", > > but > > >>>> we don't deprecate all existing once. I personally don't care to > much > > if > > >>>> we spell deprecated method out explicitly, but right now it's not > > >>>> consistent as we only list methods we add. > > >>>> > > >>>> > > >>> > > >>>> Should we deprecate `StateStoreSupplier`? > > >>>> > > >>> Yep > > >>> > > >>>> > > >>>> > > >>>> -Matthias > > >>>> > > >>>> > > >>>> > > >>>> On 8/22/17 6:55 AM, Damian Guy wrote: > > >>>>> I've just updated the KIP with some additional changes targeted at > > >>>>> StreamsBuilder > > >>>>> > > >>>>> Thanks, > > >>>>> Damian > > >>>>> > > >>>>> On Thu, 10 Aug 2017 at 12:59 Damian Guy <damian....@gmail.com> > > wrote: > > >>>>> > > >>>>>> > > >>>>>>> Got it, thanks. > > >>>>>>> > > >>>>>>> Does it still make sense to have one static constructors for each > > >>>> spec, > > >>>>>>> with one constructor having only one parameter to make it more > > >> usable, > > >>>>>>> i.e. > > >>>>>>> as a user I do not need to give all parameters if I only want to > > >>>> override > > >>>>>>> one of them? Maybe we can just name the constructors as `with` > but > > >>>> I'm not > > >>>>>>> sure if Java distinguish: > > >>>>>>> > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<K> keySerde) > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<V> > valueSerde) > > >>>>>>> > > >>>>>>> as two function signatures. > > >>>>>>> > > >>>>>>> > > >>>>>> No that won't work. That is why we have all options, i.e., on > > Produce > > >>>>>> public static <K, V> Produced<K, V> with(final Serde<K> keySerde, > > >>>> final Serde<V> > > >>>>>> valueSerde) > > >>>>>> public static <K, V> Produced<K, V> with(final > StreamPartitioner<K, > > >> V> > > >>>>>> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) > > >>>>>> public static <K, V> Produced<K, V> keySerde(final Serde<K> > > keySerde) > > >>>>>> public static <K, V> Produced<K, V> valueSerde(final Serde<V> > > >>>> valueSerde) > > >>>>>> public static <K, V> Produced<K, V> streamPartitioner(final > > >>>> StreamPartitioner<K, > > >>>>>> V> partitioner) > > >>>>>> > > >>>>>> So if you only want to use one you can just use the function that > > >> takes > > >>>>>> one argument. > > >>>>>> > > >>>>>>> > > >>>>>>> Guozhang > > >>>>>>> > > >>>>>>> > > >>>>>>> On Wed, Aug 9, 2017 at 6:20 AM, Damian Guy <damian....@gmail.com > > > > >>>> wrote: > > >>>>>>> > > >>>>>>>> On Tue, 8 Aug 2017 at 20:11 Guozhang Wang <wangg...@gmail.com> > > >>>> wrote: > > >>>>>>>> > > >>>>>>>>> Damian, > > >>>>>>>>> > > >>>>>>>>> Thanks for the proposal, I had a few comments on the APIs: > > >>>>>>>>> > > >>>>>>>>> 1. Printed#withFile seems not needed, as users should always > spec > > >> if > > >>>>>>> it > > >>>>>>>> is > > >>>>>>>>> to sysOut or to File at the beginning. In addition as a second > > >>>>>>> thought, I > > >>>>>>>>> think serdes are not useful for prints anyways since we assume > > >>>>>>> `toString` > > >>>>>>>>> is provided except for byte arrays, in which we will special > > >> handle > > >>>>>>> it. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> +1 > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> Another comment about Printed in general is it differs with > other > > >>>>>>> options > > >>>>>>>>> that it is a required option than optional one, since it > includes > > >>>>>>>> toSysOut > > >>>>>>>>> / toFile specs; what are the pros and cons for including these > > two > > >>>> in > > >>>>>>> the > > >>>>>>>>> option and hence make it a required option than leaving them at > > >> the > > >>>>>>> API > > >>>>>>>>> layer and make Printed as optional for mapper / label only? > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> It isn't required as we will still have the no-arg print() which > > >> will > > >>>>>>> just > > >>>>>>>> go to sysout as it does now. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 2.1 KStream#through / to > > >>>>>>>>> > > >>>>>>>>> We should have an overloaded function without Produced? > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> Yes - we already have those so they are not part of the KIP, > i.e, > > >>>>>>>> through(topic) > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 2.2 KStream#groupBy / groupByKey > > >>>>>>>>> > > >>>>>>>>> We should have an overloaded function without Serialized? > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> Yes, as above > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 2.3 KGroupedStream#count / reduce / aggregate > > >>>>>>>>> > > >>>>>>>>> We should have an overloaded function without Materialized? > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> As above > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 2.4 KStream#join > > >>>>>>>>> > > >>>>>>>>> We should have an overloaded function without Joined? > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> as above > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 2.5 Each of KTable's operators: > > >>>>>>>>> > > >>>>>>>>> We should have an overloaded function without Produced / > > >> Serialized > > >>>> / > > >>>>>>>>> Materialized? > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> as above > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 3.1 Produced: the static functions have overlaps, which seems > not > > >>>>>>>>> necessary. I'd suggest jut having the following three static > with > > >>>>>>> another > > >>>>>>>>> three similar member functions: > > >>>>>>>>> > > >>>>>>>>> public static <K, V> Produced<K, V> withKeySerde(final Serde<K> > > >>>>>>> keySerde) > > >>>>>>>>> > > >>>>>>>>> public static <K, V> Produced<K, V> withValueSerde(final > Serde<V> > > >>>>>>>>> valueSerde) > > >>>>>>>>> > > >>>>>>>>> public static <K, V> Produced<K, V> withStreamPartitioner(final > > >>>>>>>>> StreamPartitioner<K, V> partitioner) > > >>>>>>>>> > > >>>>>>>>> The key idea is that by using the same function name string for > > >>>> static > > >>>>>>>>> constructor and member functions, users do not need to remember > > >> what > > >>>>>>> are > > >>>>>>>>> the differences but can call these functions with any ordering > > >> they > > >>>>>>> want, > > >>>>>>>>> and later calls on the same spec will win over early calls. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> That would be great if java supported it, but it doesn't. You > > can't > > >>>> have > > >>>>>>>> static an member functions with the same signature. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 3.2 Serialized: similarly > > >>>>>>>>> > > >>>>>>>>> public static <K, V> Serialized<K, V> withKeySerde(final > Serde<K> > > >>>>>>>> keySerde) > > >>>>>>>>> > > >>>>>>>>> public static <K, V> Serialized<K, V> withValueSerde(final > > >> Serde<V> > > >>>>>>>>> valueSerde) > > >>>>>>>>> > > >>>>>>>>> public Serialized<K, V> withKeySerde(final Serde<K> keySerde) > > >>>>>>>>> > > >>>>>>>>> public Serialized<K, V> withValueSerde(final Serde valueSerde) > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> as above > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Also it has a final Serde<V> otherValueSerde in one of its > static > > >>>>>>>>> constructor, it that intentional? > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> Nope: thanks. > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 3.3. Joined: similarly, keep the static constructor signatures > > the > > >>>>>>> same > > >>>>>>>> as > > >>>>>>>>> its corresponding member fields. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> As above > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> 3.4 Materialized: it is a bit special, and I think we can keep > > its > > >>>>>>> static > > >>>>>>>>> constructors with only two `as` as they are today.K > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> 4. Is there any modifications on StateStoreSupplier? Is it > > replaced > > >>>> by > > >>>>>>>>> BytesStoreSupplier? Seems some more descriptions are lacking > > here. > > >>>>>>> Also > > >>>>>>>> in > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> No modifications to StateStoreSupplier. It is superseceded by > > >>>>>>>> BytesStoreSupplier. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> public static <K, V, S extends StateStore> Materialized<K, V, > S> > > >>>>>>>>> as(final StateStoreSupplier<S> > > >>>>>>>>> supplier) > > >>>>>>>>> > > >>>>>>>>> Is the parameter in type of BytesStoreSupplier? > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> Yep - thanks > > >>>>>>>> > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Guozhang > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Thu, Jul 27, 2017 at 5:26 AM, Damian Guy < > > damian....@gmail.com > > >>> > > >>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Updated link: > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>>> 182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+ > > >>>>>>>>>> use+of+custom+storage+engines > > >>>>>>>>>> > > >>>>>>>>>> Thanks, > > >>>>>>>>>> Damian > > >>>>>>>>>> > > >>>>>>>>>> On Thu, 27 Jul 2017 at 13:09 Damian Guy <damian....@gmail.com > > > > >>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi, > > >>>>>>>>>>> > > >>>>>>>>>>> I've put together a KIP to make some changes to the > > KafkaStreams > > >>>>>>> DSL > > >>>>>>>>> that > > >>>>>>>>>>> will hopefully allow us to: > > >>>>>>>>>>> 1) reduce the explosion of overloads > > >>>>>>>>>>> 2) add new features without having to continue adding more > > >>>>>>> overloads > > >>>>>>>>>>> 3) provide simpler ways for people to use custom storage > > engines > > >>>>>>> and > > >>>>>>>>> wrap > > >>>>>>>>>>> them with logging, caching etc if desired > > >>>>>>>>>>> 4) enable per-operator caching rather than global caching > > >> without > > >>>>>>>>> having > > >>>>>>>>>>> to resort to supplying a StateStoreSupplier when you just > want > > >> to > > >>>>>>>> turn > > >>>>>>>>>>> caching off. > > >>>>>>>>>>> > > >>>>>>>>>>> The KIP is here: > > >>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage. > > >>>>>>>>>> action?pageId=73631309 > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> Damian > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> -- Guozhang > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> -- > > >>>>>>> -- Guozhang > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >> > > > > > > > > -- -- Guozhang