[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340814#comment-15340814
 ] 

Guozhang Wang edited comment on KAFKA-3705 at 6/21/16 1:08 AM:
---------------------------------------------------------------

Thanks for the feedbacks!

Re 1: Not sure I fully understand this. I thought you can pass a 
{{StreamPartitioner}} when calling {{addSink}} which should be sufficient?

Re 2: We are aware of this, and as discussed in the wiki our current proposal 
is that we can use sth. similar to what you mentioned as {{range(K1 prefix)}} 
and check if {{key.startsWith(prefix)}} to stop iterating. There are some 
optimizations with prefix seeking in RocksDB but we need to contribute back to 
RocksDB's JNI to make use of it.

https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly

Re 3: I think you do not need to keep both the old and new keys for 
repartitioning if the old values need to be sent as well, but rather send them 
as two separate records as <null, old> and <new, null> since after the 
repartitioning, they may be going to two different partitions and hence 
processed by two different joiners, which is the expected behavior. More 
precisely, we are going to send the {new, old} pair separately as two record: 
<PK-new, <AK, AV-new>>, and <PK-new, <AK, AV-old>>, and partition on <PK-new> 
and <PK-old>. These two records may be sent to two different partitions and 
hence processed by two different processors.

For example, if you have two KTables {{A}} and {{B}}, with the following schema:

A: {key: a, value: a'}
B: {key: b, value: a, c}

And you want to join them by key {{a}}, now let's say table {{A}} just have two 
records: 

{a="a1", a'="a1-pre"}, 
{a="a2", a'="a2-pre"}, 

and an incoming record for table {{B}} comes as:

{b="b", a="a1", c="c1"}

Then a join result of {a="a1", joined = join("a1-pre", "c1")} should be output.

Later when table {{B}} gets an update on the existing key "b":

{b="b", a="a2", c="c2"}

Two join results should be output: first negating the previous join result as 

{a="a1", joined = null}

Then a new join result on the new re-partitioned key:

{a="a2", joined = join("a2-pre", "c2")}

Does that sound good to you?


was (Author: guozhang):
Thanks for the feedbacks!

Re 1: Not sure I fully understand this. I thought you can pass a 
{{StreamPartitioner}} when calling {{addSink}} which should be sufficient?

Re 2: We are aware of this, and as discussed in the wiki our current proposal 
is that we can use sth. similar to what you mentioned as {{range(K1 prefix)}} 
and check if {{key.startsWith(prefix)}} to stop iterating. There are some 
optimizations with prefix seeking in RocksDB but we need to contribute back to 
RocksDB's JNI to make use of it.

https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly

Re 3: The idea is that for the repatitioning, we are going to first transform 
the old key-value pair <AK, AV> into <PK, <AK, AV>>, but when sending the 
key-value pair to the re-partition topic, specify the {{StreamPartitioner}} to 
partition based on combo <PK, AK> (remember its assign API takes both the key 
and value), and let the joiner after the repartitioning to be applied on <AV> 
only.

When old value needs to be sent as well, we are going to send the {new, old} 
pair separately as two record: <PK-new, <AK, AV-new>>, and <PK-new, <AK, 
AV-old>>, and still partition on combo <PK-new, AK> and <PK-old, AK>. These two 
records may be sent to two different partitions and hence processed by two 
different processors, which are expected behavior. Does that look reasonable to 
you?

> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to