Interesting.

I thought that https://issues.apache.org/jira/browse/KAFKA-4125 is the
main motivation for this KIP :)

I also think, that we should not expose the full ProcessorContext at DSL
level.

Thus, overall I am not even sure if we should fix KAFKA-3907 at all.
Manual commits are something DSL users should not worry about -- and if
one really needs this, an advanced user can still insert a dummy
`transform` to request a commit from there.

-Matthias


On 10/18/17 5:39 AM, Jeyhun Karimov wrote:
> Hi,
> 
> The main intuition is to solve [1], which is part of this KIP.
> I agree with you that this might not seem semantically correct as we are
> not committing record state.
> Alternatively, we can remove commit() from RecordContext and add
> ProcessorContext (which has commit() method) as an extra argument to Rich
> methods:
> 
> instead of
> public interface RichValueMapper<V, VR, K> {
>     VR apply(final V value,
>              final K key,
>              final RecordContext recordContext);
> }
> 
> we can adopt
> 
> public interface RichValueMapper<V, VR, K> {
>     VR apply(final V value,
>              final K key,
>              final RecordContext recordContext,
>              final ProcessorContext processorContext);
> }
> 
> 
> However, in this case, a user can get confused as ProcessorContext and
> RecordContext share some methods with the same name.
> 
> 
> Cheers,
> Jeyhun
> 
> 
> [1] https://issues.apache.org/jira/browse/KAFKA-3907
> 
> 
> On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Regarding #6 above, I'm still not clear why we would need `commit()` in
>> both ProcessorContext and RecordContext, could you elaborate a bit more?
>>
>> To me `commit()` is really a processor context not a record context
>> logically: when you call that function, it means we would commit the state
>> of the whole task up to this processed record, not only that single record
>> itself.
>>
>>
>> Guozhang
>>
>> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov <je.kari...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the feedback.
>>>
>>>
>>> 0. RichInitializer definition seems missing.
>>>
>>>
>>>
>>> - Fixed.
>>>
>>>
>>>  I'd suggest moving the key parameter in the RichValueXX and RichReducer
>>>> after the value parameters, as well as in the templates; e.g.
>>>> public interface RichValueJoiner<V1, V2, VR, K> {
>>>>     VR apply(final V1 value1, final V2 value2, final K key, final
>>>> RecordContext
>>>> recordContext);
>>>> }
>>>
>>>
>>>
>>> - Fixed.
>>>
>>>
>>> 2. Some of the listed functions are not necessary since their pairing
>> APIs
>>>> are being deprecated in 1.0 already:
>>>> <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K,
>> ?
>>>> super V, KR> selector,
>>>>                                    final Serde<KR> keySerde,
>>>>                                    final Serde<V> valSerde);
>>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
>>>>                                  final RichValueJoiner<? super K, ?
>> super
>>>> V,
>>>> ? super VT, ? extends VR> joiner,
>>>>                                  final Serde<K> keySerde,
>>>>                                  final Serde<V> valSerde);
>>>
>>>
>>> -Fixed
>>>
>>> 3. For a few functions where we are adding three APIs for a combo of both
>>>> mapper / joiner, or both initializer / aggregator, or adder /
>> subtractor,
>>>> I'm wondering if we can just keep one that use "rich" functions for
>> both;
>>>> so that we can have less overloads and let users who only want to
>> access
>>>> one of them to just use dummy parameter declarations. For example:
>>>>
>>>> <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV>
>> globalKTable,
>>>>                                  final RichKeyValueMapper<? super K, ?
>>>> super
>>>>  V, ? extends GK> keyValueMapper,
>>>>                                  final RichValueJoiner<? super K, ?
>> super
>>>> V,
>>>> ? super GV, ? extends RV> joiner);
>>>
>>>
>>>
>>> -Agreed. Fixed.
>>>
>>>
>>> 4. For TimeWindowedKStream, I'm wondering why we do not make its
>>>> Initializer also "rich" functions? I.e.
>>>
>>>
>>> - It was a typo. Fixed.
>>>
>>>
>>> 5. We need to move "RecordContext" from o.a.k.processor.internals to
>>>> o.a.k.processor.
>>>>
>>>> 6. I'm not clear why we want to move `commit()` from ProcessorContext
>> to
>>>> RecordContext?
>>>>
>>>
>>> -
>>> Because it makes sense logically and  to reduce code maintenance (both
>>> interfaces have offset() timestamp() topic() partition() methods),  I
>>> inherit ProcessorContext from RecordContext.
>>> Since we need commit() method both in ProcessorContext and in
>> RecordContext
>>> I move commit() method to parent class (RecordContext).
>>>
>>>
>>> Cheers,
>>> Jeyhun
>>>
>>>
>>>
>>> On Wed, Oct 11, 2017 at 12:59 AM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>>> Jeyhun,
>>>>
>>>> Thanks for the updated KIP, here are my comments.
>>>>
>>>> 0. RichInitializer definition seems missing.
>>>>
>>>> 1. I'd suggest moving the key parameter in the RichValueXX and
>>> RichReducer
>>>> after the value parameters, as well as in the templates; e.g.
>>>>
>>>> public interface RichValueJoiner<V1, V2, VR, K> {
>>>>     VR apply(final V1 value1, final V2 value2, final K key, final
>>>> RecordContext
>>>> recordContext);
>>>> }
>>>>
>>>> My motivation is that for lambda expression in J8, users that would not
>>>> care about the key but only the context, or vice versa, is likely to
>>> write
>>>> it as (value1, value2, dummy, context) -> ... than putting the dummy at
>>> the
>>>> beginning of the parameter list. Generally speaking we'd like to make
>> all
>>>> the "necessary" parameters prior to optional ones.
>>>>
>>>>
>>>> 2. Some of the listed functions are not necessary since their pairing
>>> APIs
>>>> are being deprecated in 1.0 already:
>>>>
>>>> <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K,
>> ?
>>>> super V, KR> selector,
>>>>                                    final Serde<KR> keySerde,
>>>>                                    final Serde<V> valSerde);
>>>>
>>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
>>>>                                  final RichValueJoiner<? super K, ?
>> super
>>>> V,
>>>> ? super VT, ? extends VR> joiner,
>>>>                                  final Serde<K> keySerde,
>>>>                                  final Serde<V> valSerde);
>>>>
>>>>
>>>>
>>>> 3. For a few functions where we are adding three APIs for a combo of
>> both
>>>> mapper / joiner, or both initializer / aggregator, or adder /
>> subtractor,
>>>> I'm wondering if we can just keep one that use "rich" functions for
>> both;
>>>> so that we can have less overloads and let users who only want to
>> access
>>>> one of them to just use dummy parameter declarations. For example:
>>>>
>>>>
>>>> <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV>
>> globalKTable,
>>>>                                  final RichKeyValueMapper<? super K, ?
>>>> super
>>>>  V, ? extends GK> keyValueMapper,
>>>>                                  final RichValueJoiner<? super K, ?
>> super
>>>> V,
>>>> ? super GV, ? extends RV> joiner);
>>>>
>>>> <VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer,
>>>>                              final RichAggregator<? super K, ? super V,
>>> VR>
>>>> aggregator,
>>>>                              final Materialized<K, VR,
>>> KeyValueStore<Bytes,
>>>> byte[]>> materialized);
>>>>
>>>> Similarly for KGroupedTable, a bunch of aggregate() are deprecated so
>> we
>>> do
>>>> not need to add its rich functions any more.
>>>>
>>>>
>>>> 4. For TimeWindowedKStream, I'm wondering why we do not make its
>>>> Initializer also "rich" functions? I.e.
>>>>
>>>> <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K>
>>>> initializer,
>>>>                                        final RichAggregator<? super K,
>> ?
>>>> super V, VR> aggregator);
>>>> <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K>
>>>> initializer,
>>>>                                        final RichAggregator<? super K,
>> ?
>>>> super V, VR> aggregator,
>>>>                                        final Materialized<K, VR,
>>>> WindowStore<Bytes, byte[]>> materialized);
>>>>
>>>>
>>>> 5. We need to move "RecordContext" from o.a.k.processor.internals to
>>>> o.a.k.processor.
>>>>
>>>> 6. I'm not clear why we want to move `commit()` from ProcessorContext
>> to
>>>> RecordContext? Conceptually I think it would better staying in the
>>>> ProcessorContext. Do you find this not doable in the internal
>>>> implementations?
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>>
>>>> On Fri, Sep 22, 2017 at 1:09 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>>    recordContext = new RecordContext() {               //
>> recordContext
>>>>> initialization is added in this KIP
>>>>>
>>>>> This code snippet seems to be standard - would it make sense to pull
>> it
>>>>> into a (sample) RecordContext implementation ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Fri, Sep 22, 2017 at 12:14 PM, Jeyhun Karimov <
>> je.kari...@gmail.com
>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi Ted,
>>>>>>
>>>>>> Thanks for your comments. I added a couple of comments in KIP to
>>>> clarify
>>>>>> some points.
>>>>>>
>>>>>>
>>>>>> bq. provides a hybrd solution
>>>>>>> Typo in hybrid.
>>>>>>
>>>>>>
>>>>>> - My bad. Thanks for the correction.
>>>>>>
>>>>>> It would be nice if you can name some Value operator as examples.
>>>>>>
>>>>>>
>>>>>>>
>>>>>> - I added the corresponding interface names to KIP.
>>>>>>
>>>>>>
>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>>                              final Aggregator<? super K, ? super
>> V,
>>>> VR>
>>>>>>> adder,
>>>>>>> The adder doesn't need to be RichAggregator ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> - Exactly. However, there are 2 Aggregator-type arguments in the
>>>> related
>>>>>> method. So, I had to overload all possible their Rich counterparts:
>>>>>>
>>>>>> // adder with non-rich, subtrctor is rich
>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>                              final Aggregator<? super K, ? super V,
>>> VR>
>>>>>> adder,
>>>>>>                              final RichAggregator<? super K, ?
>> super
>>> V,
>>>>> VR>
>>>>>> subtractor,
>>>>>>                              final Materialized<K, VR,
>>>>> KeyValueStore<Bytes,
>>>>>> byte[]>> materialized);
>>>>>>
>>>>>> // adder withrich, subtrctor is non-rich
>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>                              final RichAggregator<? super K, ?
>> super
>>> V,
>>>>> VR>
>>>>>> adder,
>>>>>>                              final Aggregator<? super K, ? super V,
>>> VR>
>>>>>> subtractor,
>>>>>>                              final Materialized<K, VR,
>>>>> KeyValueStore<Bytes,
>>>>>> byte[]>> materialized);
>>>>>>
>>>>>> // both adder and subtractor are rich
>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>                              final RichAggregator<? super K, ?
>> super
>>> V,
>>>>> VR>
>>>>>> adder,
>>>>>>                              final RichAggregator<? super K, ?
>> super
>>> V,
>>>>> VR>
>>>>>> subtractor,
>>>>>>                              final Materialized<K, VR,
>>>>> KeyValueStore<Bytes,
>>>>>> byte[]>> materialized);
>>>>>>
>>>>>>
>>>>>> Can you explain a bit about the above implementation ?
>>>>>>>    void commit () {
>>>>>>>      throw new UnsupportedOperationException("commit() is not
>>>>> supported
>>>>>> in
>>>>>>> this context");
>>>>>>> Is the exception going to be replaced with real code in the PR ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> - I added some comments both inside and outside the code snippets
>> in
>>>> KIP.
>>>>>> Specifically, for the code snippet above, we add *commit()* method
>> to
>>>>>> *RecordContext* interface.
>>>>>> However, we want  *commit()* method to be used only for
>>> *RecordContext*
>>>>>> instances (at least for now), so we add
>> UnsupportedOperationException
>>>> in
>>>>>> all classes/interfaces that extend/implement *RecordContext.*
>>>>>> In general, 1) we make RecordContext publicly available within
>>>>>> ProcessorContext,  2) initialize its instance within all required
>>>>>> Processors and 3) pass it as an argument to the related Rich
>>> interfaces
>>>>>> inside Processors.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> Jeyhun
>>>>>>
>>>>>> On Fri, Sep 22, 2017 at 6:44 PM Ted Yu <yuzhih...@gmail.com>
>> wrote:
>>>>>>
>>>>>>> bq. provides a hybrd solution
>>>>>>>
>>>>>>> Typo in hybrid.
>>>>>>>
>>>>>>> bq. accessing read-only keys within XXXValues operators
>>>>>>>
>>>>>>> It would be nice if you can name some Value operator as examples.
>>>>>>>
>>>>>>> <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
>>>>>>>                              final Aggregator<? super K, ? super
>> V,
>>>> VR>
>>>>>>> adder,
>>>>>>>
>>>>>>> The adder doesn't need to be RichAggregator ?
>>>>>>>
>>>>>>>   public RecordContext recordContext() {
>>>>>>>     return this.recordContext();
>>>>>>>
>>>>>>> Can you explain a bit about the above implementation ?
>>>>>>>
>>>>>>>    void commit () {
>>>>>>>      throw new UnsupportedOperationException("commit() is not
>>>>> supported
>>>>>> in
>>>>>>> this context");
>>>>>>>
>>>>>>> Is the exception going to be replaced with real code in the PR ?
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 22, 2017 at 9:28 AM, Jeyhun Karimov <
>>>> je.kari...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Dear community,
>>>>>>>>
>>>>>>>> I updated the related KIP [1]. Please feel free to comment.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Jeyhun
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 22, 2017 at 12:20 AM Jeyhun Karimov <
>>>>> je.kari...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Damian,
>>>>>>>>>
>>>>>>>>> Thanks for the update. I working on it and will provide an
>>> update
>>>>>> soon.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Jeyhun
>>>>>>>>>
>>>>>>>>> On Thu, Sep 21, 2017 at 4:50 PM Damian Guy <
>>> damian....@gmail.com
>>>>>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>
>>>>>>>>>> All KIP-182 API PRs have now been merged. So you can
>> consider
>>> it
>>>>> as
>>>>>>>>>> stable.
>>>>>>>>>> Thanks,
>>>>>>>>>> Damian
>>>>>>>>>>
>>>>>>>>>> On Thu, 21 Sep 2017 at 15:23 Jeyhun Karimov <
>>>> je.kari...@gmail.com
>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot for your comments. For the single interface
>>>>> (RichXXX
>>>>>>> and
>>>>>>>>>>> XXXWithKey) solution, I have already submitted a PR but
>>>> probably
>>>>>> it
>>>>>>> is
>>>>>>>>>>> outdated (when the KIP first proposed), I need to revisit
>>> that
>>>>>> one.
>>>>>>>>>>>
>>>>>>>>>>> @Guozhang, from our (offline) discussion, I understood
>> that
>>> we
>>>>> may
>>>>>>> not
>>>>>>>>>> make
>>>>>>>>>>> it merge this KIP into the upcoming release, as KIP-159 is
>>> not
>>>>>> voted
>>>>>>>> yet
>>>>>>>>>>> (because we want both KIP-149 and KIP-159 to be as an
>>> "atomic"
>>>>>>> merge).
>>>>>>>>>> So
>>>>>>>>>>> I decided to wait until KIP-182 gets stable (there are
>> some
>>>>> minor
>>>>>>>>>> updates
>>>>>>>>>>> AFAIK) and update the KIP accordingly. Please correct me
>> if
>>> I
>>>> am
>>>>>>> wrong
>>>>>>>>>> or I
>>>>>>>>>>> misunderstood.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Jeyhun
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Sep 21, 2017 at 4:11 PM Damian Guy <
>>>>> damian....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> +1
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, 21 Sep 2017 at 13:46 Guozhang Wang <
>>>>> wangg...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> +1 for me as well for collapsing.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jeyhun, could you update the wiki accordingly to show
>>>> what's
>>>>>> the
>>>>>>>>>> final
>>>>>>>>>>>>> updates post KIP-182 that needs to be done in KIP-159
>>>>>> including
>>>>>>>>>>> KIP-149?
>>>>>>>>>>>>> The child page I made is just a suggestion, but you
>>> would
>>>>>> still
>>>>>>>>>> need to
>>>>>>>>>>>>> update your proposal for people to comment and vote
>> on.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Sep 14, 2017 at 10:37 PM, Ted Yu <
>>>>> yuzhih...@gmail.com
>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> +1
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One interface is cleaner.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Sep 14, 2017 at 7:26 AM, Bill Bejeck <
>>>>>>> bbej...@gmail.com
>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> +1 for me on collapsing the RichXXXX and
>>>>> ValueXXXXWithKey
>>>>>>>>>>> interfaces
>>>>>>>>>>>>>> into 1
>>>>>>>>>>>>>>> interface.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Sep 13, 2017 at 11:31 AM, Jeyhun Karimov <
>>>>>>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Damian,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your feedback. Actually, this (what
>> you
>>>>>>> propose)
>>>>>>>>>> was
>>>>>>>>>>> the
>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>> idea of KIP-149. Then we decided to divide it
>> into
>>>> two
>>>>>>>> KIPs. I
>>>>>>>>>>> also
>>>>>>>>>>>>>>>> expressed my opinion that keeping the two
>>> interfaces
>>>>>> (Rich
>>>>>>>> and
>>>>>>>>>>>>> withKey)
>>>>>>>>>>>>>>>> separate would add more overloads. So, email
>>>>> discussion
>>>>>>>>>> resulted
>>>>>>>>>>>> that
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> would not be a problem.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Our initial idea was similar to :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> public abstract class RichValueMapper<K, V, VR>
>>>>>>> implements
>>>>>>>>>>>>>>>> ValueMapperWithKey<K, V, VR>, RichFunction {
>>>>>>>>>>>>>>>> ......
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So, we check the type of object, whether it is
>>>> RichXXX
>>>>>> or
>>>>>>>>>>>> XXXWithKey
>>>>>>>>>>>>>>> inside
>>>>>>>>>>>>>>>> the called method and continue accordingly.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If this is ok with the community, I would like
>> to
>>>>> revert
>>>>>>> the
>>>>>>>>>>>> current
>>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>> to this again.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Sep 13, 2017 at 3:02 PM Damian Guy <
>>>>>>>>>> damian....@gmail.com
>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for sending out the update. I guess i
>> was
>>>>>>> thinking
>>>>>>>>>> more
>>>>>>>>>>>>> along
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> lines of option 2 where we collapse the
>> RichXXXX
>>>> and
>>>>>>>>>>>>> ValueXXXXWithKey
>>>>>>>>>>>>>>> etc
>>>>>>>>>>>>>>>>> interfaces into 1 interface that has all of
>> the
>>>>>>>> arguments. I
>>>>>>>>>>>> think
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>> only need to add one additional overload for
>>> each
>>>>>>>> operator?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, 13 Sep 2017 at 10:59 Jeyhun Karimov <
>>>>>>>>>>>> je.kari...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Dear all,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I would like to resume the discussion on
>>>> KIP-159.
>>>>> I
>>>>>>> (and
>>>>>>>>>>>>> Guozhang)
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>> that releasing KIP-149 and KIP-159 in the
>> same
>>>>>> release
>>>>>>>>>> would
>>>>>>>>>>>> make
>>>>>>>>>>>>>>> sense
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> avoid a release with "partial" public APIs.
>>>> There
>>>>>> is a
>>>>>>>> KIP
>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>> Guozhang (and approved by me) to unify both
>>>> KIPs.
>>>>>>>>>>>>>>>>>> Please feel free to comment on this.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
>>>> confluence/pages/viewpage.
>>>>>>>>>>>>>>>> action?pageId=73637757
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Jul 21, 2017 at 2:00 AM Jeyhun
>>> Karimov <
>>>>>>>>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Matthias, Damian, all,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for your comments and sorry for
>>>>> super-late
>>>>>>>>>> update.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Sure, the DSL refactoring is not blocking
>>> for
>>>>> this
>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>> I made some changes to KIP document based
>> on
>>>> my
>>>>>>>>>> prototype.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Please feel free to comment.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Jul 7, 2017 at 9:35 PM Matthias J.
>>>> Sax <
>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I would not block this KIP with regard to
>>> DSL
>>>>>>>>>> refactoring.
>>>>>>>>>>>>> IMHO,
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>> just finish this one and the DSL
>>> refactoring
>>>>> will
>>>>>>>> help
>>>>>>>>>>> later
>>>>>>>>>>>>> on
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> reduce the number of overloads.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 7/7/17 5:28 AM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>>>>> I am following the related thread in
>> the
>>>>>> mailing
>>>>>>>> list
>>>>>>>>>>> and
>>>>>>>>>>>>>>> looking
>>>>>>>>>>>>>>>>>>>> forward
>>>>>>>>>>>>>>>>>>>>> for one-shot solution for overloads
>>> issue.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, Jul 7, 2017 at 10:32 AM Damian
>>> Guy
>>>> <
>>>>>>>>>>>>>>> damian....@gmail.com>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> About overrides, what other
>> alternatives
>>>> do
>>>>> we
>>>>>>>> have?
>>>>>>>>>>> For
>>>>>>>>>>>>>>>>>>>>>>> backwards-compatibility we have to
>> add
>>>>> extra
>>>>>>>>>> methods
>>>>>>>>>>> to
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>>>> ones.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> It wasn't clear to me in the KIP if
>>> these
>>>>> are
>>>>>>> new
>>>>>>>>>>> methods
>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>> replacing
>>>>>>>>>>>>>>>>>>>>>> existing ones.
>>>>>>>>>>>>>>>>>>>>>> Also, we are currently discussing
>>> options
>>>>> for
>>>>>>>>>> replacing
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> overrides.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> About ProcessorContext vs
>>> RecordContext,
>>>>> you
>>>>>>> are
>>>>>>>>>>> right.
>>>>>>>>>>>> I
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>> need to
>>>>>>>>>>>>>>>>>>>>>>> implement a prototype to understand
>> the
>>>>> full
>>>>>>>>>> picture
>>>>>>>>>>> as
>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>> parts
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> KIP might not be as straightforward
>> as
>>> I
>>>>>>> thought.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jul 5, 2017 at 10:40 AM
>> Damian
>>>> Guy
>>>>> <
>>>>>>>>>>>>>>>> damian....@gmail.com>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> HI Jeyhun,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Is the intention that these methods
>>> are
>>>>> new
>>>>>>>>>> overloads
>>>>>>>>>>>> on
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> KStream,
>>>>>>>>>>>>>>>>>>>>>>>> KTable, etc?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> It is worth noting that a
>>>> ProcessorContext
>>>>>> is
>>>>>>>> not
>>>>>>>>>> a
>>>>>>>>>>>>>>>>> RecordContext.
>>>>>>>>>>>>>>>>>> A
>>>>>>>>>>>>>>>>>>>>>>>> RecordContext, as it stands, only
>>> exists
>>>>>>> during
>>>>>>>>>> the
>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>>>>> record. Whereas the ProcessorContext
>>>>> exists
>>>>>>> for
>>>>>>>>>> the
>>>>>>>>>>>>>> lifetime
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> Processor. Sot it doesn't make sense
>>> to
>>>>>> cast a
>>>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>> RecordContext.
>>>>>>>>>>>>>>>>>>>>>>>> You mentioned above passing the
>>>>>>>>>>>> InternalProcessorContext
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> init()
>>>>>>>>>>>>>>>>>>>>>>>> calls. It is internal for a reason
>>> and i
>>>>>> think
>>>>>>>> it
>>>>>>>>>>>> should
>>>>>>>>>>>>>>> remain
>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> way.
>>>>>>>>>>>>>>>>>>>>>>>> It might be better to move the
>>>>>> recordContext()
>>>>>>>>>> method
>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>> InternalProcessorContext to
>>>>>> ProcessorContext.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> In the KIP you have an example
>>> showing:
>>>>>>>>>>>>>>>>>>>>>>>> richMapper.init((RecordContext)
>>>>>>>> processorContext);
>>>>>>>>>>>>>>>>>>>>>>>> But the interface is:
>>>>>>>>>>>>>>>>>>>>>>>> public interface RichValueMapper<V,
>>> VR>
>>>> {
>>>>>>>>>>>>>>>>>>>>>>>>     VR apply(final V value, final
>>>>>>> RecordContext
>>>>>>>>>>>>>>> recordContext);
>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>> i.e., there is no init(...), besides
>>> as
>>>>>> above
>>>>>>>> this
>>>>>>>>>>>>> wouldn't
>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>> sense.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 4 Jul 2017 at 23:30 Jeyhun
>>>>> Karimov <
>>>>>>>>>>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Actually my intend was to provide
>> to
>>>>>>>>>> RichInitializer
>>>>>>>>>>>> and
>>>>>>>>>>>>>>> later
>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>> provide the context of the record
>> as
>>>> you
>>>>>> also
>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>>>>>>>>>>>>> I remove that not to confuse the
>>> users.
>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the RecordContext and
>>>>>>>> ProcessorContext
>>>>>>>>>>>>>>> interfaces, I
>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>> realized the
>> InternalProcessorContext
>>>>>> class.
>>>>>>>>>> Can't
>>>>>>>>>>> we
>>>>>>>>>>>>> pass
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>>>>>>>>>>>> parameter to init() method of
>>>> processors?
>>>>>>> Then
>>>>>>>> we
>>>>>>>>>>>> would
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> able
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>>>>>>>> RecordContext easily with just a
>>> method
>>>>>> call.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jun 29, 2017 at 10:14 PM
>>>> Matthias
>>>>>> J.
>>>>>>>> Sax
>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> One more thing:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I don't think `RichInitializer`
>> does
>>>>> make
>>>>>>>>>> sense. As
>>>>>>>>>>>> we
>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>>>> input record, there is also no
>>>> context.
>>>>> We
>>>>>>>>>> could of
>>>>>>>>>>>>>> course
>>>>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> context of the record that
>> triggers
>>>> the
>>>>>> init
>>>>>>>>>> call,
>>>>>>>>>>>> but
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>> semantically questionable. Also,
>> the
>>>>>> context
>>>>>>>> for
>>>>>>>>>>> this
>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>> be provided by the consecutive
>> call
>>> to
>>>>>>>> aggregate
>>>>>>>>>>>>> anyways.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On 6/29/17 1:11 PM, Matthias J.
>> Sax
>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for updating the KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> I have one concern with regard to
>>>>>> backward
>>>>>>>>>>>>>> compatibility.
>>>>>>>>>>>>>>>> You
>>>>>>>>>>>>>>>>>>>>>>> suggest
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> use RecrodContext as base
>> interface
>>>> for
>>>>>>>>>>>>>> ProcessorContext.
>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>>>>> break compatibility.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> I think, we should just have two
>>>>>>> independent
>>>>>>>>>>>>> interfaces.
>>>>>>>>>>>>>>> Our
>>>>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessorContextImpl class would
>>>>>> implement
>>>>>>>>>> both.
>>>>>>>>>>>> This
>>>>>>>>>>>>>>> allows
>>>>>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> cast
>>>>>>>>>>>>>>>>>>>>>>>>>>> it to `RecordContext` and thus
>>> limit
>>>>> the
>>>>>>>>>> visible
>>>>>>>>>>>>> scope.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On 6/27/17 1:35 PM, Jeyhun
>> Karimov
>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I updated the KIP w.r.t.
>>> discussion
>>>>> and
>>>>>>>>>> comments.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Basically I eliminated overloads
>>> for
>>>>>>>>>> particular
>>>>>>>>>>>>> method
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>>>> than 3.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> As we can see there are a lot of
>>>>>> overloads
>>>>>>>>>> (and
>>>>>>>>>>>> more
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> come
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-149
>>>>>>>>>>>>>>>>>>>>>>>>>>>> :) )
>>>>>>>>>>>>>>>>>>>>>>>>>>>> So, is it wise to
>>>>>>>>>>>>>>>>>>>>>>>>>>>> wait the result of constructive
>>> DSL
>>>>>> thread
>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>>>> extend KIP to address this issue
>>> as
>>>>> well
>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>>>> continue as it is?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 14, 2017 at 11:29 PM
>>>>>> Guozhang
>>>>>>>>>> Wang <
>>>>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LGTM. Thanks!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 13, 2017 at 2:20
>> PM,
>>>>> Jeyhun
>>>>>>>>>> Karimov
>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>> je.kari...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comment
>> Matthias.
>>>>> After
>>>>>>> all
>>>>>>>>>> the
>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>>>>>>> (thanks
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> participants), I think this
>>>> (single
>>>>>>> method
>>>>>>>>>> that
>>>>>>>>>>>>>> passes
>>>>>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> object) is the best
>> alternative.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just a side note: I think
>>>> KAFKA-3907
>>>>>> [1]
>>>>>>>> can
>>>>>>>>>>> also
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> integrated
>>>>>>>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP by adding related method
>>>> inside
>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>> interface.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3907
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jun 13, 2017 at 7:50
>> PM
>>>>>> Matthias
>>>>>>>> J.
>>>>>>>>>>> Sax <
>>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to push this
>>>>> discussion
>>>>>>>>>> further.
>>>>>>>>>>> It
>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>> got
>>>>>>>>>>>>>>>>>>>>>>>> nice
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternatives (thanks for the
>>>>> summary
>>>>>>>>>> Jeyhun!).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> With respect to RichFunctions
>>> and
>>>>>>>> allowing
>>>>>>>>>>> them
>>>>>>>>>>>> to
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> stateful, I
>>>>>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my doubt as expressed
>> already.
>>>> From
>>>>>> my
>>>>>>>>>>>>>> understanding,
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> idea
>>>>>>>>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> give access to record
>> metadata
>>>>>>>> information
>>>>>>>>>>> only.
>>>>>>>>>>>>> If
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stateful computation you
>> should
>>>>>> rather
>>>>>>>> use
>>>>>>>>>>>>>>> #transform().
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Furthermore, as pointed out,
>> we
>>>>> would
>>>>>>>> need
>>>>>>>>>> to
>>>>>>>>>>>>> switch
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> supplier-pattern introducing
>>> many
>>>>>> more
>>>>>>>>>>>> overloads.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For those reason, I advocate
>>> for
>>>> a
>>>>>>> simple
>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>> with a
>>>>>>>>>>>>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that passes in a
>> RecordContext
>>>>>> object.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 6/6/17 5:15 PM, Guozhang
>>> Wang
>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the comprehensive
>>>>>> summary!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally I'd prefer the
>>> option
>>>>> of
>>>>>>>>>> passing
>>>>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameter into he overloaded
>>>>>> function.
>>>>>>>> But
>>>>>>>>>>> I'm
>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>> open
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> arguments
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if there are sth. that I
>> have
>>>>>>>> overlooked.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 5, 2017 at 3:19
>>> PM,
>>>>>> Jeyhun
>>>>>>>>>>> Karimov
>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your comments
>>>> Matthias
>>>>>> and
>>>>>>>>>>>> Guozhang.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Below I mention the quick
>>>> summary
>>>>>> of
>>>>>>>> the
>>>>>>>>>>> main
>>>>>>>>>>>>>>>>> alternatives
>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>>> looked
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce the Rich
>> functions
>>> (I
>>>>>> will
>>>>>>>>>> refer
>>>>>>>>>>> to
>>>>>>>>>>>> it
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>> Rich
>>>>>>>>>>>>>>>>>>>>>>>>> functions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> until we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> find better/another name).
>>>>>> Initially
>>>>>>>> the
>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>>> alternatives
>>>>>>>>>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backwards-compatible, so I
>>> will
>>>>> not
>>>>>>>>>> mention
>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The related discussions are
>>>>> spread
>>>>>> in
>>>>>>>>>>> KIP-149
>>>>>>>>>>>>> and
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (KIP-159)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion threads.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. The idea of rich
>> functions
>>>>> came
>>>>>>> into
>>>>>>>>>> the
>>>>>>>>>>>>> stage
>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>> KIP-149,
>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion thread. As a
>>> result
>>>> we
>>>>>>>>>> extended
>>>>>>>>>>>>> KIP-149
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>>>>>>>> Rich
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> functions as well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2.  To as part of the Rich
>>>>>> functions,
>>>>>>>> we
>>>>>>>>>>>>> provided
>>>>>>>>>>>>>>> init
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (ProcessorContext)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> method. Afterwards, Dammian
>>>>>> suggested
>>>>>>>>>> that
>>>>>>>>>>> we
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessorContext to users.
>>> As a
>>>>>>> result,
>>>>>>>>>> we
>>>>>>>>>>>>>> separated
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> two
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problems
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> two separate KIPs, as it
>>> seems
>>>>> they
>>>>>>> can
>>>>>>>>>> be
>>>>>>>>>>>>> solved
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>> parallel.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - One approach we
>> considered
>>>> was
>>>>> :
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface
>>>>>>> ValueMapperWithKey<K,
>>>>>>>> V,
>>>>>>>>>>> VR>
>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     VR apply(final K key,
>>>> final V
>>>>>>>> value);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface
>>>>> RichValueMapper<K,
>>>>>>> V,
>>>>>>>>>> VR>
>>>>>>>>>>>>> extends
>>>>>>>>>>>>>>>>>>>>>>>> RichFunction{
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface
>>> RichFunction {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     void init(RecordContext
>>>>>>>>>> recordContext);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     void close();
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public interface
>>> RecordContext
>>>> {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     String applicationId();
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     TaskId taskId();
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     StreamsMetrics
>> metrics();
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     String topic();
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     int partition();
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     long offset();
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     long timestamp();
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     Map<String, Object>
>>>>>> appConfigs();
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     Map<String, Object>
>>>>>>>>>>>>>> appConfigsWithPrefix(String
>>>>>>>>>>>>>>>>>>>>>> prefix);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to