Mohan / Mathias,

> > I think extending min/max to non-numeric types makes sense. Wondering
> > why we should require a `Comparator` or if we should require that the
> > types implement `Comparable` instead?
> >
> Good question. This is what it would look like:
>
> KTable<K, V> min_comparable()
> KTable<K, V> min_comparator(Comparator<V> comp)

Not sure if I understood Mathias' proposal correctly, but I think that
instead of going with
your original proposal (<VR extends Number> KTable<K, VR> min(Function<V,
VR> func...)
or mine (KTable<K, V> min(Comparator<V> comparator...), we could simplify
it a
bit by using a function to extract a Comparable property from the original
value:

KTable<K, V> min(Function<V, Comparable<?>> func...)

> I also think, that min/max should not change the value type. Using
> `Long` for sum() make sense though, and also to require a `<V extends
> Number>`.

Are there any reasons to limit the sum() to integers? Why not use a Double
instead?

Best regards,
Alexandre

On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <mposde...@gmail.com>
wrote:

> Matthias,
>
> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax <mj...@mailbox.org.invalid
> >
> wrote:
>
> > Hi,
> >
> > I think extending min/max to non-numeric types makes sense. Wondering
> > why we should require a `Comparator` or if we should require that the
> > types implement `Comparable` instead?
> >
> > Good question. This is what it would look like:
>
> KTable<K, V> min_comparable()
> KTable<K, V> min_comparator(Comparator<V> comp)
>
> For min_comparable to work, you still need the bounds "V extends
> Comparable<
> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning, it
> has to be at the interface level like this:
>
>  KStream<K,  V extends Comparable<V>>
>
> which is a little messy unless there is a different way to do the same. The
> comparator gives a simple way to define an anonymous function.
>
> What do you think ?
>
>
> > I also think, that min/max should not change the value type. Using
> > `Long` for sum() make sense though, and also to require a `<V extends
> > Number>`.
> >
> > I guess these are the two possibilities:
>
> <E extends Number> Long sum(Function<V, E> func)
> Long sum(Function<V, Number> func)
>
> Both should work. "func" can return any subtypes of Number and I don't see
> any advantages with the first version. Can you clarify ?
>
> Thanks
> Mohan
>
>
> >
> > -Matthias
> >
> > On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> > > Hi Alex,
> > >
> > > On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
> > alexandre.bra...@gmail.com>
> > > wrote:
> > >
> > >>
> > >> My point here is that, when we're only interested in a max/min numeric
> > >> value, it doesn't
> > >> matter when we have repeated values, since we'd be only forwarding the
> > >> number downstream,
> > >> so I could disregard when the Comparator returns a zero value (meaning
> > >> equals) and min/max
> > >> would still be semantically correct. But when we're forwarding the
> > original
> > >> object downstream
> > >> instead of its numeric property, it could mean different things
> > >> semantically depending on how
> > >> we handle the repeated values.
> > >>
> > >> As an example, if I were using max() on a stream of Biddings for
> > products
> > >> in an auction, the
> > >> order of the biddings would probably influence the winner if two
> clients
> > >> send Biddings with the
> > >> same value. If we're only forwarding the Bidding value downstream (a
> > double
> > >> value of 100, for
> > >> example), it doesn't matter how repeated values are handled, since the
> > max
> > >> price for this
> > >> auction would still be 100.00, no matter what Bidding got selected in
> > the
> > >> end. But if we're
> > >> forwarding the Biddings downstream instead, then it matters whether
> the
> > >> winning Bidding sent
> > >> downstream was originally posted by Client A or Client B.
> > >>
> > >> I'm not saying that an overloaded method to handle different options
> for
> > >> how repeated values
> > >> should be handled by min/max is mandatory, but it should be clear on
> the
> > >> methods' docs
> > >> what would happen when Comparator.compare() == 0. My preferred option
> > for
> > >> the default
> > >> behaviour is to only forward a new value is smaller/bigger than the
> > >> previous min/max value
> > >> (ignoring compare() == 0), since it would emit less values downstream
> > and
> > >> would be easier
> > >> to read ("I only send a value downstream if it's bigger/smaller than
> the
> > >> previously selected
> > >> value").
> > >>
> > > Thanks for the clarification. I like your suggestion unless someone
> feels
> > > that they want an option to control this (i.e., when compare() == 0,
> > return
> > > the old value vs new value).
> > >
> > >
> > >>
> > >>> Not knowing the schema of the value (V) has its own set of problems.
> > As I
> > >> have alluded to
> > >>> in the proposal, this is a little bit messy. We already have "reduce"
> > >> which can be used to
> > >>> implement sum (mapValues().reduce()).
> > >>> Thinking about it more, do you think "sum" would be useful ? One
> hacky
> > >> way to implement
> > >>> this is to inspect the type of the return when the "func" is called
> the
> > >> first time OR infer from
> > >>> the materialized or have an explicit initializer.
> > >>
> > >> I think it might be useful for some use cases, yes, but it would be
> > tricky
> > >> to implement this in a
> > >> way that handles generic Numbers and keeps their original
> implementation
> > >> class. One
> > >> simplification you could take is fixating VR to be a Double, and then
> > use
> > >> Number.doubleValue()
> > >> to compute the sum.
> > >>
> > >
> > > Yeah, that would simplify quite a bit. I think you are suggesting this:
> > >
> > > KTable<K,Double> sum(Function<V, Number> func)
> > >
> > >
> > >> What you said about using reduce() to compute a sum() is also true for
> > >> min() and max(). =) All
> > >> three methods in this KIP would be a syntactic sugar for what could
> > >> otherwise be implemented
> > >> using reduce/aggregate, but I see value in implementing them and
> > >> simplifying the adoption of
> > >> those use cases.
> > >>
> > >> Agreed. I seem to have forgotten the reason as to why I started this
> KIP
> > > :-). There is a long way to go.
> > >
> > > -thanks
> > > Mohan
> > >
> > > Best regards,
> > >> Alexandre
> > >>
> > >> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <
> > mposde...@gmail.com>
> > >> wrote:
> > >>
> > >>> Hi Alex,
> > >>>
> > >>> Responses below.
> > >>>
> > >>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
> > >>> alexandre.bra...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi Mohan,
> > >>>>
> > >>>> I like the idea of adding those methods to the API, but I'd like to
> > >> make
> > >>> a
> > >>>> suggestion:
> > >>>>
> > >>>> Although the most used scenario for min() / max() might possibly be
> > for
> > >>>> numeric values, I think they could also be
> > >>>> useful on other objects like Dates, LocalDates or Strings. Why limit
> > >> the
> > >>>> API to Numbers only?
> > >>>>
> > >>>>
> > >>> There was no specific reason. Just addressing the common scenario.
> But
> > I
> > >>> don't see why this can't be supported given your suggestion below.
> > >>>
> > >>> Extending on the above, couldn't we change the API to provide a
> > >>>> Comparator<V> instead of the Function<V, VR>
> > >>>> for those methods, and make them return a KTable<K, V> instead? Not
> > >> only
> > >>>> would this approach not limit the
> > >>>> usage of those methods to Numbers, but they'd also preserve the
> origin
> > >>> from
> > >>>> the min/max value [1]. The extraction of
> > >>>> a single (numeric?) value could be achieved by a subsequent
> > >> .mapValues()
> > >>>> operator, and this strategy would also
> > >>>> allow us to reuse the stream's current value serde on min / max,
> > making
> > >>> the
> > >>>> Materialized an optional parameter.
> > >>>>
> > >>>> I like your idea though it is odd that min/max returns  KTable<K, V>
> > >>> instead of the KTable<K, VR> (like in count), but mapValues should do
> > the
> > >>> trick.
> > >>>
> > >>> One extra complication of this approach is that now we'd have to
> handle
> > >>>> repeated min/max values from different
> > >>>> origins (two semantically different objects for which the comparator
> > >>>> returns 0), but we could solve that by adding
> > >>>> a parameter to specify whether to use the older or newer value (or
> > >>> assuming
> > >>>> one of these options as default for a
> > >>>> simpler API?).
> > >>>>
> > >>>> I am not sure whether this complexity is warranted. Why can't we
> just
> > >>> stick to the way a regular Comparator works ? Can you give me a real
> > >> world
> > >>> example ?
> > >>>
> > >>>>
> > >>>> I know it's an implementation issue, but I'm curious on how you'd
> > solve
> > >>>> handling the <VR extends Number> on
> > >>>> the sum(). Since the multiple implementations of this interface
> don't
> > >>> have
> > >>>> a common constructor nor an interface
> > >>>> method to add two Numbers, would it be possible to implement sum()
> and
> > >>>> retain the original VR type on the
> > >>>> returned KTable?
> > >>>>
> > >>>
> > >>> Not knowing the schema of the value (V) has its own set of problems.
> > As I
> > >>> have alluded to in the proposal, this is a little bit messy. We
> already
> > >>> have "reduce" which can be used to implement sum
> > (mapValues().reduce()).
> > >>> Thinking about it more, do you think "sum" would be useful ? One
> hacky
> > >> way
> > >>> to implement this is to inspect the type of the return when the
> "func"
> > is
> > >>> called the first time OR infer from the materialized or have an
> > explicit
> > >>> initializer.
> > >>>
> > >>> Thanks
> > >>> Mohan
> > >>>
> > >>>
> > >>>> [1]: An example scenario for this would be to find the min / max
> > >> Bidding
> > >>>> for a product where, at the end of the
> > >>>> auction, I need not only the min / max value of said Bidding, but
> also
> > >>> the
> > >>>> bidder's contact information.
> > >>>>
> > >>>> Best,
> > >>>> Alexandre
> > >>>>
> > >>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <
> > >> mposde...@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>>
> > >>>>> I have created a proposal for adding some additional aggregation
> APIs
> > >>>> like
> > >>>>> count.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
> > >>>>>
> > >>>>> I have noted down some of the issues that need discussion. Thanks
> to
> > >>>>> Matthias for helping me with the scope of the proposal.
> > >>>>>
> > >>>>> Thanks
> > >>>>> Mohan
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to