On Thu, 24 Aug 2017 at 02:49 Guozhang Wang <wangg...@gmail.com> wrote:

> I have a couple of comments but otherwise it LGTM:
>
> 1. For these two functions in StreamsBuilder, the topic String is set as
> the second parameter in between of two options. Would that be better to be
> set as the first or the last one instead?
>
> It would be better as the first, but then it is different to the
#streams() methods due to varargs.


> public synchronized <K, V> KTable<K, V> table(final Consumed<K, V>
> consumed, final String topic, final Materialized<K, V> materialized)
>
> public synchronized <K, V> GlobalKTable<K, V> globalTable(final Consumed<K,
> V> consumed, final String topic, final Materialized<K, V> materialized)
>
> I understand that we cannot do it for the first parameter because of the
> vararg type. So I'd suggest either
>
> a) set it as the last parameter, but then it is inconsistent with other
> functions like these:
>
> void to(final String topic, final Produced<V, V> options);
>
> KTable<K, V> through(final String topic, final Materialized<K, V> options);
>
> b) only allow one single topic name parameter in StreamsBuilder.stream()
> since in practice we do not see too many usages of multiple topics, plus it
> can be semi-supported with "merge" as we move it from StreamsBuilder to
> KStream (KAFKA-5765),
>
> Perhaps this is the better approach


> 2. KGroupedStream's function:
>
> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>                              final Aggregator<? super K, ? super V, VR>
> aggregator,
>                              final Serde<VR> aggValueSerde,
>                              final Materialized<K, VR, KeyValueStore<K,
> VR>> materialized);
>
> The "aggValueSerde" seems not needed?
>
> 3. +1 on `KGroupedStream` v.s. `GroupedKStream`. I think KGroupedStream was
> a bad name as a hind-sight. I personally feel we should just correct it
> with a new class and deprecate / remove the old one before 1.0.0, but that
> could be in its own KIP.
>
>
The problem with this is that we'd need to add new `groupBy` and
`groupByKey` methods that return `GroupedKStream`, we can't change the
existing ones as that would break compatibility. So what would we name
these methods?


>
> Guozhang
>
>
>
> On Wed, Aug 23, 2017 at 1:01 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > We already have GlobalKTable and i can't rename KGroupedStream, which
> > really should be GroupedKStream. So I think we should name new things
> > correctly, i.e., WindowedKStream etc and fix the others when we can.
> >
> > On Wed, 23 Aug 2017 at 20:38 Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > About KGroupedStream vs GroupedKStream: shouldn't we keep the naming
> > > convention consistent? And if we change the naming schema just change
> > > all at once? I personally don't care which naming scheme is better, but
> > > I think consistency is super important!
> > >
> > > About Bill's comment: I agree, and had a similar thought.
> > >
> > >
> > > -Matthias
> > >
> > > On 8/23/17 12:24 PM, Bill Bejeck wrote:
> > > > Thanks for all the work on this KIP Damian.
> > > >
> > > > Both `Produced` and `Joined` have a `with` method accepting all
> > > parameters,
> > > > but `Consumed` doesn't. Should we add one for consistency?
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Wed, Aug 23, 2017 at 4:15 AM, Damian Guy <damian....@gmail.com>
> > > wrote:
> > > >
> > > >> KIP has been updated. thanks
> > > >>
> > > >> On Wed, 23 Aug 2017 at 09:10 Damian Guy <damian....@gmail.com>
> wrote:
> > > >>
> > > >>> Hi Matthias,
> > > >>>
> > > >>>
> > > >>>> KStream:
> > > >>>> leftJoin and outerJoin for KStream/KTable join should not have
> > > >>>> `JoinWindows` parameter
> > > >>>>
> > > >>>> Thanks!
> > > >>>
> > > >>>
> > > >>>>
> > > >>>> Nit: TopologyBuilder -> Topology
> > > >>>>
> > > >>>> Ack
> > > >>>
> > > >>>
> > > >>>> Nit: new class Serialized list static method #with twice
> > > >>>>
> > > >>>> Ack
> > > >>>
> > > >>>
> > > >>>> WindowedKStream -> for consistency we should either have
> > > GroupedKStream
> > > >>>> or KWindowedStream... (similar argument for
> SessionWindowedKStream)
> > > >>>>
> > > >>>> We can't rename KGroupedStream -> GroupedKStream without breaking
> > > >>> compatibility. So we are stuck with it for now. Hopefully in the
> > future
> > > >> we
> > > >>> can rename KGroupedStream to GroupedKStream.
> > > >>>
> > > >>>
> > > >>>>
> > > >>>> KGroupedStream
> > > >>>> -> why do we use a different name for `sessionWindowedBy()` --
> seems
> > > to
> > > >>>> be cleaner to call both methods `windowedBy()`
> > > >>>>
> > > >>>>
> > > >>> I beg to differ that it is cleaner either way!
> > > >>>
> > > >>>
> > > >>>>
> > > >>>> StreamsBuilder#stream -> parameter order is confusing... We should
> > > have
> > > >>>> Pattern as second parameter to align both methods.
> > > >>>>
> > > >>>> Ack
> > > >>>
> > > >>>
> > > >>>> StreamsBuilder#table/globalTable -> move parameter `Consumed` as
> > first
> > > >>>> parameter to align with `#stream`
> > > >>>>
> > > >>>>
> > > >>>> Ack
> > > >>>
> > > >>>> Produced#with(Serde, Serde)
> > > >>>> Produced#with(StreamPartitioner, Serde, Serde)
> > > >>>> -> should StreamPartitioner be the third argument instead of the
> > > first?
> > > >>>>
> > > >>>> Sure
> > > >>>
> > > >>>>
> > > >>>> Consumed:
> > > >>>> Why do we need 3 different names for the 3 static methods? I would
> > all
> > > >>>> of them just call `with()`. Current names sound clumsy to me. And
> a
> > > >>>> plain `with()` also aligns with the naming of static methods of
> > other
> > > >>>> classes.
> > > >>>>
> > > >>>
> > > >>> I disagree that the names sound clumsy! But yes they should be
> > aligned
> > > >>> with the others.
> > > >>>
> > > >>>
> > > >>>>
> > > >>>>
> > > >>>> I guess we are also deprecation a bunch of method for
> > > >>>> KStream/KTable/KGroupedStream/KGroupedTable and should mention
> > which
> > > >>>> one? There is just one sentence "Deprecate the existing
> overloads.",
> > > but
> > > >>>> we don't deprecate all existing once. I personally don't care to
> > much
> > > if
> > > >>>> we spell deprecated method out explicitly, but right now it's not
> > > >>>> consistent as we only list methods we add.
> > > >>>>
> > > >>>>
> > > >>>
> > > >>>> Should we deprecate `StateStoreSupplier`?
> > > >>>>
> > > >>> Yep
> > > >>>
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On 8/22/17 6:55 AM, Damian Guy wrote:
> > > >>>>> I've just updated the KIP with some additional changes targeted
> at
> > > >>>>> StreamsBuilder
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Damian
> > > >>>>>
> > > >>>>> On Thu, 10 Aug 2017 at 12:59 Damian Guy <damian....@gmail.com>
> > > wrote:
> > > >>>>>
> > > >>>>>>
> > > >>>>>>> Got it, thanks.
> > > >>>>>>>
> > > >>>>>>> Does it still make sense to have one static constructors for
> each
> > > >>>> spec,
> > > >>>>>>> with one constructor having only one parameter to make it more
> > > >> usable,
> > > >>>>>>> i.e.
> > > >>>>>>> as a user I do not need to give all parameters if I only want
> to
> > > >>>> override
> > > >>>>>>> one of them? Maybe we can just name the constructors as `with`
> > but
> > > >>>> I'm not
> > > >>>>>>> sure if Java distinguish:
> > > >>>>>>>
> > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<K>
> keySerde)
> > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<V>
> > valueSerde)
> > > >>>>>>>
> > > >>>>>>> as two function signatures.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>> No that won't work. That is why we have all options, i.e., on
> > > Produce
> > > >>>>>> public static <K, V> Produced<K, V> with(final Serde<K>
> keySerde,
> > > >>>> final Serde<V>
> > > >>>>>> valueSerde)
> > > >>>>>> public static <K, V> Produced<K, V> with(final
> > StreamPartitioner<K,
> > > >> V>
> > > >>>>>> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde)
> > > >>>>>> public static <K, V> Produced<K, V> keySerde(final Serde<K>
> > > keySerde)
> > > >>>>>> public static <K, V> Produced<K, V> valueSerde(final Serde<V>
> > > >>>> valueSerde)
> > > >>>>>> public static <K, V> Produced<K, V> streamPartitioner(final
> > > >>>> StreamPartitioner<K,
> > > >>>>>> V> partitioner)
> > > >>>>>>
> > > >>>>>> So if you only want to use one you can just use the function
> that
> > > >> takes
> > > >>>>>> one argument.
> > > >>>>>>
> > > >>>>>>>
> > > >>>>>>> Guozhang
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Wed, Aug 9, 2017 at 6:20 AM, Damian Guy <
> damian....@gmail.com
> > >
> > > >>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> On Tue, 8 Aug 2017 at 20:11 Guozhang Wang <wangg...@gmail.com
> >
> > > >>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Damian,
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks for the proposal, I had a few comments on the APIs:
> > > >>>>>>>>>
> > > >>>>>>>>> 1. Printed#withFile seems not needed, as users should always
> > spec
> > > >> if
> > > >>>>>>> it
> > > >>>>>>>> is
> > > >>>>>>>>> to sysOut or to File at the beginning. In addition as a
> second
> > > >>>>>>> thought, I
> > > >>>>>>>>> think serdes are not useful for prints anyways since we
> assume
> > > >>>>>>> `toString`
> > > >>>>>>>>> is provided except for byte arrays, in which we will special
> > > >> handle
> > > >>>>>>> it.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>> +1
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> Another comment about Printed in general is it differs with
> > other
> > > >>>>>>> options
> > > >>>>>>>>> that it is a required option than optional one, since it
> > includes
> > > >>>>>>>> toSysOut
> > > >>>>>>>>> / toFile specs; what are the pros and cons for including
> these
> > > two
> > > >>>> in
> > > >>>>>>> the
> > > >>>>>>>>> option and hence make it a required option than leaving them
> at
> > > >> the
> > > >>>>>>> API
> > > >>>>>>>>> layer and make Printed as optional for mapper / label only?
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>> It isn't required as we will still have the no-arg print()
> which
> > > >> will
> > > >>>>>>> just
> > > >>>>>>>> go to sysout as it does now.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 2.1 KStream#through / to
> > > >>>>>>>>>
> > > >>>>>>>>> We should have an overloaded function without Produced?
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Yes - we already have those so they are not part of the KIP,
> > i.e,
> > > >>>>>>>> through(topic)
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 2.2 KStream#groupBy / groupByKey
> > > >>>>>>>>>
> > > >>>>>>>>> We should have an overloaded function without Serialized?
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Yes, as above
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 2.3 KGroupedStream#count / reduce / aggregate
> > > >>>>>>>>>
> > > >>>>>>>>> We should have an overloaded function without Materialized?
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> As above
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 2.4 KStream#join
> > > >>>>>>>>>
> > > >>>>>>>>> We should have an overloaded function without Joined?
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> as above
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 2.5 Each of KTable's operators:
> > > >>>>>>>>>
> > > >>>>>>>>> We should have an overloaded function without Produced /
> > > >> Serialized
> > > >>>> /
> > > >>>>>>>>> Materialized?
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>> as above
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 3.1 Produced: the static functions have overlaps, which seems
> > not
> > > >>>>>>>>> necessary. I'd suggest jut having the following three static
> > with
> > > >>>>>>> another
> > > >>>>>>>>> three similar member functions:
> > > >>>>>>>>>
> > > >>>>>>>>> public static <K, V> Produced<K, V> withKeySerde(final
> Serde<K>
> > > >>>>>>> keySerde)
> > > >>>>>>>>>
> > > >>>>>>>>> public static <K, V> Produced<K, V> withValueSerde(final
> > Serde<V>
> > > >>>>>>>>> valueSerde)
> > > >>>>>>>>>
> > > >>>>>>>>> public static <K, V> Produced<K, V>
> withStreamPartitioner(final
> > > >>>>>>>>> StreamPartitioner<K, V> partitioner)
> > > >>>>>>>>>
> > > >>>>>>>>> The key idea is that by using the same function name string
> for
> > > >>>> static
> > > >>>>>>>>> constructor and member functions, users do not need to
> remember
> > > >> what
> > > >>>>>>> are
> > > >>>>>>>>> the differences but can call these functions with any
> ordering
> > > >> they
> > > >>>>>>> want,
> > > >>>>>>>>> and later calls on the same spec will win over early calls.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>> That would be great if java supported it, but it doesn't. You
> > > can't
> > > >>>> have
> > > >>>>>>>> static an member functions with the same signature.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 3.2 Serialized: similarly
> > > >>>>>>>>>
> > > >>>>>>>>> public static <K, V> Serialized<K, V> withKeySerde(final
> > Serde<K>
> > > >>>>>>>> keySerde)
> > > >>>>>>>>>
> > > >>>>>>>>> public static <K, V> Serialized<K, V> withValueSerde(final
> > > >> Serde<V>
> > > >>>>>>>>> valueSerde)
> > > >>>>>>>>>
> > > >>>>>>>>> public Serialized<K, V> withKeySerde(final Serde<K> keySerde)
> > > >>>>>>>>>
> > > >>>>>>>>> public Serialized<K, V> withValueSerde(final Serde
> valueSerde)
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> as above
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> Also it has a final Serde<V> otherValueSerde in one of its
> > static
> > > >>>>>>>>> constructor, it that intentional?
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Nope: thanks.
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 3.3. Joined: similarly, keep the static constructor
> signatures
> > > the
> > > >>>>>>> same
> > > >>>>>>>> as
> > > >>>>>>>>> its corresponding member fields.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>> As above
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> 3.4 Materialized: it is a bit special, and I think we can
> keep
> > > its
> > > >>>>>>> static
> > > >>>>>>>>> constructors with only two `as` as they are today.K
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>> 4. Is there any modifications on StateStoreSupplier? Is it
> > > replaced
> > > >>>> by
> > > >>>>>>>>> BytesStoreSupplier? Seems some more descriptions are lacking
> > > here.
> > > >>>>>>> Also
> > > >>>>>>>> in
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>> No modifications to StateStoreSupplier. It is superseceded by
> > > >>>>>>>> BytesStoreSupplier.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> public static <K, V, S extends StateStore> Materialized<K, V,
> > S>
> > > >>>>>>>>> as(final StateStoreSupplier<S>
> > > >>>>>>>>> supplier)
> > > >>>>>>>>>
> > > >>>>>>>>> Is the parameter in type of BytesStoreSupplier?
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Yep - thanks
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> Guozhang
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Thu, Jul 27, 2017 at 5:26 AM, Damian Guy <
> > > damian....@gmail.com
> > > >>>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Updated link:
> > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >>>>>>>>>> 182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+
> > > >>>>>>>>>> use+of+custom+storage+engines
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks,
> > > >>>>>>>>>> Damian
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Thu, 27 Jul 2017 at 13:09 Damian Guy <
> damian....@gmail.com
> > >
> > > >>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hi,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I've put together a KIP to make some changes to the
> > > KafkaStreams
> > > >>>>>>> DSL
> > > >>>>>>>>> that
> > > >>>>>>>>>>> will hopefully allow us to:
> > > >>>>>>>>>>> 1) reduce the explosion of overloads
> > > >>>>>>>>>>> 2) add new features without having to continue adding more
> > > >>>>>>> overloads
> > > >>>>>>>>>>> 3) provide simpler ways for people to use custom storage
> > > engines
> > > >>>>>>> and
> > > >>>>>>>>> wrap
> > > >>>>>>>>>>> them with logging, caching etc if desired
> > > >>>>>>>>>>> 4) enable per-operator caching rather than global caching
> > > >> without
> > > >>>>>>>>> having
> > > >>>>>>>>>>> to resort to supplying a StateStoreSupplier when you just
> > want
> > > >> to
> > > >>>>>>>> turn
> > > >>>>>>>>>>> caching off.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> The KIP is here:
> > > >>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.
> > > >>>>>>>>>> action?pageId=73631309
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks,
> > > >>>>>>>>>>> Damian
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> --
> > > >>>>>>>>> -- Guozhang
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> --
> > > >>>>>>> -- Guozhang
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to