[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340814#comment-15340814 ]
Guozhang Wang edited comment on KAFKA-3705 at 6/21/16 1:08 AM: --------------------------------------------------------------- Thanks for the feedbacks! Re 1: Not sure I fully understand this. I thought you can pass a {{StreamPartitioner}} when calling {{addSink}} which should be sufficient? Re 2: We are aware of this, and as discussed in the wiki our current proposal is that we can use sth. similar to what you mentioned as {{range(K1 prefix)}} and check if {{key.startsWith(prefix)}} to stop iterating. There are some optimizations with prefix seeking in RocksDB but we need to contribute back to RocksDB's JNI to make use of it. https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly Re 3: I think you do not need to keep both the old and new keys for repartitioning if the old values need to be sent as well, but rather send them as two separate records as <null, old> and <new, null> since after the repartitioning, they may be going to two different partitions and hence processed by two different joiners, which is the expected behavior. More precisely, we are going to send the {new, old} pair separately as two record: <PK-new, <AK, AV-new>>, and <PK-new, <AK, AV-old>>, and partition on <PK-new> and <PK-old>. These two records may be sent to two different partitions and hence processed by two different processors. For example, if you have two KTables {{A}} and {{B}}, with the following schema: A: {key: a, value: a'} B: {key: b, value: a, c} And you want to join them by key {{a}}, now let's say table {{A}} just have two records: {a="a1", a'="a1-pre"}, {a="a2", a'="a2-pre"}, and an incoming record for table {{B}} comes as: {b="b", a="a1", c="c1"} Then a join result of {a="a1", joined = join("a1-pre", "c1")} should be output. Later when table {{B}} gets an update on the existing key "b": {b="b", a="a2", c="c2"} Two join results should be output: first negating the previous join result as {a="a1", joined = null} Then a new join result on the new re-partitioned key: {a="a2", joined = join("a2-pre", "c2")} Does that sound good to you? was (Author: guozhang): Thanks for the feedbacks! Re 1: Not sure I fully understand this. I thought you can pass a {{StreamPartitioner}} when calling {{addSink}} which should be sufficient? Re 2: We are aware of this, and as discussed in the wiki our current proposal is that we can use sth. similar to what you mentioned as {{range(K1 prefix)}} and check if {{key.startsWith(prefix)}} to stop iterating. There are some optimizations with prefix seeking in RocksDB but we need to contribute back to RocksDB's JNI to make use of it. https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly Re 3: The idea is that for the repatitioning, we are going to first transform the old key-value pair <AK, AV> into <PK, <AK, AV>>, but when sending the key-value pair to the re-partition topic, specify the {{StreamPartitioner}} to partition based on combo <PK, AK> (remember its assign API takes both the key and value), and let the joiner after the repartitioning to be applied on <AV> only. When old value needs to be sent as well, we are going to send the {new, old} pair separately as two record: <PK-new, <AK, AV-new>>, and <PK-new, <AK, AV-old>>, and still partition on combo <PK-new, AK> and <PK-old, AK>. These two records may be sent to two different partitions and hence processed by two different processors, which are expected behavior. Does that look reasonable to you? > Support non-key joining in KTable > --------------------------------- > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)