LGTM. Thanks!
Guozhang On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <je.kari...@gmail.com> wrote: > Thanks for the comment Matthias. After all the discussion (thanks to all > participants), I think this (single method that passes in a RecordContext > object) is the best alternative. > Just a side note: I think KAFKA-3907 [1] can also be integrated into the > KIP by adding related method inside RecordContext interface. > > > [1] https://issues.apache.org/jira/browse/KAFKA-3907 > > > Cheers, > Jeyhun > > On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > Hi, > > > > I would like to push this discussion further. It seems we got nice > > alternatives (thanks for the summary Jeyhun!). > > > > With respect to RichFunctions and allowing them to be stateful, I have > > my doubt as expressed already. From my understanding, the idea was to > > give access to record metadata information only. If you want to do a > > stateful computation you should rather use #transform(). > > > > Furthermore, as pointed out, we would need to switch to a > > supplier-pattern introducing many more overloads. > > > > For those reason, I advocate for a simple interface with a single method > > that passes in a RecordContext object. > > > > > > -Matthias > > > > > > On 6/6/17 5:15 PM, Guozhang Wang wrote: > > > Thanks for the comprehensive summary! > > > > > > Personally I'd prefer the option of passing RecordContext as an > > additional > > > parameter into he overloaded function. But I'm also open to other > > arguments > > > if there are sth. that I have overlooked. > > > > > > Guozhang > > > > > > > > > On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <je.kari...@gmail.com> > > wrote: > > > > > >> Hi, > > >> > > >> Thanks for your comments Matthias and Guozhang. > > >> > > >> Below I mention the quick summary of the main alternatives we looked > at > > to > > >> introduce the Rich functions (I will refer to it as Rich functions > > until we > > >> find better/another name). Initially the proposed alternatives was not > > >> backwards-compatible, so I will not mention them. > > >> The related discussions are spread in KIP-149 and in this KIP > (KIP-159) > > >> discussion threads. > > >> > > >> > > >> > > >> 1. The idea of rich functions came into the stage with KIP-149, in > > >> discussion thread. As a result we extended KIP-149 to support Rich > > >> functions as well. > > >> > > >> 2. To as part of the Rich functions, we provided init > > (ProcessorContext) > > >> method. Afterwards, Dammian suggested that we should not provide > > >> ProcessorContext to users. As a result, we separated the two problems > > into > > >> two separate KIPs, as it seems they can be solved in parallel. > > >> > > >> - One approach we considered was : > > >> > > >> public interface ValueMapperWithKey<K, V, VR> { > > >> VR apply(final K key, final V value); > > >> } > > >> > > >> public interface RichValueMapper<K, V, VR> extends RichFunction{ > > >> } > > >> > > >> public interface RichFunction { > > >> void init(RecordContext recordContext); > > >> void close(); > > >> } > > >> > > >> public interface RecordContext { > > >> String applicationId(); > > >> TaskId taskId(); > > >> StreamsMetrics metrics(); > > >> String topic(); > > >> int partition(); > > >> long offset(); > > >> long timestamp(); > > >> Map<String, Object> appConfigs(); > > >> Map<String, Object> appConfigsWithPrefix(String prefix); > > >> } > > >> > > >> > > >> public interface ProcessorContext extends RecordContext { > > >> // all methods but the ones in RecordContext > > >> } > > >> > > >> As a result: > > >> * . All "withKey" and "withoutKey" interfaces can be converted to > their > > >> Rich counterparts (with empty init() and close() methods) > > >> *. All related Processors will accept Rich interfaces in their > > >> constructors. > > >> *. So, we convert the related "withKey" or "withoutKey" interfaces to > > Rich > > >> interface while building the topology and initialize the related > > processors > > >> with Rich interfaces only. > > >> *. We will not need to overloaded methods for rich functions as Rich > > >> interfaces extend withKey interfaces. We will just check the object > type > > >> and act accordingly. > > >> > > >> > > >> > > >> > > >> 3. There was some thoughts that the above approach does not support > > lambdas > > >> so we should support only one method, only init(RecordContext), as > part > > of > > >> Rich interfaces. > > >> This is still in discussion. Personally I think Rich interfaces are by > > >> definition lambda-free and we should not care much about it. > > >> > > >> > > >> 4. Thanks to Matthias's discussion, an alternative we considered was > to > > >> pass in the RecordContext as method parameter. This might even allow > to > > >> use Lambdas and we could keep the name RichFunction as we preserve the > > >> nature of being a function. > > >> "If you go with `init()` and `close()` we basically > > >> allow users to have an in-memory state for a function. Thus, we cannot > > >> share a single instance of RichValueMapper (etc) over multiple tasks > and > > >> we would need a supplier pattern similar to #transform(). And this > would > > >> "break the flow" of the API, as (Rich)ValueMapperSupplier would not > > >> inherit from ValueMapper and thus we would need many new overload for > > >> KStream/KTable classes". (Copy paste from Matthias's email) > > >> > > >> > > >> Cheers, > > >> Jeyhun > > >> > > >> > > >> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax <matth...@confluent.io > > > > >> wrote: > > >> > > >>> Yes, we did consider this, and there is no consensus yet what the > best > > >>> alternative is. > > >>> > > >>> @Jeyhun: the email thread got pretty long. Maybe you can give a quick > > >>> summary of the current state of the discussion? > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> On 6/4/17 6:04 PM, Guozhang Wang wrote: > > >>>> Thanks for the explanation Jeyhun and Matthias. > > >>>> > > >>>> I have just read through both KIP-149 and KIP-159 and am wondering > if > > >> you > > >>>> guys have considered a slight different approach for rich function, > > >> that > > >>> is > > >>>> to add the `RecordContext` into the apply functions as an additional > > >>>> parameter. For example: > > >>>> > > >>>> --------------------------- > > >>>> > > >>>> interface RichValueMapper<V, VR> { > > >>>> > > >>>> VR apply(final V value, final RecordContext context); > > >>>> > > >>>> } > > >>>> > > >>>> ... > > >>>> > > >>>> // then in KStreams > > >>>> > > >>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> > > >>> mapper); > > >>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <? super > V, ? > > >>>> extends VR> mapper); > > >>>> > > >>>> ------------------------------- > > >>>> > > >>>> The caveat is that it will introduces more overloads; but I think > the > > >>>> #.overloads are mainly introduced by 1) serde overrides and 2) > > >>>> state-store-supplier overides, both of which can be reduced in the > > near > > >>>> future, and I felt this overloading is still worthwhile, as it has > the > > >>>> following benefits: > > >>>> > > >>>> 1) still allow lambda expressions. > > >>>> 2) clearer code path (do not need to "convert" from non-rich > functions > > >> to > > >>>> rich functions) > > >>>> > > >>>> > > >>>> Maybe this approach has already been discussed and I may have > > >> overlooked > > >>> in > > >>>> the email thread; anyways, lmk. > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>> > > >>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax < > > >> matth...@confluent.io> > > >>>> wrote: > > >>>> > > >>>>> I agree with Jeyhun. As already mention, the overall API > improvement > > >>>>> ideas are overlapping and/or contradicting each other. For this > > >> reason, > > >>>>> not all ideas can be accomplished and some Jira might just be > closed > > >> as > > >>>>> "won't fix". > > >>>>> > > >>>>> For this reason, we try to do those KIP discussion with are large > > >> scope > > >>>>> to get an overall picture to converge to an overall consisted API. > > >>>>> > > >>>>> > > >>>>> @Jeyhun: about the overloads. Yes, we might get more overload. It > > >> might > > >>>>> be sufficient though, to do a single xxxWithContext() overload that > > >> will > > >>>>> provide key+value+context. Otherwise, if might get too messy having > > >>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext, > > >>>>> ValueMapperWithKeyWithContext. > > >>>>> > > >>>>> On the other hand, we also have the "builder pattern" idea as an > API > > >>>>> change and this might mitigate the overload problem. Not for simple > > >>>>> function like map/flatMap etc but for joins and aggregations. > > >>>>> > > >>>>> > > >>>>> On the other hand, as I mentioned in an older email, I am > personally > > >>>>> fine to break the pure functional interface, and add > > >>>>> > > >>>>> - interface WithRecordContext with method `open(RecordContext)` > (or > > >>>>> `init(...)`, or any better name) -- but not `close()`) > > >>>>> > > >>>>> - interface ValueMapperWithRecordContext extends ValueMapper, > > >>>>> WithRecordContext > > >>>>> > > >>>>> This would allow us to avoid any overload. Of course, we don't get > a > > >>>>> "pure function" interface and also sacrifices Lambdas. > > >>>>> > > >>>>> > > >>>>> > > >>>>> I am personally a little bit undecided what the better option might > > >> be. > > >>>>> Curious to hear what other think about this trade off. > > >>>>> > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> > > >>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote: > > >>>>>> Hi Guozhang, > > >>>>>> > > >>>>>> It subsumes partially. Initially the idea was to support > > >> RichFunctions > > >>>>> as a > > >>>>>> separate interface. Throughout the discussion, however, we > > considered > > >>>>> maybe > > >>>>>> overloading the related methods (with RecodContext param) is > better > > >>>>>> approach than providing a separate RichFunction interface. > > >>>>>> > > >>>>>> Cheers, > > >>>>>> Jeyhun > > >>>>>> > > >>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang <wangg...@gmail.com> > > >>> wrote: > > >>>>>> > > >>>>>>> Does this KIP subsume this ticket as well? > > >>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125 > > >>>>>>> > > >>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov < > > >> je.kari...@gmail.com > > >>>> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Dear community, > > >>>>>>>> > > >>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would like to > > >>>>> initiate > > >>>>>>>> KIP for rich functions (interfaces) [2]. > > >>>>>>>> I would like to get your comments. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> [1] > > >>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj= > > >>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+ > > >>>>> ValueTransformer+ValueMapper+ > > >>>>>>>> and+ValueJoiner > > >>>>>>>> [2] > > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Cheers, > > >>>>>>>> Jeyhun > > >>>>>>>> -- > > >>>>>>>> -Cheers > > >>>>>>>> > > >>>>>>>> Jeyhun > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> -- > > >>>>>>> -- Guozhang > > >>>>>>> > > >>>>> > > >>>>> > > >>>> > > >>>> > > >>> > > >>> -- > > >> -Cheers > > >> > > >> Jeyhun > > >> > > > > > > > > > > > > > -- > -Cheers > > Jeyhun > -- -- Guozhang