+1 :)

On 08/05/17 23:52, Matthias J. Sax wrote:
Hi,

I was reading the updated KIP and I am wondering, if we should do the
design a little different.

Instead of distinguishing between a RichFunction and non-RichFunction at
runtime level, we would use RichFunctions all the time. Thus, on the DSL
entry level, if a user provides a non-RichFunction, we wrap it by a
RichFunction that is fully implemented by Streams. This RichFunction
would just forward the call omitting the key:

Just to sketch the idea (incomplete code snippet):

public StreamsRichValueMapper implements RichValueMapper() {
   private ValueMapper userProvidedMapper; // set by constructor

   public VR apply(final K key, final V1 value1, final V2 value2) {
       return userProvidedMapper(value1, value2);
   }
}

 From a performance point of view, I am not sure if the
"if(isRichFunction)" including casts etc would have more overhead than
this approach (we would do more nested method call for non-RichFunction
which should be more common than RichFunctions).

This approach should not effect lambdas (or do I miss something?) and
might be cleaner, as we could have one more top level interface
`RichFunction` with methods `init()` and `close()` and also interfaces
for `RichValueMapper` etc. (thus, no abstract classes required).


Any thoughts?


-Matthias


On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
Hi,

Thanks for comments. I extended PR and KIP to include rich functions. I
will still have to evaluate the cost of deep copying of keys.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <mathieu.fenn...@replicon.com>
wrote:

Hey Matthias,

My opinion would be that documenting the immutability of the key is the
best approach available.  Better than requiring the key to be serializable
(as with Jeyhun's last pass at the PR), no performance risk.

It'd be different if Java had immutable type constraints of some kind. :-)

Mathieu


On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

Agreed about RichFunction. If we follow this path, it should cover
all(?) DSL interfaces.

About guarding the key -- I am still not sure what to do about it...
Maybe it might be enough to document it (and name the key parameter like
`readOnlyKey` to make is very clear). Ultimately, I would prefer to
guard against any modification, but I have no good idea how to do this.
Not sure what others think about the risk of corrupted partitioning
(what would be a user error and we could say, well, bad luck, you got a
bug in your code, that's not our fault), vs deep copy with a potential
performance hit (that we can't quantity atm without any performance
test).
We do have a performance system test. Maybe it's worth for you to apply
the deep copy strategy and run the test. It's very basic performance
test only, but might give some insight. If you want to do this, look
into folder "tests" for general test setup, and into
"tests/kafaktests/benchmarks/streams" to find find the perf test.


-Matthias

On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
Hi Matthias,

I think extending KIP to include RichFunctions totally  makes sense.
So,
  we don't want to guard the keys because it is costly.
If we introduce RichFunctions I think it should not be limited only
the 3
interfaces proposed in KIP but to wide range of interfaces.
Please correct me if I am wrong.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <matth...@confluent.io
wrote:

One follow up. There was this email on the user list:


http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
It might make sense so include Initializer, Adder, and Substractor
inferface, too.

And we should double check if there are other interface we might miss
atm.

-Matthias


On 5/4/17 1:31 PM, Matthias J. Sax wrote:
Thanks for updating the KIP.

Deep copying the key will work for sure, but I am actually a little
bit
worried about performance impact... We might want to do some test to
quantify this impact.


Btw: this remind me about the idea of `RichFunction` interface that
would allow users to access record metadata (like timestamp, offset,
partition etc) within DSL. This would be a similar concept. Thus, I
am
wondering, if it would make sense to enlarge the scope of this KIP by
that? WDYT?



-Matthias


On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
Hi Mathieu,

Thanks for feedback. I followed similar approach and updated PR and
KIP
accordingly. I tried to guard the key in Processors sending a copy
of
an
actual key.
Because I am doing deep copy of an object, I think memory can be
bottleneck
in some use-cases.

Cheers,
Jeyhun

On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <
mathieu.fenn...@replicon.com>
wrote:

Hi Jeyhun,

This approach would change ValueMapper (...etc) to be classes,
rather
than
interfaces, which is also a backwards incompatible change.  An
alternative
approach that would be backwards compatible would be to define new
interfaces, and provide overrides where those interfaces are used.

Unfortunately, making the key parameter as "final" doesn't change
much
about guarding against key change.  It only prevents the parameter
variable
from being reassigned.  If the key type is a mutable object (eg.
byte[]),
it can still be mutated. (eg. key[0] = 0).  But I'm not really sure
there's
much that can be done about that.

Mathieu


On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <
je.kari...@gmail.com
wrote:

Thanks for comments.

The concerns makes sense. Although we can guard for immutable keys
in
current implementation (with few changes), I didn't consider
backward
compatibility.

In this case 2 solutions come to my mind. In both cases, user
accesses
the
key in Object type, as passing extra type parameter will break
backwards-compatibility.  So user has to cast to actual key type.

1. Firstly, We can overload apply method with 2 argument (key and
value)
and force key to be *final*. By doing this,  I think we can
address
both
backward-compatibility and guarding against key change.

2. Secondly, we can create class KeyAccess like:

public class KeyAccess {
     Object key;
     public void beforeApply(final Object key) {
         this.key = key;
     }
     public Object getKey() {
         return key;
     }
}

We can extend *ValueMapper, ValueJoiner* and *ValueTransformer*
from
*KeyAccess*. Inside processor (for example
*KTableMapValuesProcessor*)
before calling *mapper.apply(value)* we can set the *key* by
*mapper.beforeApply(key)*. As a result, user can use *getKey()* to
access
the key inside *apply(value)* method.


Cheers,
Jeyhun




On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <
matth...@confluent.io
wrote:

Jeyhun,

thanks a lot for the KIP!

I think there are two issues we need to address:

(1) The KIP does not consider backward compatibility. Users did
complain
about this in past releases already, and as the user base grows,
we
should not break backward compatibility in future releases
anymore.
Thus, we should think of a better way to allow key access.

Mathieu's comment goes into the same direction

On the other hand, the number of compile failures that would
need
to
be
fixed from this change is unfortunate. :-)
(2) Another concern is, that there is no guard to prevent user
code
to
modify the key. This might corrupt partitioning if users do alter
the
key (accidentally -- or users are just not aware that they are
not
allowed to modify the provided key object) and thus break the
application. (This was the original motivation to not provide the
key
in
the first place -- it's guards against modification.)


-Matthias



On 5/1/17 6:31 AM, Mathieu Fenniak wrote:
Hi Jeyhun,

I just want to add my voice that, I too, have wished for access
to
the
record key during a mapValues or similar operation.

On the other hand, the number of compile failures that would
need
to
be
fixed from this change is unfortunate. :-)  But at least it
would
all
be
a
pretty clear and easy change.

Mathieu


On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <
je.kari...@gmail.com
wrote:
Dear community,

I want to share KIP-149 [1] based on issues KAFKA-4218 [2],
KAFKA-4726
[3],
KAFKA-3745 [4]. The related PR can be found at [5].
I would like to get your comments.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
149%3A+Enabling+key+access+in+ValueTransformer%2C+
ValueMapper%2C+and+ValueJoiner
[2] https://issues.apache.org/jira/browse/KAFKA-4218
[3] https://issues.apache.org/jira/browse/KAFKA-4726
[4] https://issues.apache.org/jira/browse/KAFKA-3745
[5] https://github.com/apache/kafka/pull/2946


Cheers,
Jeyhun



--
-Cheers

Jeyhun

--
-Cheers

Jeyhun

--
-Cheers

Jeyhun



Reply via email to