I agree that that `ProcessorContext` interface is too broad in general -- this is even true for transform/process, and it's also reflected in the API improvement list we want to do.
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions So I am wondering, if you question the `RichFunction` approach in general? Or if you suggest to either extend the scope of this KIP to include this---or maybe better, do another KIP for it and delay this KIP until the other one is done? -Matthias On 5/15/17 2:35 AM, Damian Guy wrote: > Thanks for the KIP. > > I'm not convinced on the `RichFunction` approach. Do we really want to give > every DSL method access to the `ProcessorContext` ? It has a bunch of > methods on it that seem in-appropriate for some of the DSL methods, i.e, > `register`, `getStateStore`, `forward`, `schedule` etc. It is far too > broad. I think it would be better to have a narrower interface like the > `RecordContext` - remembering it is easier to add methods/interfaces later > than to remove them > > On Sat, 13 May 2017 at 22:26 Matthias J. Sax <matth...@confluent.io> wrote: > >> Jeyhun, >> >> I am not an expert on Lambdas. Can you elaborate a little bit? I cannot >> follow the explanation in the KIP to see what the problem is. >> >> For updating the KIP title I don't know -- guess it's up to you. Maybe a >> committer can comment on this? >> >> >> Further comments: >> >> - The KIP get a little hard to read -- can you maybe reformat the wiki >> page a little bit? I think using `CodeBlock` would help. >> >> - What about KStream-KTable joins? You don't have overlaods added for >> them. Why? (Even if I still hope that we don't need to add any new >> overloads) >> >> - Why do we need `AbstractRichFunction`? >> >> - What about interfaces Initializer, ForeachAction, Merger, Predicate, >> Reducer? I don't want to say we should/need to add to all, but we should >> discuss all of them and add where it does make sense (e.g., >> RichForachAction does make sense IMHO) >> >> >> Btw: I like the hierarchy `ValueXX` -- `ValueXXWithKey` -- `RichValueXX` >> in general -- but why can't we do all this with interfaces only? >> >> >> >> -Matthias >> >> >> >> On 5/11/17 7:00 AM, Jeyhun Karimov wrote: >>> Hi, >>> >>> Thanks for your comments. I think we cannot extend the two interfaces if >> we >>> want to keep lambdas. I updated the KIP [1]. Maybe I should change the >>> title, because now we are not limiting the KIP to only ValueMapper, >>> ValueTransformer and ValueJoiner. >>> Please feel free to comment. >>> >>> [1] >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner >>> >>> >>> Cheers, >>> Jeyhun >>> >>> On Tue, May 9, 2017 at 7:36 PM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> If `ValueMapperWithKey` extends `ValueMapper` we don't need the new >>>> overlaod. >>>> >>>> And yes, we need to do one check -- but this happens when building the >>>> topology. At runtime (I mean after KafkaStream#start() we don't need any >>>> check as we will always use `ValueMapperWithKey`) >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 5/9/17 2:55 AM, Jeyhun Karimov wrote: >>>>> Hi, >>>>> >>>>> Thanks for feedback. >>>>> Then we need to overload method >>>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> >>>>> mapper); >>>>> with >>>>> <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends >>>> VR> >>>>> mapper); >>>>> >>>>> and in runtime (inside processor) we still have to check it is >>>> ValueMapper >>>>> or ValueMapperWithKey before wrapping it into the rich function. >>>>> >>>>> >>>>> Please correct me if I am wrong. >>>>> >>>>> Cheers, >>>>> Jeyhun >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki < >>>>> michal.borowie...@openbet.com> wrote: >>>>> >>>>>> +1 :) >>>>>> >>>>>> >>>>>> On 08/05/17 23:52, Matthias J. Sax wrote: >>>>>>> Hi, >>>>>>> >>>>>>> I was reading the updated KIP and I am wondering, if we should do the >>>>>>> design a little different. >>>>>>> >>>>>>> Instead of distinguishing between a RichFunction and non-RichFunction >>>> at >>>>>>> runtime level, we would use RichFunctions all the time. Thus, on the >>>> DSL >>>>>>> entry level, if a user provides a non-RichFunction, we wrap it by a >>>>>>> RichFunction that is fully implemented by Streams. This RichFunction >>>>>>> would just forward the call omitting the key: >>>>>>> >>>>>>> Just to sketch the idea (incomplete code snippet): >>>>>>> >>>>>>>> public StreamsRichValueMapper implements RichValueMapper() { >>>>>>>> private ValueMapper userProvidedMapper; // set by constructor >>>>>>>> >>>>>>>> public VR apply(final K key, final V1 value1, final V2 value2) { >>>>>>>> return userProvidedMapper(value1, value2); >>>>>>>> } >>>>>>>> } >>>>>>> >>>>>>> From a performance point of view, I am not sure if the >>>>>>> "if(isRichFunction)" including casts etc would have more overhead >> than >>>>>>> this approach (we would do more nested method call for >> non-RichFunction >>>>>>> which should be more common than RichFunctions). >>>>>>> >>>>>>> This approach should not effect lambdas (or do I miss something?) and >>>>>>> might be cleaner, as we could have one more top level interface >>>>>>> `RichFunction` with methods `init()` and `close()` and also >> interfaces >>>>>>> for `RichValueMapper` etc. (thus, no abstract classes required). >>>>>>> >>>>>>> >>>>>>> Any thoughts? >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote: >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thanks for comments. I extended PR and KIP to include rich >> functions. >>>> I >>>>>>>> will still have to evaluate the cost of deep copying of keys. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Jeyhun >>>>>>>> >>>>>>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak < >>>>>> mathieu.fenn...@replicon.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey Matthias, >>>>>>>>> >>>>>>>>> My opinion would be that documenting the immutability of the key is >>>> the >>>>>>>>> best approach available. Better than requiring the key to be >>>>>> serializable >>>>>>>>> (as with Jeyhun's last pass at the PR), no performance risk. >>>>>>>>> >>>>>>>>> It'd be different if Java had immutable type constraints of some >>>> kind. >>>>>> :-) >>>>>>>>> >>>>>>>>> Mathieu >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax < >>>>>> matth...@confluent.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Agreed about RichFunction. If we follow this path, it should cover >>>>>>>>>> all(?) DSL interfaces. >>>>>>>>>> >>>>>>>>>> About guarding the key -- I am still not sure what to do about >> it... >>>>>>>>>> Maybe it might be enough to document it (and name the key >> parameter >>>>>> like >>>>>>>>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer >> to >>>>>>>>>> guard against any modification, but I have no good idea how to do >>>>>> this. >>>>>>>>>> Not sure what others think about the risk of corrupted >> partitioning >>>>>>>>>> (what would be a user error and we could say, well, bad luck, you >>>> got >>>>>> a >>>>>>>>>> bug in your code, that's not our fault), vs deep copy with a >>>> potential >>>>>>>>>> performance hit (that we can't quantity atm without any >> performance >>>>>>>>> test). >>>>>>>>>> We do have a performance system test. Maybe it's worth for you to >>>>>> apply >>>>>>>>>> the deep copy strategy and run the test. It's very basic >> performance >>>>>>>>>> test only, but might give some insight. If you want to do this, >> look >>>>>>>>>> into folder "tests" for general test setup, and into >>>>>>>>>> "tests/kafaktests/benchmarks/streams" to find find the perf test. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote: >>>>>>>>>>> Hi Matthias, >>>>>>>>>>> >>>>>>>>>>> I think extending KIP to include RichFunctions totally makes >>>> sense. >>>>>>>>> So, >>>>>>>>>>> we don't want to guard the keys because it is costly. >>>>>>>>>>> If we introduce RichFunctions I think it should not be limited >> only >>>>>>>>> the 3 >>>>>>>>>>> interfaces proposed in KIP but to wide range of interfaces. >>>>>>>>>>> Please correct me if I am wrong. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Jeyhun >>>>>>>>>>> >>>>>>>>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax < >>>>>> matth...@confluent.io >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> One follow up. There was this email on the user list: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj= >>>>>>>>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+ >>>>>>>>>>>> It might make sense so include Initializer, Adder, and >> Substractor >>>>>>>>>>>> inferface, too. >>>>>>>>>>>> >>>>>>>>>>>> And we should double check if there are other interface we might >>>>>> miss >>>>>>>>>> atm. >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote: >>>>>>>>>>>>> Thanks for updating the KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> Deep copying the key will work for sure, but I am actually a >>>> little >>>>>>>>> bit >>>>>>>>>>>>> worried about performance impact... We might want to do some >> test >>>>>> to >>>>>>>>>>>>> quantify this impact. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Btw: this remind me about the idea of `RichFunction` interface >>>> that >>>>>>>>>>>>> would allow users to access record metadata (like timestamp, >>>>>> offset, >>>>>>>>>>>>> partition etc) within DSL. This would be a similar concept. >>>> Thus, I >>>>>>>>> am >>>>>>>>>>>>> wondering, if it would make sense to enlarge the scope of this >>>> KIP >>>>>> by >>>>>>>>>>>>> that? WDYT? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote: >>>>>>>>>>>>>> Hi Mathieu, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for feedback. I followed similar approach and updated >> PR >>>>>> and >>>>>>>>>> KIP >>>>>>>>>>>>>> accordingly. I tried to guard the key in Processors sending a >>>> copy >>>>>>>>> of >>>>>>>>>> an >>>>>>>>>>>>>> actual key. >>>>>>>>>>>>>> Because I am doing deep copy of an object, I think memory can >> be >>>>>>>>>>>> bottleneck >>>>>>>>>>>>>> in some use-cases. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak < >>>>>>>>>>>> mathieu.fenn...@replicon.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Jeyhun, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This approach would change ValueMapper (...etc) to be >> classes, >>>>>>>>> rather >>>>>>>>>>>> than >>>>>>>>>>>>>>> interfaces, which is also a backwards incompatible change. >> An >>>>>>>>>>>> alternative >>>>>>>>>>>>>>> approach that would be backwards compatible would be to >> define >>>>>> new >>>>>>>>>>>>>>> interfaces, and provide overrides where those interfaces are >>>>>> used. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Unfortunately, making the key parameter as "final" doesn't >>>> change >>>>>>>>>> much >>>>>>>>>>>>>>> about guarding against key change. It only prevents the >>>>>> parameter >>>>>>>>>>>> variable >>>>>>>>>>>>>>> from being reassigned. If the key type is a mutable object >>>> (eg. >>>>>>>>>>>> byte[]), >>>>>>>>>>>>>>> it can still be mutated. (eg. key[0] = 0). But I'm not >> really >>>>>> sure >>>>>>>>>>>> there's >>>>>>>>>>>>>>> much that can be done about that. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Mathieu >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov < >>>>>>>>> je.kari...@gmail.com >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for comments. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The concerns makes sense. Although we can guard for >> immutable >>>>>> keys >>>>>>>>>> in >>>>>>>>>>>>>>>> current implementation (with few changes), I didn't consider >>>>>>>>>> backward >>>>>>>>>>>>>>>> compatibility. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In this case 2 solutions come to my mind. In both cases, >> user >>>>>>>>>> accesses >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> key in Object type, as passing extra type parameter will >> break >>>>>>>>>>>>>>>> backwards-compatibility. So user has to cast to actual key >>>>>> type. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument >> (key >>>>>> and >>>>>>>>>>>> value) >>>>>>>>>>>>>>>> and force key to be *final*. By doing this, I think we can >>>>>>>>> address >>>>>>>>>>>> both >>>>>>>>>>>>>>>> backward-compatibility and guarding against key change. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2. Secondly, we can create class KeyAccess like: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public class KeyAccess { >>>>>>>>>>>>>>>> Object key; >>>>>>>>>>>>>>>> public void beforeApply(final Object key) { >>>>>>>>>>>>>>>> this.key = key; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> public Object getKey() { >>>>>>>>>>>>>>>> return key; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and >>>> *ValueTransformer* >>>>>>>>> from >>>>>>>>>>>>>>>> *KeyAccess*. Inside processor (for example >>>>>>>>>> *KTableMapValuesProcessor*) >>>>>>>>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by >>>>>>>>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use >>>> *getKey()* >>>>>> to >>>>>>>>>>>> access >>>>>>>>>>>>>>>> the key inside *apply(value)* method. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax < >>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Jeyhun, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> thanks a lot for the KIP! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think there are two issues we need to address: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users >>>> did >>>>>>>>>>>>>>> complain >>>>>>>>>>>>>>>>> about this in past releases already, and as the user base >>>>>> grows, >>>>>>>>> we >>>>>>>>>>>>>>>>> should not break backward compatibility in future releases >>>>>>>>> anymore. >>>>>>>>>>>>>>>>> Thus, we should think of a better way to allow key access. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Mathieu's comment goes into the same direction >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On the other hand, the number of compile failures that >>>> would >>>>>>>>> need >>>>>>>>>>>> to >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> fixed from this change is unfortunate. :-) >>>>>>>>>>>>>>>>> (2) Another concern is, that there is no guard to prevent >>>> user >>>>>>>>> code >>>>>>>>>>>> to >>>>>>>>>>>>>>>>> modify the key. This might corrupt partitioning if users do >>>>>> alter >>>>>>>>>> the >>>>>>>>>>>>>>>>> key (accidentally -- or users are just not aware that they >>>> are >>>>>>>>> not >>>>>>>>>>>>>>>>> allowed to modify the provided key object) and thus break >> the >>>>>>>>>>>>>>>>> application. (This was the original motivation to not >> provide >>>>>> the >>>>>>>>>> key >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> the first place -- it's guards against modification.) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote: >>>>>>>>>>>>>>>>>> Hi Jeyhun, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I just want to add my voice that, I too, have wished for >>>>>> access >>>>>>>>> to >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> record key during a mapValues or similar operation. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On the other hand, the number of compile failures that >> would >>>>>>>>> need >>>>>>>>>> to >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> fixed from this change is unfortunate. :-) But at least >> it >>>>>>>>> would >>>>>>>>>>>> all >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> pretty clear and easy change. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Mathieu >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov < >>>>>>>>>>>> je.kari...@gmail.com >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> Dear community, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 >> [2], >>>>>>>>>>>>>>> KAFKA-4726 >>>>>>>>>>>>>>>>> [3], >>>>>>>>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5]. >>>>>>>>>>>>>>>>>>> I would like to get your comments. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+ >>>>>>>>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner >>>>>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218 >>>>>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726 >>>>>>>>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745 >>>>>>>>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> -Cheers >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> -Cheers >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>> -Cheers >>>>>>>>>>> >>>>>>>>>>> Jeyhun >>>>>>>>>>> >>>>>>>>>> >>>>>> >>>>>> -- >>>>> -Cheers >>>>> >>>>> Jeyhun >>>>> >>>> >>>> -- >>> -Cheers >>> >>> Jeyhun >>> >> >> >
signature.asc
Description: OpenPGP digital signature