[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340779#comment-15340779 ]
Jan Filipiak edited comment on KAFKA-3705 at 6/21/16 12:36 AM: --------------------------------------------------------------- A few things I came accross building the current implementation based on the processor API. 1. Partitioning I ended up with the need of passing an additional ValueMapper<K,K1> into the method. I had to use it in the Sinks partitioner to extract the _partition/join-key_ from the key that is used for the repartition topic. It had to be extracted from the key as I still need to be able to pass nullvalues to the correct partition for deletes. This came from not knowing the number of partitions in the processor but only in the partitoner, this made the "API" kinda complicated. 2. Range Select This ValueMapper mentioned above also had to be passed into the RocksDBIterator. Havin KeyValueIterator<K,V> range(K from, K to) is not "natural" for prefix range querries. KeyValueIterator<K,V> range(K1 prefix) where Serde<K1> needs to produce prefixbytes of Serde<K> 3. Key expansion After a join in this fashion, the key is what I started refering to as widened. Say you have KTable<AK,AV> and it is the table that needs to be repartitioned and KP is the repartition key, then, independently on the other table the new Key of the table must include KP and AK, wich is a wired thing compared to the traditonal relational database way. Imagin having a result table as KTable<Pair<AK,KP>,Pair<AV,XV>> then the used to be unique key AK is not unique anymore, the processor might see the insert in the one partition before the delete in the other (eg when the rows KP was update). I think this should be embrased, because that is how it is. It should just be apparent for the user maybe as it needs to be dealt with in downstream processors. Unrelated to the topic of joining, the processor api not necessarily comfortable, I appreaceate the beauty of the threading model but stiching graphs together based on processornames and strings is more tricky than I tought. Anyhow really nice stream processing framework. It feels and looks so much better than what is out there spark or storm. Watching their desprate attempts to put state in is a joy. Nice work. As soon as our implementation is hardend in production, Ill probably can share. was (Author: jfilipiak): A few things I came accross building the current implementation based on the processor API. 1. Partitioning I ended up with the need of passing an additional ValueMapper<K,K1> into the method. I had to use it in the Sinks partitioner to extract the _partition/join-key_ from the key that is used for the repartition topic. It had to be extracted from the key as I still need to be able to pass nullvalues to the correct partition for deletes. This came from not knowing the number of partitions in the processor but only in the partitoner, this made the "API" kinda complicated. 2. Range Select This ValueMapper mentioned above also had to be passed into the RocksDBIterator. Havin KeyValueIterator<K,V> range(K from, K to) is not "natural" for prefix range querries. KeyValueIterator<K,V> range(K1 prefix) where Serde<K1> needs to produce prefixbytes of Serde<K> 3. Key expansion After a join in this fashion, the key is what I started refering to as widened. Say you have KTable<AK,AV> and it is the table that needs to be repartitioned and KP is the repartition key, then, independently on the other table the new Key of the table must include KR and AK, wich is a wired thing compared to the traditonal relational database way. Imagin having a result table as KTable<Pair<AK,KP>,Pair<AV,XV>> then the used to be unique key AK is not unique anymore, the processor might see the insert in the one partition before the delete in the other (eg when the rows KP was update). I think this should be embrased, because that is how it is. It should just be apparent for the user maybe as it needs to be dealt with in downstream processors. Unrelated to the topic of joining, the processor api not necessarily comfortable, I appreaceate the beauty of the threading model but stiching graphs together based on processornames and strings is more tricky than I tought. Anyhow really nice stream processing framework. It feels and looks so much better than what is out there spark or storm. Watching their desprate attempts to put state in is a joy. Nice work. As soon as our implementation is hardend in production, Ill probably can share. > Support non-key joining in KTable > --------------------------------- > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)