Hi,

Thanks for feedback.
Then we need to overload method
  <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR>
mapper);
with
  <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends VR>
mapper);

and in runtime (inside processor) we still have to check it is ValueMapper
or ValueMapperWithKey before wrapping it into the rich function.


Please correct me if I am wrong.

Cheers,
Jeyhun




On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> +1 :)
>
>
> On 08/05/17 23:52, Matthias J. Sax wrote:
> > Hi,
> >
> > I was reading the updated KIP and I am wondering, if we should do the
> > design a little different.
> >
> > Instead of distinguishing between a RichFunction and non-RichFunction at
> > runtime level, we would use RichFunctions all the time. Thus, on the DSL
> > entry level, if a user provides a non-RichFunction, we wrap it by a
> > RichFunction that is fully implemented by Streams. This RichFunction
> > would just forward the call omitting the key:
> >
> > Just to sketch the idea (incomplete code snippet):
> >
> >> public StreamsRichValueMapper implements RichValueMapper() {
> >>    private ValueMapper userProvidedMapper; // set by constructor
> >>
> >>    public VR apply(final K key, final V1 value1, final V2 value2) {
> >>        return userProvidedMapper(value1, value2);
> >>    }
> >> }
> >
> >  From a performance point of view, I am not sure if the
> > "if(isRichFunction)" including casts etc would have more overhead than
> > this approach (we would do more nested method call for non-RichFunction
> > which should be more common than RichFunctions).
> >
> > This approach should not effect lambdas (or do I miss something?) and
> > might be cleaner, as we could have one more top level interface
> > `RichFunction` with methods `init()` and `close()` and also interfaces
> > for `RichValueMapper` etc. (thus, no abstract classes required).
> >
> >
> > Any thoughts?
> >
> >
> > -Matthias
> >
> >
> > On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
> >> Hi,
> >>
> >> Thanks for comments. I extended PR and KIP to include rich functions. I
> >> will still have to evaluate the cost of deep copying of keys.
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <
> mathieu.fenn...@replicon.com>
> >> wrote:
> >>
> >>> Hey Matthias,
> >>>
> >>> My opinion would be that documenting the immutability of the key is the
> >>> best approach available.  Better than requiring the key to be
> serializable
> >>> (as with Jeyhun's last pass at the PR), no performance risk.
> >>>
> >>> It'd be different if Java had immutable type constraints of some kind.
> :-)
> >>>
> >>> Mathieu
> >>>
> >>>
> >>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> Agreed about RichFunction. If we follow this path, it should cover
> >>>> all(?) DSL interfaces.
> >>>>
> >>>> About guarding the key -- I am still not sure what to do about it...
> >>>> Maybe it might be enough to document it (and name the key parameter
> like
> >>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
> >>>> guard against any modification, but I have no good idea how to do
> this.
> >>>> Not sure what others think about the risk of corrupted partitioning
> >>>> (what would be a user error and we could say, well, bad luck, you got
> a
> >>>> bug in your code, that's not our fault), vs deep copy with a potential
> >>>> performance hit (that we can't quantity atm without any performance
> >>> test).
> >>>> We do have a performance system test. Maybe it's worth for you to
> apply
> >>>> the deep copy strategy and run the test. It's very basic performance
> >>>> test only, but might give some insight. If you want to do this, look
> >>>> into folder "tests" for general test setup, and into
> >>>> "tests/kafaktests/benchmarks/streams" to find find the perf test.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
> >>>>> Hi Matthias,
> >>>>>
> >>>>> I think extending KIP to include RichFunctions totally  makes sense.
> >>> So,
> >>>>>   we don't want to guard the keys because it is costly.
> >>>>> If we introduce RichFunctions I think it should not be limited only
> >>> the 3
> >>>>> interfaces proposed in KIP but to wide range of interfaces.
> >>>>> Please correct me if I am wrong.
> >>>>>
> >>>>> Cheers,
> >>>>> Jeyhun
> >>>>>
> >>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <
> matth...@confluent.io
> >>>>> wrote:
> >>>>>
> >>>>>> One follow up. There was this email on the user list:
> >>>>>>
> >>>>>>
> >>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
> >>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
> >>>>>> It might make sense so include Initializer, Adder, and Substractor
> >>>>>> inferface, too.
> >>>>>>
> >>>>>> And we should double check if there are other interface we might
> miss
> >>>> atm.
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
> >>>>>>> Thanks for updating the KIP.
> >>>>>>>
> >>>>>>> Deep copying the key will work for sure, but I am actually a little
> >>> bit
> >>>>>>> worried about performance impact... We might want to do some test
> to
> >>>>>>> quantify this impact.
> >>>>>>>
> >>>>>>>
> >>>>>>> Btw: this remind me about the idea of `RichFunction` interface that
> >>>>>>> would allow users to access record metadata (like timestamp,
> offset,
> >>>>>>> partition etc) within DSL. This would be a similar concept. Thus, I
> >>> am
> >>>>>>> wondering, if it would make sense to enlarge the scope of this KIP
> by
> >>>>>>> that? WDYT?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
> >>>>>>>> Hi Mathieu,
> >>>>>>>>
> >>>>>>>> Thanks for feedback. I followed similar approach and updated PR
> and
> >>>> KIP
> >>>>>>>> accordingly. I tried to guard the key in Processors sending a copy
> >>> of
> >>>> an
> >>>>>>>> actual key.
> >>>>>>>> Because I am doing deep copy of an object, I think memory can be
> >>>>>> bottleneck
> >>>>>>>> in some use-cases.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Jeyhun
> >>>>>>>>
> >>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <
> >>>>>> mathieu.fenn...@replicon.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Jeyhun,
> >>>>>>>>>
> >>>>>>>>> This approach would change ValueMapper (...etc) to be classes,
> >>> rather
> >>>>>> than
> >>>>>>>>> interfaces, which is also a backwards incompatible change.  An
> >>>>>> alternative
> >>>>>>>>> approach that would be backwards compatible would be to define
> new
> >>>>>>>>> interfaces, and provide overrides where those interfaces are
> used.
> >>>>>>>>>
> >>>>>>>>> Unfortunately, making the key parameter as "final" doesn't change
> >>>> much
> >>>>>>>>> about guarding against key change.  It only prevents the
> parameter
> >>>>>> variable
> >>>>>>>>> from being reassigned.  If the key type is a mutable object (eg.
> >>>>>> byte[]),
> >>>>>>>>> it can still be mutated. (eg. key[0] = 0).  But I'm not really
> sure
> >>>>>> there's
> >>>>>>>>> much that can be done about that.
> >>>>>>>>>
> >>>>>>>>> Mathieu
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <
> >>> je.kari...@gmail.com
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks for comments.
> >>>>>>>>>>
> >>>>>>>>>> The concerns makes sense. Although we can guard for immutable
> keys
> >>>> in
> >>>>>>>>>> current implementation (with few changes), I didn't consider
> >>>> backward
> >>>>>>>>>> compatibility.
> >>>>>>>>>>
> >>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user
> >>>> accesses
> >>>>>>>>> the
> >>>>>>>>>> key in Object type, as passing extra type parameter will break
> >>>>>>>>>> backwards-compatibility.  So user has to cast to actual key
> type.
> >>>>>>>>>>
> >>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key
> and
> >>>>>> value)
> >>>>>>>>>> and force key to be *final*. By doing this,  I think we can
> >>> address
> >>>>>> both
> >>>>>>>>>> backward-compatibility and guarding against key change.
> >>>>>>>>>>
> >>>>>>>>>> 2. Secondly, we can create class KeyAccess like:
> >>>>>>>>>>
> >>>>>>>>>> public class KeyAccess {
> >>>>>>>>>>      Object key;
> >>>>>>>>>>      public void beforeApply(final Object key) {
> >>>>>>>>>>          this.key = key;
> >>>>>>>>>>      }
> >>>>>>>>>>      public Object getKey() {
> >>>>>>>>>>          return key;
> >>>>>>>>>>      }
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and *ValueTransformer*
> >>> from
> >>>>>>>>>> *KeyAccess*. Inside processor (for example
> >>>> *KTableMapValuesProcessor*)
> >>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by
> >>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use *getKey()*
> to
> >>>>>> access
> >>>>>>>>>> the key inside *apply(value)* method.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Jeyhun
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <
> >>>> matth...@confluent.io
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Jeyhun,
> >>>>>>>>>>>
> >>>>>>>>>>> thanks a lot for the KIP!
> >>>>>>>>>>>
> >>>>>>>>>>> I think there are two issues we need to address:
> >>>>>>>>>>>
> >>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users did
> >>>>>>>>> complain
> >>>>>>>>>>> about this in past releases already, and as the user base
> grows,
> >>> we
> >>>>>>>>>>> should not break backward compatibility in future releases
> >>> anymore.
> >>>>>>>>>>> Thus, we should think of a better way to allow key access.
> >>>>>>>>>>>
> >>>>>>>>>>> Mathieu's comment goes into the same direction
> >>>>>>>>>>>
> >>>>>>>>>>>>> On the other hand, the number of compile failures that would
> >>> need
> >>>>>> to
> >>>>>>>>>> be
> >>>>>>>>>>>>> fixed from this change is unfortunate. :-)
> >>>>>>>>>>> (2) Another concern is, that there is no guard to prevent user
> >>> code
> >>>>>> to
> >>>>>>>>>>> modify the key. This might corrupt partitioning if users do
> alter
> >>>> the
> >>>>>>>>>>> key (accidentally -- or users are just not aware that they are
> >>> not
> >>>>>>>>>>> allowed to modify the provided key object) and thus break the
> >>>>>>>>>>> application. (This was the original motivation to not provide
> the
> >>>> key
> >>>>>>>>> in
> >>>>>>>>>>> the first place -- it's guards against modification.)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote:
> >>>>>>>>>>>> Hi Jeyhun,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I just want to add my voice that, I too, have wished for
> access
> >>> to
> >>>>>>>>> the
> >>>>>>>>>>>> record key during a mapValues or similar operation.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On the other hand, the number of compile failures that would
> >>> need
> >>>> to
> >>>>>>>>> be
> >>>>>>>>>>>> fixed from this change is unfortunate. :-)  But at least it
> >>> would
> >>>>>> all
> >>>>>>>>>> be
> >>>>>>>>>>> a
> >>>>>>>>>>>> pretty clear and easy change.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Mathieu
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <
> >>>>>> je.kari...@gmail.com
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>> Dear community,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2],
> >>>>>>>>> KAFKA-4726
> >>>>>>>>>>> [3],
> >>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5].
> >>>>>>>>>>>>> I would like to get your comments.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+
> >>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner
> >>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218
> >>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726
> >>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745
> >>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> -Cheers
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>> -Cheers
> >>>>>>>>>>
> >>>>>>>>>> Jeyhun
> >>>>>>>>>>
> >>>>>> --
> >>>>> -Cheers
> >>>>>
> >>>>> Jeyhun
> >>>>>
> >>>>
>
> --
-Cheers

Jeyhun

Reply via email to