Hi Matthias, Actually my intend was to provide to RichInitializer and later on we could provide the context of the record as you also mentioned. I remove that not to confuse the users. Regarding the RecordContext and ProcessorContext interfaces, I just realized the InternalProcessorContext class. Can't we pass this as a parameter to init() method of processors? Then we would be able to get RecordContext easily with just a method call.
Cheers, Jeyhun On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <matth...@confluent.io> wrote: > One more thing: > > I don't think `RichInitializer` does make sense. As we don't have any > input record, there is also no context. We could of course provide the > context of the record that triggers the init call, but this seems to be > semantically questionable. Also, the context for this first record will > be provided by the consecutive call to aggregate anyways. > > > -Matthias > > On 6/29/17 1:11 PM, Matthias J. Sax wrote: > > Thanks for updating the KIP. > > > > I have one concern with regard to backward compatibility. You suggest to > > use RecrodContext as base interface for ProcessorContext. This will > > break compatibility. > > > > I think, we should just have two independent interfaces. Our own > > ProcessorContextImpl class would implement both. This allows us to cast > > it to `RecordContext` and thus limit the visible scope. > > > > > > -Matthias > > > > > > > > On 6/27/17 1:35 PM, Jeyhun Karimov wrote: > >> Hi all, > >> > >> I updated the KIP w.r.t. discussion and comments. > >> Basically I eliminated overloads for particular method if they are more > >> than 3. > >> As we can see there are a lot of overloads (and more will come with > KIP-149 > >> :) ) > >> So, is it wise to > >> wait the result of constructive DSL thread or > >> extend KIP to address this issue as well or > >> continue as it is? > >> > >> Cheers, > >> Jeyhun > >> > >> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >>> LGTM. Thanks! > >>> > >>> > >>> Guozhang > >>> > >>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <je.kari...@gmail.com> > >>> wrote: > >>> > >>>> Thanks for the comment Matthias. After all the discussion (thanks to > all > >>>> participants), I think this (single method that passes in a > RecordContext > >>>> object) is the best alternative. > >>>> Just a side note: I think KAFKA-3907 [1] can also be integrated into > the > >>>> KIP by adding related method inside RecordContext interface. > >>>> > >>>> > >>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907 > >>>> > >>>> > >>>> Cheers, > >>>> Jeyhun > >>>> > >>>> On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax < > matth...@confluent.io> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> > >>>>> I would like to push this discussion further. It seems we got nice > >>>>> alternatives (thanks for the summary Jeyhun!). > >>>>> > >>>>> With respect to RichFunctions and allowing them to be stateful, I > have > >>>>> my doubt as expressed already. From my understanding, the idea was to > >>>>> give access to record metadata information only. If you want to do a > >>>>> stateful computation you should rather use #transform(). > >>>>> > >>>>> Furthermore, as pointed out, we would need to switch to a > >>>>> supplier-pattern introducing many more overloads. > >>>>> > >>>>> For those reason, I advocate for a simple interface with a single > >>> method > >>>>> that passes in a RecordContext object. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> On 6/6/17 5:15 PM, Guozhang Wang wrote: > >>>>>> Thanks for the comprehensive summary! > >>>>>> > >>>>>> Personally I'd prefer the option of passing RecordContext as an > >>>>> additional > >>>>>> parameter into he overloaded function. But I'm also open to other > >>>>> arguments > >>>>>> if there are sth. that I have overlooked. > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov < > je.kari...@gmail.com > >>>> > >>>>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> Thanks for your comments Matthias and Guozhang. > >>>>>>> > >>>>>>> Below I mention the quick summary of the main alternatives we > looked > >>>> at > >>>>> to > >>>>>>> introduce the Rich functions (I will refer to it as Rich functions > >>>>> until we > >>>>>>> find better/another name). Initially the proposed alternatives was > >>> not > >>>>>>> backwards-compatible, so I will not mention them. > >>>>>>> The related discussions are spread in KIP-149 and in this KIP > >>>> (KIP-159) > >>>>>>> discussion threads. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> 1. The idea of rich functions came into the stage with KIP-149, in > >>>>>>> discussion thread. As a result we extended KIP-149 to support Rich > >>>>>>> functions as well. > >>>>>>> > >>>>>>> 2. To as part of the Rich functions, we provided init > >>>>> (ProcessorContext) > >>>>>>> method. Afterwards, Dammian suggested that we should not provide > >>>>>>> ProcessorContext to users. As a result, we separated the two > >>> problems > >>>>> into > >>>>>>> two separate KIPs, as it seems they can be solved in parallel. > >>>>>>> > >>>>>>> - One approach we considered was : > >>>>>>> > >>>>>>> public interface ValueMapperWithKey<K, V, VR> { > >>>>>>> VR apply(final K key, final V value); > >>>>>>> } > >>>>>>> > >>>>>>> public interface RichValueMapper<K, V, VR> extends RichFunction{ > >>>>>>> } > >>>>>>> > >>>>>>> public interface RichFunction { > >>>>>>> void init(RecordContext recordContext); > >>>>>>> void close(); > >>>>>>> } > >>>>>>> > >>>>>>> public interface RecordContext { > >>>>>>> String applicationId(); > >>>>>>> TaskId taskId(); > >>>>>>> StreamsMetrics metrics(); > >>>>>>> String topic(); > >>>>>>> int partition(); > >>>>>>> long offset(); > >>>>>>> long timestamp(); > >>>>>>> Map<String, Object> appConfigs(); > >>>>>>> Map<String, Object> appConfigsWithPrefix(String prefix); > >>>>>>> } > >>>>>>> > >>>>>>> > >>>>>>> public interface ProcessorContext extends RecordContext { > >>>>>>> // all methods but the ones in RecordContext > >>>>>>> } > >>>>>>> > >>>>>>> As a result: > >>>>>>> * . All "withKey" and "withoutKey" interfaces can be converted to > >>>> their > >>>>>>> Rich counterparts (with empty init() and close() methods) > >>>>>>> *. All related Processors will accept Rich interfaces in their > >>>>>>> constructors. > >>>>>>> *. So, we convert the related "withKey" or "withoutKey" interfaces > >>> to > >>>>> Rich > >>>>>>> interface while building the topology and initialize the related > >>>>> processors > >>>>>>> with Rich interfaces only. > >>>>>>> *. We will not need to overloaded methods for rich functions as > Rich > >>>>>>> interfaces extend withKey interfaces. We will just check the object > >>>> type > >>>>>>> and act accordingly. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> 3. There was some thoughts that the above approach does not support > >>>>> lambdas > >>>>>>> so we should support only one method, only init(RecordContext), as > >>>> part > >>>>> of > >>>>>>> Rich interfaces. > >>>>>>> This is still in discussion. Personally I think Rich interfaces are > >>> by > >>>>>>> definition lambda-free and we should not care much about it. > >>>>>>> > >>>>>>> > >>>>>>> 4. Thanks to Matthias's discussion, an alternative we considered > was > >>>> to > >>>>>>> pass in the RecordContext as method parameter. This might even > >>> allow > >>>> to > >>>>>>> use Lambdas and we could keep the name RichFunction as we preserve > >>> the > >>>>>>> nature of being a function. > >>>>>>> "If you go with `init()` and `close()` we basically > >>>>>>> allow users to have an in-memory state for a function. Thus, we > >>> cannot > >>>>>>> share a single instance of RichValueMapper (etc) over multiple > tasks > >>>> and > >>>>>>> we would need a supplier pattern similar to #transform(). And this > >>>> would > >>>>>>> "break the flow" of the API, as (Rich)ValueMapperSupplier would not > >>>>>>> inherit from ValueMapper and thus we would need many new overload > >>> for > >>>>>>> KStream/KTable classes". (Copy paste from Matthias's email) > >>>>>>> > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Jeyhun > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax < > >>> matth...@confluent.io > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Yes, we did consider this, and there is no consensus yet what the > >>>> best > >>>>>>>> alternative is. > >>>>>>>> > >>>>>>>> @Jeyhun: the email thread got pretty long. Maybe you can give a > >>> quick > >>>>>>>> summary of the current state of the discussion? > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> On 6/4/17 6:04 PM, Guozhang Wang wrote: > >>>>>>>>> Thanks for the explanation Jeyhun and Matthias. > >>>>>>>>> > >>>>>>>>> I have just read through both KIP-149 and KIP-159 and am > wondering > >>>> if > >>>>>>> you > >>>>>>>>> guys have considered a slight different approach for rich > >>> function, > >>>>>>> that > >>>>>>>> is > >>>>>>>>> to add the `RecordContext` into the apply functions as an > >>> additional > >>>>>>>>> parameter. For example: > >>>>>>>>> > >>>>>>>>> --------------------------- > >>>>>>>>> > >>>>>>>>> interface RichValueMapper<V, VR> { > >>>>>>>>> > >>>>>>>>> VR apply(final V value, final RecordContext context); > >>>>>>>>> > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> ... > >>>>>>>>> > >>>>>>>>> // then in KStreams > >>>>>>>>> > >>>>>>>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends > VR> > >>>>>>>> mapper); > >>>>>>>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <? super > >>>> V, ? > >>>>>>>>> extends VR> mapper); > >>>>>>>>> > >>>>>>>>> ------------------------------- > >>>>>>>>> > >>>>>>>>> The caveat is that it will introduces more overloads; but I think > >>>> the > >>>>>>>>> #.overloads are mainly introduced by 1) serde overrides and 2) > >>>>>>>>> state-store-supplier overides, both of which can be reduced in > the > >>>>> near > >>>>>>>>> future, and I felt this overloading is still worthwhile, as it > has > >>>> the > >>>>>>>>> following benefits: > >>>>>>>>> > >>>>>>>>> 1) still allow lambda expressions. > >>>>>>>>> 2) clearer code path (do not need to "convert" from non-rich > >>>> functions > >>>>>>> to > >>>>>>>>> rich functions) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Maybe this approach has already been discussed and I may have > >>>>>>> overlooked > >>>>>>>> in > >>>>>>>>> the email thread; anyways, lmk. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Guozhang > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax < > >>>>>>> matth...@confluent.io> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> I agree with Jeyhun. As already mention, the overall API > >>>> improvement > >>>>>>>>>> ideas are overlapping and/or contradicting each other. For this > >>>>>>> reason, > >>>>>>>>>> not all ideas can be accomplished and some Jira might just be > >>>> closed > >>>>>>> as > >>>>>>>>>> "won't fix". > >>>>>>>>>> > >>>>>>>>>> For this reason, we try to do those KIP discussion with are > large > >>>>>>> scope > >>>>>>>>>> to get an overall picture to converge to an overall consisted > >>> API. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> @Jeyhun: about the overloads. Yes, we might get more overload. > It > >>>>>>> might > >>>>>>>>>> be sufficient though, to do a single xxxWithContext() overload > >>> that > >>>>>>> will > >>>>>>>>>> provide key+value+context. Otherwise, if might get too messy > >>> having > >>>>>>>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext, > >>>>>>>>>> ValueMapperWithKeyWithContext. > >>>>>>>>>> > >>>>>>>>>> On the other hand, we also have the "builder pattern" idea as an > >>>> API > >>>>>>>>>> change and this might mitigate the overload problem. Not for > >>> simple > >>>>>>>>>> function like map/flatMap etc but for joins and aggregations. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On the other hand, as I mentioned in an older email, I am > >>>> personally > >>>>>>>>>> fine to break the pure functional interface, and add > >>>>>>>>>> > >>>>>>>>>> - interface WithRecordContext with method > `open(RecordContext)` > >>>> (or > >>>>>>>>>> `init(...)`, or any better name) -- but not `close()`) > >>>>>>>>>> > >>>>>>>>>> - interface ValueMapperWithRecordContext extends ValueMapper, > >>>>>>>>>> WithRecordContext > >>>>>>>>>> > >>>>>>>>>> This would allow us to avoid any overload. Of course, we don't > >>> get > >>>> a > >>>>>>>>>> "pure function" interface and also sacrifices Lambdas. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I am personally a little bit undecided what the better option > >>> might > >>>>>>> be. > >>>>>>>>>> Curious to hear what other think about this trade off. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote: > >>>>>>>>>>> Hi Guozhang, > >>>>>>>>>>> > >>>>>>>>>>> It subsumes partially. Initially the idea was to support > >>>>>>> RichFunctions > >>>>>>>>>> as a > >>>>>>>>>>> separate interface. Throughout the discussion, however, we > >>>>> considered > >>>>>>>>>> maybe > >>>>>>>>>>> overloading the related methods (with RecodContext param) is > >>>> better > >>>>>>>>>>> approach than providing a separate RichFunction interface. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Jeyhun > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang < > >>> wangg...@gmail.com> > >>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Does this KIP subsume this ticket as well? > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125 > >>>>>>>>>>>> > >>>>>>>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov < > >>>>>>> je.kari...@gmail.com > >>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Dear community, > >>>>>>>>>>>>> > >>>>>>>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would like > >>> to > >>>>>>>>>> initiate > >>>>>>>>>>>>> KIP for rich functions (interfaces) [2]. > >>>>>>>>>>>>> I would like to get your comments. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1] > >>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj= > >>>>>>>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+ > >>>>>>>>>> ValueTransformer+ValueMapper+ > >>>>>>>>>>>>> and+ValueJoiner > >>>>>>>>>>>>> [2] > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> -Cheers > >>>>>>>>>>>>> > >>>>>>>>>>>>> Jeyhun > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>> -Cheers > >>>>>>> > >>>>>>> Jeyhun > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> -- > >>>> -Cheers > >>>> > >>>> Jeyhun > >>>> > >>> > >>> > >>> > >>> -- > >>> -- Guozhang > >>> > > > > -- -Cheers Jeyhun