Hi Alex,

On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <alexandre.bra...@gmail.com>
wrote:

>
> My point here is that, when we're only interested in a max/min numeric
> value, it doesn't
> matter when we have repeated values, since we'd be only forwarding the
> number downstream,
> so I could disregard when the Comparator returns a zero value (meaning
> equals) and min/max
> would still be semantically correct. But when we're forwarding the original
> object downstream
> instead of its numeric property, it could mean different things
> semantically depending on how
> we handle the repeated values.
>
> As an example, if I were using max() on a stream of Biddings for products
> in an auction, the
> order of the biddings would probably influence the winner if two clients
> send Biddings with the
> same value. If we're only forwarding the Bidding value downstream (a double
> value of 100, for
> example), it doesn't matter how repeated values are handled, since the max
> price for this
> auction would still be 100.00, no matter what Bidding got selected in the
> end. But if we're
> forwarding the Biddings downstream instead, then it matters whether the
> winning Bidding sent
> downstream was originally posted by Client A or Client B.
>
> I'm not saying that an overloaded method to handle different options for
> how repeated values
> should be handled by min/max is mandatory, but it should be clear on the
> methods' docs
> what would happen when Comparator.compare() == 0. My preferred option for
> the default
> behaviour is to only forward a new value is smaller/bigger than the
> previous min/max value
> (ignoring compare() == 0), since it would emit less values downstream and
> would be easier
> to read ("I only send a value downstream if it's bigger/smaller than the
> previously selected
> value").
>
Thanks for the clarification. I like your suggestion unless someone feels
that they want an option to control this (i.e., when compare() == 0, return
the old value vs new value).


>
> > Not knowing the schema of the value (V) has its own set of problems. As I
> have alluded to
> > in the proposal, this is a little bit messy. We already have "reduce"
> which can be used to
> > implement sum (mapValues().reduce()).
> > Thinking about it more, do you think "sum" would be useful ? One hacky
> way to implement
> > this is to inspect the type of the return when the "func" is called the
> first time OR infer from
> > the materialized or have an explicit initializer.
>
> I think it might be useful for some use cases, yes, but it would be tricky
> to implement this in a
> way that handles generic Numbers and keeps their original implementation
> class. One
> simplification you could take is fixating VR to be a Double, and then use
> Number.doubleValue()
> to compute the sum.
>

Yeah, that would simplify quite a bit. I think you are suggesting this:

KTable<K,Double> sum(Function<V, Number> func)


> What you said about using reduce() to compute a sum() is also true for
> min() and max(). =) All
> three methods in this KIP would be a syntactic sugar for what could
> otherwise be implemented
> using reduce/aggregate, but I see value in implementing them and
> simplifying the adoption of
> those use cases.
>
> Agreed. I seem to have forgotten the reason as to why I started this KIP
:-). There is a long way to go.

-thanks
Mohan

Best regards,
> Alexandre
>
> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <mposde...@gmail.com>
> wrote:
>
> > Hi Alex,
> >
> > Responses below.
> >
> > On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
> > alexandre.bra...@gmail.com>
> > wrote:
> >
> > > Hi Mohan,
> > >
> > > I like the idea of adding those methods to the API, but I'd like to
> make
> > a
> > > suggestion:
> > >
> > > Although the most used scenario for min() / max() might possibly be for
> > > numeric values, I think they could also be
> > > useful on other objects like Dates, LocalDates or Strings. Why limit
> the
> > > API to Numbers only?
> > >
> > >
> > There was no specific reason. Just addressing the common scenario. But I
> > don't see why this can't be supported given your suggestion below.
> >
> > Extending on the above, couldn't we change the API to provide a
> > > Comparator<V> instead of the Function<V, VR>
> > > for those methods, and make them return a KTable<K, V> instead? Not
> only
> > > would this approach not limit the
> > > usage of those methods to Numbers, but they'd also preserve the origin
> > from
> > > the min/max value [1]. The extraction of
> > > a single (numeric?) value could be achieved by a subsequent
> .mapValues()
> > > operator, and this strategy would also
> > > allow us to reuse the stream's current value serde on min / max, making
> > the
> > > Materialized an optional parameter.
> > >
> > > I like your idea though it is odd that min/max returns  KTable<K, V>
> > instead of the KTable<K, VR> (like in count), but mapValues should do the
> > trick.
> >
> > One extra complication of this approach is that now we'd have to handle
> > > repeated min/max values from different
> > > origins (two semantically different objects for which the comparator
> > > returns 0), but we could solve that by adding
> > > a parameter to specify whether to use the older or newer value (or
> > assuming
> > > one of these options as default for a
> > > simpler API?).
> > >
> > > I am not sure whether this complexity is warranted. Why can't we just
> > stick to the way a regular Comparator works ? Can you give me a real
> world
> > example ?
> >
> > >
> > > I know it's an implementation issue, but I'm curious on how you'd solve
> > > handling the <VR extends Number> on
> > > the sum(). Since the multiple implementations of this interface don't
> > have
> > > a common constructor nor an interface
> > > method to add two Numbers, would it be possible to implement sum() and
> > > retain the original VR type on the
> > > returned KTable?
> > >
> >
> > Not knowing the schema of the value (V) has its own set of problems. As I
> > have alluded to in the proposal, this is a little bit messy. We already
> > have "reduce" which can be used to implement sum (mapValues().reduce()).
> > Thinking about it more, do you think "sum" would be useful ? One hacky
> way
> > to implement this is to inspect the type of the return when the "func" is
> > called the first time OR infer from the materialized or have an explicit
> > initializer.
> >
> > Thanks
> > Mohan
> >
> >
> > > [1]: An example scenario for this would be to find the min / max
> Bidding
> > > for a product where, at the end of the
> > > auction, I need not only the min / max value of said Bidding, but also
> > the
> > > bidder's contact information.
> > >
> > > Best,
> > > Alexandre
> > >
> > > On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <
> mposde...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have created a proposal for adding some additional aggregation APIs
> > > like
> > > > count.
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
> > > >
> > > > I have noted down some of the issues that need discussion. Thanks to
> > > > Matthias for helping me with the scope of the proposal.
> > > >
> > > > Thanks
> > > > Mohan
> > > >
> > >
> >
>

Reply via email to