Alex,

On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <alexandre.bra...@gmail.com>
wrote:

> Mohan / Mathias,
>
> > > I think extending min/max to non-numeric types makes sense. Wondering
> > > why we should require a `Comparator` or if we should require that the
> > > types implement `Comparable` instead?
> > >
> > Good question. This is what it would look like:
> >
> > KTable<K, V> min_comparable()
> > KTable<K, V> min_comparator(Comparator<V> comp)
>
> Not sure if I understood Mathias' proposal correctly, but I think that
> instead of going with
> your original proposal (<VR extends Number> KTable<K, VR> min(Function<V,
> VR> func...)
> or mine (KTable<K, V> min(Comparator<V> comparator...), we could simplify
> it a
> bit by using a function to extract a Comparable property from the original
> value:
>
> KTable<K, V> min(Function<V, Comparable<?>> func...)
>
> I will let Matthias clarify. I am not sure why it is simpler than the
comparator one. Comparable is implemented by the type and not sure exposing
it this way makes it any better.

> I also think, that min/max should not change the value type. Using
> > `Long` for sum() make sense though, and also to require a `<V extends
> > Number>`.
>
> Are there any reasons to limit the sum() to integers? Why not use a Double
> instead?
>
> Yeah, if the precision is important, then we should stick with Double.

-mohan

Best regards,
> Alexandre
>
> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <mposde...@gmail.com>
> wrote:
>
> > Matthias,
> >
> > On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
> <mj...@mailbox.org.invalid
> > >
> > wrote:
> >
> > > Hi,
> > >
> > > I think extending min/max to non-numeric types makes sense. Wondering
> > > why we should require a `Comparator` or if we should require that the
> > > types implement `Comparable` instead?
> > >
> > > Good question. This is what it would look like:
> >
> > KTable<K, V> min_comparable()
> > KTable<K, V> min_comparator(Comparator<V> comp)
> >
> > For min_comparable to work, you still need the bounds "V extends
> > Comparable<
> > V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
> it
> > has to be at the interface level like this:
> >
> >  KStream<K,  V extends Comparable<V>>
> >
> > which is a little messy unless there is a different way to do the same.
> The
> > comparator gives a simple way to define an anonymous function.
> >
> > What do you think ?
> >
> >
> > > I also think, that min/max should not change the value type. Using
> > > `Long` for sum() make sense though, and also to require a `<V extends
> > > Number>`.
> > >
> > > I guess these are the two possibilities:
> >
> > <E extends Number> Long sum(Function<V, E> func)
> > Long sum(Function<V, Number> func)
> >
> > Both should work. "func" can return any subtypes of Number and I don't
> see
> > any advantages with the first version. Can you clarify ?
> >
> > Thanks
> > Mohan
> >
> >
> > >
> > > -Matthias
> > >
> > > On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> > > > Hi Alex,
> > > >
> > > > On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
> > > alexandre.bra...@gmail.com>
> > > > wrote:
> > > >
> > > >>
> > > >> My point here is that, when we're only interested in a max/min
> numeric
> > > >> value, it doesn't
> > > >> matter when we have repeated values, since we'd be only forwarding
> the
> > > >> number downstream,
> > > >> so I could disregard when the Comparator returns a zero value
> (meaning
> > > >> equals) and min/max
> > > >> would still be semantically correct. But when we're forwarding the
> > > original
> > > >> object downstream
> > > >> instead of its numeric property, it could mean different things
> > > >> semantically depending on how
> > > >> we handle the repeated values.
> > > >>
> > > >> As an example, if I were using max() on a stream of Biddings for
> > > products
> > > >> in an auction, the
> > > >> order of the biddings would probably influence the winner if two
> > clients
> > > >> send Biddings with the
> > > >> same value. If we're only forwarding the Bidding value downstream (a
> > > double
> > > >> value of 100, for
> > > >> example), it doesn't matter how repeated values are handled, since
> the
> > > max
> > > >> price for this
> > > >> auction would still be 100.00, no matter what Bidding got selected
> in
> > > the
> > > >> end. But if we're
> > > >> forwarding the Biddings downstream instead, then it matters whether
> > the
> > > >> winning Bidding sent
> > > >> downstream was originally posted by Client A or Client B.
> > > >>
> > > >> I'm not saying that an overloaded method to handle different options
> > for
> > > >> how repeated values
> > > >> should be handled by min/max is mandatory, but it should be clear on
> > the
> > > >> methods' docs
> > > >> what would happen when Comparator.compare() == 0. My preferred
> option
> > > for
> > > >> the default
> > > >> behaviour is to only forward a new value is smaller/bigger than the
> > > >> previous min/max value
> > > >> (ignoring compare() == 0), since it would emit less values
> downstream
> > > and
> > > >> would be easier
> > > >> to read ("I only send a value downstream if it's bigger/smaller than
> > the
> > > >> previously selected
> > > >> value").
> > > >>
> > > > Thanks for the clarification. I like your suggestion unless someone
> > feels
> > > > that they want an option to control this (i.e., when compare() == 0,
> > > return
> > > > the old value vs new value).
> > > >
> > > >
> > > >>
> > > >>> Not knowing the schema of the value (V) has its own set of
> problems.
> > > As I
> > > >> have alluded to
> > > >>> in the proposal, this is a little bit messy. We already have
> "reduce"
> > > >> which can be used to
> > > >>> implement sum (mapValues().reduce()).
> > > >>> Thinking about it more, do you think "sum" would be useful ? One
> > hacky
> > > >> way to implement
> > > >>> this is to inspect the type of the return when the "func" is called
> > the
> > > >> first time OR infer from
> > > >>> the materialized or have an explicit initializer.
> > > >>
> > > >> I think it might be useful for some use cases, yes, but it would be
> > > tricky
> > > >> to implement this in a
> > > >> way that handles generic Numbers and keeps their original
> > implementation
> > > >> class. One
> > > >> simplification you could take is fixating VR to be a Double, and
> then
> > > use
> > > >> Number.doubleValue()
> > > >> to compute the sum.
> > > >>
> > > >
> > > > Yeah, that would simplify quite a bit. I think you are suggesting
> this:
> > > >
> > > > KTable<K,Double> sum(Function<V, Number> func)
> > > >
> > > >
> > > >> What you said about using reduce() to compute a sum() is also true
> for
> > > >> min() and max(). =) All
> > > >> three methods in this KIP would be a syntactic sugar for what could
> > > >> otherwise be implemented
> > > >> using reduce/aggregate, but I see value in implementing them and
> > > >> simplifying the adoption of
> > > >> those use cases.
> > > >>
> > > >> Agreed. I seem to have forgotten the reason as to why I started this
> > KIP
> > > > :-). There is a long way to go.
> > > >
> > > > -thanks
> > > > Mohan
> > > >
> > > > Best regards,
> > > >> Alexandre
> > > >>
> > > >> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <
> > > mposde...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hi Alex,
> > > >>>
> > > >>> Responses below.
> > > >>>
> > > >>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
> > > >>> alexandre.bra...@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>>> Hi Mohan,
> > > >>>>
> > > >>>> I like the idea of adding those methods to the API, but I'd like
> to
> > > >> make
> > > >>> a
> > > >>>> suggestion:
> > > >>>>
> > > >>>> Although the most used scenario for min() / max() might possibly
> be
> > > for
> > > >>>> numeric values, I think they could also be
> > > >>>> useful on other objects like Dates, LocalDates or Strings. Why
> limit
> > > >> the
> > > >>>> API to Numbers only?
> > > >>>>
> > > >>>>
> > > >>> There was no specific reason. Just addressing the common scenario.
> > But
> > > I
> > > >>> don't see why this can't be supported given your suggestion below.
> > > >>>
> > > >>> Extending on the above, couldn't we change the API to provide a
> > > >>>> Comparator<V> instead of the Function<V, VR>
> > > >>>> for those methods, and make them return a KTable<K, V> instead?
> Not
> > > >> only
> > > >>>> would this approach not limit the
> > > >>>> usage of those methods to Numbers, but they'd also preserve the
> > origin
> > > >>> from
> > > >>>> the min/max value [1]. The extraction of
> > > >>>> a single (numeric?) value could be achieved by a subsequent
> > > >> .mapValues()
> > > >>>> operator, and this strategy would also
> > > >>>> allow us to reuse the stream's current value serde on min / max,
> > > making
> > > >>> the
> > > >>>> Materialized an optional parameter.
> > > >>>>
> > > >>>> I like your idea though it is odd that min/max returns  KTable<K,
> V>
> > > >>> instead of the KTable<K, VR> (like in count), but mapValues should
> do
> > > the
> > > >>> trick.
> > > >>>
> > > >>> One extra complication of this approach is that now we'd have to
> > handle
> > > >>>> repeated min/max values from different
> > > >>>> origins (two semantically different objects for which the
> comparator
> > > >>>> returns 0), but we could solve that by adding
> > > >>>> a parameter to specify whether to use the older or newer value (or
> > > >>> assuming
> > > >>>> one of these options as default for a
> > > >>>> simpler API?).
> > > >>>>
> > > >>>> I am not sure whether this complexity is warranted. Why can't we
> > just
> > > >>> stick to the way a regular Comparator works ? Can you give me a
> real
> > > >> world
> > > >>> example ?
> > > >>>
> > > >>>>
> > > >>>> I know it's an implementation issue, but I'm curious on how you'd
> > > solve
> > > >>>> handling the <VR extends Number> on
> > > >>>> the sum(). Since the multiple implementations of this interface
> > don't
> > > >>> have
> > > >>>> a common constructor nor an interface
> > > >>>> method to add two Numbers, would it be possible to implement sum()
> > and
> > > >>>> retain the original VR type on the
> > > >>>> returned KTable?
> > > >>>>
> > > >>>
> > > >>> Not knowing the schema of the value (V) has its own set of
> problems.
> > > As I
> > > >>> have alluded to in the proposal, this is a little bit messy. We
> > already
> > > >>> have "reduce" which can be used to implement sum
> > > (mapValues().reduce()).
> > > >>> Thinking about it more, do you think "sum" would be useful ? One
> > hacky
> > > >> way
> > > >>> to implement this is to inspect the type of the return when the
> > "func"
> > > is
> > > >>> called the first time OR infer from the materialized or have an
> > > explicit
> > > >>> initializer.
> > > >>>
> > > >>> Thanks
> > > >>> Mohan
> > > >>>
> > > >>>
> > > >>>> [1]: An example scenario for this would be to find the min / max
> > > >> Bidding
> > > >>>> for a product where, at the end of the
> > > >>>> auction, I need not only the min / max value of said Bidding, but
> > also
> > > >>> the
> > > >>>> bidder's contact information.
> > > >>>>
> > > >>>> Best,
> > > >>>> Alexandre
> > > >>>>
> > > >>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <
> > > >> mposde...@gmail.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> I have created a proposal for adding some additional aggregation
> > APIs
> > > >>>> like
> > > >>>>> count.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
> > > >>>>>
> > > >>>>> I have noted down some of the issues that need discussion. Thanks
> > to
> > > >>>>> Matthias for helping me with the scope of the proposal.
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>> Mohan
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> >
>

Reply via email to