Ah, I was hoping for a magic solution not a jira! Another thought was to embed table2 as KTable in custom processor and use that for lookup.
class CustomProcesser(kStreamBuilder: KStreamBuilder) extends Processor[Int, Int] { private var kvStore: KeyValueStore[Int, Int] = _ private val table2 = kStreamBuilder.table(IntSerde(), IntSerde(), table2TopicName) override def init(context: ProcessorContext) { this.kvStore = context.getStateStore("lookupMap").asInstanceOf[KeyValueStore[Int, Int]] table2.foreach((k,v) => kvStore.put(k, v)) } override def process(key1:Int, val1:Int) { context.forward(key1, kvStore.get(val1)) } Then do table1.toStream().through(ValueBasedPartitioner).process(customProcessor) I'm not sure if creating a KTable inside a Processor like this is semantically correct. But at least both tables are co-partitioned and we don't have to replicate. Srikanth On Thu, Jul 14, 2016 at 11:36 AM, Matthias J. Sax <matth...@confluent.io> wrote: > My bad... I should have considered this in the first place. You are > absolutely right. > > Supporting this kind of a join is work in progress. > https://issues.apache.org/jira/browse/KAFKA-3705 > > Your custom solution (Option 1) might work... But as you mentioned, the > problem will be that the first table gets replicated (and not > partitioned) over all Processors, which might become a problem. > > -Matthias > > > On 07/14/2016 05:22 PM, Srikanth wrote: > > I should have mentioned that I tried this. It worked in other case but > will > > not work for this one. > > I'm pasting a sample from table1 that I gave in my first email. > > > > Table1 > > 111 -> aaa > > 222 -> bbb > > 333 -> aaa > > > > Here value is not unique(aaa). So, I can't just make it a key. 333 will > > then override 111. > > > > Srikanth > > > > On Thu, Jul 14, 2016 at 11:07 AM, Matthias J. Sax <matth...@confluent.io > > > > wrote: > > > >> You will need to set a new key before you do the re-partitioning. In > >> your case, it seems you want to switch key and value. This can be done > >> with a simple map > >> > >>> table1.toStream() > >>> .map(new KeyValueMapper<K, V, KeyValue<V, K>() { > >>> public KeyValue<V, K> apply(K key, V value) { > >>> return new KeyValue<V, K>(value, key); > >>> } > >>> }) > >>> .to("my-repartioning-topic"); > >>> newTable1 = builder.table("my-repartioning-topic"); > >> > >> With K and V being the actual types of key and value in table1. > >> > >> Of course, you can modify the table entries in map() in any other way > >> that suits your use case. You only need to make sure, to set the (join) > >> key before you do the re-partitioning. > >> > >> -Matthias > >> > >> > >> On 07/14/2016 04:47 PM, Srikanth wrote: > >>> Matthias, > >>> > >>> With option 2, how would we perform join after re-partition. Although > we > >>> re-partitioned with value, the key doesn't change. > >>> KTable joins always use keys and ValueJoiner get values from both table > >>> when keys match. > >>> > >>> Having data co-located will not be sufficient rt?? > >>> > >>> Srikanth > >>> > >>> On Thu, Jul 14, 2016 at 4:12 AM, Matthias J. Sax < > matth...@confluent.io> > >>> wrote: > >>> > >>>> I would recommend re-partitioning as described in Option 2. > >>>> > >>>> -Matthias > >>>> > >>>> On 07/13/2016 10:53 PM, Srikanth wrote: > >>>>> Hello, > >>>>> > >>>>> I'm trying the following join using KTable. There are two change log > >>>> tables. > >>>>> Table1 > >>>>> 111 -> aaa > >>>>> 222 -> bbb > >>>>> 333 -> aaa > >>>>> > >>>>> Table2 > >>>>> aaa -> 999 > >>>>> bbb -> 888 > >>>>> ccc -> 777 > >>>>> > >>>>> My result table should be > >>>>> 111 -> 999 > >>>>> 222 -> 888 > >>>>> 333 -> 999 > >>>>> > >>>>> Its not a case for join() as the keys don't match. Its more a lookup > >>>> table. > >>>>> > >>>>> Option1 is to use a Table1.toStream().process(ProcessSupplier(), > >>>>> "storeName") > >>>>> punctuate() will use regular kafka consumer that reads updates from > >>>> Table2 > >>>>> and updates a private map. > >>>>> Process() will do a key-value lookup. > >>>>> This has an advantage when Table1 is much larger than Table2. > >>>>> Each instance of the processor will have to hold entire Table2. > >>>>> > >>>>> Option2 is to re-partition Table1 using through(StreamPartitioner) > and > >>>>> partition using value. > >>>>> This will ensure co-location. Then join with Table2. This part might > be > >>>>> tricky?? > >>>>> > >>>>> Your comments and suggestions are welcome! > >>>>> > >>>>> Srikanth > >>>>> > >>>> > >>>> > >>> > >> > >> > > > >