On Thu, 24 Aug 2017 at 02:49 Guozhang Wang <wangg...@gmail.com> wrote:
> I have a couple of comments but otherwise it LGTM: > > 1. For these two functions in StreamsBuilder, the topic String is set as > the second parameter in between of two options. Would that be better to be > set as the first or the last one instead? > > It would be better as the first, but then it is different to the #streams() methods due to varargs. > public synchronized <K, V> KTable<K, V> table(final Consumed<K, V> > consumed, final String topic, final Materialized<K, V> materialized) > > public synchronized <K, V> GlobalKTable<K, V> globalTable(final Consumed<K, > V> consumed, final String topic, final Materialized<K, V> materialized) > > I understand that we cannot do it for the first parameter because of the > vararg type. So I'd suggest either > > a) set it as the last parameter, but then it is inconsistent with other > functions like these: > > void to(final String topic, final Produced<V, V> options); > > KTable<K, V> through(final String topic, final Materialized<K, V> options); > > b) only allow one single topic name parameter in StreamsBuilder.stream() > since in practice we do not see too many usages of multiple topics, plus it > can be semi-supported with "merge" as we move it from StreamsBuilder to > KStream (KAFKA-5765), > > Perhaps this is the better approach > 2. KGroupedStream's function: > > <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, > final Aggregator<? super K, ? super V, VR> > aggregator, > final Serde<VR> aggValueSerde, > final Materialized<K, VR, KeyValueStore<K, > VR>> materialized); > > The "aggValueSerde" seems not needed? > > 3. +1 on `KGroupedStream` v.s. `GroupedKStream`. I think KGroupedStream was > a bad name as a hind-sight. I personally feel we should just correct it > with a new class and deprecate / remove the old one before 1.0.0, but that > could be in its own KIP. > > The problem with this is that we'd need to add new `groupBy` and `groupByKey` methods that return `GroupedKStream`, we can't change the existing ones as that would break compatibility. So what would we name these methods? > > Guozhang > > > > On Wed, Aug 23, 2017 at 1:01 PM, Damian Guy <damian....@gmail.com> wrote: > > > We already have GlobalKTable and i can't rename KGroupedStream, which > > really should be GroupedKStream. So I think we should name new things > > correctly, i.e., WindowedKStream etc and fix the others when we can. > > > > On Wed, 23 Aug 2017 at 20:38 Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > About KGroupedStream vs GroupedKStream: shouldn't we keep the naming > > > convention consistent? And if we change the naming schema just change > > > all at once? I personally don't care which naming scheme is better, but > > > I think consistency is super important! > > > > > > About Bill's comment: I agree, and had a similar thought. > > > > > > > > > -Matthias > > > > > > On 8/23/17 12:24 PM, Bill Bejeck wrote: > > > > Thanks for all the work on this KIP Damian. > > > > > > > > Both `Produced` and `Joined` have a `with` method accepting all > > > parameters, > > > > but `Consumed` doesn't. Should we add one for consistency? > > > > > > > > Thanks, > > > > Bill > > > > > > > > On Wed, Aug 23, 2017 at 4:15 AM, Damian Guy <damian....@gmail.com> > > > wrote: > > > > > > > >> KIP has been updated. thanks > > > >> > > > >> On Wed, 23 Aug 2017 at 09:10 Damian Guy <damian....@gmail.com> > wrote: > > > >> > > > >>> Hi Matthias, > > > >>> > > > >>> > > > >>>> KStream: > > > >>>> leftJoin and outerJoin for KStream/KTable join should not have > > > >>>> `JoinWindows` parameter > > > >>>> > > > >>>> Thanks! > > > >>> > > > >>> > > > >>>> > > > >>>> Nit: TopologyBuilder -> Topology > > > >>>> > > > >>>> Ack > > > >>> > > > >>> > > > >>>> Nit: new class Serialized list static method #with twice > > > >>>> > > > >>>> Ack > > > >>> > > > >>> > > > >>>> WindowedKStream -> for consistency we should either have > > > GroupedKStream > > > >>>> or KWindowedStream... (similar argument for > SessionWindowedKStream) > > > >>>> > > > >>>> We can't rename KGroupedStream -> GroupedKStream without breaking > > > >>> compatibility. So we are stuck with it for now. Hopefully in the > > future > > > >> we > > > >>> can rename KGroupedStream to GroupedKStream. > > > >>> > > > >>> > > > >>>> > > > >>>> KGroupedStream > > > >>>> -> why do we use a different name for `sessionWindowedBy()` -- > seems > > > to > > > >>>> be cleaner to call both methods `windowedBy()` > > > >>>> > > > >>>> > > > >>> I beg to differ that it is cleaner either way! > > > >>> > > > >>> > > > >>>> > > > >>>> StreamsBuilder#stream -> parameter order is confusing... We should > > > have > > > >>>> Pattern as second parameter to align both methods. > > > >>>> > > > >>>> Ack > > > >>> > > > >>> > > > >>>> StreamsBuilder#table/globalTable -> move parameter `Consumed` as > > first > > > >>>> parameter to align with `#stream` > > > >>>> > > > >>>> > > > >>>> Ack > > > >>> > > > >>>> Produced#with(Serde, Serde) > > > >>>> Produced#with(StreamPartitioner, Serde, Serde) > > > >>>> -> should StreamPartitioner be the third argument instead of the > > > first? > > > >>>> > > > >>>> Sure > > > >>> > > > >>>> > > > >>>> Consumed: > > > >>>> Why do we need 3 different names for the 3 static methods? I would > > all > > > >>>> of them just call `with()`. Current names sound clumsy to me. And > a > > > >>>> plain `with()` also aligns with the naming of static methods of > > other > > > >>>> classes. > > > >>>> > > > >>> > > > >>> I disagree that the names sound clumsy! But yes they should be > > aligned > > > >>> with the others. > > > >>> > > > >>> > > > >>>> > > > >>>> > > > >>>> I guess we are also deprecation a bunch of method for > > > >>>> KStream/KTable/KGroupedStream/KGroupedTable and should mention > > which > > > >>>> one? There is just one sentence "Deprecate the existing > overloads.", > > > but > > > >>>> we don't deprecate all existing once. I personally don't care to > > much > > > if > > > >>>> we spell deprecated method out explicitly, but right now it's not > > > >>>> consistent as we only list methods we add. > > > >>>> > > > >>>> > > > >>> > > > >>>> Should we deprecate `StateStoreSupplier`? > > > >>>> > > > >>> Yep > > > >>> > > > >>>> > > > >>>> > > > >>>> -Matthias > > > >>>> > > > >>>> > > > >>>> > > > >>>> On 8/22/17 6:55 AM, Damian Guy wrote: > > > >>>>> I've just updated the KIP with some additional changes targeted > at > > > >>>>> StreamsBuilder > > > >>>>> > > > >>>>> Thanks, > > > >>>>> Damian > > > >>>>> > > > >>>>> On Thu, 10 Aug 2017 at 12:59 Damian Guy <damian....@gmail.com> > > > wrote: > > > >>>>> > > > >>>>>> > > > >>>>>>> Got it, thanks. > > > >>>>>>> > > > >>>>>>> Does it still make sense to have one static constructors for > each > > > >>>> spec, > > > >>>>>>> with one constructor having only one parameter to make it more > > > >> usable, > > > >>>>>>> i.e. > > > >>>>>>> as a user I do not need to give all parameters if I only want > to > > > >>>> override > > > >>>>>>> one of them? Maybe we can just name the constructors as `with` > > but > > > >>>> I'm not > > > >>>>>>> sure if Java distinguish: > > > >>>>>>> > > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<K> > keySerde) > > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<V> > > valueSerde) > > > >>>>>>> > > > >>>>>>> as two function signatures. > > > >>>>>>> > > > >>>>>>> > > > >>>>>> No that won't work. That is why we have all options, i.e., on > > > Produce > > > >>>>>> public static <K, V> Produced<K, V> with(final Serde<K> > keySerde, > > > >>>> final Serde<V> > > > >>>>>> valueSerde) > > > >>>>>> public static <K, V> Produced<K, V> with(final > > StreamPartitioner<K, > > > >> V> > > > >>>>>> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) > > > >>>>>> public static <K, V> Produced<K, V> keySerde(final Serde<K> > > > keySerde) > > > >>>>>> public static <K, V> Produced<K, V> valueSerde(final Serde<V> > > > >>>> valueSerde) > > > >>>>>> public static <K, V> Produced<K, V> streamPartitioner(final > > > >>>> StreamPartitioner<K, > > > >>>>>> V> partitioner) > > > >>>>>> > > > >>>>>> So if you only want to use one you can just use the function > that > > > >> takes > > > >>>>>> one argument. > > > >>>>>> > > > >>>>>>> > > > >>>>>>> Guozhang > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Wed, Aug 9, 2017 at 6:20 AM, Damian Guy < > damian....@gmail.com > > > > > > >>>> wrote: > > > >>>>>>> > > > >>>>>>>> On Tue, 8 Aug 2017 at 20:11 Guozhang Wang <wangg...@gmail.com > > > > > >>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Damian, > > > >>>>>>>>> > > > >>>>>>>>> Thanks for the proposal, I had a few comments on the APIs: > > > >>>>>>>>> > > > >>>>>>>>> 1. Printed#withFile seems not needed, as users should always > > spec > > > >> if > > > >>>>>>> it > > > >>>>>>>> is > > > >>>>>>>>> to sysOut or to File at the beginning. In addition as a > second > > > >>>>>>> thought, I > > > >>>>>>>>> think serdes are not useful for prints anyways since we > assume > > > >>>>>>> `toString` > > > >>>>>>>>> is provided except for byte arrays, in which we will special > > > >> handle > > > >>>>>>> it. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> +1 > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> Another comment about Printed in general is it differs with > > other > > > >>>>>>> options > > > >>>>>>>>> that it is a required option than optional one, since it > > includes > > > >>>>>>>> toSysOut > > > >>>>>>>>> / toFile specs; what are the pros and cons for including > these > > > two > > > >>>> in > > > >>>>>>> the > > > >>>>>>>>> option and hence make it a required option than leaving them > at > > > >> the > > > >>>>>>> API > > > >>>>>>>>> layer and make Printed as optional for mapper / label only? > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> It isn't required as we will still have the no-arg print() > which > > > >> will > > > >>>>>>> just > > > >>>>>>>> go to sysout as it does now. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 2.1 KStream#through / to > > > >>>>>>>>> > > > >>>>>>>>> We should have an overloaded function without Produced? > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> Yes - we already have those so they are not part of the KIP, > > i.e, > > > >>>>>>>> through(topic) > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 2.2 KStream#groupBy / groupByKey > > > >>>>>>>>> > > > >>>>>>>>> We should have an overloaded function without Serialized? > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> Yes, as above > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 2.3 KGroupedStream#count / reduce / aggregate > > > >>>>>>>>> > > > >>>>>>>>> We should have an overloaded function without Materialized? > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> As above > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 2.4 KStream#join > > > >>>>>>>>> > > > >>>>>>>>> We should have an overloaded function without Joined? > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> as above > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 2.5 Each of KTable's operators: > > > >>>>>>>>> > > > >>>>>>>>> We should have an overloaded function without Produced / > > > >> Serialized > > > >>>> / > > > >>>>>>>>> Materialized? > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> as above > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 3.1 Produced: the static functions have overlaps, which seems > > not > > > >>>>>>>>> necessary. I'd suggest jut having the following three static > > with > > > >>>>>>> another > > > >>>>>>>>> three similar member functions: > > > >>>>>>>>> > > > >>>>>>>>> public static <K, V> Produced<K, V> withKeySerde(final > Serde<K> > > > >>>>>>> keySerde) > > > >>>>>>>>> > > > >>>>>>>>> public static <K, V> Produced<K, V> withValueSerde(final > > Serde<V> > > > >>>>>>>>> valueSerde) > > > >>>>>>>>> > > > >>>>>>>>> public static <K, V> Produced<K, V> > withStreamPartitioner(final > > > >>>>>>>>> StreamPartitioner<K, V> partitioner) > > > >>>>>>>>> > > > >>>>>>>>> The key idea is that by using the same function name string > for > > > >>>> static > > > >>>>>>>>> constructor and member functions, users do not need to > remember > > > >> what > > > >>>>>>> are > > > >>>>>>>>> the differences but can call these functions with any > ordering > > > >> they > > > >>>>>>> want, > > > >>>>>>>>> and later calls on the same spec will win over early calls. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> That would be great if java supported it, but it doesn't. You > > > can't > > > >>>> have > > > >>>>>>>> static an member functions with the same signature. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 3.2 Serialized: similarly > > > >>>>>>>>> > > > >>>>>>>>> public static <K, V> Serialized<K, V> withKeySerde(final > > Serde<K> > > > >>>>>>>> keySerde) > > > >>>>>>>>> > > > >>>>>>>>> public static <K, V> Serialized<K, V> withValueSerde(final > > > >> Serde<V> > > > >>>>>>>>> valueSerde) > > > >>>>>>>>> > > > >>>>>>>>> public Serialized<K, V> withKeySerde(final Serde<K> keySerde) > > > >>>>>>>>> > > > >>>>>>>>> public Serialized<K, V> withValueSerde(final Serde > valueSerde) > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> as above > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> Also it has a final Serde<V> otherValueSerde in one of its > > static > > > >>>>>>>>> constructor, it that intentional? > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> Nope: thanks. > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 3.3. Joined: similarly, keep the static constructor > signatures > > > the > > > >>>>>>> same > > > >>>>>>>> as > > > >>>>>>>>> its corresponding member fields. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> As above > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> 3.4 Materialized: it is a bit special, and I think we can > keep > > > its > > > >>>>>>> static > > > >>>>>>>>> constructors with only two `as` as they are today.K > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> 4. Is there any modifications on StateStoreSupplier? Is it > > > replaced > > > >>>> by > > > >>>>>>>>> BytesStoreSupplier? Seems some more descriptions are lacking > > > here. > > > >>>>>>> Also > > > >>>>>>>> in > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> No modifications to StateStoreSupplier. It is superseceded by > > > >>>>>>>> BytesStoreSupplier. > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> public static <K, V, S extends StateStore> Materialized<K, V, > > S> > > > >>>>>>>>> as(final StateStoreSupplier<S> > > > >>>>>>>>> supplier) > > > >>>>>>>>> > > > >>>>>>>>> Is the parameter in type of BytesStoreSupplier? > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> Yep - thanks > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> Guozhang > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On Thu, Jul 27, 2017 at 5:26 AM, Damian Guy < > > > damian....@gmail.com > > > >>> > > > >>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Updated link: > > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > >>>>>>>>>> 182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+ > > > >>>>>>>>>> use+of+custom+storage+engines > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks, > > > >>>>>>>>>> Damian > > > >>>>>>>>>> > > > >>>>>>>>>> On Thu, 27 Jul 2017 at 13:09 Damian Guy < > damian....@gmail.com > > > > > > >>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hi, > > > >>>>>>>>>>> > > > >>>>>>>>>>> I've put together a KIP to make some changes to the > > > KafkaStreams > > > >>>>>>> DSL > > > >>>>>>>>> that > > > >>>>>>>>>>> will hopefully allow us to: > > > >>>>>>>>>>> 1) reduce the explosion of overloads > > > >>>>>>>>>>> 2) add new features without having to continue adding more > > > >>>>>>> overloads > > > >>>>>>>>>>> 3) provide simpler ways for people to use custom storage > > > engines > > > >>>>>>> and > > > >>>>>>>>> wrap > > > >>>>>>>>>>> them with logging, caching etc if desired > > > >>>>>>>>>>> 4) enable per-operator caching rather than global caching > > > >> without > > > >>>>>>>>> having > > > >>>>>>>>>>> to resort to supplying a StateStoreSupplier when you just > > want > > > >> to > > > >>>>>>>> turn > > > >>>>>>>>>>> caching off. > > > >>>>>>>>>>> > > > >>>>>>>>>>> The KIP is here: > > > >>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage. > > > >>>>>>>>>> action?pageId=73631309 > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thanks, > > > >>>>>>>>>>> Damian > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> -- > > > >>>>>>>>> -- Guozhang > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> -- > > > >>>>>>> -- Guozhang > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>>> > > > >> > > > > > > > > > > > > > > > > -- > -- Guozhang >