Jeyhun, I am not an expert on Lambdas. Can you elaborate a little bit? I cannot follow the explanation in the KIP to see what the problem is.
For updating the KIP title I don't know -- guess it's up to you. Maybe a committer can comment on this? Further comments: - The KIP get a little hard to read -- can you maybe reformat the wiki page a little bit? I think using `CodeBlock` would help. - What about KStream-KTable joins? You don't have overlaods added for them. Why? (Even if I still hope that we don't need to add any new overloads) - Why do we need `AbstractRichFunction`? - What about interfaces Initializer, ForeachAction, Merger, Predicate, Reducer? I don't want to say we should/need to add to all, but we should discuss all of them and add where it does make sense (e.g., RichForachAction does make sense IMHO) Btw: I like the hierarchy `ValueXX` -- `ValueXXWithKey` -- `RichValueXX` in general -- but why can't we do all this with interfaces only? -Matthias On 5/11/17 7:00 AM, Jeyhun Karimov wrote: > Hi, > > Thanks for your comments. I think we cannot extend the two interfaces if we > want to keep lambdas. I updated the KIP [1]. Maybe I should change the > title, because now we are not limiting the KIP to only ValueMapper, > ValueTransformer and ValueJoiner. > Please feel free to comment. > > [1] > https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner > > > Cheers, > Jeyhun > > On Tue, May 9, 2017 at 7:36 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> If `ValueMapperWithKey` extends `ValueMapper` we don't need the new >> overlaod. >> >> And yes, we need to do one check -- but this happens when building the >> topology. At runtime (I mean after KafkaStream#start() we don't need any >> check as we will always use `ValueMapperWithKey`) >> >> >> -Matthias >> >> >> On 5/9/17 2:55 AM, Jeyhun Karimov wrote: >>> Hi, >>> >>> Thanks for feedback. >>> Then we need to overload method >>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> >>> mapper); >>> with >>> <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends >> VR> >>> mapper); >>> >>> and in runtime (inside processor) we still have to check it is >> ValueMapper >>> or ValueMapperWithKey before wrapping it into the rich function. >>> >>> >>> Please correct me if I am wrong. >>> >>> Cheers, >>> Jeyhun >>> >>> >>> >>> >>> On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki < >>> michal.borowie...@openbet.com> wrote: >>> >>>> +1 :) >>>> >>>> >>>> On 08/05/17 23:52, Matthias J. Sax wrote: >>>>> Hi, >>>>> >>>>> I was reading the updated KIP and I am wondering, if we should do the >>>>> design a little different. >>>>> >>>>> Instead of distinguishing between a RichFunction and non-RichFunction >> at >>>>> runtime level, we would use RichFunctions all the time. Thus, on the >> DSL >>>>> entry level, if a user provides a non-RichFunction, we wrap it by a >>>>> RichFunction that is fully implemented by Streams. This RichFunction >>>>> would just forward the call omitting the key: >>>>> >>>>> Just to sketch the idea (incomplete code snippet): >>>>> >>>>>> public StreamsRichValueMapper implements RichValueMapper() { >>>>>> private ValueMapper userProvidedMapper; // set by constructor >>>>>> >>>>>> public VR apply(final K key, final V1 value1, final V2 value2) { >>>>>> return userProvidedMapper(value1, value2); >>>>>> } >>>>>> } >>>>> >>>>> From a performance point of view, I am not sure if the >>>>> "if(isRichFunction)" including casts etc would have more overhead than >>>>> this approach (we would do more nested method call for non-RichFunction >>>>> which should be more common than RichFunctions). >>>>> >>>>> This approach should not effect lambdas (or do I miss something?) and >>>>> might be cleaner, as we could have one more top level interface >>>>> `RichFunction` with methods `init()` and `close()` and also interfaces >>>>> for `RichValueMapper` etc. (thus, no abstract classes required). >>>>> >>>>> >>>>> Any thoughts? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote: >>>>>> Hi, >>>>>> >>>>>> Thanks for comments. I extended PR and KIP to include rich functions. >> I >>>>>> will still have to evaluate the cost of deep copying of keys. >>>>>> >>>>>> Cheers, >>>>>> Jeyhun >>>>>> >>>>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak < >>>> mathieu.fenn...@replicon.com> >>>>>> wrote: >>>>>> >>>>>>> Hey Matthias, >>>>>>> >>>>>>> My opinion would be that documenting the immutability of the key is >> the >>>>>>> best approach available. Better than requiring the key to be >>>> serializable >>>>>>> (as with Jeyhun's last pass at the PR), no performance risk. >>>>>>> >>>>>>> It'd be different if Java had immutable type constraints of some >> kind. >>>> :-) >>>>>>> >>>>>>> Mathieu >>>>>>> >>>>>>> >>>>>>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax < >>>> matth...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Agreed about RichFunction. If we follow this path, it should cover >>>>>>>> all(?) DSL interfaces. >>>>>>>> >>>>>>>> About guarding the key -- I am still not sure what to do about it... >>>>>>>> Maybe it might be enough to document it (and name the key parameter >>>> like >>>>>>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to >>>>>>>> guard against any modification, but I have no good idea how to do >>>> this. >>>>>>>> Not sure what others think about the risk of corrupted partitioning >>>>>>>> (what would be a user error and we could say, well, bad luck, you >> got >>>> a >>>>>>>> bug in your code, that's not our fault), vs deep copy with a >> potential >>>>>>>> performance hit (that we can't quantity atm without any performance >>>>>>> test). >>>>>>>> We do have a performance system test. Maybe it's worth for you to >>>> apply >>>>>>>> the deep copy strategy and run the test. It's very basic performance >>>>>>>> test only, but might give some insight. If you want to do this, look >>>>>>>> into folder "tests" for general test setup, and into >>>>>>>> "tests/kafaktests/benchmarks/streams" to find find the perf test. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote: >>>>>>>>> Hi Matthias, >>>>>>>>> >>>>>>>>> I think extending KIP to include RichFunctions totally makes >> sense. >>>>>>> So, >>>>>>>>> we don't want to guard the keys because it is costly. >>>>>>>>> If we introduce RichFunctions I think it should not be limited only >>>>>>> the 3 >>>>>>>>> interfaces proposed in KIP but to wide range of interfaces. >>>>>>>>> Please correct me if I am wrong. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Jeyhun >>>>>>>>> >>>>>>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax < >>>> matth...@confluent.io >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> One follow up. There was this email on the user list: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj= >>>>>>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+ >>>>>>>>>> It might make sense so include Initializer, Adder, and Substractor >>>>>>>>>> inferface, too. >>>>>>>>>> >>>>>>>>>> And we should double check if there are other interface we might >>>> miss >>>>>>>> atm. >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote: >>>>>>>>>>> Thanks for updating the KIP. >>>>>>>>>>> >>>>>>>>>>> Deep copying the key will work for sure, but I am actually a >> little >>>>>>> bit >>>>>>>>>>> worried about performance impact... We might want to do some test >>>> to >>>>>>>>>>> quantify this impact. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Btw: this remind me about the idea of `RichFunction` interface >> that >>>>>>>>>>> would allow users to access record metadata (like timestamp, >>>> offset, >>>>>>>>>>> partition etc) within DSL. This would be a similar concept. >> Thus, I >>>>>>> am >>>>>>>>>>> wondering, if it would make sense to enlarge the scope of this >> KIP >>>> by >>>>>>>>>>> that? WDYT? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote: >>>>>>>>>>>> Hi Mathieu, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for feedback. I followed similar approach and updated PR >>>> and >>>>>>>> KIP >>>>>>>>>>>> accordingly. I tried to guard the key in Processors sending a >> copy >>>>>>> of >>>>>>>> an >>>>>>>>>>>> actual key. >>>>>>>>>>>> Because I am doing deep copy of an object, I think memory can be >>>>>>>>>> bottleneck >>>>>>>>>>>> in some use-cases. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Jeyhun >>>>>>>>>>>> >>>>>>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak < >>>>>>>>>> mathieu.fenn...@replicon.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Jeyhun, >>>>>>>>>>>>> >>>>>>>>>>>>> This approach would change ValueMapper (...etc) to be classes, >>>>>>> rather >>>>>>>>>> than >>>>>>>>>>>>> interfaces, which is also a backwards incompatible change. An >>>>>>>>>> alternative >>>>>>>>>>>>> approach that would be backwards compatible would be to define >>>> new >>>>>>>>>>>>> interfaces, and provide overrides where those interfaces are >>>> used. >>>>>>>>>>>>> >>>>>>>>>>>>> Unfortunately, making the key parameter as "final" doesn't >> change >>>>>>>> much >>>>>>>>>>>>> about guarding against key change. It only prevents the >>>> parameter >>>>>>>>>> variable >>>>>>>>>>>>> from being reassigned. If the key type is a mutable object >> (eg. >>>>>>>>>> byte[]), >>>>>>>>>>>>> it can still be mutated. (eg. key[0] = 0). But I'm not really >>>> sure >>>>>>>>>> there's >>>>>>>>>>>>> much that can be done about that. >>>>>>>>>>>>> >>>>>>>>>>>>> Mathieu >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov < >>>>>>> je.kari...@gmail.com >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for comments. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The concerns makes sense. Although we can guard for immutable >>>> keys >>>>>>>> in >>>>>>>>>>>>>> current implementation (with few changes), I didn't consider >>>>>>>> backward >>>>>>>>>>>>>> compatibility. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user >>>>>>>> accesses >>>>>>>>>>>>> the >>>>>>>>>>>>>> key in Object type, as passing extra type parameter will break >>>>>>>>>>>>>> backwards-compatibility. So user has to cast to actual key >>>> type. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key >>>> and >>>>>>>>>> value) >>>>>>>>>>>>>> and force key to be *final*. By doing this, I think we can >>>>>>> address >>>>>>>>>> both >>>>>>>>>>>>>> backward-compatibility and guarding against key change. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. Secondly, we can create class KeyAccess like: >>>>>>>>>>>>>> >>>>>>>>>>>>>> public class KeyAccess { >>>>>>>>>>>>>> Object key; >>>>>>>>>>>>>> public void beforeApply(final Object key) { >>>>>>>>>>>>>> this.key = key; >>>>>>>>>>>>>> } >>>>>>>>>>>>>> public Object getKey() { >>>>>>>>>>>>>> return key; >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and >> *ValueTransformer* >>>>>>> from >>>>>>>>>>>>>> *KeyAccess*. Inside processor (for example >>>>>>>> *KTableMapValuesProcessor*) >>>>>>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by >>>>>>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use >> *getKey()* >>>> to >>>>>>>>>> access >>>>>>>>>>>>>> the key inside *apply(value)* method. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax < >>>>>>>> matth...@confluent.io >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jeyhun, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> thanks a lot for the KIP! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think there are two issues we need to address: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users >> did >>>>>>>>>>>>> complain >>>>>>>>>>>>>>> about this in past releases already, and as the user base >>>> grows, >>>>>>> we >>>>>>>>>>>>>>> should not break backward compatibility in future releases >>>>>>> anymore. >>>>>>>>>>>>>>> Thus, we should think of a better way to allow key access. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Mathieu's comment goes into the same direction >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On the other hand, the number of compile failures that >> would >>>>>>> need >>>>>>>>>> to >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> fixed from this change is unfortunate. :-) >>>>>>>>>>>>>>> (2) Another concern is, that there is no guard to prevent >> user >>>>>>> code >>>>>>>>>> to >>>>>>>>>>>>>>> modify the key. This might corrupt partitioning if users do >>>> alter >>>>>>>> the >>>>>>>>>>>>>>> key (accidentally -- or users are just not aware that they >> are >>>>>>> not >>>>>>>>>>>>>>> allowed to modify the provided key object) and thus break the >>>>>>>>>>>>>>> application. (This was the original motivation to not provide >>>> the >>>>>>>> key >>>>>>>>>>>>> in >>>>>>>>>>>>>>> the first place -- it's guards against modification.) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote: >>>>>>>>>>>>>>>> Hi Jeyhun, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I just want to add my voice that, I too, have wished for >>>> access >>>>>>> to >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> record key during a mapValues or similar operation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On the other hand, the number of compile failures that would >>>>>>> need >>>>>>>> to >>>>>>>>>>>>> be >>>>>>>>>>>>>>>> fixed from this change is unfortunate. :-) But at least it >>>>>>> would >>>>>>>>>> all >>>>>>>>>>>>>> be >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>> pretty clear and easy change. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Mathieu >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov < >>>>>>>>>> je.kari...@gmail.com >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> Dear community, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2], >>>>>>>>>>>>> KAFKA-4726 >>>>>>>>>>>>>>> [3], >>>>>>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5]. >>>>>>>>>>>>>>>>> I would like to get your comments. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+ >>>>>>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner >>>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218 >>>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726 >>>>>>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745 >>>>>>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> -Cheers >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>> -Cheers >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>> >>>>>>>>>> -- >>>>>>>>> -Cheers >>>>>>>>> >>>>>>>>> Jeyhun >>>>>>>>> >>>>>>>> >>>> >>>> -- >>> -Cheers >>> >>> Jeyhun >>> >> >> -- > -Cheers > > Jeyhun >
signature.asc
Description: OpenPGP digital signature