One correction:
and in runtime (inside processor) we still have to check it is ValueMapper
> or ValueMapperWithKey before wrapping it into the rich function.


this will be in compile time in API level.



Cheers,
Jeyhun


On Tue, May 9, 2017 at 11:55 AM Jeyhun Karimov <je.kari...@gmail.com> wrote:

> Hi,
>
> Thanks for feedback.
> Then we need to overload method
>   <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR>
> mapper);
> with
>   <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends
> VR> mapper);
>
> and in runtime (inside processor) we still have to check it is ValueMapper
> or ValueMapperWithKey before wrapping it into the rich function.
>
>
> Please correct me if I am wrong.
>
> Cheers,
> Jeyhun
>
>
>
>
> On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
> michal.borowie...@openbet.com> wrote:
>
>> +1 :)
>>
>>
>> On 08/05/17 23:52, Matthias J. Sax wrote:
>> > Hi,
>> >
>> > I was reading the updated KIP and I am wondering, if we should do the
>> > design a little different.
>> >
>> > Instead of distinguishing between a RichFunction and non-RichFunction at
>> > runtime level, we would use RichFunctions all the time. Thus, on the DSL
>> > entry level, if a user provides a non-RichFunction, we wrap it by a
>> > RichFunction that is fully implemented by Streams. This RichFunction
>> > would just forward the call omitting the key:
>> >
>> > Just to sketch the idea (incomplete code snippet):
>> >
>> >> public StreamsRichValueMapper implements RichValueMapper() {
>> >>    private ValueMapper userProvidedMapper; // set by constructor
>> >>
>> >>    public VR apply(final K key, final V1 value1, final V2 value2) {
>> >>        return userProvidedMapper(value1, value2);
>> >>    }
>> >> }
>> >
>> >  From a performance point of view, I am not sure if the
>> > "if(isRichFunction)" including casts etc would have more overhead than
>> > this approach (we would do more nested method call for non-RichFunction
>> > which should be more common than RichFunctions).
>> >
>> > This approach should not effect lambdas (or do I miss something?) and
>> > might be cleaner, as we could have one more top level interface
>> > `RichFunction` with methods `init()` and `close()` and also interfaces
>> > for `RichValueMapper` etc. (thus, no abstract classes required).
>> >
>> >
>> > Any thoughts?
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
>> >> Hi,
>> >>
>> >> Thanks for comments. I extended PR and KIP to include rich functions. I
>> >> will still have to evaluate the cost of deep copying of keys.
>> >>
>> >> Cheers,
>> >> Jeyhun
>> >>
>> >> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <
>> mathieu.fenn...@replicon.com>
>> >> wrote:
>> >>
>> >>> Hey Matthias,
>> >>>
>> >>> My opinion would be that documenting the immutability of the key is
>> the
>> >>> best approach available.  Better than requiring the key to be
>> serializable
>> >>> (as with Jeyhun's last pass at the PR), no performance risk.
>> >>>
>> >>> It'd be different if Java had immutable type constraints of some
>> kind. :-)
>> >>>
>> >>> Mathieu
>> >>>
>> >>>
>> >>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <
>> matth...@confluent.io>
>> >>> wrote:
>> >>>
>> >>>> Agreed about RichFunction. If we follow this path, it should cover
>> >>>> all(?) DSL interfaces.
>> >>>>
>> >>>> About guarding the key -- I am still not sure what to do about it...
>> >>>> Maybe it might be enough to document it (and name the key parameter
>> like
>> >>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
>> >>>> guard against any modification, but I have no good idea how to do
>> this.
>> >>>> Not sure what others think about the risk of corrupted partitioning
>> >>>> (what would be a user error and we could say, well, bad luck, you
>> got a
>> >>>> bug in your code, that's not our fault), vs deep copy with a
>> potential
>> >>>> performance hit (that we can't quantity atm without any performance
>> >>> test).
>> >>>> We do have a performance system test. Maybe it's worth for you to
>> apply
>> >>>> the deep copy strategy and run the test. It's very basic performance
>> >>>> test only, but might give some insight. If you want to do this, look
>> >>>> into folder "tests" for general test setup, and into
>> >>>> "tests/kafaktests/benchmarks/streams" to find find the perf test.
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
>> >>>>> Hi Matthias,
>> >>>>>
>> >>>>> I think extending KIP to include RichFunctions totally  makes sense.
>> >>> So,
>> >>>>>   we don't want to guard the keys because it is costly.
>> >>>>> If we introduce RichFunctions I think it should not be limited only
>> >>> the 3
>> >>>>> interfaces proposed in KIP but to wide range of interfaces.
>> >>>>> Please correct me if I am wrong.
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Jeyhun
>> >>>>>
>> >>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <
>> matth...@confluent.io
>> >>>>> wrote:
>> >>>>>
>> >>>>>> One follow up. There was this email on the user list:
>> >>>>>>
>> >>>>>>
>> >>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
>> >>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
>> >>>>>> It might make sense so include Initializer, Adder, and Substractor
>> >>>>>> inferface, too.
>> >>>>>>
>> >>>>>> And we should double check if there are other interface we might
>> miss
>> >>>> atm.
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>>
>> >>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
>> >>>>>>> Thanks for updating the KIP.
>> >>>>>>>
>> >>>>>>> Deep copying the key will work for sure, but I am actually a
>> little
>> >>> bit
>> >>>>>>> worried about performance impact... We might want to do some test
>> to
>> >>>>>>> quantify this impact.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Btw: this remind me about the idea of `RichFunction` interface
>> that
>> >>>>>>> would allow users to access record metadata (like timestamp,
>> offset,
>> >>>>>>> partition etc) within DSL. This would be a similar concept. Thus,
>> I
>> >>> am
>> >>>>>>> wondering, if it would make sense to enlarge the scope of this
>> KIP by
>> >>>>>>> that? WDYT?
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> -Matthias
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
>> >>>>>>>> Hi Mathieu,
>> >>>>>>>>
>> >>>>>>>> Thanks for feedback. I followed similar approach and updated PR
>> and
>> >>>> KIP
>> >>>>>>>> accordingly. I tried to guard the key in Processors sending a
>> copy
>> >>> of
>> >>>> an
>> >>>>>>>> actual key.
>> >>>>>>>> Because I am doing deep copy of an object, I think memory can be
>> >>>>>> bottleneck
>> >>>>>>>> in some use-cases.
>> >>>>>>>>
>> >>>>>>>> Cheers,
>> >>>>>>>> Jeyhun
>> >>>>>>>>
>> >>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <
>> >>>>>> mathieu.fenn...@replicon.com>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi Jeyhun,
>> >>>>>>>>>
>> >>>>>>>>> This approach would change ValueMapper (...etc) to be classes,
>> >>> rather
>> >>>>>> than
>> >>>>>>>>> interfaces, which is also a backwards incompatible change.  An
>> >>>>>> alternative
>> >>>>>>>>> approach that would be backwards compatible would be to define
>> new
>> >>>>>>>>> interfaces, and provide overrides where those interfaces are
>> used.
>> >>>>>>>>>
>> >>>>>>>>> Unfortunately, making the key parameter as "final" doesn't
>> change
>> >>>> much
>> >>>>>>>>> about guarding against key change.  It only prevents the
>> parameter
>> >>>>>> variable
>> >>>>>>>>> from being reassigned.  If the key type is a mutable object (eg.
>> >>>>>> byte[]),
>> >>>>>>>>> it can still be mutated. (eg. key[0] = 0).  But I'm not really
>> sure
>> >>>>>> there's
>> >>>>>>>>> much that can be done about that.
>> >>>>>>>>>
>> >>>>>>>>> Mathieu
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <
>> >>> je.kari...@gmail.com
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Thanks for comments.
>> >>>>>>>>>>
>> >>>>>>>>>> The concerns makes sense. Although we can guard for immutable
>> keys
>> >>>> in
>> >>>>>>>>>> current implementation (with few changes), I didn't consider
>> >>>> backward
>> >>>>>>>>>> compatibility.
>> >>>>>>>>>>
>> >>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user
>> >>>> accesses
>> >>>>>>>>> the
>> >>>>>>>>>> key in Object type, as passing extra type parameter will break
>> >>>>>>>>>> backwards-compatibility.  So user has to cast to actual key
>> type.
>> >>>>>>>>>>
>> >>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key
>> and
>> >>>>>> value)
>> >>>>>>>>>> and force key to be *final*. By doing this,  I think we can
>> >>> address
>> >>>>>> both
>> >>>>>>>>>> backward-compatibility and guarding against key change.
>> >>>>>>>>>>
>> >>>>>>>>>> 2. Secondly, we can create class KeyAccess like:
>> >>>>>>>>>>
>> >>>>>>>>>> public class KeyAccess {
>> >>>>>>>>>>      Object key;
>> >>>>>>>>>>      public void beforeApply(final Object key) {
>> >>>>>>>>>>          this.key = key;
>> >>>>>>>>>>      }
>> >>>>>>>>>>      public Object getKey() {
>> >>>>>>>>>>          return key;
>> >>>>>>>>>>      }
>> >>>>>>>>>> }
>> >>>>>>>>>>
>> >>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and *ValueTransformer*
>> >>> from
>> >>>>>>>>>> *KeyAccess*. Inside processor (for example
>> >>>> *KTableMapValuesProcessor*)
>> >>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by
>> >>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use
>> *getKey()* to
>> >>>>>> access
>> >>>>>>>>>> the key inside *apply(value)* method.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Cheers,
>> >>>>>>>>>> Jeyhun
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <
>> >>>> matth...@confluent.io
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Jeyhun,
>> >>>>>>>>>>>
>> >>>>>>>>>>> thanks a lot for the KIP!
>> >>>>>>>>>>>
>> >>>>>>>>>>> I think there are two issues we need to address:
>> >>>>>>>>>>>
>> >>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users
>> did
>> >>>>>>>>> complain
>> >>>>>>>>>>> about this in past releases already, and as the user base
>> grows,
>> >>> we
>> >>>>>>>>>>> should not break backward compatibility in future releases
>> >>> anymore.
>> >>>>>>>>>>> Thus, we should think of a better way to allow key access.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Mathieu's comment goes into the same direction
>> >>>>>>>>>>>
>> >>>>>>>>>>>>> On the other hand, the number of compile failures that would
>> >>> need
>> >>>>>> to
>> >>>>>>>>>> be
>> >>>>>>>>>>>>> fixed from this change is unfortunate. :-)
>> >>>>>>>>>>> (2) Another concern is, that there is no guard to prevent user
>> >>> code
>> >>>>>> to
>> >>>>>>>>>>> modify the key. This might corrupt partitioning if users do
>> alter
>> >>>> the
>> >>>>>>>>>>> key (accidentally -- or users are just not aware that they are
>> >>> not
>> >>>>>>>>>>> allowed to modify the provided key object) and thus break the
>> >>>>>>>>>>> application. (This was the original motivation to not provide
>> the
>> >>>> key
>> >>>>>>>>> in
>> >>>>>>>>>>> the first place -- it's guards against modification.)
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> -Matthias
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote:
>> >>>>>>>>>>>> Hi Jeyhun,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I just want to add my voice that, I too, have wished for
>> access
>> >>> to
>> >>>>>>>>> the
>> >>>>>>>>>>>> record key during a mapValues or similar operation.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On the other hand, the number of compile failures that would
>> >>> need
>> >>>> to
>> >>>>>>>>> be
>> >>>>>>>>>>>> fixed from this change is unfortunate. :-)  But at least it
>> >>> would
>> >>>>>> all
>> >>>>>>>>>> be
>> >>>>>>>>>>> a
>> >>>>>>>>>>>> pretty clear and easy change.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Mathieu
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <
>> >>>>>> je.kari...@gmail.com
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>>> Dear community,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2],
>> >>>>>>>>> KAFKA-4726
>> >>>>>>>>>>> [3],
>> >>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5].
>> >>>>>>>>>>>>> I would like to get your comments.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> [1]
>> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+
>> >>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner
>> >>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218
>> >>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726
>> >>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745
>> >>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Cheers,
>> >>>>>>>>>>>>> Jeyhun
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> --
>> >>>>>>>>>>>>> -Cheers
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Jeyhun
>> >>>>>>>>>>>>>
>> >>>>>>>>>>> --
>> >>>>>>>>>> -Cheers
>> >>>>>>>>>>
>> >>>>>>>>>> Jeyhun
>> >>>>>>>>>>
>> >>>>>> --
>> >>>>> -Cheers
>> >>>>>
>> >>>>> Jeyhun
>> >>>>>
>> >>>>
>>
>> --
> -Cheers
>
> Jeyhun
>

Reply via email to