Interesting. I thought that https://issues.apache.org/jira/browse/KAFKA-4125 is the main motivation for this KIP :)
I also think, that we should not expose the full ProcessorContext at DSL level. Thus, overall I am not even sure if we should fix KAFKA-3907 at all. Manual commits are something DSL users should not worry about -- and if one really needs this, an advanced user can still insert a dummy `transform` to request a commit from there. -Matthias On 10/18/17 5:39 AM, Jeyhun Karimov wrote: > Hi, > > The main intuition is to solve [1], which is part of this KIP. > I agree with you that this might not seem semantically correct as we are > not committing record state. > Alternatively, we can remove commit() from RecordContext and add > ProcessorContext (which has commit() method) as an extra argument to Rich > methods: > > instead of > public interface RichValueMapper<V, VR, K> { > VR apply(final V value, > final K key, > final RecordContext recordContext); > } > > we can adopt > > public interface RichValueMapper<V, VR, K> { > VR apply(final V value, > final K key, > final RecordContext recordContext, > final ProcessorContext processorContext); > } > > > However, in this case, a user can get confused as ProcessorContext and > RecordContext share some methods with the same name. > > > Cheers, > Jeyhun > > > [1] https://issues.apache.org/jira/browse/KAFKA-3907 > > > On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang <wangg...@gmail.com> wrote: > >> Regarding #6 above, I'm still not clear why we would need `commit()` in >> both ProcessorContext and RecordContext, could you elaborate a bit more? >> >> To me `commit()` is really a processor context not a record context >> logically: when you call that function, it means we would commit the state >> of the whole task up to this processed record, not only that single record >> itself. >> >> >> Guozhang >> >> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov <je.kari...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Thanks for the feedback. >>> >>> >>> 0. RichInitializer definition seems missing. >>> >>> >>> >>> - Fixed. >>> >>> >>> I'd suggest moving the key parameter in the RichValueXX and RichReducer >>>> after the value parameters, as well as in the templates; e.g. >>>> public interface RichValueJoiner<V1, V2, VR, K> { >>>> VR apply(final V1 value1, final V2 value2, final K key, final >>>> RecordContext >>>> recordContext); >>>> } >>> >>> >>> >>> - Fixed. >>> >>> >>> 2. Some of the listed functions are not necessary since their pairing >> APIs >>>> are being deprecated in 1.0 already: >>>> <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, >> ? >>>> super V, KR> selector, >>>> final Serde<KR> keySerde, >>>> final Serde<V> valSerde); >>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, >>>> final RichValueJoiner<? super K, ? >> super >>>> V, >>>> ? super VT, ? extends VR> joiner, >>>> final Serde<K> keySerde, >>>> final Serde<V> valSerde); >>> >>> >>> -Fixed >>> >>> 3. For a few functions where we are adding three APIs for a combo of both >>>> mapper / joiner, or both initializer / aggregator, or adder / >> subtractor, >>>> I'm wondering if we can just keep one that use "rich" functions for >> both; >>>> so that we can have less overloads and let users who only want to >> access >>>> one of them to just use dummy parameter declarations. For example: >>>> >>>> <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> >> globalKTable, >>>> final RichKeyValueMapper<? super K, ? >>>> super >>>> V, ? extends GK> keyValueMapper, >>>> final RichValueJoiner<? super K, ? >> super >>>> V, >>>> ? super GV, ? extends RV> joiner); >>> >>> >>> >>> -Agreed. Fixed. >>> >>> >>> 4. For TimeWindowedKStream, I'm wondering why we do not make its >>>> Initializer also "rich" functions? I.e. >>> >>> >>> - It was a typo. Fixed. >>> >>> >>> 5. We need to move "RecordContext" from o.a.k.processor.internals to >>>> o.a.k.processor. >>>> >>>> 6. I'm not clear why we want to move `commit()` from ProcessorContext >> to >>>> RecordContext? >>>> >>> >>> - >>> Because it makes sense logically and to reduce code maintenance (both >>> interfaces have offset() timestamp() topic() partition() methods), I >>> inherit ProcessorContext from RecordContext. >>> Since we need commit() method both in ProcessorContext and in >> RecordContext >>> I move commit() method to parent class (RecordContext). >>> >>> >>> Cheers, >>> Jeyhun >>> >>> >>> >>> On Wed, Oct 11, 2017 at 12:59 AM, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> >>>> Jeyhun, >>>> >>>> Thanks for the updated KIP, here are my comments. >>>> >>>> 0. RichInitializer definition seems missing. >>>> >>>> 1. I'd suggest moving the key parameter in the RichValueXX and >>> RichReducer >>>> after the value parameters, as well as in the templates; e.g. >>>> >>>> public interface RichValueJoiner<V1, V2, VR, K> { >>>> VR apply(final V1 value1, final V2 value2, final K key, final >>>> RecordContext >>>> recordContext); >>>> } >>>> >>>> My motivation is that for lambda expression in J8, users that would not >>>> care about the key but only the context, or vice versa, is likely to >>> write >>>> it as (value1, value2, dummy, context) -> ... than putting the dummy at >>> the >>>> beginning of the parameter list. Generally speaking we'd like to make >> all >>>> the "necessary" parameters prior to optional ones. >>>> >>>> >>>> 2. Some of the listed functions are not necessary since their pairing >>> APIs >>>> are being deprecated in 1.0 already: >>>> >>>> <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, >> ? >>>> super V, KR> selector, >>>> final Serde<KR> keySerde, >>>> final Serde<V> valSerde); >>>> >>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, >>>> final RichValueJoiner<? super K, ? >> super >>>> V, >>>> ? super VT, ? extends VR> joiner, >>>> final Serde<K> keySerde, >>>> final Serde<V> valSerde); >>>> >>>> >>>> >>>> 3. For a few functions where we are adding three APIs for a combo of >> both >>>> mapper / joiner, or both initializer / aggregator, or adder / >> subtractor, >>>> I'm wondering if we can just keep one that use "rich" functions for >> both; >>>> so that we can have less overloads and let users who only want to >> access >>>> one of them to just use dummy parameter declarations. For example: >>>> >>>> >>>> <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> >> globalKTable, >>>> final RichKeyValueMapper<? super K, ? >>>> super >>>> V, ? extends GK> keyValueMapper, >>>> final RichValueJoiner<? super K, ? >> super >>>> V, >>>> ? super GV, ? extends RV> joiner); >>>> >>>> <VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer, >>>> final RichAggregator<? super K, ? super V, >>> VR> >>>> aggregator, >>>> final Materialized<K, VR, >>> KeyValueStore<Bytes, >>>> byte[]>> materialized); >>>> >>>> Similarly for KGroupedTable, a bunch of aggregate() are deprecated so >> we >>> do >>>> not need to add its rich functions any more. >>>> >>>> >>>> 4. For TimeWindowedKStream, I'm wondering why we do not make its >>>> Initializer also "rich" functions? I.e. >>>> >>>> <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> >>>> initializer, >>>> final RichAggregator<? super K, >> ? >>>> super V, VR> aggregator); >>>> <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> >>>> initializer, >>>> final RichAggregator<? super K, >> ? >>>> super V, VR> aggregator, >>>> final Materialized<K, VR, >>>> WindowStore<Bytes, byte[]>> materialized); >>>> >>>> >>>> 5. We need to move "RecordContext" from o.a.k.processor.internals to >>>> o.a.k.processor. >>>> >>>> 6. I'm not clear why we want to move `commit()` from ProcessorContext >> to >>>> RecordContext? Conceptually I think it would better staying in the >>>> ProcessorContext. Do you find this not doable in the internal >>>> implementations? >>>> >>>> >>>> Guozhang >>>> >>>> >>>> >>>> On Fri, Sep 22, 2017 at 1:09 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> recordContext = new RecordContext() { // >> recordContext >>>>> initialization is added in this KIP >>>>> >>>>> This code snippet seems to be standard - would it make sense to pull >> it >>>>> into a (sample) RecordContext implementation ? >>>>> >>>>> Cheers >>>>> >>>>> On Fri, Sep 22, 2017 at 12:14 PM, Jeyhun Karimov < >> je.kari...@gmail.com >>>> >>>>> wrote: >>>>> >>>>>> Hi Ted, >>>>>> >>>>>> Thanks for your comments. I added a couple of comments in KIP to >>>> clarify >>>>>> some points. >>>>>> >>>>>> >>>>>> bq. provides a hybrd solution >>>>>>> Typo in hybrid. >>>>>> >>>>>> >>>>>> - My bad. Thanks for the correction. >>>>>> >>>>>> It would be nice if you can name some Value operator as examples. >>>>>> >>>>>> >>>>>>> >>>>>> - I added the corresponding interface names to KIP. >>>>>> >>>>>> >>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, >>>>>>> final Aggregator<? super K, ? super >> V, >>>> VR> >>>>>>> adder, >>>>>>> The adder doesn't need to be RichAggregator ? >>>>>> >>>>>> >>>>>> >>>>>> - Exactly. However, there are 2 Aggregator-type arguments in the >>>> related >>>>>> method. So, I had to overload all possible their Rich counterparts: >>>>>> >>>>>> // adder with non-rich, subtrctor is rich >>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, >>>>>> final Aggregator<? super K, ? super V, >>> VR> >>>>>> adder, >>>>>> final RichAggregator<? super K, ? >> super >>> V, >>>>> VR> >>>>>> subtractor, >>>>>> final Materialized<K, VR, >>>>> KeyValueStore<Bytes, >>>>>> byte[]>> materialized); >>>>>> >>>>>> // adder withrich, subtrctor is non-rich >>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, >>>>>> final RichAggregator<? super K, ? >> super >>> V, >>>>> VR> >>>>>> adder, >>>>>> final Aggregator<? super K, ? super V, >>> VR> >>>>>> subtractor, >>>>>> final Materialized<K, VR, >>>>> KeyValueStore<Bytes, >>>>>> byte[]>> materialized); >>>>>> >>>>>> // both adder and subtractor are rich >>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, >>>>>> final RichAggregator<? super K, ? >> super >>> V, >>>>> VR> >>>>>> adder, >>>>>> final RichAggregator<? super K, ? >> super >>> V, >>>>> VR> >>>>>> subtractor, >>>>>> final Materialized<K, VR, >>>>> KeyValueStore<Bytes, >>>>>> byte[]>> materialized); >>>>>> >>>>>> >>>>>> Can you explain a bit about the above implementation ? >>>>>>> void commit () { >>>>>>> throw new UnsupportedOperationException("commit() is not >>>>> supported >>>>>> in >>>>>>> this context"); >>>>>>> Is the exception going to be replaced with real code in the PR ? >>>>>> >>>>>> >>>>>> >>>>>> - I added some comments both inside and outside the code snippets >> in >>>> KIP. >>>>>> Specifically, for the code snippet above, we add *commit()* method >> to >>>>>> *RecordContext* interface. >>>>>> However, we want *commit()* method to be used only for >>> *RecordContext* >>>>>> instances (at least for now), so we add >> UnsupportedOperationException >>>> in >>>>>> all classes/interfaces that extend/implement *RecordContext.* >>>>>> In general, 1) we make RecordContext publicly available within >>>>>> ProcessorContext, 2) initialize its instance within all required >>>>>> Processors and 3) pass it as an argument to the related Rich >>> interfaces >>>>>> inside Processors. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Cheers, >>>>>> Jeyhun >>>>>> >>>>>> On Fri, Sep 22, 2017 at 6:44 PM Ted Yu <yuzhih...@gmail.com> >> wrote: >>>>>> >>>>>>> bq. provides a hybrd solution >>>>>>> >>>>>>> Typo in hybrid. >>>>>>> >>>>>>> bq. accessing read-only keys within XXXValues operators >>>>>>> >>>>>>> It would be nice if you can name some Value operator as examples. >>>>>>> >>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, >>>>>>> final Aggregator<? super K, ? super >> V, >>>> VR> >>>>>>> adder, >>>>>>> >>>>>>> The adder doesn't need to be RichAggregator ? >>>>>>> >>>>>>> public RecordContext recordContext() { >>>>>>> return this.recordContext(); >>>>>>> >>>>>>> Can you explain a bit about the above implementation ? >>>>>>> >>>>>>> void commit () { >>>>>>> throw new UnsupportedOperationException("commit() is not >>>>> supported >>>>>> in >>>>>>> this context"); >>>>>>> >>>>>>> Is the exception going to be replaced with real code in the PR ? >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> >>>>>>> On Fri, Sep 22, 2017 at 9:28 AM, Jeyhun Karimov < >>>> je.kari...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Dear community, >>>>>>>> >>>>>>>> I updated the related KIP [1]. Please feel free to comment. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Jeyhun >>>>>>>> >>>>>>>> [1] >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 22, 2017 at 12:20 AM Jeyhun Karimov < >>>>> je.kari...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Damian, >>>>>>>>> >>>>>>>>> Thanks for the update. I working on it and will provide an >>> update >>>>>> soon. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Jeyhun >>>>>>>>> >>>>>>>>> On Thu, Sep 21, 2017 at 4:50 PM Damian Guy < >>> damian....@gmail.com >>>>> >>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Jeyhun, >>>>>>>>>> >>>>>>>>>> All KIP-182 API PRs have now been merged. So you can >> consider >>> it >>>>> as >>>>>>>>>> stable. >>>>>>>>>> Thanks, >>>>>>>>>> Damian >>>>>>>>>> >>>>>>>>>> On Thu, 21 Sep 2017 at 15:23 Jeyhun Karimov < >>>> je.kari...@gmail.com >>>>>> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> Thanks a lot for your comments. For the single interface >>>>> (RichXXX >>>>>>> and >>>>>>>>>>> XXXWithKey) solution, I have already submitted a PR but >>>> probably >>>>>> it >>>>>>> is >>>>>>>>>>> outdated (when the KIP first proposed), I need to revisit >>> that >>>>>> one. >>>>>>>>>>> >>>>>>>>>>> @Guozhang, from our (offline) discussion, I understood >> that >>> we >>>>> may >>>>>>> not >>>>>>>>>> make >>>>>>>>>>> it merge this KIP into the upcoming release, as KIP-159 is >>> not >>>>>> voted >>>>>>>> yet >>>>>>>>>>> (because we want both KIP-149 and KIP-159 to be as an >>> "atomic" >>>>>>> merge). >>>>>>>>>> So >>>>>>>>>>> I decided to wait until KIP-182 gets stable (there are >> some >>>>> minor >>>>>>>>>> updates >>>>>>>>>>> AFAIK) and update the KIP accordingly. Please correct me >> if >>> I >>>> am >>>>>>> wrong >>>>>>>>>> or I >>>>>>>>>>> misunderstood. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Jeyhun >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Sep 21, 2017 at 4:11 PM Damian Guy < >>>>> damian....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> +1 >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 21 Sep 2017 at 13:46 Guozhang Wang < >>>>> wangg...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> +1 for me as well for collapsing. >>>>>>>>>>>>> >>>>>>>>>>>>> Jeyhun, could you update the wiki accordingly to show >>>> what's >>>>>> the >>>>>>>>>> final >>>>>>>>>>>>> updates post KIP-182 that needs to be done in KIP-159 >>>>>> including >>>>>>>>>>> KIP-149? >>>>>>>>>>>>> The child page I made is just a suggestion, but you >>> would >>>>>> still >>>>>>>>>> need to >>>>>>>>>>>>> update your proposal for people to comment and vote >> on. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Guozhang >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Sep 14, 2017 at 10:37 PM, Ted Yu < >>>>> yuzhih...@gmail.com >>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> +1 >>>>>>>>>>>>>> >>>>>>>>>>>>>> One interface is cleaner. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Sep 14, 2017 at 7:26 AM, Bill Bejeck < >>>>>>> bbej...@gmail.com >>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> +1 for me on collapsing the RichXXXX and >>>>> ValueXXXXWithKey >>>>>>>>>>> interfaces >>>>>>>>>>>>>> into 1 >>>>>>>>>>>>>>> interface. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Sep 13, 2017 at 11:31 AM, Jeyhun Karimov < >>>>>>>>>>>> je.kari...@gmail.com >>>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Damian, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your feedback. Actually, this (what >> you >>>>>>> propose) >>>>>>>>>> was >>>>>>>>>>> the >>>>>>>>>>>>>> first >>>>>>>>>>>>>>>> idea of KIP-149. Then we decided to divide it >> into >>>> two >>>>>>>> KIPs. I >>>>>>>>>>> also >>>>>>>>>>>>>>>> expressed my opinion that keeping the two >>> interfaces >>>>>> (Rich >>>>>>>> and >>>>>>>>>>>>> withKey) >>>>>>>>>>>>>>>> separate would add more overloads. So, email >>>>> discussion >>>>>>>>>> resulted >>>>>>>>>>>> that >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>> would not be a problem. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Our initial idea was similar to : >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public abstract class RichValueMapper<K, V, VR> >>>>>>> implements >>>>>>>>>>>>>>>> ValueMapperWithKey<K, V, VR>, RichFunction { >>>>>>>>>>>>>>>> ...... >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So, we check the type of object, whether it is >>>> RichXXX >>>>>> or >>>>>>>>>>>> XXXWithKey >>>>>>>>>>>>>>> inside >>>>>>>>>>>>>>>> the called method and continue accordingly. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If this is ok with the community, I would like >> to >>>>> revert >>>>>>> the >>>>>>>>>>>> current >>>>>>>>>>>>>>> design >>>>>>>>>>>>>>>> to this again. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Sep 13, 2017 at 3:02 PM Damian Guy < >>>>>>>>>> damian....@gmail.com >>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Jeyhun, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for sending out the update. I guess i >> was >>>>>>> thinking >>>>>>>>>> more >>>>>>>>>>>>> along >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> lines of option 2 where we collapse the >> RichXXXX >>>> and >>>>>>>>>>>>> ValueXXXXWithKey >>>>>>>>>>>>>>> etc >>>>>>>>>>>>>>>>> interfaces into 1 interface that has all of >> the >>>>>>>> arguments. I >>>>>>>>>>>> think >>>>>>>>>>>>> we >>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>> only need to add one additional overload for >>> each >>>>>>>> operator? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, 13 Sep 2017 at 10:59 Jeyhun Karimov < >>>>>>>>>>>> je.kari...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Dear all, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I would like to resume the discussion on >>>> KIP-159. >>>>> I >>>>>>> (and >>>>>>>>>>>>> Guozhang) >>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> that releasing KIP-149 and KIP-159 in the >> same >>>>>> release >>>>>>>>>> would >>>>>>>>>>>> make >>>>>>>>>>>>>>> sense >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> avoid a release with "partial" public APIs. >>>> There >>>>>> is a >>>>>>>> KIP >>>>>>>>>>> [1] >>>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>> Guozhang (and approved by me) to unify both >>>> KIPs. >>>>>>>>>>>>>>>>>> Please feel free to comment on this. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://cwiki.apache.org/ >>>> confluence/pages/viewpage. >>>>>>>>>>>>>>>> action?pageId=73637757 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Jul 21, 2017 at 2:00 AM Jeyhun >>> Karimov < >>>>>>>>>>>>>> je.kari...@gmail.com >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Matthias, Damian, all, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for your comments and sorry for >>>>> super-late >>>>>>>>>> update. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Sure, the DSL refactoring is not blocking >>> for >>>>> this >>>>>>>> KIP. >>>>>>>>>>>>>>>>>>> I made some changes to KIP document based >> on >>>> my >>>>>>>>>> prototype. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Please feel free to comment. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Jul 7, 2017 at 9:35 PM Matthias J. >>>> Sax < >>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I would not block this KIP with regard to >>> DSL >>>>>>>>>> refactoring. >>>>>>>>>>>>> IMHO, >>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>> just finish this one and the DSL >>> refactoring >>>>> will >>>>>>>> help >>>>>>>>>>> later >>>>>>>>>>>>> on >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> reduce the number of overloads. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 7/7/17 5:28 AM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>>>>>>> I am following the related thread in >> the >>>>>> mailing >>>>>>>> list >>>>>>>>>>> and >>>>>>>>>>>>>>> looking >>>>>>>>>>>>>>>>>>>> forward >>>>>>>>>>>>>>>>>>>>> for one-shot solution for overloads >>> issue. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Fri, Jul 7, 2017 at 10:32 AM Damian >>> Guy >>>> < >>>>>>>>>>>>>>> damian....@gmail.com> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Jeyhun, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> About overrides, what other >> alternatives >>>> do >>>>> we >>>>>>>> have? >>>>>>>>>>> For >>>>>>>>>>>>>>>>>>>>>>> backwards-compatibility we have to >> add >>>>> extra >>>>>>>>>> methods >>>>>>>>>>> to >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>>>>> ones. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> It wasn't clear to me in the KIP if >>> these >>>>> are >>>>>>> new >>>>>>>>>>> methods >>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>> replacing >>>>>>>>>>>>>>>>>>>>>> existing ones. >>>>>>>>>>>>>>>>>>>>>> Also, we are currently discussing >>> options >>>>> for >>>>>>>>>> replacing >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> overrides. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> About ProcessorContext vs >>> RecordContext, >>>>> you >>>>>>> are >>>>>>>>>>> right. >>>>>>>>>>>> I >>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>> need to >>>>>>>>>>>>>>>>>>>>>>> implement a prototype to understand >> the >>>>> full >>>>>>>>>> picture >>>>>>>>>>> as >>>>>>>>>>>>> some >>>>>>>>>>>>>>>> parts >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> KIP might not be as straightforward >> as >>> I >>>>>>> thought. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 5, 2017 at 10:40 AM >> Damian >>>> Guy >>>>> < >>>>>>>>>>>>>>>> damian....@gmail.com> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> HI Jeyhun, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Is the intention that these methods >>> are >>>>> new >>>>>>>>>> overloads >>>>>>>>>>>> on >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> KStream, >>>>>>>>>>>>>>>>>>>>>>>> KTable, etc? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> It is worth noting that a >>>> ProcessorContext >>>>>> is >>>>>>>> not >>>>>>>>>> a >>>>>>>>>>>>>>>>> RecordContext. >>>>>>>>>>>>>>>>>> A >>>>>>>>>>>>>>>>>>>>>>>> RecordContext, as it stands, only >>> exists >>>>>>> during >>>>>>>>>> the >>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>>>>>>>> single >>>>>>>>>>>>>>>>>>>>>>>> record. Whereas the ProcessorContext >>>>> exists >>>>>>> for >>>>>>>>>> the >>>>>>>>>>>>>> lifetime >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> Processor. Sot it doesn't make sense >>> to >>>>>> cast a >>>>>>>>>>>>>>> ProcessorContext >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>> RecordContext. >>>>>>>>>>>>>>>>>>>>>>>> You mentioned above passing the >>>>>>>>>>>> InternalProcessorContext >>>>>>>>>>>>> to >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> init() >>>>>>>>>>>>>>>>>>>>>>>> calls. It is internal for a reason >>> and i >>>>>> think >>>>>>>> it >>>>>>>>>>>> should >>>>>>>>>>>>>>> remain >>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> way. >>>>>>>>>>>>>>>>>>>>>>>> It might be better to move the >>>>>> recordContext() >>>>>>>>>> method >>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>> InternalProcessorContext to >>>>>> ProcessorContext. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> In the KIP you have an example >>> showing: >>>>>>>>>>>>>>>>>>>>>>>> richMapper.init((RecordContext) >>>>>>>> processorContext); >>>>>>>>>>>>>>>>>>>>>>>> But the interface is: >>>>>>>>>>>>>>>>>>>>>>>> public interface RichValueMapper<V, >>> VR> >>>> { >>>>>>>>>>>>>>>>>>>>>>>> VR apply(final V value, final >>>>>>> RecordContext >>>>>>>>>>>>>>> recordContext); >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> i.e., there is no init(...), besides >>> as >>>>>> above >>>>>>>> this >>>>>>>>>>>>> wouldn't >>>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>> sense. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, 4 Jul 2017 at 23:30 Jeyhun >>>>> Karimov < >>>>>>>>>>>>>>>> je.kari...@gmail.com >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Actually my intend was to provide >> to >>>>>>>>>> RichInitializer >>>>>>>>>>>> and >>>>>>>>>>>>>>> later >>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>>>> provide the context of the record >> as >>>> you >>>>>> also >>>>>>>>>>>> mentioned. >>>>>>>>>>>>>>>>>>>>>>>>> I remove that not to confuse the >>> users. >>>>>>>>>>>>>>>>>>>>>>>>> Regarding the RecordContext and >>>>>>>> ProcessorContext >>>>>>>>>>>>>>> interfaces, I >>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>> realized the >> InternalProcessorContext >>>>>> class. >>>>>>>>>> Can't >>>>>>>>>>> we >>>>>>>>>>>>> pass >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>> as a >>>>>>>>>>>>>>>>>>>>>>>>> parameter to init() method of >>>> processors? >>>>>>> Then >>>>>>>> we >>>>>>>>>>>> would >>>>>>>>>>>>> be >>>>>>>>>>>>>>>> able >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>>>>>>>>> RecordContext easily with just a >>> method >>>>>> call. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 29, 2017 at 10:14 PM >>>> Matthias >>>>>> J. >>>>>>>> Sax >>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> One more thing: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I don't think `RichInitializer` >> does >>>>> make >>>>>>>>>> sense. As >>>>>>>>>>>> we >>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>>>>>> input record, there is also no >>>> context. >>>>> We >>>>>>>>>> could of >>>>>>>>>>>>>> course >>>>>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> context of the record that >> triggers >>>> the >>>>>> init >>>>>>>>>> call, >>>>>>>>>>>> but >>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>> semantically questionable. Also, >> the >>>>>> context >>>>>>>> for >>>>>>>>>>> this >>>>>>>>>>>>>> first >>>>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>>> be provided by the consecutive >> call >>> to >>>>>>>> aggregate >>>>>>>>>>>>> anyways. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On 6/29/17 1:11 PM, Matthias J. >> Sax >>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for updating the KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I have one concern with regard to >>>>>> backward >>>>>>>>>>>>>> compatibility. >>>>>>>>>>>>>>>> You >>>>>>>>>>>>>>>>>>>>>>> suggest >>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> use RecrodContext as base >> interface >>>> for >>>>>>>>>>>>>> ProcessorContext. >>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>>>>> break compatibility. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think, we should just have two >>>>>>> independent >>>>>>>>>>>>> interfaces. >>>>>>>>>>>>>>> Our >>>>>>>>>>>>>>>>> own >>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessorContextImpl class would >>>>>> implement >>>>>>>>>> both. >>>>>>>>>>>> This >>>>>>>>>>>>>>> allows >>>>>>>>>>>>>>>>> us >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> cast >>>>>>>>>>>>>>>>>>>>>>>>>>> it to `RecordContext` and thus >>> limit >>>>> the >>>>>>>>>> visible >>>>>>>>>>>>> scope. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 6/27/17 1:35 PM, Jeyhun >> Karimov >>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I updated the KIP w.r.t. >>> discussion >>>>> and >>>>>>>>>> comments. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Basically I eliminated overloads >>> for >>>>>>>>>> particular >>>>>>>>>>>>> method >>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>>>>> than 3. >>>>>>>>>>>>>>>>>>>>>>>>>>>> As we can see there are a lot of >>>>>> overloads >>>>>>>>>> (and >>>>>>>>>>>> more >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> come >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>> KIP-149 >>>>>>>>>>>>>>>>>>>>>>>>>>>> :) ) >>>>>>>>>>>>>>>>>>>>>>>>>>>> So, is it wise to >>>>>>>>>>>>>>>>>>>>>>>>>>>> wait the result of constructive >>> DSL >>>>>> thread >>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>>>>> extend KIP to address this issue >>> as >>>>> well >>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>>>>> continue as it is? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 14, 2017 at 11:29 PM >>>>>> Guozhang >>>>>>>>>> Wang < >>>>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> LGTM. Thanks! >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 13, 2017 at 2:20 >> PM, >>>>> Jeyhun >>>>>>>>>> Karimov >>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>> je.kari...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comment >> Matthias. >>>>> After >>>>>>> all >>>>>>>>>> the >>>>>>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>>>>>>> (thanks >>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> participants), I think this >>>> (single >>>>>>> method >>>>>>>>>> that >>>>>>>>>>>>>> passes >>>>>>>>>>>>>>>> in a >>>>>>>>>>>>>>>>>>>>>>>>>> RecordContext >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> object) is the best >> alternative. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just a side note: I think >>>> KAFKA-3907 >>>>>> [1] >>>>>>>> can >>>>>>>>>>> also >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> integrated >>>>>>>>>>>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP by adding related method >>>> inside >>>>>>>>>>> RecordContext >>>>>>>>>>>>>>>>> interface. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3907 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 13, 2017 at 7:50 >> PM >>>>>> Matthias >>>>>>>> J. >>>>>>>>>>> Sax < >>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to push this >>>>> discussion >>>>>>>>>> further. >>>>>>>>>>> It >>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> got >>>>>>>>>>>>>>>>>>>>>>>> nice >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternatives (thanks for the >>>>> summary >>>>>>>>>> Jeyhun!). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> With respect to RichFunctions >>> and >>>>>>>> allowing >>>>>>>>>>> them >>>>>>>>>>>> to >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>> stateful, I >>>>>>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my doubt as expressed >> already. >>>> From >>>>>> my >>>>>>>>>>>>>> understanding, >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> idea >>>>>>>>>>>>>>>>>>>>>>>> was >>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> give access to record >> metadata >>>>>>>> information >>>>>>>>>>> only. >>>>>>>>>>>>> If >>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> do >>>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stateful computation you >> should >>>>>> rather >>>>>>>> use >>>>>>>>>>>>>>> #transform(). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Furthermore, as pointed out, >> we >>>>> would >>>>>>>> need >>>>>>>>>> to >>>>>>>>>>>>> switch >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> supplier-pattern introducing >>> many >>>>>> more >>>>>>>>>>>> overloads. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For those reason, I advocate >>> for >>>> a >>>>>>> simple >>>>>>>>>>>>> interface >>>>>>>>>>>>>>>> with a >>>>>>>>>>>>>>>>>>>>>>> single >>>>>>>>>>>>>>>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that passes in a >> RecordContext >>>>>> object. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 6/6/17 5:15 PM, Guozhang >>> Wang >>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comprehensive >>>>>> summary! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally I'd prefer the >>> option >>>>> of >>>>>>>>>> passing >>>>>>>>>>>>>>>> RecordContext >>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter into he overloaded >>>>>> function. >>>>>>>> But >>>>>>>>>>> I'm >>>>>>>>>>>>> also >>>>>>>>>>>>>>>> open >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> arguments >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if there are sth. that I >> have >>>>>>>> overlooked. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 5, 2017 at 3:19 >>> PM, >>>>>> Jeyhun >>>>>>>>>>> Karimov >>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>> je.kari...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your comments >>>> Matthias >>>>>> and >>>>>>>>>>>> Guozhang. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Below I mention the quick >>>> summary >>>>>> of >>>>>>>> the >>>>>>>>>>> main >>>>>>>>>>>>>>>>> alternatives >>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>>> looked >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce the Rich >> functions >>> (I >>>>>> will >>>>>>>>>> refer >>>>>>>>>>> to >>>>>>>>>>>> it >>>>>>>>>>>>>> as >>>>>>>>>>>>>>>> Rich >>>>>>>>>>>>>>>>>>>>>>>>> functions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> until we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> find better/another name). >>>>>> Initially >>>>>>>> the >>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>>>>>> alternatives >>>>>>>>>>>>>>>>>>>>>>>>> was >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backwards-compatible, so I >>> will >>>>> not >>>>>>>>>> mention >>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The related discussions are >>>>> spread >>>>>> in >>>>>>>>>>> KIP-149 >>>>>>>>>>>>> and >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (KIP-159) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion threads. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. The idea of rich >> functions >>>>> came >>>>>>> into >>>>>>>>>> the >>>>>>>>>>>>> stage >>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>> KIP-149, >>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion thread. As a >>> result >>>> we >>>>>>>>>> extended >>>>>>>>>>>>> KIP-149 >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> support >>>>>>>>>>>>>>>>>>>>>>>>> Rich >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> functions as well. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. To as part of the Rich >>>>>> functions, >>>>>>>> we >>>>>>>>>>>>> provided >>>>>>>>>>>>>>> init >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (ProcessorContext) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> method. Afterwards, Dammian >>>>>> suggested >>>>>>>>>> that >>>>>>>>>>> we >>>>>>>>>>>>>> should >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessorContext to users. >>> As a >>>>>>> result, >>>>>>>>>> we >>>>>>>>>>>>>> separated >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> two >>>>>>>>>>>>>>>>>>>>>>>>>>>>> problems >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> two separate KIPs, as it >>> seems >>>>> they >>>>>>> can >>>>>>>>>> be >>>>>>>>>>>>> solved >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>> parallel. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - One approach we >> considered >>>> was >>>>> : >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface >>>>>>> ValueMapperWithKey<K, >>>>>>>> V, >>>>>>>>>>> VR> >>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> VR apply(final K key, >>>> final V >>>>>>>> value); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface >>>>> RichValueMapper<K, >>>>>>> V, >>>>>>>>>> VR> >>>>>>>>>>>>> extends >>>>>>>>>>>>>>>>>>>>>>>> RichFunction{ >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface >>> RichFunction { >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> void init(RecordContext >>>>>>>>>> recordContext); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> void close(); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface >>> RecordContext >>>> { >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> String applicationId(); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TaskId taskId(); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsMetrics >> metrics(); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> String topic(); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> int partition(); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> long offset(); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> long timestamp(); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map<String, Object> >>>>>> appConfigs(); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map<String, Object> >>>>>>>>>>>>>> appConfigsWithPrefix(String >>>>>>>>>>>>>>>>>>>>>> prefix); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>> >
signature.asc
Description: OpenPGP digital signature