Hi, Thanks for your comments. I think we cannot extend the two interfaces if we want to keep lambdas. I updated the KIP [1]. Maybe I should change the title, because now we are not limiting the KIP to only ValueMapper, ValueTransformer and ValueJoiner. Please feel free to comment.
[1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner Cheers, Jeyhun On Tue, May 9, 2017 at 7:36 PM Matthias J. Sax <matth...@confluent.io> wrote: > If `ValueMapperWithKey` extends `ValueMapper` we don't need the new > overlaod. > > And yes, we need to do one check -- but this happens when building the > topology. At runtime (I mean after KafkaStream#start() we don't need any > check as we will always use `ValueMapperWithKey`) > > > -Matthias > > > On 5/9/17 2:55 AM, Jeyhun Karimov wrote: > > Hi, > > > > Thanks for feedback. > > Then we need to overload method > > <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> > > mapper); > > with > > <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends > VR> > > mapper); > > > > and in runtime (inside processor) we still have to check it is > ValueMapper > > or ValueMapperWithKey before wrapping it into the rich function. > > > > > > Please correct me if I am wrong. > > > > Cheers, > > Jeyhun > > > > > > > > > > On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki < > > michal.borowie...@openbet.com> wrote: > > > >> +1 :) > >> > >> > >> On 08/05/17 23:52, Matthias J. Sax wrote: > >>> Hi, > >>> > >>> I was reading the updated KIP and I am wondering, if we should do the > >>> design a little different. > >>> > >>> Instead of distinguishing between a RichFunction and non-RichFunction > at > >>> runtime level, we would use RichFunctions all the time. Thus, on the > DSL > >>> entry level, if a user provides a non-RichFunction, we wrap it by a > >>> RichFunction that is fully implemented by Streams. This RichFunction > >>> would just forward the call omitting the key: > >>> > >>> Just to sketch the idea (incomplete code snippet): > >>> > >>>> public StreamsRichValueMapper implements RichValueMapper() { > >>>> private ValueMapper userProvidedMapper; // set by constructor > >>>> > >>>> public VR apply(final K key, final V1 value1, final V2 value2) { > >>>> return userProvidedMapper(value1, value2); > >>>> } > >>>> } > >>> > >>> From a performance point of view, I am not sure if the > >>> "if(isRichFunction)" including casts etc would have more overhead than > >>> this approach (we would do more nested method call for non-RichFunction > >>> which should be more common than RichFunctions). > >>> > >>> This approach should not effect lambdas (or do I miss something?) and > >>> might be cleaner, as we could have one more top level interface > >>> `RichFunction` with methods `init()` and `close()` and also interfaces > >>> for `RichValueMapper` etc. (thus, no abstract classes required). > >>> > >>> > >>> Any thoughts? > >>> > >>> > >>> -Matthias > >>> > >>> > >>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote: > >>>> Hi, > >>>> > >>>> Thanks for comments. I extended PR and KIP to include rich functions. > I > >>>> will still have to evaluate the cost of deep copying of keys. > >>>> > >>>> Cheers, > >>>> Jeyhun > >>>> > >>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak < > >> mathieu.fenn...@replicon.com> > >>>> wrote: > >>>> > >>>>> Hey Matthias, > >>>>> > >>>>> My opinion would be that documenting the immutability of the key is > the > >>>>> best approach available. Better than requiring the key to be > >> serializable > >>>>> (as with Jeyhun's last pass at the PR), no performance risk. > >>>>> > >>>>> It'd be different if Java had immutable type constraints of some > kind. > >> :-) > >>>>> > >>>>> Mathieu > >>>>> > >>>>> > >>>>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> Agreed about RichFunction. If we follow this path, it should cover > >>>>>> all(?) DSL interfaces. > >>>>>> > >>>>>> About guarding the key -- I am still not sure what to do about it... > >>>>>> Maybe it might be enough to document it (and name the key parameter > >> like > >>>>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to > >>>>>> guard against any modification, but I have no good idea how to do > >> this. > >>>>>> Not sure what others think about the risk of corrupted partitioning > >>>>>> (what would be a user error and we could say, well, bad luck, you > got > >> a > >>>>>> bug in your code, that's not our fault), vs deep copy with a > potential > >>>>>> performance hit (that we can't quantity atm without any performance > >>>>> test). > >>>>>> We do have a performance system test. Maybe it's worth for you to > >> apply > >>>>>> the deep copy strategy and run the test. It's very basic performance > >>>>>> test only, but might give some insight. If you want to do this, look > >>>>>> into folder "tests" for general test setup, and into > >>>>>> "tests/kafaktests/benchmarks/streams" to find find the perf test. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote: > >>>>>>> Hi Matthias, > >>>>>>> > >>>>>>> I think extending KIP to include RichFunctions totally makes > sense. > >>>>> So, > >>>>>>> we don't want to guard the keys because it is costly. > >>>>>>> If we introduce RichFunctions I think it should not be limited only > >>>>> the 3 > >>>>>>> interfaces proposed in KIP but to wide range of interfaces. > >>>>>>> Please correct me if I am wrong. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Jeyhun > >>>>>>> > >>>>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax < > >> matth...@confluent.io > >>>>>>> wrote: > >>>>>>> > >>>>>>>> One follow up. There was this email on the user list: > >>>>>>>> > >>>>>>>> > >>>>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj= > >>>>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+ > >>>>>>>> It might make sense so include Initializer, Adder, and Substractor > >>>>>>>> inferface, too. > >>>>>>>> > >>>>>>>> And we should double check if there are other interface we might > >> miss > >>>>>> atm. > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote: > >>>>>>>>> Thanks for updating the KIP. > >>>>>>>>> > >>>>>>>>> Deep copying the key will work for sure, but I am actually a > little > >>>>> bit > >>>>>>>>> worried about performance impact... We might want to do some test > >> to > >>>>>>>>> quantify this impact. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Btw: this remind me about the idea of `RichFunction` interface > that > >>>>>>>>> would allow users to access record metadata (like timestamp, > >> offset, > >>>>>>>>> partition etc) within DSL. This would be a similar concept. > Thus, I > >>>>> am > >>>>>>>>> wondering, if it would make sense to enlarge the scope of this > KIP > >> by > >>>>>>>>> that? WDYT? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote: > >>>>>>>>>> Hi Mathieu, > >>>>>>>>>> > >>>>>>>>>> Thanks for feedback. I followed similar approach and updated PR > >> and > >>>>>> KIP > >>>>>>>>>> accordingly. I tried to guard the key in Processors sending a > copy > >>>>> of > >>>>>> an > >>>>>>>>>> actual key. > >>>>>>>>>> Because I am doing deep copy of an object, I think memory can be > >>>>>>>> bottleneck > >>>>>>>>>> in some use-cases. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Jeyhun > >>>>>>>>>> > >>>>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak < > >>>>>>>> mathieu.fenn...@replicon.com> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Jeyhun, > >>>>>>>>>>> > >>>>>>>>>>> This approach would change ValueMapper (...etc) to be classes, > >>>>> rather > >>>>>>>> than > >>>>>>>>>>> interfaces, which is also a backwards incompatible change. An > >>>>>>>> alternative > >>>>>>>>>>> approach that would be backwards compatible would be to define > >> new > >>>>>>>>>>> interfaces, and provide overrides where those interfaces are > >> used. > >>>>>>>>>>> > >>>>>>>>>>> Unfortunately, making the key parameter as "final" doesn't > change > >>>>>> much > >>>>>>>>>>> about guarding against key change. It only prevents the > >> parameter > >>>>>>>> variable > >>>>>>>>>>> from being reassigned. If the key type is a mutable object > (eg. > >>>>>>>> byte[]), > >>>>>>>>>>> it can still be mutated. (eg. key[0] = 0). But I'm not really > >> sure > >>>>>>>> there's > >>>>>>>>>>> much that can be done about that. > >>>>>>>>>>> > >>>>>>>>>>> Mathieu > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov < > >>>>> je.kari...@gmail.com > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Thanks for comments. > >>>>>>>>>>>> > >>>>>>>>>>>> The concerns makes sense. Although we can guard for immutable > >> keys > >>>>>> in > >>>>>>>>>>>> current implementation (with few changes), I didn't consider > >>>>>> backward > >>>>>>>>>>>> compatibility. > >>>>>>>>>>>> > >>>>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user > >>>>>> accesses > >>>>>>>>>>> the > >>>>>>>>>>>> key in Object type, as passing extra type parameter will break > >>>>>>>>>>>> backwards-compatibility. So user has to cast to actual key > >> type. > >>>>>>>>>>>> > >>>>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key > >> and > >>>>>>>> value) > >>>>>>>>>>>> and force key to be *final*. By doing this, I think we can > >>>>> address > >>>>>>>> both > >>>>>>>>>>>> backward-compatibility and guarding against key change. > >>>>>>>>>>>> > >>>>>>>>>>>> 2. Secondly, we can create class KeyAccess like: > >>>>>>>>>>>> > >>>>>>>>>>>> public class KeyAccess { > >>>>>>>>>>>> Object key; > >>>>>>>>>>>> public void beforeApply(final Object key) { > >>>>>>>>>>>> this.key = key; > >>>>>>>>>>>> } > >>>>>>>>>>>> public Object getKey() { > >>>>>>>>>>>> return key; > >>>>>>>>>>>> } > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and > *ValueTransformer* > >>>>> from > >>>>>>>>>>>> *KeyAccess*. Inside processor (for example > >>>>>> *KTableMapValuesProcessor*) > >>>>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by > >>>>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use > *getKey()* > >> to > >>>>>>>> access > >>>>>>>>>>>> the key inside *apply(value)* method. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> Jeyhun > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax < > >>>>>> matth...@confluent.io > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Jeyhun, > >>>>>>>>>>>>> > >>>>>>>>>>>>> thanks a lot for the KIP! > >>>>>>>>>>>>> > >>>>>>>>>>>>> I think there are two issues we need to address: > >>>>>>>>>>>>> > >>>>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users > did > >>>>>>>>>>> complain > >>>>>>>>>>>>> about this in past releases already, and as the user base > >> grows, > >>>>> we > >>>>>>>>>>>>> should not break backward compatibility in future releases > >>>>> anymore. > >>>>>>>>>>>>> Thus, we should think of a better way to allow key access. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Mathieu's comment goes into the same direction > >>>>>>>>>>>>> > >>>>>>>>>>>>>>> On the other hand, the number of compile failures that > would > >>>>> need > >>>>>>>> to > >>>>>>>>>>>> be > >>>>>>>>>>>>>>> fixed from this change is unfortunate. :-) > >>>>>>>>>>>>> (2) Another concern is, that there is no guard to prevent > user > >>>>> code > >>>>>>>> to > >>>>>>>>>>>>> modify the key. This might corrupt partitioning if users do > >> alter > >>>>>> the > >>>>>>>>>>>>> key (accidentally -- or users are just not aware that they > are > >>>>> not > >>>>>>>>>>>>> allowed to modify the provided key object) and thus break the > >>>>>>>>>>>>> application. (This was the original motivation to not provide > >> the > >>>>>> key > >>>>>>>>>>> in > >>>>>>>>>>>>> the first place -- it's guards against modification.) > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote: > >>>>>>>>>>>>>> Hi Jeyhun, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I just want to add my voice that, I too, have wished for > >> access > >>>>> to > >>>>>>>>>>> the > >>>>>>>>>>>>>> record key during a mapValues or similar operation. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On the other hand, the number of compile failures that would > >>>>> need > >>>>>> to > >>>>>>>>>>> be > >>>>>>>>>>>>>> fixed from this change is unfortunate. :-) But at least it > >>>>> would > >>>>>>>> all > >>>>>>>>>>>> be > >>>>>>>>>>>>> a > >>>>>>>>>>>>>> pretty clear and easy change. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Mathieu > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov < > >>>>>>>> je.kari...@gmail.com > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> Dear community, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2], > >>>>>>>>>>> KAFKA-4726 > >>>>>>>>>>>>> [3], > >>>>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5]. > >>>>>>>>>>>>>>> I would like to get your comments. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+ > >>>>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner > >>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218 > >>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726 > >>>>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745 > >>>>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946 > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> -Cheers > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>> -Cheers > >>>>>>>>>>>> > >>>>>>>>>>>> Jeyhun > >>>>>>>>>>>> > >>>>>>>> -- > >>>>>>> -Cheers > >>>>>>> > >>>>>>> Jeyhun > >>>>>>> > >>>>>> > >> > >> -- > > -Cheers > > > > Jeyhun > > > > -- -Cheers Jeyhun