One correction:
and in runtime (inside processor) we still have to check it is ValueMapper > or ValueMapperWithKey before wrapping it into the rich function. this will be in compile time in API level. Cheers, Jeyhun On Tue, May 9, 2017 at 11:55 AM Jeyhun Karimov <je.kari...@gmail.com> wrote: > Hi, > > Thanks for feedback. > Then we need to overload method > <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> > mapper); > with > <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends > VR> mapper); > > and in runtime (inside processor) we still have to check it is ValueMapper > or ValueMapperWithKey before wrapping it into the rich function. > > > Please correct me if I am wrong. > > Cheers, > Jeyhun > > > > > On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki < > michal.borowie...@openbet.com> wrote: > >> +1 :) >> >> >> On 08/05/17 23:52, Matthias J. Sax wrote: >> > Hi, >> > >> > I was reading the updated KIP and I am wondering, if we should do the >> > design a little different. >> > >> > Instead of distinguishing between a RichFunction and non-RichFunction at >> > runtime level, we would use RichFunctions all the time. Thus, on the DSL >> > entry level, if a user provides a non-RichFunction, we wrap it by a >> > RichFunction that is fully implemented by Streams. This RichFunction >> > would just forward the call omitting the key: >> > >> > Just to sketch the idea (incomplete code snippet): >> > >> >> public StreamsRichValueMapper implements RichValueMapper() { >> >> private ValueMapper userProvidedMapper; // set by constructor >> >> >> >> public VR apply(final K key, final V1 value1, final V2 value2) { >> >> return userProvidedMapper(value1, value2); >> >> } >> >> } >> > >> > From a performance point of view, I am not sure if the >> > "if(isRichFunction)" including casts etc would have more overhead than >> > this approach (we would do more nested method call for non-RichFunction >> > which should be more common than RichFunctions). >> > >> > This approach should not effect lambdas (or do I miss something?) and >> > might be cleaner, as we could have one more top level interface >> > `RichFunction` with methods `init()` and `close()` and also interfaces >> > for `RichValueMapper` etc. (thus, no abstract classes required). >> > >> > >> > Any thoughts? >> > >> > >> > -Matthias >> > >> > >> > On 5/6/17 5:29 PM, Jeyhun Karimov wrote: >> >> Hi, >> >> >> >> Thanks for comments. I extended PR and KIP to include rich functions. I >> >> will still have to evaluate the cost of deep copying of keys. >> >> >> >> Cheers, >> >> Jeyhun >> >> >> >> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak < >> mathieu.fenn...@replicon.com> >> >> wrote: >> >> >> >>> Hey Matthias, >> >>> >> >>> My opinion would be that documenting the immutability of the key is >> the >> >>> best approach available. Better than requiring the key to be >> serializable >> >>> (as with Jeyhun's last pass at the PR), no performance risk. >> >>> >> >>> It'd be different if Java had immutable type constraints of some >> kind. :-) >> >>> >> >>> Mathieu >> >>> >> >>> >> >>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax < >> matth...@confluent.io> >> >>> wrote: >> >>> >> >>>> Agreed about RichFunction. If we follow this path, it should cover >> >>>> all(?) DSL interfaces. >> >>>> >> >>>> About guarding the key -- I am still not sure what to do about it... >> >>>> Maybe it might be enough to document it (and name the key parameter >> like >> >>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to >> >>>> guard against any modification, but I have no good idea how to do >> this. >> >>>> Not sure what others think about the risk of corrupted partitioning >> >>>> (what would be a user error and we could say, well, bad luck, you >> got a >> >>>> bug in your code, that's not our fault), vs deep copy with a >> potential >> >>>> performance hit (that we can't quantity atm without any performance >> >>> test). >> >>>> We do have a performance system test. Maybe it's worth for you to >> apply >> >>>> the deep copy strategy and run the test. It's very basic performance >> >>>> test only, but might give some insight. If you want to do this, look >> >>>> into folder "tests" for general test setup, and into >> >>>> "tests/kafaktests/benchmarks/streams" to find find the perf test. >> >>>> >> >>>> >> >>>> -Matthias >> >>>> >> >>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote: >> >>>>> Hi Matthias, >> >>>>> >> >>>>> I think extending KIP to include RichFunctions totally makes sense. >> >>> So, >> >>>>> we don't want to guard the keys because it is costly. >> >>>>> If we introduce RichFunctions I think it should not be limited only >> >>> the 3 >> >>>>> interfaces proposed in KIP but to wide range of interfaces. >> >>>>> Please correct me if I am wrong. >> >>>>> >> >>>>> Cheers, >> >>>>> Jeyhun >> >>>>> >> >>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax < >> matth...@confluent.io >> >>>>> wrote: >> >>>>> >> >>>>>> One follow up. There was this email on the user list: >> >>>>>> >> >>>>>> >> >>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj= >> >>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+ >> >>>>>> It might make sense so include Initializer, Adder, and Substractor >> >>>>>> inferface, too. >> >>>>>> >> >>>>>> And we should double check if there are other interface we might >> miss >> >>>> atm. >> >>>>>> >> >>>>>> -Matthias >> >>>>>> >> >>>>>> >> >>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote: >> >>>>>>> Thanks for updating the KIP. >> >>>>>>> >> >>>>>>> Deep copying the key will work for sure, but I am actually a >> little >> >>> bit >> >>>>>>> worried about performance impact... We might want to do some test >> to >> >>>>>>> quantify this impact. >> >>>>>>> >> >>>>>>> >> >>>>>>> Btw: this remind me about the idea of `RichFunction` interface >> that >> >>>>>>> would allow users to access record metadata (like timestamp, >> offset, >> >>>>>>> partition etc) within DSL. This would be a similar concept. Thus, >> I >> >>> am >> >>>>>>> wondering, if it would make sense to enlarge the scope of this >> KIP by >> >>>>>>> that? WDYT? >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> -Matthias >> >>>>>>> >> >>>>>>> >> >>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote: >> >>>>>>>> Hi Mathieu, >> >>>>>>>> >> >>>>>>>> Thanks for feedback. I followed similar approach and updated PR >> and >> >>>> KIP >> >>>>>>>> accordingly. I tried to guard the key in Processors sending a >> copy >> >>> of >> >>>> an >> >>>>>>>> actual key. >> >>>>>>>> Because I am doing deep copy of an object, I think memory can be >> >>>>>> bottleneck >> >>>>>>>> in some use-cases. >> >>>>>>>> >> >>>>>>>> Cheers, >> >>>>>>>> Jeyhun >> >>>>>>>> >> >>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak < >> >>>>>> mathieu.fenn...@replicon.com> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> Hi Jeyhun, >> >>>>>>>>> >> >>>>>>>>> This approach would change ValueMapper (...etc) to be classes, >> >>> rather >> >>>>>> than >> >>>>>>>>> interfaces, which is also a backwards incompatible change. An >> >>>>>> alternative >> >>>>>>>>> approach that would be backwards compatible would be to define >> new >> >>>>>>>>> interfaces, and provide overrides where those interfaces are >> used. >> >>>>>>>>> >> >>>>>>>>> Unfortunately, making the key parameter as "final" doesn't >> change >> >>>> much >> >>>>>>>>> about guarding against key change. It only prevents the >> parameter >> >>>>>> variable >> >>>>>>>>> from being reassigned. If the key type is a mutable object (eg. >> >>>>>> byte[]), >> >>>>>>>>> it can still be mutated. (eg. key[0] = 0). But I'm not really >> sure >> >>>>>> there's >> >>>>>>>>> much that can be done about that. >> >>>>>>>>> >> >>>>>>>>> Mathieu >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov < >> >>> je.kari...@gmail.com >> >>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Thanks for comments. >> >>>>>>>>>> >> >>>>>>>>>> The concerns makes sense. Although we can guard for immutable >> keys >> >>>> in >> >>>>>>>>>> current implementation (with few changes), I didn't consider >> >>>> backward >> >>>>>>>>>> compatibility. >> >>>>>>>>>> >> >>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user >> >>>> accesses >> >>>>>>>>> the >> >>>>>>>>>> key in Object type, as passing extra type parameter will break >> >>>>>>>>>> backwards-compatibility. So user has to cast to actual key >> type. >> >>>>>>>>>> >> >>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key >> and >> >>>>>> value) >> >>>>>>>>>> and force key to be *final*. By doing this, I think we can >> >>> address >> >>>>>> both >> >>>>>>>>>> backward-compatibility and guarding against key change. >> >>>>>>>>>> >> >>>>>>>>>> 2. Secondly, we can create class KeyAccess like: >> >>>>>>>>>> >> >>>>>>>>>> public class KeyAccess { >> >>>>>>>>>> Object key; >> >>>>>>>>>> public void beforeApply(final Object key) { >> >>>>>>>>>> this.key = key; >> >>>>>>>>>> } >> >>>>>>>>>> public Object getKey() { >> >>>>>>>>>> return key; >> >>>>>>>>>> } >> >>>>>>>>>> } >> >>>>>>>>>> >> >>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and *ValueTransformer* >> >>> from >> >>>>>>>>>> *KeyAccess*. Inside processor (for example >> >>>> *KTableMapValuesProcessor*) >> >>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by >> >>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use >> *getKey()* to >> >>>>>> access >> >>>>>>>>>> the key inside *apply(value)* method. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Cheers, >> >>>>>>>>>> Jeyhun >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax < >> >>>> matth...@confluent.io >> >>>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Jeyhun, >> >>>>>>>>>>> >> >>>>>>>>>>> thanks a lot for the KIP! >> >>>>>>>>>>> >> >>>>>>>>>>> I think there are two issues we need to address: >> >>>>>>>>>>> >> >>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users >> did >> >>>>>>>>> complain >> >>>>>>>>>>> about this in past releases already, and as the user base >> grows, >> >>> we >> >>>>>>>>>>> should not break backward compatibility in future releases >> >>> anymore. >> >>>>>>>>>>> Thus, we should think of a better way to allow key access. >> >>>>>>>>>>> >> >>>>>>>>>>> Mathieu's comment goes into the same direction >> >>>>>>>>>>> >> >>>>>>>>>>>>> On the other hand, the number of compile failures that would >> >>> need >> >>>>>> to >> >>>>>>>>>> be >> >>>>>>>>>>>>> fixed from this change is unfortunate. :-) >> >>>>>>>>>>> (2) Another concern is, that there is no guard to prevent user >> >>> code >> >>>>>> to >> >>>>>>>>>>> modify the key. This might corrupt partitioning if users do >> alter >> >>>> the >> >>>>>>>>>>> key (accidentally -- or users are just not aware that they are >> >>> not >> >>>>>>>>>>> allowed to modify the provided key object) and thus break the >> >>>>>>>>>>> application. (This was the original motivation to not provide >> the >> >>>> key >> >>>>>>>>> in >> >>>>>>>>>>> the first place -- it's guards against modification.) >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> -Matthias >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote: >> >>>>>>>>>>>> Hi Jeyhun, >> >>>>>>>>>>>> >> >>>>>>>>>>>> I just want to add my voice that, I too, have wished for >> access >> >>> to >> >>>>>>>>> the >> >>>>>>>>>>>> record key during a mapValues or similar operation. >> >>>>>>>>>>>> >> >>>>>>>>>>>> On the other hand, the number of compile failures that would >> >>> need >> >>>> to >> >>>>>>>>> be >> >>>>>>>>>>>> fixed from this change is unfortunate. :-) But at least it >> >>> would >> >>>>>> all >> >>>>>>>>>> be >> >>>>>>>>>>> a >> >>>>>>>>>>>> pretty clear and easy change. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Mathieu >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov < >> >>>>>> je.kari...@gmail.com >> >>>>>>>>>>> wrote: >> >>>>>>>>>>>>> Dear community, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2], >> >>>>>>>>> KAFKA-4726 >> >>>>>>>>>>> [3], >> >>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5]. >> >>>>>>>>>>>>> I would like to get your comments. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> [1] >> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> >>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+ >> >>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner >> >>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218 >> >>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726 >> >>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745 >> >>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946 >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Cheers, >> >>>>>>>>>>>>> Jeyhun >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> -- >> >>>>>>>>>>>>> -Cheers >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Jeyhun >> >>>>>>>>>>>>> >> >>>>>>>>>>> -- >> >>>>>>>>>> -Cheers >> >>>>>>>>>> >> >>>>>>>>>> Jeyhun >> >>>>>>>>>> >> >>>>>> -- >> >>>>> -Cheers >> >>>>> >> >>>>> Jeyhun >> >>>>> >> >>>> >> >> -- > -Cheers > > Jeyhun >