I've updated the kip to reflect Bill's comment and also to make StreamBuilder methods have topic as the first param, i.e., StreamBuilder#stream no longer accepts varargs.
On Thu, 24 Aug 2017 at 09:12 Damian Guy <damian....@gmail.com> wrote: > On Thu, 24 Aug 2017 at 02:49 Guozhang Wang <wangg...@gmail.com> wrote: > >> I have a couple of comments but otherwise it LGTM: >> >> 1. For these two functions in StreamsBuilder, the topic String is set as >> the second parameter in between of two options. Would that be better to be >> set as the first or the last one instead? >> >> It would be better as the first, but then it is different to the > #streams() methods due to varargs. > > >> public synchronized <K, V> KTable<K, V> table(final Consumed<K, V> >> consumed, final String topic, final Materialized<K, V> materialized) >> >> public synchronized <K, V> GlobalKTable<K, V> globalTable(final >> Consumed<K, >> V> consumed, final String topic, final Materialized<K, V> materialized) >> >> I understand that we cannot do it for the first parameter because of the >> vararg type. So I'd suggest either >> >> a) set it as the last parameter, but then it is inconsistent with other >> functions like these: >> >> void to(final String topic, final Produced<V, V> options); >> >> KTable<K, V> through(final String topic, final Materialized<K, V> >> options); >> >> b) only allow one single topic name parameter in StreamsBuilder.stream() >> since in practice we do not see too many usages of multiple topics, plus >> it >> can be semi-supported with "merge" as we move it from StreamsBuilder to >> KStream (KAFKA-5765), >> >> Perhaps this is the better approach > > >> 2. KGroupedStream's function: >> >> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, >> final Aggregator<? super K, ? super V, VR> >> aggregator, >> final Serde<VR> aggValueSerde, >> final Materialized<K, VR, KeyValueStore<K, >> VR>> materialized); >> >> The "aggValueSerde" seems not needed? >> >> 3. +1 on `KGroupedStream` v.s. `GroupedKStream`. I think KGroupedStream >> was >> a bad name as a hind-sight. I personally feel we should just correct it >> with a new class and deprecate / remove the old one before 1.0.0, but that >> could be in its own KIP. >> >> > The problem with this is that we'd need to add new `groupBy` and > `groupByKey` methods that return `GroupedKStream`, we can't change the > existing ones as that would break compatibility. So what would we name > these methods? > > >> >> Guozhang >> >> >> >> On Wed, Aug 23, 2017 at 1:01 PM, Damian Guy <damian....@gmail.com> wrote: >> >> > We already have GlobalKTable and i can't rename KGroupedStream, which >> > really should be GroupedKStream. So I think we should name new things >> > correctly, i.e., WindowedKStream etc and fix the others when we can. >> > >> > On Wed, 23 Aug 2017 at 20:38 Matthias J. Sax <matth...@confluent.io> >> > wrote: >> > >> > > About KGroupedStream vs GroupedKStream: shouldn't we keep the naming >> > > convention consistent? And if we change the naming schema just change >> > > all at once? I personally don't care which naming scheme is better, >> but >> > > I think consistency is super important! >> > > >> > > About Bill's comment: I agree, and had a similar thought. >> > > >> > > >> > > -Matthias >> > > >> > > On 8/23/17 12:24 PM, Bill Bejeck wrote: >> > > > Thanks for all the work on this KIP Damian. >> > > > >> > > > Both `Produced` and `Joined` have a `with` method accepting all >> > > parameters, >> > > > but `Consumed` doesn't. Should we add one for consistency? >> > > > >> > > > Thanks, >> > > > Bill >> > > > >> > > > On Wed, Aug 23, 2017 at 4:15 AM, Damian Guy <damian....@gmail.com> >> > > wrote: >> > > > >> > > >> KIP has been updated. thanks >> > > >> >> > > >> On Wed, 23 Aug 2017 at 09:10 Damian Guy <damian....@gmail.com> >> wrote: >> > > >> >> > > >>> Hi Matthias, >> > > >>> >> > > >>> >> > > >>>> KStream: >> > > >>>> leftJoin and outerJoin for KStream/KTable join should not have >> > > >>>> `JoinWindows` parameter >> > > >>>> >> > > >>>> Thanks! >> > > >>> >> > > >>> >> > > >>>> >> > > >>>> Nit: TopologyBuilder -> Topology >> > > >>>> >> > > >>>> Ack >> > > >>> >> > > >>> >> > > >>>> Nit: new class Serialized list static method #with twice >> > > >>>> >> > > >>>> Ack >> > > >>> >> > > >>> >> > > >>>> WindowedKStream -> for consistency we should either have >> > > GroupedKStream >> > > >>>> or KWindowedStream... (similar argument for >> SessionWindowedKStream) >> > > >>>> >> > > >>>> We can't rename KGroupedStream -> GroupedKStream without breaking >> > > >>> compatibility. So we are stuck with it for now. Hopefully in the >> > future >> > > >> we >> > > >>> can rename KGroupedStream to GroupedKStream. >> > > >>> >> > > >>> >> > > >>>> >> > > >>>> KGroupedStream >> > > >>>> -> why do we use a different name for `sessionWindowedBy()` -- >> seems >> > > to >> > > >>>> be cleaner to call both methods `windowedBy()` >> > > >>>> >> > > >>>> >> > > >>> I beg to differ that it is cleaner either way! >> > > >>> >> > > >>> >> > > >>>> >> > > >>>> StreamsBuilder#stream -> parameter order is confusing... We >> should >> > > have >> > > >>>> Pattern as second parameter to align both methods. >> > > >>>> >> > > >>>> Ack >> > > >>> >> > > >>> >> > > >>>> StreamsBuilder#table/globalTable -> move parameter `Consumed` as >> > first >> > > >>>> parameter to align with `#stream` >> > > >>>> >> > > >>>> >> > > >>>> Ack >> > > >>> >> > > >>>> Produced#with(Serde, Serde) >> > > >>>> Produced#with(StreamPartitioner, Serde, Serde) >> > > >>>> -> should StreamPartitioner be the third argument instead of the >> > > first? >> > > >>>> >> > > >>>> Sure >> > > >>> >> > > >>>> >> > > >>>> Consumed: >> > > >>>> Why do we need 3 different names for the 3 static methods? I >> would >> > all >> > > >>>> of them just call `with()`. Current names sound clumsy to me. >> And a >> > > >>>> plain `with()` also aligns with the naming of static methods of >> > other >> > > >>>> classes. >> > > >>>> >> > > >>> >> > > >>> I disagree that the names sound clumsy! But yes they should be >> > aligned >> > > >>> with the others. >> > > >>> >> > > >>> >> > > >>>> >> > > >>>> >> > > >>>> I guess we are also deprecation a bunch of method for >> > > >>>> KStream/KTable/KGroupedStream/KGroupedTable and should mention >> > which >> > > >>>> one? There is just one sentence "Deprecate the existing >> overloads.", >> > > but >> > > >>>> we don't deprecate all existing once. I personally don't care to >> > much >> > > if >> > > >>>> we spell deprecated method out explicitly, but right now it's not >> > > >>>> consistent as we only list methods we add. >> > > >>>> >> > > >>>> >> > > >>> >> > > >>>> Should we deprecate `StateStoreSupplier`? >> > > >>>> >> > > >>> Yep >> > > >>> >> > > >>>> >> > > >>>> >> > > >>>> -Matthias >> > > >>>> >> > > >>>> >> > > >>>> >> > > >>>> On 8/22/17 6:55 AM, Damian Guy wrote: >> > > >>>>> I've just updated the KIP with some additional changes targeted >> at >> > > >>>>> StreamsBuilder >> > > >>>>> >> > > >>>>> Thanks, >> > > >>>>> Damian >> > > >>>>> >> > > >>>>> On Thu, 10 Aug 2017 at 12:59 Damian Guy <damian....@gmail.com> >> > > wrote: >> > > >>>>> >> > > >>>>>> >> > > >>>>>>> Got it, thanks. >> > > >>>>>>> >> > > >>>>>>> Does it still make sense to have one static constructors for >> each >> > > >>>> spec, >> > > >>>>>>> with one constructor having only one parameter to make it more >> > > >> usable, >> > > >>>>>>> i.e. >> > > >>>>>>> as a user I do not need to give all parameters if I only want >> to >> > > >>>> override >> > > >>>>>>> one of them? Maybe we can just name the constructors as `with` >> > but >> > > >>>> I'm not >> > > >>>>>>> sure if Java distinguish: >> > > >>>>>>> >> > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<K> >> keySerde) >> > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<V> >> > valueSerde) >> > > >>>>>>> >> > > >>>>>>> as two function signatures. >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>> No that won't work. That is why we have all options, i.e., on >> > > Produce >> > > >>>>>> public static <K, V> Produced<K, V> with(final Serde<K> >> keySerde, >> > > >>>> final Serde<V> >> > > >>>>>> valueSerde) >> > > >>>>>> public static <K, V> Produced<K, V> with(final >> > StreamPartitioner<K, >> > > >> V> >> > > >>>>>> partitioner, final Serde<K> keySerde, final Serde<V> >> valueSerde) >> > > >>>>>> public static <K, V> Produced<K, V> keySerde(final Serde<K> >> > > keySerde) >> > > >>>>>> public static <K, V> Produced<K, V> valueSerde(final Serde<V> >> > > >>>> valueSerde) >> > > >>>>>> public static <K, V> Produced<K, V> streamPartitioner(final >> > > >>>> StreamPartitioner<K, >> > > >>>>>> V> partitioner) >> > > >>>>>> >> > > >>>>>> So if you only want to use one you can just use the function >> that >> > > >> takes >> > > >>>>>> one argument. >> > > >>>>>> >> > > >>>>>>> >> > > >>>>>>> Guozhang >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> On Wed, Aug 9, 2017 at 6:20 AM, Damian Guy < >> damian....@gmail.com >> > > >> > > >>>> wrote: >> > > >>>>>>> >> > > >>>>>>>> On Tue, 8 Aug 2017 at 20:11 Guozhang Wang < >> wangg...@gmail.com> >> > > >>>> wrote: >> > > >>>>>>>> >> > > >>>>>>>>> Damian, >> > > >>>>>>>>> >> > > >>>>>>>>> Thanks for the proposal, I had a few comments on the APIs: >> > > >>>>>>>>> >> > > >>>>>>>>> 1. Printed#withFile seems not needed, as users should always >> > spec >> > > >> if >> > > >>>>>>> it >> > > >>>>>>>> is >> > > >>>>>>>>> to sysOut or to File at the beginning. In addition as a >> second >> > > >>>>>>> thought, I >> > > >>>>>>>>> think serdes are not useful for prints anyways since we >> assume >> > > >>>>>>> `toString` >> > > >>>>>>>>> is provided except for byte arrays, in which we will special >> > > >> handle >> > > >>>>>>> it. >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>> +1 >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>>> Another comment about Printed in general is it differs with >> > other >> > > >>>>>>> options >> > > >>>>>>>>> that it is a required option than optional one, since it >> > includes >> > > >>>>>>>> toSysOut >> > > >>>>>>>>> / toFile specs; what are the pros and cons for including >> these >> > > two >> > > >>>> in >> > > >>>>>>> the >> > > >>>>>>>>> option and hence make it a required option than leaving >> them at >> > > >> the >> > > >>>>>>> API >> > > >>>>>>>>> layer and make Printed as optional for mapper / label only? >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>> It isn't required as we will still have the no-arg print() >> which >> > > >> will >> > > >>>>>>> just >> > > >>>>>>>> go to sysout as it does now. >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> 2.1 KStream#through / to >> > > >>>>>>>>> >> > > >>>>>>>>> We should have an overloaded function without Produced? >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> Yes - we already have those so they are not part of the KIP, >> > i.e, >> > > >>>>>>>> through(topic) >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> 2.2 KStream#groupBy / groupByKey >> > > >>>>>>>>> >> > > >>>>>>>>> We should have an overloaded function without Serialized? >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> Yes, as above >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> 2.3 KGroupedStream#count / reduce / aggregate >> > > >>>>>>>>> >> > > >>>>>>>>> We should have an overloaded function without Materialized? >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> As above >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> 2.4 KStream#join >> > > >>>>>>>>> >> > > >>>>>>>>> We should have an overloaded function without Joined? >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> as above >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> 2.5 Each of KTable's operators: >> > > >>>>>>>>> >> > > >>>>>>>>> We should have an overloaded function without Produced / >> > > >> Serialized >> > > >>>> / >> > > >>>>>>>>> Materialized? >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>> as above >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> 3.1 Produced: the static functions have overlaps, which >> seems >> > not >> > > >>>>>>>>> necessary. I'd suggest jut having the following three static >> > with >> > > >>>>>>> another >> > > >>>>>>>>> three similar member functions: >> > > >>>>>>>>> >> > > >>>>>>>>> public static <K, V> Produced<K, V> withKeySerde(final >> Serde<K> >> > > >>>>>>> keySerde) >> > > >>>>>>>>> >> > > >>>>>>>>> public static <K, V> Produced<K, V> withValueSerde(final >> > Serde<V> >> > > >>>>>>>>> valueSerde) >> > > >>>>>>>>> >> > > >>>>>>>>> public static <K, V> Produced<K, V> >> withStreamPartitioner(final >> > > >>>>>>>>> StreamPartitioner<K, V> partitioner) >> > > >>>>>>>>> >> > > >>>>>>>>> The key idea is that by using the same function name string >> for >> > > >>>> static >> > > >>>>>>>>> constructor and member functions, users do not need to >> remember >> > > >> what >> > > >>>>>>> are >> > > >>>>>>>>> the differences but can call these functions with any >> ordering >> > > >> they >> > > >>>>>>> want, >> > > >>>>>>>>> and later calls on the same spec will win over early calls. >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>> That would be great if java supported it, but it doesn't. You >> > > can't >> > > >>>> have >> > > >>>>>>>> static an member functions with the same signature. >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> 3.2 Serialized: similarly >> > > >>>>>>>>> >> > > >>>>>>>>> public static <K, V> Serialized<K, V> withKeySerde(final >> > Serde<K> >> > > >>>>>>>> keySerde) >> > > >>>>>>>>> >> > > >>>>>>>>> public static <K, V> Serialized<K, V> withValueSerde(final >> > > >> Serde<V> >> > > >>>>>>>>> valueSerde) >> > > >>>>>>>>> >> > > >>>>>>>>> public Serialized<K, V> withKeySerde(final Serde<K> >> keySerde) >> > > >>>>>>>>> >> > > >>>>>>>>> public Serialized<K, V> withValueSerde(final Serde >> valueSerde) >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> as above >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> Also it has a final Serde<V> otherValueSerde in one of its >> > static >> > > >>>>>>>>> constructor, it that intentional? >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> Nope: thanks. >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> 3.3. Joined: similarly, keep the static constructor >> signatures >> > > the >> > > >>>>>>> same >> > > >>>>>>>> as >> > > >>>>>>>>> its corresponding member fields. >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>> As above >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>>> 3.4 Materialized: it is a bit special, and I think we can >> keep >> > > its >> > > >>>>>>> static >> > > >>>>>>>>> constructors with only two `as` as they are today.K >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>> 4. Is there any modifications on StateStoreSupplier? Is it >> > > replaced >> > > >>>> by >> > > >>>>>>>>> BytesStoreSupplier? Seems some more descriptions are lacking >> > > here. >> > > >>>>>>> Also >> > > >>>>>>>> in >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>> No modifications to StateStoreSupplier. It is superseceded by >> > > >>>>>>>> BytesStoreSupplier. >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>>> public static <K, V, S extends StateStore> Materialized<K, >> V, >> > S> >> > > >>>>>>>>> as(final StateStoreSupplier<S> >> > > >>>>>>>>> supplier) >> > > >>>>>>>>> >> > > >>>>>>>>> Is the parameter in type of BytesStoreSupplier? >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> Yep - thanks >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> Guozhang >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> On Thu, Jul 27, 2017 at 5:26 AM, Damian Guy < >> > > damian....@gmail.com >> > > >>> >> > > >>>>>>>> wrote: >> > > >>>>>>>>> >> > > >>>>>>>>>> Updated link: >> > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > >>>>>>>>>> 182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+ >> > > >>>>>>>>>> use+of+custom+storage+engines >> > > >>>>>>>>>> >> > > >>>>>>>>>> Thanks, >> > > >>>>>>>>>> Damian >> > > >>>>>>>>>> >> > > >>>>>>>>>> On Thu, 27 Jul 2017 at 13:09 Damian Guy < >> damian....@gmail.com >> > > >> > > >>>>>>> wrote: >> > > >>>>>>>>>> >> > > >>>>>>>>>>> Hi, >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> I've put together a KIP to make some changes to the >> > > KafkaStreams >> > > >>>>>>> DSL >> > > >>>>>>>>> that >> > > >>>>>>>>>>> will hopefully allow us to: >> > > >>>>>>>>>>> 1) reduce the explosion of overloads >> > > >>>>>>>>>>> 2) add new features without having to continue adding more >> > > >>>>>>> overloads >> > > >>>>>>>>>>> 3) provide simpler ways for people to use custom storage >> > > engines >> > > >>>>>>> and >> > > >>>>>>>>> wrap >> > > >>>>>>>>>>> them with logging, caching etc if desired >> > > >>>>>>>>>>> 4) enable per-operator caching rather than global caching >> > > >> without >> > > >>>>>>>>> having >> > > >>>>>>>>>>> to resort to supplying a StateStoreSupplier when you just >> > want >> > > >> to >> > > >>>>>>>> turn >> > > >>>>>>>>>>> caching off. >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> The KIP is here: >> > > >>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage. >> > > >>>>>>>>>> action?pageId=73631309 >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> Thanks, >> > > >>>>>>>>>>> Damian >> > > >>>>>>>>>>> >> > > >>>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> -- >> > > >>>>>>>>> -- Guozhang >> > > >>>>>>>>> >> > > >>>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> -- >> > > >>>>>>> -- Guozhang >> > > >>>>>>> >> > > >>>>>> >> > > >>>>> >> > > >>>> >> > > >>>> >> > > >> >> > > > >> > > >> > > >> > >> >> >> >> -- >> -- Guozhang >> >