Jeyhun,

I am not an expert on Lambdas. Can you elaborate a little bit? I cannot
follow the explanation in the KIP to see what the problem is.

For updating the KIP title I don't know -- guess it's up to you. Maybe a
committer can comment on this?


Further comments:

 - The KIP get a little hard to read -- can you maybe reformat the wiki
page a little bit? I think using `CodeBlock` would help.

 - What about KStream-KTable joins? You don't have overlaods added for
them. Why? (Even if I still hope that we don't need to add any new
overloads)

 - Why do we need `AbstractRichFunction`?

 - What about interfaces Initializer, ForeachAction, Merger, Predicate,
Reducer? I don't want to say we should/need to add to all, but we should
discuss all of them and add where it does make sense (e.g.,
RichForachAction does make sense IMHO)


Btw: I like the hierarchy `ValueXX` -- `ValueXXWithKey` -- `RichValueXX`
in general -- but why can't we do all this with interfaces only?



-Matthias



On 5/11/17 7:00 AM, Jeyhun Karimov wrote:
> Hi,
> 
> Thanks for your comments. I think we cannot extend the two interfaces if we
> want to keep lambdas. I updated the KIP [1]. Maybe I should change the
> title, because now we are not limiting the KIP to only ValueMapper,
> ValueTransformer and ValueJoiner.
> Please feel free to comment.
> 
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner
> 
> 
> Cheers,
> Jeyhun
> 
> On Tue, May 9, 2017 at 7:36 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> If `ValueMapperWithKey` extends `ValueMapper` we don't need the new
>> overlaod.
>>
>> And yes, we need to do one check -- but this happens when building the
>> topology. At runtime (I mean after KafkaStream#start() we don't need any
>> check as we will always use `ValueMapperWithKey`)
>>
>>
>> -Matthias
>>
>>
>> On 5/9/17 2:55 AM, Jeyhun Karimov wrote:
>>> Hi,
>>>
>>> Thanks for feedback.
>>> Then we need to overload method
>>>   <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR>
>>> mapper);
>>> with
>>>   <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends
>> VR>
>>> mapper);
>>>
>>> and in runtime (inside processor) we still have to check it is
>> ValueMapper
>>> or ValueMapperWithKey before wrapping it into the rich function.
>>>
>>>
>>> Please correct me if I am wrong.
>>>
>>> Cheers,
>>> Jeyhun
>>>
>>>
>>>
>>>
>>> On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
>>> michal.borowie...@openbet.com> wrote:
>>>
>>>> +1 :)
>>>>
>>>>
>>>> On 08/05/17 23:52, Matthias J. Sax wrote:
>>>>> Hi,
>>>>>
>>>>> I was reading the updated KIP and I am wondering, if we should do the
>>>>> design a little different.
>>>>>
>>>>> Instead of distinguishing between a RichFunction and non-RichFunction
>> at
>>>>> runtime level, we would use RichFunctions all the time. Thus, on the
>> DSL
>>>>> entry level, if a user provides a non-RichFunction, we wrap it by a
>>>>> RichFunction that is fully implemented by Streams. This RichFunction
>>>>> would just forward the call omitting the key:
>>>>>
>>>>> Just to sketch the idea (incomplete code snippet):
>>>>>
>>>>>> public StreamsRichValueMapper implements RichValueMapper() {
>>>>>>    private ValueMapper userProvidedMapper; // set by constructor
>>>>>>
>>>>>>    public VR apply(final K key, final V1 value1, final V2 value2) {
>>>>>>        return userProvidedMapper(value1, value2);
>>>>>>    }
>>>>>> }
>>>>>
>>>>>  From a performance point of view, I am not sure if the
>>>>> "if(isRichFunction)" including casts etc would have more overhead than
>>>>> this approach (we would do more nested method call for non-RichFunction
>>>>> which should be more common than RichFunctions).
>>>>>
>>>>> This approach should not effect lambdas (or do I miss something?) and
>>>>> might be cleaner, as we could have one more top level interface
>>>>> `RichFunction` with methods `init()` and `close()` and also interfaces
>>>>> for `RichValueMapper` etc. (thus, no abstract classes required).
>>>>>
>>>>>
>>>>> Any thoughts?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for comments. I extended PR and KIP to include rich functions.
>> I
>>>>>> will still have to evaluate the cost of deep copying of keys.
>>>>>>
>>>>>> Cheers,
>>>>>> Jeyhun
>>>>>>
>>>>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <
>>>> mathieu.fenn...@replicon.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Matthias,
>>>>>>>
>>>>>>> My opinion would be that documenting the immutability of the key is
>> the
>>>>>>> best approach available.  Better than requiring the key to be
>>>> serializable
>>>>>>> (as with Jeyhun's last pass at the PR), no performance risk.
>>>>>>>
>>>>>>> It'd be different if Java had immutable type constraints of some
>> kind.
>>>> :-)
>>>>>>>
>>>>>>> Mathieu
>>>>>>>
>>>>>>>
>>>>>>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <
>>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Agreed about RichFunction. If we follow this path, it should cover
>>>>>>>> all(?) DSL interfaces.
>>>>>>>>
>>>>>>>> About guarding the key -- I am still not sure what to do about it...
>>>>>>>> Maybe it might be enough to document it (and name the key parameter
>>>> like
>>>>>>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
>>>>>>>> guard against any modification, but I have no good idea how to do
>>>> this.
>>>>>>>> Not sure what others think about the risk of corrupted partitioning
>>>>>>>> (what would be a user error and we could say, well, bad luck, you
>> got
>>>> a
>>>>>>>> bug in your code, that's not our fault), vs deep copy with a
>> potential
>>>>>>>> performance hit (that we can't quantity atm without any performance
>>>>>>> test).
>>>>>>>> We do have a performance system test. Maybe it's worth for you to
>>>> apply
>>>>>>>> the deep copy strategy and run the test. It's very basic performance
>>>>>>>> test only, but might give some insight. If you want to do this, look
>>>>>>>> into folder "tests" for general test setup, and into
>>>>>>>> "tests/kafaktests/benchmarks/streams" to find find the perf test.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
>>>>>>>>> Hi Matthias,
>>>>>>>>>
>>>>>>>>> I think extending KIP to include RichFunctions totally  makes
>> sense.
>>>>>>> So,
>>>>>>>>>   we don't want to guard the keys because it is costly.
>>>>>>>>> If we introduce RichFunctions I think it should not be limited only
>>>>>>> the 3
>>>>>>>>> interfaces proposed in KIP but to wide range of interfaces.
>>>>>>>>> Please correct me if I am wrong.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Jeyhun
>>>>>>>>>
>>>>>>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <
>>>> matth...@confluent.io
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> One follow up. There was this email on the user list:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
>>>>>>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
>>>>>>>>>> It might make sense so include Initializer, Adder, and Substractor
>>>>>>>>>> inferface, too.
>>>>>>>>>>
>>>>>>>>>> And we should double check if there are other interface we might
>>>> miss
>>>>>>>> atm.
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
>>>>>>>>>>> Thanks for updating the KIP.
>>>>>>>>>>>
>>>>>>>>>>> Deep copying the key will work for sure, but I am actually a
>> little
>>>>>>> bit
>>>>>>>>>>> worried about performance impact... We might want to do some test
>>>> to
>>>>>>>>>>> quantify this impact.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Btw: this remind me about the idea of `RichFunction` interface
>> that
>>>>>>>>>>> would allow users to access record metadata (like timestamp,
>>>> offset,
>>>>>>>>>>> partition etc) within DSL. This would be a similar concept.
>> Thus, I
>>>>>>> am
>>>>>>>>>>> wondering, if it would make sense to enlarge the scope of this
>> KIP
>>>> by
>>>>>>>>>>> that? WDYT?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
>>>>>>>>>>>> Hi Mathieu,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for feedback. I followed similar approach and updated PR
>>>> and
>>>>>>>> KIP
>>>>>>>>>>>> accordingly. I tried to guard the key in Processors sending a
>> copy
>>>>>>> of
>>>>>>>> an
>>>>>>>>>>>> actual key.
>>>>>>>>>>>> Because I am doing deep copy of an object, I think memory can be
>>>>>>>>>> bottleneck
>>>>>>>>>>>> in some use-cases.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <
>>>>>>>>>> mathieu.fenn...@replicon.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>>>
>>>>>>>>>>>>> This approach would change ValueMapper (...etc) to be classes,
>>>>>>> rather
>>>>>>>>>> than
>>>>>>>>>>>>> interfaces, which is also a backwards incompatible change.  An
>>>>>>>>>> alternative
>>>>>>>>>>>>> approach that would be backwards compatible would be to define
>>>> new
>>>>>>>>>>>>> interfaces, and provide overrides where those interfaces are
>>>> used.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Unfortunately, making the key parameter as "final" doesn't
>> change
>>>>>>>> much
>>>>>>>>>>>>> about guarding against key change.  It only prevents the
>>>> parameter
>>>>>>>>>> variable
>>>>>>>>>>>>> from being reassigned.  If the key type is a mutable object
>> (eg.
>>>>>>>>>> byte[]),
>>>>>>>>>>>>> it can still be mutated. (eg. key[0] = 0).  But I'm not really
>>>> sure
>>>>>>>>>> there's
>>>>>>>>>>>>> much that can be done about that.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Mathieu
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <
>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for comments.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The concerns makes sense. Although we can guard for immutable
>>>> keys
>>>>>>>> in
>>>>>>>>>>>>>> current implementation (with few changes), I didn't consider
>>>>>>>> backward
>>>>>>>>>>>>>> compatibility.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user
>>>>>>>> accesses
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> key in Object type, as passing extra type parameter will break
>>>>>>>>>>>>>> backwards-compatibility.  So user has to cast to actual key
>>>> type.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key
>>>> and
>>>>>>>>>> value)
>>>>>>>>>>>>>> and force key to be *final*. By doing this,  I think we can
>>>>>>> address
>>>>>>>>>> both
>>>>>>>>>>>>>> backward-compatibility and guarding against key change.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2. Secondly, we can create class KeyAccess like:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public class KeyAccess {
>>>>>>>>>>>>>>      Object key;
>>>>>>>>>>>>>>      public void beforeApply(final Object key) {
>>>>>>>>>>>>>>          this.key = key;
>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>      public Object getKey() {
>>>>>>>>>>>>>>          return key;
>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and
>> *ValueTransformer*
>>>>>>> from
>>>>>>>>>>>>>> *KeyAccess*. Inside processor (for example
>>>>>>>> *KTableMapValuesProcessor*)
>>>>>>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by
>>>>>>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use
>> *getKey()*
>>>> to
>>>>>>>>>> access
>>>>>>>>>>>>>> the key inside *apply(value)* method.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <
>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeyhun,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> thanks a lot for the KIP!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think there are two issues we need to address:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users
>> did
>>>>>>>>>>>>> complain
>>>>>>>>>>>>>>> about this in past releases already, and as the user base
>>>> grows,
>>>>>>> we
>>>>>>>>>>>>>>> should not break backward compatibility in future releases
>>>>>>> anymore.
>>>>>>>>>>>>>>> Thus, we should think of a better way to allow key access.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Mathieu's comment goes into the same direction
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On the other hand, the number of compile failures that
>> would
>>>>>>> need
>>>>>>>>>> to
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> fixed from this change is unfortunate. :-)
>>>>>>>>>>>>>>> (2) Another concern is, that there is no guard to prevent
>> user
>>>>>>> code
>>>>>>>>>> to
>>>>>>>>>>>>>>> modify the key. This might corrupt partitioning if users do
>>>> alter
>>>>>>>> the
>>>>>>>>>>>>>>> key (accidentally -- or users are just not aware that they
>> are
>>>>>>> not
>>>>>>>>>>>>>>> allowed to modify the provided key object) and thus break the
>>>>>>>>>>>>>>> application. (This was the original motivation to not provide
>>>> the
>>>>>>>> key
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> the first place -- it's guards against modification.)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote:
>>>>>>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I just want to add my voice that, I too, have wished for
>>>> access
>>>>>>> to
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> record key during a mapValues or similar operation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On the other hand, the number of compile failures that would
>>>>>>> need
>>>>>>>> to
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> fixed from this change is unfortunate. :-)  But at least it
>>>>>>> would
>>>>>>>>>> all
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> pretty clear and easy change.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Mathieu
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <
>>>>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Dear community,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2],
>>>>>>>>>>>>> KAFKA-4726
>>>>>>>>>>>>>>> [3],
>>>>>>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5].
>>>>>>>>>>>>>>>>> I would like to get your comments.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+
>>>>>>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner
>>>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218
>>>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726
>>>>>>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745
>>>>>>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> -Cheers
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> -Cheers
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> -Cheers
>>>>>>>>>
>>>>>>>>> Jeyhun
>>>>>>>>>
>>>>>>>>
>>>>
>>>> --
>>> -Cheers
>>>
>>> Jeyhun
>>>
>>
>> --
> -Cheers
> 
> Jeyhun
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to