I've updated the kip to reflect Bill's comment and also to make
StreamBuilder methods have topic as the first param, i.e.,
StreamBuilder#stream no longer accepts varargs.

On Thu, 24 Aug 2017 at 09:12 Damian Guy <damian....@gmail.com> wrote:

> On Thu, 24 Aug 2017 at 02:49 Guozhang Wang <wangg...@gmail.com> wrote:
>
>> I have a couple of comments but otherwise it LGTM:
>>
>> 1. For these two functions in StreamsBuilder, the topic String is set as
>> the second parameter in between of two options. Would that be better to be
>> set as the first or the last one instead?
>>
>> It would be better as the first, but then it is different to the
> #streams() methods due to varargs.
>
>
>> public synchronized <K, V> KTable<K, V> table(final Consumed<K, V>
>> consumed, final String topic, final Materialized<K, V> materialized)
>>
>> public synchronized <K, V> GlobalKTable<K, V> globalTable(final
>> Consumed<K,
>> V> consumed, final String topic, final Materialized<K, V> materialized)
>>
>> I understand that we cannot do it for the first parameter because of the
>> vararg type. So I'd suggest either
>>
>> a) set it as the last parameter, but then it is inconsistent with other
>> functions like these:
>>
>> void to(final String topic, final Produced<V, V> options);
>>
>> KTable<K, V> through(final String topic, final Materialized<K, V>
>> options);
>>
>> b) only allow one single topic name parameter in StreamsBuilder.stream()
>> since in practice we do not see too many usages of multiple topics, plus
>> it
>> can be semi-supported with "merge" as we move it from StreamsBuilder to
>> KStream (KAFKA-5765),
>>
>> Perhaps this is the better approach
>
>
>> 2. KGroupedStream's function:
>>
>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>                              final Aggregator<? super K, ? super V, VR>
>> aggregator,
>>                              final Serde<VR> aggValueSerde,
>>                              final Materialized<K, VR, KeyValueStore<K,
>> VR>> materialized);
>>
>> The "aggValueSerde" seems not needed?
>>
>> 3. +1 on `KGroupedStream` v.s. `GroupedKStream`. I think KGroupedStream
>> was
>> a bad name as a hind-sight. I personally feel we should just correct it
>> with a new class and deprecate / remove the old one before 1.0.0, but that
>> could be in its own KIP.
>>
>>
> The problem with this is that we'd need to add new `groupBy` and
> `groupByKey` methods that return `GroupedKStream`, we can't change the
> existing ones as that would break compatibility. So what would we name
> these methods?
>
>
>>
>> Guozhang
>>
>>
>>
>> On Wed, Aug 23, 2017 at 1:01 PM, Damian Guy <damian....@gmail.com> wrote:
>>
>> > We already have GlobalKTable and i can't rename KGroupedStream, which
>> > really should be GroupedKStream. So I think we should name new things
>> > correctly, i.e., WindowedKStream etc and fix the others when we can.
>> >
>> > On Wed, 23 Aug 2017 at 20:38 Matthias J. Sax <matth...@confluent.io>
>> > wrote:
>> >
>> > > About KGroupedStream vs GroupedKStream: shouldn't we keep the naming
>> > > convention consistent? And if we change the naming schema just change
>> > > all at once? I personally don't care which naming scheme is better,
>> but
>> > > I think consistency is super important!
>> > >
>> > > About Bill's comment: I agree, and had a similar thought.
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 8/23/17 12:24 PM, Bill Bejeck wrote:
>> > > > Thanks for all the work on this KIP Damian.
>> > > >
>> > > > Both `Produced` and `Joined` have a `with` method accepting all
>> > > parameters,
>> > > > but `Consumed` doesn't. Should we add one for consistency?
>> > > >
>> > > > Thanks,
>> > > > Bill
>> > > >
>> > > > On Wed, Aug 23, 2017 at 4:15 AM, Damian Guy <damian....@gmail.com>
>> > > wrote:
>> > > >
>> > > >> KIP has been updated. thanks
>> > > >>
>> > > >> On Wed, 23 Aug 2017 at 09:10 Damian Guy <damian....@gmail.com>
>> wrote:
>> > > >>
>> > > >>> Hi Matthias,
>> > > >>>
>> > > >>>
>> > > >>>> KStream:
>> > > >>>> leftJoin and outerJoin for KStream/KTable join should not have
>> > > >>>> `JoinWindows` parameter
>> > > >>>>
>> > > >>>> Thanks!
>> > > >>>
>> > > >>>
>> > > >>>>
>> > > >>>> Nit: TopologyBuilder -> Topology
>> > > >>>>
>> > > >>>> Ack
>> > > >>>
>> > > >>>
>> > > >>>> Nit: new class Serialized list static method #with twice
>> > > >>>>
>> > > >>>> Ack
>> > > >>>
>> > > >>>
>> > > >>>> WindowedKStream -> for consistency we should either have
>> > > GroupedKStream
>> > > >>>> or KWindowedStream... (similar argument for
>> SessionWindowedKStream)
>> > > >>>>
>> > > >>>> We can't rename KGroupedStream -> GroupedKStream without breaking
>> > > >>> compatibility. So we are stuck with it for now. Hopefully in the
>> > future
>> > > >> we
>> > > >>> can rename KGroupedStream to GroupedKStream.
>> > > >>>
>> > > >>>
>> > > >>>>
>> > > >>>> KGroupedStream
>> > > >>>> -> why do we use a different name for `sessionWindowedBy()` --
>> seems
>> > > to
>> > > >>>> be cleaner to call both methods `windowedBy()`
>> > > >>>>
>> > > >>>>
>> > > >>> I beg to differ that it is cleaner either way!
>> > > >>>
>> > > >>>
>> > > >>>>
>> > > >>>> StreamsBuilder#stream -> parameter order is confusing... We
>> should
>> > > have
>> > > >>>> Pattern as second parameter to align both methods.
>> > > >>>>
>> > > >>>> Ack
>> > > >>>
>> > > >>>
>> > > >>>> StreamsBuilder#table/globalTable -> move parameter `Consumed` as
>> > first
>> > > >>>> parameter to align with `#stream`
>> > > >>>>
>> > > >>>>
>> > > >>>> Ack
>> > > >>>
>> > > >>>> Produced#with(Serde, Serde)
>> > > >>>> Produced#with(StreamPartitioner, Serde, Serde)
>> > > >>>> -> should StreamPartitioner be the third argument instead of the
>> > > first?
>> > > >>>>
>> > > >>>> Sure
>> > > >>>
>> > > >>>>
>> > > >>>> Consumed:
>> > > >>>> Why do we need 3 different names for the 3 static methods? I
>> would
>> > all
>> > > >>>> of them just call `with()`. Current names sound clumsy to me.
>> And a
>> > > >>>> plain `with()` also aligns with the naming of static methods of
>> > other
>> > > >>>> classes.
>> > > >>>>
>> > > >>>
>> > > >>> I disagree that the names sound clumsy! But yes they should be
>> > aligned
>> > > >>> with the others.
>> > > >>>
>> > > >>>
>> > > >>>>
>> > > >>>>
>> > > >>>> I guess we are also deprecation a bunch of method for
>> > > >>>> KStream/KTable/KGroupedStream/KGroupedTable and should mention
>> > which
>> > > >>>> one? There is just one sentence "Deprecate the existing
>> overloads.",
>> > > but
>> > > >>>> we don't deprecate all existing once. I personally don't care to
>> > much
>> > > if
>> > > >>>> we spell deprecated method out explicitly, but right now it's not
>> > > >>>> consistent as we only list methods we add.
>> > > >>>>
>> > > >>>>
>> > > >>>
>> > > >>>> Should we deprecate `StateStoreSupplier`?
>> > > >>>>
>> > > >>> Yep
>> > > >>>
>> > > >>>>
>> > > >>>>
>> > > >>>> -Matthias
>> > > >>>>
>> > > >>>>
>> > > >>>>
>> > > >>>> On 8/22/17 6:55 AM, Damian Guy wrote:
>> > > >>>>> I've just updated the KIP with some additional changes targeted
>> at
>> > > >>>>> StreamsBuilder
>> > > >>>>>
>> > > >>>>> Thanks,
>> > > >>>>> Damian
>> > > >>>>>
>> > > >>>>> On Thu, 10 Aug 2017 at 12:59 Damian Guy <damian....@gmail.com>
>> > > wrote:
>> > > >>>>>
>> > > >>>>>>
>> > > >>>>>>> Got it, thanks.
>> > > >>>>>>>
>> > > >>>>>>> Does it still make sense to have one static constructors for
>> each
>> > > >>>> spec,
>> > > >>>>>>> with one constructor having only one parameter to make it more
>> > > >> usable,
>> > > >>>>>>> i.e.
>> > > >>>>>>> as a user I do not need to give all parameters if I only want
>> to
>> > > >>>> override
>> > > >>>>>>> one of them? Maybe we can just name the constructors as `with`
>> > but
>> > > >>>> I'm not
>> > > >>>>>>> sure if Java distinguish:
>> > > >>>>>>>
>> > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<K>
>> keySerde)
>> > > >>>>>>> public static <K, V> Produced<K, V> with(final Serde<V>
>> > valueSerde)
>> > > >>>>>>>
>> > > >>>>>>> as two function signatures.
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>> No that won't work. That is why we have all options, i.e., on
>> > > Produce
>> > > >>>>>> public static <K, V> Produced<K, V> with(final Serde<K>
>> keySerde,
>> > > >>>> final Serde<V>
>> > > >>>>>> valueSerde)
>> > > >>>>>> public static <K, V> Produced<K, V> with(final
>> > StreamPartitioner<K,
>> > > >> V>
>> > > >>>>>> partitioner, final Serde<K> keySerde, final Serde<V>
>> valueSerde)
>> > > >>>>>> public static <K, V> Produced<K, V> keySerde(final Serde<K>
>> > > keySerde)
>> > > >>>>>> public static <K, V> Produced<K, V> valueSerde(final Serde<V>
>> > > >>>> valueSerde)
>> > > >>>>>> public static <K, V> Produced<K, V> streamPartitioner(final
>> > > >>>> StreamPartitioner<K,
>> > > >>>>>> V> partitioner)
>> > > >>>>>>
>> > > >>>>>> So if you only want to use one you can just use the function
>> that
>> > > >> takes
>> > > >>>>>> one argument.
>> > > >>>>>>
>> > > >>>>>>>
>> > > >>>>>>> Guozhang
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> On Wed, Aug 9, 2017 at 6:20 AM, Damian Guy <
>> damian....@gmail.com
>> > >
>> > > >>>> wrote:
>> > > >>>>>>>
>> > > >>>>>>>> On Tue, 8 Aug 2017 at 20:11 Guozhang Wang <
>> wangg...@gmail.com>
>> > > >>>> wrote:
>> > > >>>>>>>>
>> > > >>>>>>>>> Damian,
>> > > >>>>>>>>>
>> > > >>>>>>>>> Thanks for the proposal, I had a few comments on the APIs:
>> > > >>>>>>>>>
>> > > >>>>>>>>> 1. Printed#withFile seems not needed, as users should always
>> > spec
>> > > >> if
>> > > >>>>>>> it
>> > > >>>>>>>> is
>> > > >>>>>>>>> to sysOut or to File at the beginning. In addition as a
>> second
>> > > >>>>>>> thought, I
>> > > >>>>>>>>> think serdes are not useful for prints anyways since we
>> assume
>> > > >>>>>>> `toString`
>> > > >>>>>>>>> is provided except for byte arrays, in which we will special
>> > > >> handle
>> > > >>>>>>> it.
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>> +1
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>> Another comment about Printed in general is it differs with
>> > other
>> > > >>>>>>> options
>> > > >>>>>>>>> that it is a required option than optional one, since it
>> > includes
>> > > >>>>>>>> toSysOut
>> > > >>>>>>>>> / toFile specs; what are the pros and cons for including
>> these
>> > > two
>> > > >>>> in
>> > > >>>>>>> the
>> > > >>>>>>>>> option and hence make it a required option than leaving
>> them at
>> > > >> the
>> > > >>>>>>> API
>> > > >>>>>>>>> layer and make Printed as optional for mapper / label only?
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>> It isn't required as we will still have the no-arg print()
>> which
>> > > >> will
>> > > >>>>>>> just
>> > > >>>>>>>> go to sysout as it does now.
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> 2.1 KStream#through / to
>> > > >>>>>>>>>
>> > > >>>>>>>>> We should have an overloaded function without Produced?
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> Yes - we already have those so they are not part of the KIP,
>> > i.e,
>> > > >>>>>>>> through(topic)
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> 2.2 KStream#groupBy / groupByKey
>> > > >>>>>>>>>
>> > > >>>>>>>>> We should have an overloaded function without Serialized?
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> Yes, as above
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> 2.3 KGroupedStream#count / reduce / aggregate
>> > > >>>>>>>>>
>> > > >>>>>>>>> We should have an overloaded function without Materialized?
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> As above
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> 2.4 KStream#join
>> > > >>>>>>>>>
>> > > >>>>>>>>> We should have an overloaded function without Joined?
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> as above
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> 2.5 Each of KTable's operators:
>> > > >>>>>>>>>
>> > > >>>>>>>>> We should have an overloaded function without Produced /
>> > > >> Serialized
>> > > >>>> /
>> > > >>>>>>>>> Materialized?
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>> as above
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> 3.1 Produced: the static functions have overlaps, which
>> seems
>> > not
>> > > >>>>>>>>> necessary. I'd suggest jut having the following three static
>> > with
>> > > >>>>>>> another
>> > > >>>>>>>>> three similar member functions:
>> > > >>>>>>>>>
>> > > >>>>>>>>> public static <K, V> Produced<K, V> withKeySerde(final
>> Serde<K>
>> > > >>>>>>> keySerde)
>> > > >>>>>>>>>
>> > > >>>>>>>>> public static <K, V> Produced<K, V> withValueSerde(final
>> > Serde<V>
>> > > >>>>>>>>> valueSerde)
>> > > >>>>>>>>>
>> > > >>>>>>>>> public static <K, V> Produced<K, V>
>> withStreamPartitioner(final
>> > > >>>>>>>>> StreamPartitioner<K, V> partitioner)
>> > > >>>>>>>>>
>> > > >>>>>>>>> The key idea is that by using the same function name string
>> for
>> > > >>>> static
>> > > >>>>>>>>> constructor and member functions, users do not need to
>> remember
>> > > >> what
>> > > >>>>>>> are
>> > > >>>>>>>>> the differences but can call these functions with any
>> ordering
>> > > >> they
>> > > >>>>>>> want,
>> > > >>>>>>>>> and later calls on the same spec will win over early calls.
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>> That would be great if java supported it, but it doesn't. You
>> > > can't
>> > > >>>> have
>> > > >>>>>>>> static an member functions with the same signature.
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> 3.2 Serialized: similarly
>> > > >>>>>>>>>
>> > > >>>>>>>>> public static <K, V> Serialized<K, V> withKeySerde(final
>> > Serde<K>
>> > > >>>>>>>> keySerde)
>> > > >>>>>>>>>
>> > > >>>>>>>>> public static <K, V> Serialized<K, V> withValueSerde(final
>> > > >> Serde<V>
>> > > >>>>>>>>> valueSerde)
>> > > >>>>>>>>>
>> > > >>>>>>>>> public Serialized<K, V> withKeySerde(final Serde<K>
>> keySerde)
>> > > >>>>>>>>>
>> > > >>>>>>>>> public Serialized<K, V> withValueSerde(final Serde
>> valueSerde)
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> as above
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> Also it has a final Serde<V> otherValueSerde in one of its
>> > static
>> > > >>>>>>>>> constructor, it that intentional?
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> Nope: thanks.
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> 3.3. Joined: similarly, keep the static constructor
>> signatures
>> > > the
>> > > >>>>>>> same
>> > > >>>>>>>> as
>> > > >>>>>>>>> its corresponding member fields.
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>> As above
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>> 3.4 Materialized: it is a bit special, and I think we can
>> keep
>> > > its
>> > > >>>>>>> static
>> > > >>>>>>>>> constructors with only two `as` as they are today.K
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>> 4. Is there any modifications on StateStoreSupplier? Is it
>> > > replaced
>> > > >>>> by
>> > > >>>>>>>>> BytesStoreSupplier? Seems some more descriptions are lacking
>> > > here.
>> > > >>>>>>> Also
>> > > >>>>>>>> in
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>> No modifications to StateStoreSupplier. It is superseceded by
>> > > >>>>>>>> BytesStoreSupplier.
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>> public static <K, V, S extends StateStore> Materialized<K,
>> V,
>> > S>
>> > > >>>>>>>>> as(final StateStoreSupplier<S>
>> > > >>>>>>>>> supplier)
>> > > >>>>>>>>>
>> > > >>>>>>>>> Is the parameter in type of BytesStoreSupplier?
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>> Yep - thanks
>> > > >>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> Guozhang
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> On Thu, Jul 27, 2017 at 5:26 AM, Damian Guy <
>> > > damian....@gmail.com
>> > > >>>
>> > > >>>>>>>> wrote:
>> > > >>>>>>>>>
>> > > >>>>>>>>>> Updated link:
>> > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > >>>>>>>>>> 182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+
>> > > >>>>>>>>>> use+of+custom+storage+engines
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> Thanks,
>> > > >>>>>>>>>> Damian
>> > > >>>>>>>>>>
>> > > >>>>>>>>>> On Thu, 27 Jul 2017 at 13:09 Damian Guy <
>> damian....@gmail.com
>> > >
>> > > >>>>>>> wrote:
>> > > >>>>>>>>>>
>> > > >>>>>>>>>>> Hi,
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> I've put together a KIP to make some changes to the
>> > > KafkaStreams
>> > > >>>>>>> DSL
>> > > >>>>>>>>> that
>> > > >>>>>>>>>>> will hopefully allow us to:
>> > > >>>>>>>>>>> 1) reduce the explosion of overloads
>> > > >>>>>>>>>>> 2) add new features without having to continue adding more
>> > > >>>>>>> overloads
>> > > >>>>>>>>>>> 3) provide simpler ways for people to use custom storage
>> > > engines
>> > > >>>>>>> and
>> > > >>>>>>>>> wrap
>> > > >>>>>>>>>>> them with logging, caching etc if desired
>> > > >>>>>>>>>>> 4) enable per-operator caching rather than global caching
>> > > >> without
>> > > >>>>>>>>> having
>> > > >>>>>>>>>>> to resort to supplying a StateStoreSupplier when you just
>> > want
>> > > >> to
>> > > >>>>>>>> turn
>> > > >>>>>>>>>>> caching off.
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> The KIP is here:
>> > > >>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.
>> > > >>>>>>>>>> action?pageId=73631309
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>> Thanks,
>> > > >>>>>>>>>>> Damian
>> > > >>>>>>>>>>>
>> > > >>>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>>
>> > > >>>>>>>>> --
>> > > >>>>>>>>> -- Guozhang
>> > > >>>>>>>>>
>> > > >>>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> --
>> > > >>>>>>> -- Guozhang
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>>
>> > > >>
>> > > >
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to