Matthias,

On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax <mj...@mailbox.org.invalid>
wrote:

> Hi,
>
> I think extending min/max to non-numeric types makes sense. Wondering
> why we should require a `Comparator` or if we should require that the
> types implement `Comparable` instead?
>
> Good question. This is what it would look like:

KTable<K, V> min_comparable()
KTable<K, V> min_comparator(Comparator<V> comp)

For min_comparable to work, you still need the bounds "V extends Comparable<
V>". AFAICT, to avoid the "type parameter V hiding the type V" warning, it
has to be at the interface level like this:

 KStream<K,  V extends Comparable<V>>

which is a little messy unless there is a different way to do the same. The
comparator gives a simple way to define an anonymous function.

What do you think ?


> I also think, that min/max should not change the value type. Using
> `Long` for sum() make sense though, and also to require a `<V extends
> Number>`.
>
> I guess these are the two possibilities:

<E extends Number> Long sum(Function<V, E> func)
Long sum(Function<V, Number> func)

Both should work. "func" can return any subtypes of Number and I don't see
any advantages with the first version. Can you clarify ?

Thanks
Mohan


>
> -Matthias
>
> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> > Hi Alex,
> >
> > On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
> alexandre.bra...@gmail.com>
> > wrote:
> >
> >>
> >> My point here is that, when we're only interested in a max/min numeric
> >> value, it doesn't
> >> matter when we have repeated values, since we'd be only forwarding the
> >> number downstream,
> >> so I could disregard when the Comparator returns a zero value (meaning
> >> equals) and min/max
> >> would still be semantically correct. But when we're forwarding the
> original
> >> object downstream
> >> instead of its numeric property, it could mean different things
> >> semantically depending on how
> >> we handle the repeated values.
> >>
> >> As an example, if I were using max() on a stream of Biddings for
> products
> >> in an auction, the
> >> order of the biddings would probably influence the winner if two clients
> >> send Biddings with the
> >> same value. If we're only forwarding the Bidding value downstream (a
> double
> >> value of 100, for
> >> example), it doesn't matter how repeated values are handled, since the
> max
> >> price for this
> >> auction would still be 100.00, no matter what Bidding got selected in
> the
> >> end. But if we're
> >> forwarding the Biddings downstream instead, then it matters whether the
> >> winning Bidding sent
> >> downstream was originally posted by Client A or Client B.
> >>
> >> I'm not saying that an overloaded method to handle different options for
> >> how repeated values
> >> should be handled by min/max is mandatory, but it should be clear on the
> >> methods' docs
> >> what would happen when Comparator.compare() == 0. My preferred option
> for
> >> the default
> >> behaviour is to only forward a new value is smaller/bigger than the
> >> previous min/max value
> >> (ignoring compare() == 0), since it would emit less values downstream
> and
> >> would be easier
> >> to read ("I only send a value downstream if it's bigger/smaller than the
> >> previously selected
> >> value").
> >>
> > Thanks for the clarification. I like your suggestion unless someone feels
> > that they want an option to control this (i.e., when compare() == 0,
> return
> > the old value vs new value).
> >
> >
> >>
> >>> Not knowing the schema of the value (V) has its own set of problems.
> As I
> >> have alluded to
> >>> in the proposal, this is a little bit messy. We already have "reduce"
> >> which can be used to
> >>> implement sum (mapValues().reduce()).
> >>> Thinking about it more, do you think "sum" would be useful ? One hacky
> >> way to implement
> >>> this is to inspect the type of the return when the "func" is called the
> >> first time OR infer from
> >>> the materialized or have an explicit initializer.
> >>
> >> I think it might be useful for some use cases, yes, but it would be
> tricky
> >> to implement this in a
> >> way that handles generic Numbers and keeps their original implementation
> >> class. One
> >> simplification you could take is fixating VR to be a Double, and then
> use
> >> Number.doubleValue()
> >> to compute the sum.
> >>
> >
> > Yeah, that would simplify quite a bit. I think you are suggesting this:
> >
> > KTable<K,Double> sum(Function<V, Number> func)
> >
> >
> >> What you said about using reduce() to compute a sum() is also true for
> >> min() and max(). =) All
> >> three methods in this KIP would be a syntactic sugar for what could
> >> otherwise be implemented
> >> using reduce/aggregate, but I see value in implementing them and
> >> simplifying the adoption of
> >> those use cases.
> >>
> >> Agreed. I seem to have forgotten the reason as to why I started this KIP
> > :-). There is a long way to go.
> >
> > -thanks
> > Mohan
> >
> > Best regards,
> >> Alexandre
> >>
> >> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <
> mposde...@gmail.com>
> >> wrote:
> >>
> >>> Hi Alex,
> >>>
> >>> Responses below.
> >>>
> >>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
> >>> alexandre.bra...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Mohan,
> >>>>
> >>>> I like the idea of adding those methods to the API, but I'd like to
> >> make
> >>> a
> >>>> suggestion:
> >>>>
> >>>> Although the most used scenario for min() / max() might possibly be
> for
> >>>> numeric values, I think they could also be
> >>>> useful on other objects like Dates, LocalDates or Strings. Why limit
> >> the
> >>>> API to Numbers only?
> >>>>
> >>>>
> >>> There was no specific reason. Just addressing the common scenario. But
> I
> >>> don't see why this can't be supported given your suggestion below.
> >>>
> >>> Extending on the above, couldn't we change the API to provide a
> >>>> Comparator<V> instead of the Function<V, VR>
> >>>> for those methods, and make them return a KTable<K, V> instead? Not
> >> only
> >>>> would this approach not limit the
> >>>> usage of those methods to Numbers, but they'd also preserve the origin
> >>> from
> >>>> the min/max value [1]. The extraction of
> >>>> a single (numeric?) value could be achieved by a subsequent
> >> .mapValues()
> >>>> operator, and this strategy would also
> >>>> allow us to reuse the stream's current value serde on min / max,
> making
> >>> the
> >>>> Materialized an optional parameter.
> >>>>
> >>>> I like your idea though it is odd that min/max returns  KTable<K, V>
> >>> instead of the KTable<K, VR> (like in count), but mapValues should do
> the
> >>> trick.
> >>>
> >>> One extra complication of this approach is that now we'd have to handle
> >>>> repeated min/max values from different
> >>>> origins (two semantically different objects for which the comparator
> >>>> returns 0), but we could solve that by adding
> >>>> a parameter to specify whether to use the older or newer value (or
> >>> assuming
> >>>> one of these options as default for a
> >>>> simpler API?).
> >>>>
> >>>> I am not sure whether this complexity is warranted. Why can't we just
> >>> stick to the way a regular Comparator works ? Can you give me a real
> >> world
> >>> example ?
> >>>
> >>>>
> >>>> I know it's an implementation issue, but I'm curious on how you'd
> solve
> >>>> handling the <VR extends Number> on
> >>>> the sum(). Since the multiple implementations of this interface don't
> >>> have
> >>>> a common constructor nor an interface
> >>>> method to add two Numbers, would it be possible to implement sum() and
> >>>> retain the original VR type on the
> >>>> returned KTable?
> >>>>
> >>>
> >>> Not knowing the schema of the value (V) has its own set of problems.
> As I
> >>> have alluded to in the proposal, this is a little bit messy. We already
> >>> have "reduce" which can be used to implement sum
> (mapValues().reduce()).
> >>> Thinking about it more, do you think "sum" would be useful ? One hacky
> >> way
> >>> to implement this is to inspect the type of the return when the "func"
> is
> >>> called the first time OR infer from the materialized or have an
> explicit
> >>> initializer.
> >>>
> >>> Thanks
> >>> Mohan
> >>>
> >>>
> >>>> [1]: An example scenario for this would be to find the min / max
> >> Bidding
> >>>> for a product where, at the end of the
> >>>> auction, I need not only the min / max value of said Bidding, but also
> >>> the
> >>>> bidder's contact information.
> >>>>
> >>>> Best,
> >>>> Alexandre
> >>>>
> >>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <
> >> mposde...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I have created a proposal for adding some additional aggregation APIs
> >>>> like
> >>>>> count.
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
> >>>>>
> >>>>> I have noted down some of the issues that need discussion. Thanks to
> >>>>> Matthias for helping me with the scope of the proposal.
> >>>>>
> >>>>> Thanks
> >>>>> Mohan
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to