I was playing with the code a little bit, but it seems not to be easy to
use generics to enforce that V is `Comparable`...

We would need to introduce a new interface

 interface ComparableStream<K, V extends Comparable<V>>
    extends KStream<K, V>
 {
    KTable<K, V> min();
 }

But it also requires a nasty cast to actually use it:

  KStream<String, String> stream =
    new StreamsBuilder().stream("");
  KTable<String, String> table =
    ((ComparableStream<String, String>) stream).min();

If the value-type does not implement `Comparable` the cast would not
compile... Or would there be a simpler way to ensure that min() can only
be called _if_ V is `Comparable`?


So maybe passing in a `Comparator<V>` might be the right way to go;
might also be more flexible anyway. -- My original idea was just to
maybe avoid the `Comparator` argument, as it would make the API nicer
IMHO; fewer parameters is usually better...


I am not sure why we would want to pass `Function<V, Comparable<?>>
func` into `min()`?



-Matthias



On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> Alex,
> 
> 
> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <alexandre.bra...@gmail.com>
> wrote:
> 
>> Mohan / Mathias,
>>
>>>> I think extending min/max to non-numeric types makes sense. Wondering
>>>> why we should require a `Comparator` or if we should require that the
>>>> types implement `Comparable` instead?
>>>>
>>> Good question. This is what it would look like:
>>>
>>> KTable<K, V> min_comparable()
>>> KTable<K, V> min_comparator(Comparator<V> comp)
>>
>> Not sure if I understood Mathias' proposal correctly, but I think that
>> instead of going with
>> your original proposal (<VR extends Number> KTable<K, VR> min(Function<V,
>> VR> func...)
>> or mine (KTable<K, V> min(Comparator<V> comparator...), we could simplify
>> it a
>> bit by using a function to extract a Comparable property from the original
>> value:
>>
>> KTable<K, V> min(Function<V, Comparable<?>> func...)
>>
>> I will let Matthias clarify. I am not sure why it is simpler than the
> comparator one. Comparable is implemented by the type and not sure exposing
> it this way makes it any better.
> 
>> I also think, that min/max should not change the value type. Using
>>> `Long` for sum() make sense though, and also to require a `<V extends
>>> Number>`.
>>
>> Are there any reasons to limit the sum() to integers? Why not use a Double
>> instead?
>>
>> Yeah, if the precision is important, then we should stick with Double.
> 
> -mohan
> 
> Best regards,
>> Alexandre
>>
>> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <mposde...@gmail.com>
>> wrote:
>>
>>> Matthias,
>>>
>>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
>> <mj...@mailbox.org.invalid
>>>>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I think extending min/max to non-numeric types makes sense. Wondering
>>>> why we should require a `Comparator` or if we should require that the
>>>> types implement `Comparable` instead?
>>>>
>>>> Good question. This is what it would look like:
>>>
>>> KTable<K, V> min_comparable()
>>> KTable<K, V> min_comparator(Comparator<V> comp)
>>>
>>> For min_comparable to work, you still need the bounds "V extends
>>> Comparable<
>>> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
>> it
>>> has to be at the interface level like this:
>>>
>>>  KStream<K,  V extends Comparable<V>>
>>>
>>> which is a little messy unless there is a different way to do the same.
>> The
>>> comparator gives a simple way to define an anonymous function.
>>>
>>> What do you think ?
>>>
>>>
>>>> I also think, that min/max should not change the value type. Using
>>>> `Long` for sum() make sense though, and also to require a `<V extends
>>>> Number>`.
>>>>
>>>> I guess these are the two possibilities:
>>>
>>> <E extends Number> Long sum(Function<V, E> func)
>>> Long sum(Function<V, Number> func)
>>>
>>> Both should work. "func" can return any subtypes of Number and I don't
>> see
>>> any advantages with the first version. Can you clarify ?
>>>
>>> Thanks
>>> Mohan
>>>
>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
>>>>> Hi Alex,
>>>>>
>>>>> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
>>>> alexandre.bra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> My point here is that, when we're only interested in a max/min
>> numeric
>>>>>> value, it doesn't
>>>>>> matter when we have repeated values, since we'd be only forwarding
>> the
>>>>>> number downstream,
>>>>>> so I could disregard when the Comparator returns a zero value
>> (meaning
>>>>>> equals) and min/max
>>>>>> would still be semantically correct. But when we're forwarding the
>>>> original
>>>>>> object downstream
>>>>>> instead of its numeric property, it could mean different things
>>>>>> semantically depending on how
>>>>>> we handle the repeated values.
>>>>>>
>>>>>> As an example, if I were using max() on a stream of Biddings for
>>>> products
>>>>>> in an auction, the
>>>>>> order of the biddings would probably influence the winner if two
>>> clients
>>>>>> send Biddings with the
>>>>>> same value. If we're only forwarding the Bidding value downstream (a
>>>> double
>>>>>> value of 100, for
>>>>>> example), it doesn't matter how repeated values are handled, since
>> the
>>>> max
>>>>>> price for this
>>>>>> auction would still be 100.00, no matter what Bidding got selected
>> in
>>>> the
>>>>>> end. But if we're
>>>>>> forwarding the Biddings downstream instead, then it matters whether
>>> the
>>>>>> winning Bidding sent
>>>>>> downstream was originally posted by Client A or Client B.
>>>>>>
>>>>>> I'm not saying that an overloaded method to handle different options
>>> for
>>>>>> how repeated values
>>>>>> should be handled by min/max is mandatory, but it should be clear on
>>> the
>>>>>> methods' docs
>>>>>> what would happen when Comparator.compare() == 0. My preferred
>> option
>>>> for
>>>>>> the default
>>>>>> behaviour is to only forward a new value is smaller/bigger than the
>>>>>> previous min/max value
>>>>>> (ignoring compare() == 0), since it would emit less values
>> downstream
>>>> and
>>>>>> would be easier
>>>>>> to read ("I only send a value downstream if it's bigger/smaller than
>>> the
>>>>>> previously selected
>>>>>> value").
>>>>>>
>>>>> Thanks for the clarification. I like your suggestion unless someone
>>> feels
>>>>> that they want an option to control this (i.e., when compare() == 0,
>>>> return
>>>>> the old value vs new value).
>>>>>
>>>>>
>>>>>>
>>>>>>> Not knowing the schema of the value (V) has its own set of
>> problems.
>>>> As I
>>>>>> have alluded to
>>>>>>> in the proposal, this is a little bit messy. We already have
>> "reduce"
>>>>>> which can be used to
>>>>>>> implement sum (mapValues().reduce()).
>>>>>>> Thinking about it more, do you think "sum" would be useful ? One
>>> hacky
>>>>>> way to implement
>>>>>>> this is to inspect the type of the return when the "func" is called
>>> the
>>>>>> first time OR infer from
>>>>>>> the materialized or have an explicit initializer.
>>>>>>
>>>>>> I think it might be useful for some use cases, yes, but it would be
>>>> tricky
>>>>>> to implement this in a
>>>>>> way that handles generic Numbers and keeps their original
>>> implementation
>>>>>> class. One
>>>>>> simplification you could take is fixating VR to be a Double, and
>> then
>>>> use
>>>>>> Number.doubleValue()
>>>>>> to compute the sum.
>>>>>>
>>>>>
>>>>> Yeah, that would simplify quite a bit. I think you are suggesting
>> this:
>>>>>
>>>>> KTable<K,Double> sum(Function<V, Number> func)
>>>>>
>>>>>
>>>>>> What you said about using reduce() to compute a sum() is also true
>> for
>>>>>> min() and max(). =) All
>>>>>> three methods in this KIP would be a syntactic sugar for what could
>>>>>> otherwise be implemented
>>>>>> using reduce/aggregate, but I see value in implementing them and
>>>>>> simplifying the adoption of
>>>>>> those use cases.
>>>>>>
>>>>>> Agreed. I seem to have forgotten the reason as to why I started this
>>> KIP
>>>>> :-). There is a long way to go.
>>>>>
>>>>> -thanks
>>>>> Mohan
>>>>>
>>>>> Best regards,
>>>>>> Alexandre
>>>>>>
>>>>>> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <
>>>> mposde...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Alex,
>>>>>>>
>>>>>>> Responses below.
>>>>>>>
>>>>>>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
>>>>>>> alexandre.bra...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Mohan,
>>>>>>>>
>>>>>>>> I like the idea of adding those methods to the API, but I'd like
>> to
>>>>>> make
>>>>>>> a
>>>>>>>> suggestion:
>>>>>>>>
>>>>>>>> Although the most used scenario for min() / max() might possibly
>> be
>>>> for
>>>>>>>> numeric values, I think they could also be
>>>>>>>> useful on other objects like Dates, LocalDates or Strings. Why
>> limit
>>>>>> the
>>>>>>>> API to Numbers only?
>>>>>>>>
>>>>>>>>
>>>>>>> There was no specific reason. Just addressing the common scenario.
>>> But
>>>> I
>>>>>>> don't see why this can't be supported given your suggestion below.
>>>>>>>
>>>>>>> Extending on the above, couldn't we change the API to provide a
>>>>>>>> Comparator<V> instead of the Function<V, VR>
>>>>>>>> for those methods, and make them return a KTable<K, V> instead?
>> Not
>>>>>> only
>>>>>>>> would this approach not limit the
>>>>>>>> usage of those methods to Numbers, but they'd also preserve the
>>> origin
>>>>>>> from
>>>>>>>> the min/max value [1]. The extraction of
>>>>>>>> a single (numeric?) value could be achieved by a subsequent
>>>>>> .mapValues()
>>>>>>>> operator, and this strategy would also
>>>>>>>> allow us to reuse the stream's current value serde on min / max,
>>>> making
>>>>>>> the
>>>>>>>> Materialized an optional parameter.
>>>>>>>>
>>>>>>>> I like your idea though it is odd that min/max returns  KTable<K,
>> V>
>>>>>>> instead of the KTable<K, VR> (like in count), but mapValues should
>> do
>>>> the
>>>>>>> trick.
>>>>>>>
>>>>>>> One extra complication of this approach is that now we'd have to
>>> handle
>>>>>>>> repeated min/max values from different
>>>>>>>> origins (two semantically different objects for which the
>> comparator
>>>>>>>> returns 0), but we could solve that by adding
>>>>>>>> a parameter to specify whether to use the older or newer value (or
>>>>>>> assuming
>>>>>>>> one of these options as default for a
>>>>>>>> simpler API?).
>>>>>>>>
>>>>>>>> I am not sure whether this complexity is warranted. Why can't we
>>> just
>>>>>>> stick to the way a regular Comparator works ? Can you give me a
>> real
>>>>>> world
>>>>>>> example ?
>>>>>>>
>>>>>>>>
>>>>>>>> I know it's an implementation issue, but I'm curious on how you'd
>>>> solve
>>>>>>>> handling the <VR extends Number> on
>>>>>>>> the sum(). Since the multiple implementations of this interface
>>> don't
>>>>>>> have
>>>>>>>> a common constructor nor an interface
>>>>>>>> method to add two Numbers, would it be possible to implement sum()
>>> and
>>>>>>>> retain the original VR type on the
>>>>>>>> returned KTable?
>>>>>>>>
>>>>>>>
>>>>>>> Not knowing the schema of the value (V) has its own set of
>> problems.
>>>> As I
>>>>>>> have alluded to in the proposal, this is a little bit messy. We
>>> already
>>>>>>> have "reduce" which can be used to implement sum
>>>> (mapValues().reduce()).
>>>>>>> Thinking about it more, do you think "sum" would be useful ? One
>>> hacky
>>>>>> way
>>>>>>> to implement this is to inspect the type of the return when the
>>> "func"
>>>> is
>>>>>>> called the first time OR infer from the materialized or have an
>>>> explicit
>>>>>>> initializer.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Mohan
>>>>>>>
>>>>>>>
>>>>>>>> [1]: An example scenario for this would be to find the min / max
>>>>>> Bidding
>>>>>>>> for a product where, at the end of the
>>>>>>>> auction, I need not only the min / max value of said Bidding, but
>>> also
>>>>>>> the
>>>>>>>> bidder's contact information.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Alexandre
>>>>>>>>
>>>>>>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <
>>>>>> mposde...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I have created a proposal for adding some additional aggregation
>>> APIs
>>>>>>>> like
>>>>>>>>> count.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
>>>>>>>>>
>>>>>>>>> I have noted down some of the issues that need discussion. Thanks
>>> to
>>>>>>>>> Matthias for helping me with the scope of the proposal.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Mohan
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Reply via email to