Hi Mohan,

> I like your idea though it is odd that min/max returns  KTable<K, V>
instead of the
> KTable<K, VR> (like in count), but mapValues should do the trick.

My line of thought is that min/max could forward the min/max V associated
with each K
downstream. It'd not be the same as in count, where it'll always forward a
positive integer
representing the number of occurrences.

> I am not sure whether this complexity is warranted. Why can't we just
stick to the way
> a regular Comparator works ? Can you give me a real world example ?

My point here is that, when we're only interested in a max/min numeric
value, it doesn't
matter when we have repeated values, since we'd be only forwarding the
number downstream,
so I could disregard when the Comparator returns a zero value (meaning
equals) and min/max
would still be semantically correct. But when we're forwarding the original
object downstream
instead of its numeric property, it could mean different things
semantically depending on how
we handle the repeated values.

As an example, if I were using max() on a stream of Biddings for products
in an auction, the
order of the biddings would probably influence the winner if two clients
send Biddings with the
same value. If we're only forwarding the Bidding value downstream (a double
value of 100, for
example), it doesn't matter how repeated values are handled, since the max
price for this
auction would still be 100.00, no matter what Bidding got selected in the
end. But if we're
forwarding the Biddings downstream instead, then it matters whether the
winning Bidding sent
downstream was originally posted by Client A or Client B.

I'm not saying that an overloaded method to handle different options for
how repeated values
should be handled by min/max is mandatory, but it should be clear on the
methods' docs
what would happen when Comparator.compare() == 0. My preferred option for
the default
behaviour is to only forward a new value is smaller/bigger than the
previous min/max value
(ignoring compare() == 0), since it would emit less values downstream and
would be easier
to read ("I only send a value downstream if it's bigger/smaller than the
previously selected
value").

> Not knowing the schema of the value (V) has its own set of problems. As I
have alluded to
> in the proposal, this is a little bit messy. We already have "reduce"
which can be used to
> implement sum (mapValues().reduce()).
> Thinking about it more, do you think "sum" would be useful ? One hacky
way to implement
> this is to inspect the type of the return when the "func" is called the
first time OR infer from
> the materialized or have an explicit initializer.

I think it might be useful for some use cases, yes, but it would be tricky
to implement this in a
way that handles generic Numbers and keeps their original implementation
class. One
simplification you could take is fixating VR to be a Double, and then use
Number.doubleValue()
to compute the sum.

What you said about using reduce() to compute a sum() is also true for
min() and max(). =) All
three methods in this KIP would be a syntactic sugar for what could
otherwise be implemented
using reduce/aggregate, but I see value in implementing them and
simplifying the adoption of
those use cases.

Best regards,
Alexandre

On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <mposde...@gmail.com>
wrote:

> Hi Alex,
>
> Responses below.
>
> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
> alexandre.bra...@gmail.com>
> wrote:
>
> > Hi Mohan,
> >
> > I like the idea of adding those methods to the API, but I'd like to make
> a
> > suggestion:
> >
> > Although the most used scenario for min() / max() might possibly be for
> > numeric values, I think they could also be
> > useful on other objects like Dates, LocalDates or Strings. Why limit the
> > API to Numbers only?
> >
> >
> There was no specific reason. Just addressing the common scenario. But I
> don't see why this can't be supported given your suggestion below.
>
> Extending on the above, couldn't we change the API to provide a
> > Comparator<V> instead of the Function<V, VR>
> > for those methods, and make them return a KTable<K, V> instead? Not only
> > would this approach not limit the
> > usage of those methods to Numbers, but they'd also preserve the origin
> from
> > the min/max value [1]. The extraction of
> > a single (numeric?) value could be achieved by a subsequent .mapValues()
> > operator, and this strategy would also
> > allow us to reuse the stream's current value serde on min / max, making
> the
> > Materialized an optional parameter.
> >
> > I like your idea though it is odd that min/max returns  KTable<K, V>
> instead of the KTable<K, VR> (like in count), but mapValues should do the
> trick.
>
> One extra complication of this approach is that now we'd have to handle
> > repeated min/max values from different
> > origins (two semantically different objects for which the comparator
> > returns 0), but we could solve that by adding
> > a parameter to specify whether to use the older or newer value (or
> assuming
> > one of these options as default for a
> > simpler API?).
> >
> > I am not sure whether this complexity is warranted. Why can't we just
> stick to the way a regular Comparator works ? Can you give me a real world
> example ?
>
> >
> > I know it's an implementation issue, but I'm curious on how you'd solve
> > handling the <VR extends Number> on
> > the sum(). Since the multiple implementations of this interface don't
> have
> > a common constructor nor an interface
> > method to add two Numbers, would it be possible to implement sum() and
> > retain the original VR type on the
> > returned KTable?
> >
>
> Not knowing the schema of the value (V) has its own set of problems. As I
> have alluded to in the proposal, this is a little bit messy. We already
> have "reduce" which can be used to implement sum (mapValues().reduce()).
> Thinking about it more, do you think "sum" would be useful ? One hacky way
> to implement this is to inspect the type of the return when the "func" is
> called the first time OR infer from the materialized or have an explicit
> initializer.
>
> Thanks
> Mohan
>
>
> > [1]: An example scenario for this would be to find the min / max Bidding
> > for a product where, at the end of the
> > auction, I need not only the min / max value of said Bidding, but also
> the
> > bidder's contact information.
> >
> > Best,
> > Alexandre
> >
> > On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <mposde...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I have created a proposal for adding some additional aggregation APIs
> > like
> > > count.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
> > >
> > > I have noted down some of the issues that need discussion. Thanks to
> > > Matthias for helping me with the scope of the proposal.
> > >
> > > Thanks
> > > Mohan
> > >
> >
>

Reply via email to