Hi, Thanks for feedback. Then we need to overload method <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper); with <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends VR> mapper);
and in runtime (inside processor) we still have to check it is ValueMapper or ValueMapperWithKey before wrapping it into the rich function. Please correct me if I am wrong. Cheers, Jeyhun On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki < michal.borowie...@openbet.com> wrote: > +1 :) > > > On 08/05/17 23:52, Matthias J. Sax wrote: > > Hi, > > > > I was reading the updated KIP and I am wondering, if we should do the > > design a little different. > > > > Instead of distinguishing between a RichFunction and non-RichFunction at > > runtime level, we would use RichFunctions all the time. Thus, on the DSL > > entry level, if a user provides a non-RichFunction, we wrap it by a > > RichFunction that is fully implemented by Streams. This RichFunction > > would just forward the call omitting the key: > > > > Just to sketch the idea (incomplete code snippet): > > > >> public StreamsRichValueMapper implements RichValueMapper() { > >> private ValueMapper userProvidedMapper; // set by constructor > >> > >> public VR apply(final K key, final V1 value1, final V2 value2) { > >> return userProvidedMapper(value1, value2); > >> } > >> } > > > > From a performance point of view, I am not sure if the > > "if(isRichFunction)" including casts etc would have more overhead than > > this approach (we would do more nested method call for non-RichFunction > > which should be more common than RichFunctions). > > > > This approach should not effect lambdas (or do I miss something?) and > > might be cleaner, as we could have one more top level interface > > `RichFunction` with methods `init()` and `close()` and also interfaces > > for `RichValueMapper` etc. (thus, no abstract classes required). > > > > > > Any thoughts? > > > > > > -Matthias > > > > > > On 5/6/17 5:29 PM, Jeyhun Karimov wrote: > >> Hi, > >> > >> Thanks for comments. I extended PR and KIP to include rich functions. I > >> will still have to evaluate the cost of deep copying of keys. > >> > >> Cheers, > >> Jeyhun > >> > >> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak < > mathieu.fenn...@replicon.com> > >> wrote: > >> > >>> Hey Matthias, > >>> > >>> My opinion would be that documenting the immutability of the key is the > >>> best approach available. Better than requiring the key to be > serializable > >>> (as with Jeyhun's last pass at the PR), no performance risk. > >>> > >>> It'd be different if Java had immutable type constraints of some kind. > :-) > >>> > >>> Mathieu > >>> > >>> > >>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax < > matth...@confluent.io> > >>> wrote: > >>> > >>>> Agreed about RichFunction. If we follow this path, it should cover > >>>> all(?) DSL interfaces. > >>>> > >>>> About guarding the key -- I am still not sure what to do about it... > >>>> Maybe it might be enough to document it (and name the key parameter > like > >>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to > >>>> guard against any modification, but I have no good idea how to do > this. > >>>> Not sure what others think about the risk of corrupted partitioning > >>>> (what would be a user error and we could say, well, bad luck, you got > a > >>>> bug in your code, that's not our fault), vs deep copy with a potential > >>>> performance hit (that we can't quantity atm without any performance > >>> test). > >>>> We do have a performance system test. Maybe it's worth for you to > apply > >>>> the deep copy strategy and run the test. It's very basic performance > >>>> test only, but might give some insight. If you want to do this, look > >>>> into folder "tests" for general test setup, and into > >>>> "tests/kafaktests/benchmarks/streams" to find find the perf test. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote: > >>>>> Hi Matthias, > >>>>> > >>>>> I think extending KIP to include RichFunctions totally makes sense. > >>> So, > >>>>> we don't want to guard the keys because it is costly. > >>>>> If we introduce RichFunctions I think it should not be limited only > >>> the 3 > >>>>> interfaces proposed in KIP but to wide range of interfaces. > >>>>> Please correct me if I am wrong. > >>>>> > >>>>> Cheers, > >>>>> Jeyhun > >>>>> > >>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax < > matth...@confluent.io > >>>>> wrote: > >>>>> > >>>>>> One follow up. There was this email on the user list: > >>>>>> > >>>>>> > >>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj= > >>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+ > >>>>>> It might make sense so include Initializer, Adder, and Substractor > >>>>>> inferface, too. > >>>>>> > >>>>>> And we should double check if there are other interface we might > miss > >>>> atm. > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote: > >>>>>>> Thanks for updating the KIP. > >>>>>>> > >>>>>>> Deep copying the key will work for sure, but I am actually a little > >>> bit > >>>>>>> worried about performance impact... We might want to do some test > to > >>>>>>> quantify this impact. > >>>>>>> > >>>>>>> > >>>>>>> Btw: this remind me about the idea of `RichFunction` interface that > >>>>>>> would allow users to access record metadata (like timestamp, > offset, > >>>>>>> partition etc) within DSL. This would be a similar concept. Thus, I > >>> am > >>>>>>> wondering, if it would make sense to enlarge the scope of this KIP > by > >>>>>>> that? WDYT? > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote: > >>>>>>>> Hi Mathieu, > >>>>>>>> > >>>>>>>> Thanks for feedback. I followed similar approach and updated PR > and > >>>> KIP > >>>>>>>> accordingly. I tried to guard the key in Processors sending a copy > >>> of > >>>> an > >>>>>>>> actual key. > >>>>>>>> Because I am doing deep copy of an object, I think memory can be > >>>>>> bottleneck > >>>>>>>> in some use-cases. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Jeyhun > >>>>>>>> > >>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak < > >>>>>> mathieu.fenn...@replicon.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Jeyhun, > >>>>>>>>> > >>>>>>>>> This approach would change ValueMapper (...etc) to be classes, > >>> rather > >>>>>> than > >>>>>>>>> interfaces, which is also a backwards incompatible change. An > >>>>>> alternative > >>>>>>>>> approach that would be backwards compatible would be to define > new > >>>>>>>>> interfaces, and provide overrides where those interfaces are > used. > >>>>>>>>> > >>>>>>>>> Unfortunately, making the key parameter as "final" doesn't change > >>>> much > >>>>>>>>> about guarding against key change. It only prevents the > parameter > >>>>>> variable > >>>>>>>>> from being reassigned. If the key type is a mutable object (eg. > >>>>>> byte[]), > >>>>>>>>> it can still be mutated. (eg. key[0] = 0). But I'm not really > sure > >>>>>> there's > >>>>>>>>> much that can be done about that. > >>>>>>>>> > >>>>>>>>> Mathieu > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov < > >>> je.kari...@gmail.com > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Thanks for comments. > >>>>>>>>>> > >>>>>>>>>> The concerns makes sense. Although we can guard for immutable > keys > >>>> in > >>>>>>>>>> current implementation (with few changes), I didn't consider > >>>> backward > >>>>>>>>>> compatibility. > >>>>>>>>>> > >>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user > >>>> accesses > >>>>>>>>> the > >>>>>>>>>> key in Object type, as passing extra type parameter will break > >>>>>>>>>> backwards-compatibility. So user has to cast to actual key > type. > >>>>>>>>>> > >>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key > and > >>>>>> value) > >>>>>>>>>> and force key to be *final*. By doing this, I think we can > >>> address > >>>>>> both > >>>>>>>>>> backward-compatibility and guarding against key change. > >>>>>>>>>> > >>>>>>>>>> 2. Secondly, we can create class KeyAccess like: > >>>>>>>>>> > >>>>>>>>>> public class KeyAccess { > >>>>>>>>>> Object key; > >>>>>>>>>> public void beforeApply(final Object key) { > >>>>>>>>>> this.key = key; > >>>>>>>>>> } > >>>>>>>>>> public Object getKey() { > >>>>>>>>>> return key; > >>>>>>>>>> } > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and *ValueTransformer* > >>> from > >>>>>>>>>> *KeyAccess*. Inside processor (for example > >>>> *KTableMapValuesProcessor*) > >>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by > >>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use *getKey()* > to > >>>>>> access > >>>>>>>>>> the key inside *apply(value)* method. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Jeyhun > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax < > >>>> matth...@confluent.io > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Jeyhun, > >>>>>>>>>>> > >>>>>>>>>>> thanks a lot for the KIP! > >>>>>>>>>>> > >>>>>>>>>>> I think there are two issues we need to address: > >>>>>>>>>>> > >>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users did > >>>>>>>>> complain > >>>>>>>>>>> about this in past releases already, and as the user base > grows, > >>> we > >>>>>>>>>>> should not break backward compatibility in future releases > >>> anymore. > >>>>>>>>>>> Thus, we should think of a better way to allow key access. > >>>>>>>>>>> > >>>>>>>>>>> Mathieu's comment goes into the same direction > >>>>>>>>>>> > >>>>>>>>>>>>> On the other hand, the number of compile failures that would > >>> need > >>>>>> to > >>>>>>>>>> be > >>>>>>>>>>>>> fixed from this change is unfortunate. :-) > >>>>>>>>>>> (2) Another concern is, that there is no guard to prevent user > >>> code > >>>>>> to > >>>>>>>>>>> modify the key. This might corrupt partitioning if users do > alter > >>>> the > >>>>>>>>>>> key (accidentally -- or users are just not aware that they are > >>> not > >>>>>>>>>>> allowed to modify the provided key object) and thus break the > >>>>>>>>>>> application. (This was the original motivation to not provide > the > >>>> key > >>>>>>>>> in > >>>>>>>>>>> the first place -- it's guards against modification.) > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote: > >>>>>>>>>>>> Hi Jeyhun, > >>>>>>>>>>>> > >>>>>>>>>>>> I just want to add my voice that, I too, have wished for > access > >>> to > >>>>>>>>> the > >>>>>>>>>>>> record key during a mapValues or similar operation. > >>>>>>>>>>>> > >>>>>>>>>>>> On the other hand, the number of compile failures that would > >>> need > >>>> to > >>>>>>>>> be > >>>>>>>>>>>> fixed from this change is unfortunate. :-) But at least it > >>> would > >>>>>> all > >>>>>>>>>> be > >>>>>>>>>>> a > >>>>>>>>>>>> pretty clear and easy change. > >>>>>>>>>>>> > >>>>>>>>>>>> Mathieu > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov < > >>>>>> je.kari...@gmail.com > >>>>>>>>>>> wrote: > >>>>>>>>>>>>> Dear community, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2], > >>>>>>>>> KAFKA-4726 > >>>>>>>>>>> [3], > >>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5]. > >>>>>>>>>>>>> I would like to get your comments. > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1] > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+ > >>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner > >>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218 > >>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726 > >>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745 > >>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946 > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> -Cheers > >>>>>>>>>>>>> > >>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>> -Cheers > >>>>>>>>>> > >>>>>>>>>> Jeyhun > >>>>>>>>>> > >>>>>> -- > >>>>> -Cheers > >>>>> > >>>>> Jeyhun > >>>>> > >>>> > > -- -Cheers Jeyhun