Re: KTable DSL join

2016-07-14 Thread Srikanth
I was looking for KTable-KTable semantics where both trigger updates. The result will be used to enrich a few KStreams. I'll keep an eye on this jira. Meanwhile, I'll use custom processor or like you said convert it to KStream-KTable join and continue with my test. Srikanth On Thu, Jul 14, 201

Re: KTable DSL join

2016-07-14 Thread Matthias J. Sax
Sorry that I can only point to a Jira right now :/ About your CustomProcessor. I doubt that this will work: CustomProcessor#init() will be executed after you call KafkaStream.start(); Thus, your table2.foreach will be executed after the topology was built and I guess, will just not have any effe

Re: KTable DSL join

2016-07-14 Thread Srikanth
Ah, I was hoping for a magic solution not a jira! Another thought was to embed table2 as KTable in custom processor and use that for lookup. class CustomProcesser(kStreamBuilder: KStreamBuilder) extends Processor[Int, Int] { private var kvStore: KeyValueStore[Int, Int] = _ private val table2

Re: KTable DSL join

2016-07-14 Thread Matthias J. Sax
My bad... I should have considered this in the first place. You are absolutely right. Supporting this kind of a join is work in progress. https://issues.apache.org/jira/browse/KAFKA-3705 Your custom solution (Option 1) might work... But as you mentioned, the problem will be that the first table g

Re: KTable DSL join

2016-07-14 Thread Srikanth
I should have mentioned that I tried this. It worked in other case but will not work for this one. I'm pasting a sample from table1 that I gave in my first email. Table1 111 -> aaa 222 -> bbb 333 -> aaa Here value is not unique(aaa). So, I can't just make it a key. 333 will then override 11

Re: KTable DSL join

2016-07-14 Thread Matthias J. Sax
You will need to set a new key before you do the re-partitioning. In your case, it seems you want to switch key and value. This can be done with a simple map > table1.toStream() > .map(new KeyValueMapper() { > public KeyValue apply(K key, V value) { >return new K

Re: KTable DSL join

2016-07-14 Thread Srikanth
Michael, Thanks! Looking forward to the update. An interface like KTable is very conducive for joins. Hopefully, it will get more flexible. Srikanth On Thu, Jul 14, 2016 at 4:35 AM, Michael Noll wrote: > Srikant, > > > Its not a case for join() as the keys don't match. Its more a lookup > tabl

Re: KTable DSL join

2016-07-14 Thread Srikanth
Matthias, With option 2, how would we perform join after re-partition. Although we re-partitioned with value, the key doesn't change. KTable joins always use keys and ValueJoiner get values from both table when keys match. Having data co-located will not be sufficient rt?? Srikanth On Thu, Jul

Re: KTable DSL join

2016-07-14 Thread Avi Flax
On 7/14/16, 04:35, "Michael Noll" wrote: > Also, a heads up: It turned out that user questions around joins in Kafka > Streams are pretty common. We are currently working on improving the > documentation for joins to make this more clear. Excellent!

Re: KTable DSL join

2016-07-14 Thread Michael Noll
Srikant, > Its not a case for join() as the keys don't match. Its more a lookup table. Yes, the semantics of streaming joins in Kafka Streams are bit different from joins in traditional RDBMS. See http://docs.confluent.io/3.0.0/streams/developer-guide.html#joining-streams. Also, a heads up: It

Re: KTable DSL join

2016-07-14 Thread Matthias J. Sax
I would recommend re-partitioning as described in Option 2. -Matthias On 07/13/2016 10:53 PM, Srikanth wrote: > Hello, > > I'm trying the following join using KTable. There are two change log tables. > Table1 > 111 -> aaa > 222 -> bbb > 333 -> aaa > > Table2 > aaa -> 999 > bbb -> 888

KTable DSL join

2016-07-13 Thread Srikanth
Hello, I'm trying the following join using KTable. There are two change log tables. Table1 111 -> aaa 222 -> bbb 333 -> aaa Table2 aaa -> 999 bbb -> 888 ccc -> 777 My result table should be 111 -> 999 222 -> 888 333 -> 999 Its not a case for join() as the keys don't match. Its