Hi, I would like to push this discussion further. It seems we got nice alternatives (thanks for the summary Jeyhun!).
With respect to RichFunctions and allowing them to be stateful, I have my doubt as expressed already. From my understanding, the idea was to give access to record metadata information only. If you want to do a stateful computation you should rather use #transform(). Furthermore, as pointed out, we would need to switch to a supplier-pattern introducing many more overloads. For those reason, I advocate for a simple interface with a single method that passes in a RecordContext object. -Matthias On 6/6/17 5:15 PM, Guozhang Wang wrote: > Thanks for the comprehensive summary! > > Personally I'd prefer the option of passing RecordContext as an additional > parameter into he overloaded function. But I'm also open to other arguments > if there are sth. that I have overlooked. > > Guozhang > > > On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <je.kari...@gmail.com> wrote: > >> Hi, >> >> Thanks for your comments Matthias and Guozhang. >> >> Below I mention the quick summary of the main alternatives we looked at to >> introduce the Rich functions (I will refer to it as Rich functions until we >> find better/another name). Initially the proposed alternatives was not >> backwards-compatible, so I will not mention them. >> The related discussions are spread in KIP-149 and in this KIP (KIP-159) >> discussion threads. >> >> >> >> 1. The idea of rich functions came into the stage with KIP-149, in >> discussion thread. As a result we extended KIP-149 to support Rich >> functions as well. >> >> 2. To as part of the Rich functions, we provided init (ProcessorContext) >> method. Afterwards, Dammian suggested that we should not provide >> ProcessorContext to users. As a result, we separated the two problems into >> two separate KIPs, as it seems they can be solved in parallel. >> >> - One approach we considered was : >> >> public interface ValueMapperWithKey<K, V, VR> { >> VR apply(final K key, final V value); >> } >> >> public interface RichValueMapper<K, V, VR> extends RichFunction{ >> } >> >> public interface RichFunction { >> void init(RecordContext recordContext); >> void close(); >> } >> >> public interface RecordContext { >> String applicationId(); >> TaskId taskId(); >> StreamsMetrics metrics(); >> String topic(); >> int partition(); >> long offset(); >> long timestamp(); >> Map<String, Object> appConfigs(); >> Map<String, Object> appConfigsWithPrefix(String prefix); >> } >> >> >> public interface ProcessorContext extends RecordContext { >> // all methods but the ones in RecordContext >> } >> >> As a result: >> * . All "withKey" and "withoutKey" interfaces can be converted to their >> Rich counterparts (with empty init() and close() methods) >> *. All related Processors will accept Rich interfaces in their >> constructors. >> *. So, we convert the related "withKey" or "withoutKey" interfaces to Rich >> interface while building the topology and initialize the related processors >> with Rich interfaces only. >> *. We will not need to overloaded methods for rich functions as Rich >> interfaces extend withKey interfaces. We will just check the object type >> and act accordingly. >> >> >> >> >> 3. There was some thoughts that the above approach does not support lambdas >> so we should support only one method, only init(RecordContext), as part of >> Rich interfaces. >> This is still in discussion. Personally I think Rich interfaces are by >> definition lambda-free and we should not care much about it. >> >> >> 4. Thanks to Matthias's discussion, an alternative we considered was to >> pass in the RecordContext as method parameter. This might even allow to >> use Lambdas and we could keep the name RichFunction as we preserve the >> nature of being a function. >> "If you go with `init()` and `close()` we basically >> allow users to have an in-memory state for a function. Thus, we cannot >> share a single instance of RichValueMapper (etc) over multiple tasks and >> we would need a supplier pattern similar to #transform(). And this would >> "break the flow" of the API, as (Rich)ValueMapperSupplier would not >> inherit from ValueMapper and thus we would need many new overload for >> KStream/KTable classes". (Copy paste from Matthias's email) >> >> >> Cheers, >> Jeyhun >> >> >> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Yes, we did consider this, and there is no consensus yet what the best >>> alternative is. >>> >>> @Jeyhun: the email thread got pretty long. Maybe you can give a quick >>> summary of the current state of the discussion? >>> >>> >>> -Matthias >>> >>> On 6/4/17 6:04 PM, Guozhang Wang wrote: >>>> Thanks for the explanation Jeyhun and Matthias. >>>> >>>> I have just read through both KIP-149 and KIP-159 and am wondering if >> you >>>> guys have considered a slight different approach for rich function, >> that >>> is >>>> to add the `RecordContext` into the apply functions as an additional >>>> parameter. For example: >>>> >>>> --------------------------- >>>> >>>> interface RichValueMapper<V, VR> { >>>> >>>> VR apply(final V value, final RecordContext context); >>>> >>>> } >>>> >>>> ... >>>> >>>> // then in KStreams >>>> >>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> >>> mapper); >>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <? super V, ? >>>> extends VR> mapper); >>>> >>>> ------------------------------- >>>> >>>> The caveat is that it will introduces more overloads; but I think the >>>> #.overloads are mainly introduced by 1) serde overrides and 2) >>>> state-store-supplier overides, both of which can be reduced in the near >>>> future, and I felt this overloading is still worthwhile, as it has the >>>> following benefits: >>>> >>>> 1) still allow lambda expressions. >>>> 2) clearer code path (do not need to "convert" from non-rich functions >> to >>>> rich functions) >>>> >>>> >>>> Maybe this approach has already been discussed and I may have >> overlooked >>> in >>>> the email thread; anyways, lmk. >>>> >>>> >>>> Guozhang >>>> >>>> >>>> >>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax < >> matth...@confluent.io> >>>> wrote: >>>> >>>>> I agree with Jeyhun. As already mention, the overall API improvement >>>>> ideas are overlapping and/or contradicting each other. For this >> reason, >>>>> not all ideas can be accomplished and some Jira might just be closed >> as >>>>> "won't fix". >>>>> >>>>> For this reason, we try to do those KIP discussion with are large >> scope >>>>> to get an overall picture to converge to an overall consisted API. >>>>> >>>>> >>>>> @Jeyhun: about the overloads. Yes, we might get more overload. It >> might >>>>> be sufficient though, to do a single xxxWithContext() overload that >> will >>>>> provide key+value+context. Otherwise, if might get too messy having >>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext, >>>>> ValueMapperWithKeyWithContext. >>>>> >>>>> On the other hand, we also have the "builder pattern" idea as an API >>>>> change and this might mitigate the overload problem. Not for simple >>>>> function like map/flatMap etc but for joins and aggregations. >>>>> >>>>> >>>>> On the other hand, as I mentioned in an older email, I am personally >>>>> fine to break the pure functional interface, and add >>>>> >>>>> - interface WithRecordContext with method `open(RecordContext)` (or >>>>> `init(...)`, or any better name) -- but not `close()`) >>>>> >>>>> - interface ValueMapperWithRecordContext extends ValueMapper, >>>>> WithRecordContext >>>>> >>>>> This would allow us to avoid any overload. Of course, we don't get a >>>>> "pure function" interface and also sacrifices Lambdas. >>>>> >>>>> >>>>> >>>>> I am personally a little bit undecided what the better option might >> be. >>>>> Curious to hear what other think about this trade off. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote: >>>>>> Hi Guozhang, >>>>>> >>>>>> It subsumes partially. Initially the idea was to support >> RichFunctions >>>>> as a >>>>>> separate interface. Throughout the discussion, however, we considered >>>>> maybe >>>>>> overloading the related methods (with RecodContext param) is better >>>>>> approach than providing a separate RichFunction interface. >>>>>> >>>>>> Cheers, >>>>>> Jeyhun >>>>>> >>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang <wangg...@gmail.com> >>> wrote: >>>>>> >>>>>>> Does this KIP subsume this ticket as well? >>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125 >>>>>>> >>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov < >> je.kari...@gmail.com >>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Dear community, >>>>>>>> >>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would like to >>>>> initiate >>>>>>>> KIP for rich functions (interfaces) [2]. >>>>>>>> I would like to get your comments. >>>>>>>> >>>>>>>> >>>>>>>> [1] >>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj= >>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+ >>>>> ValueTransformer+ValueMapper+ >>>>>>>> and+ValueJoiner >>>>>>>> [2] >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams >>>>>>>> >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Jeyhun >>>>>>>> -- >>>>>>>> -Cheers >>>>>>>> >>>>>>>> Jeyhun >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- Guozhang >>>>>>> >>>>> >>>>> >>>> >>>> >>> >>> -- >> -Cheers >> >> Jeyhun >> > > >
signature.asc
Description: OpenPGP digital signature