I should have mentioned that I tried this. It worked in other case but will not work for this one. I'm pasting a sample from table1 that I gave in my first email.
Table1 111 -> aaa 222 -> bbb 333 -> aaa Here value is not unique(aaa). So, I can't just make it a key. 333 will then override 111. Srikanth On Thu, Jul 14, 2016 at 11:07 AM, Matthias J. Sax <matth...@confluent.io> wrote: > You will need to set a new key before you do the re-partitioning. In > your case, it seems you want to switch key and value. This can be done > with a simple map > > > table1.toStream() > > .map(new KeyValueMapper<K, V, KeyValue<V, K>() { > > public KeyValue<V, K> apply(K key, V value) { > > return new KeyValue<V, K>(value, key); > > } > > }) > > .to("my-repartioning-topic"); > > newTable1 = builder.table("my-repartioning-topic"); > > With K and V being the actual types of key and value in table1. > > Of course, you can modify the table entries in map() in any other way > that suits your use case. You only need to make sure, to set the (join) > key before you do the re-partitioning. > > -Matthias > > > On 07/14/2016 04:47 PM, Srikanth wrote: > > Matthias, > > > > With option 2, how would we perform join after re-partition. Although we > > re-partitioned with value, the key doesn't change. > > KTable joins always use keys and ValueJoiner get values from both table > > when keys match. > > > > Having data co-located will not be sufficient rt?? > > > > Srikanth > > > > On Thu, Jul 14, 2016 at 4:12 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> I would recommend re-partitioning as described in Option 2. > >> > >> -Matthias > >> > >> On 07/13/2016 10:53 PM, Srikanth wrote: > >>> Hello, > >>> > >>> I'm trying the following join using KTable. There are two change log > >> tables. > >>> Table1 > >>> 111 -> aaa > >>> 222 -> bbb > >>> 333 -> aaa > >>> > >>> Table2 > >>> aaa -> 999 > >>> bbb -> 888 > >>> ccc -> 777 > >>> > >>> My result table should be > >>> 111 -> 999 > >>> 222 -> 888 > >>> 333 -> 999 > >>> > >>> Its not a case for join() as the keys don't match. Its more a lookup > >> table. > >>> > >>> Option1 is to use a Table1.toStream().process(ProcessSupplier(), > >>> "storeName") > >>> punctuate() will use regular kafka consumer that reads updates from > >> Table2 > >>> and updates a private map. > >>> Process() will do a key-value lookup. > >>> This has an advantage when Table1 is much larger than Table2. > >>> Each instance of the processor will have to hold entire Table2. > >>> > >>> Option2 is to re-partition Table1 using through(StreamPartitioner) and > >>> partition using value. > >>> This will ensure co-location. Then join with Table2. This part might be > >>> tricky?? > >>> > >>> Your comments and suggestions are welcome! > >>> > >>> Srikanth > >>> > >> > >> > > > >