I was looking for KTable-KTable semantics where both trigger updates. The result will be used to enrich a few KStreams.
I'll keep an eye on this jira. Meanwhile, I'll use custom processor or like you said convert it to KStream-KTable join and continue with my test. Srikanth On Thu, Jul 14, 2016 at 12:08 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Sorry that I can only point to a Jira right now :/ > > About your CustomProcessor. I doubt that this will work: > > CustomProcessor#init() will be executed after you call KafkaStream.start(); > > Thus, your table2.foreach will be executed after the topology was built > and I guess, will just not have any effect. Hence, no data will be put > into the KV-Store. > > However, your basic idea might work. The question is, what kind of > semantics you want to get. > > In a KTable-KTable join the lookup is done both ways, ie, each time > there is an update to either of the both tables, you will get a join > result. In your example, the lookup will be only done in way direction, > ie, for each update of table1, you would do a lookup into table2. Thus, > if you have two consecutive updates to table2, you would miss one join > result. > > Did you think about the exact semantics you want to get? As an > alternative, you could convert your first table into a stream (thus, > consecutive keys do not overwrite each other) and perform a > KStream-KTable join. This has the same semantics as your custom solution > from above. Updates to the KTable do not trigger a join result (same as > an update to the KV store you want to maintain), and each tuple in > KStream does a look-up in KTable. > > > > -Matthias > > > > On 07/14/2016 05:49 PM, Srikanth wrote: > > Ah, I was hoping for a magic solution not a jira! > > > > Another thought was to embed table2 as KTable in custom processor and use > > that for lookup. > > > > class CustomProcesser(kStreamBuilder: KStreamBuilder) extends > > Processor[Int, Int] { > > private var kvStore: KeyValueStore[Int, Int] = _ > > private val table2 = kStreamBuilder.table(IntSerde(), IntSerde(), > > table2TopicName) > > > > override def init(context: ProcessorContext) { > > this.kvStore = > > context.getStateStore("lookupMap").asInstanceOf[KeyValueStore[Int, Int]] > > table2.foreach((k,v) => kvStore.put(k, v)) > > } > > override def process(key1:Int, val1:Int) { > > context.forward(key1, kvStore.get(val1)) > > } > > > > Then do > > > table1.toStream().through(ValueBasedPartitioner).process(customProcessor) > > > > I'm not sure if creating a KTable inside a Processor like this is > > semantically correct. > > But at least both tables are co-partitioned and we don't have to > replicate. > > > > Srikanth > > > > On Thu, Jul 14, 2016 at 11:36 AM, Matthias J. Sax <matth...@confluent.io > > > > wrote: > > > >> My bad... I should have considered this in the first place. You are > >> absolutely right. > >> > >> Supporting this kind of a join is work in progress. > >> https://issues.apache.org/jira/browse/KAFKA-3705 > >> > >> Your custom solution (Option 1) might work... But as you mentioned, the > >> problem will be that the first table gets replicated (and not > >> partitioned) over all Processors, which might become a problem. > >> > >> -Matthias > >> > >> > >> On 07/14/2016 05:22 PM, Srikanth wrote: > >>> I should have mentioned that I tried this. It worked in other case but > >> will > >>> not work for this one. > >>> I'm pasting a sample from table1 that I gave in my first email. > >>> > >>> Table1 > >>> 111 -> aaa > >>> 222 -> bbb > >>> 333 -> aaa > >>> > >>> Here value is not unique(aaa). So, I can't just make it a key. 333 will > >>> then override 111. > >>> > >>> Srikanth > >>> > >>> On Thu, Jul 14, 2016 at 11:07 AM, Matthias J. Sax < > matth...@confluent.io > >>> > >>> wrote: > >>> > >>>> You will need to set a new key before you do the re-partitioning. In > >>>> your case, it seems you want to switch key and value. This can be done > >>>> with a simple map > >>>> > >>>>> table1.toStream() > >>>>> .map(new KeyValueMapper<K, V, KeyValue<V, K>() { > >>>>> public KeyValue<V, K> apply(K key, V value) { > >>>>> return new KeyValue<V, K>(value, key); > >>>>> } > >>>>> }) > >>>>> .to("my-repartioning-topic"); > >>>>> newTable1 = builder.table("my-repartioning-topic"); > >>>> > >>>> With K and V being the actual types of key and value in table1. > >>>> > >>>> Of course, you can modify the table entries in map() in any other way > >>>> that suits your use case. You only need to make sure, to set the > (join) > >>>> key before you do the re-partitioning. > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 07/14/2016 04:47 PM, Srikanth wrote: > >>>>> Matthias, > >>>>> > >>>>> With option 2, how would we perform join after re-partition. Although > >> we > >>>>> re-partitioned with value, the key doesn't change. > >>>>> KTable joins always use keys and ValueJoiner get values from both > table > >>>>> when keys match. > >>>>> > >>>>> Having data co-located will not be sufficient rt?? > >>>>> > >>>>> Srikanth > >>>>> > >>>>> On Thu, Jul 14, 2016 at 4:12 AM, Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> I would recommend re-partitioning as described in Option 2. > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 07/13/2016 10:53 PM, Srikanth wrote: > >>>>>>> Hello, > >>>>>>> > >>>>>>> I'm trying the following join using KTable. There are two change > log > >>>>>> tables. > >>>>>>> Table1 > >>>>>>> 111 -> aaa > >>>>>>> 222 -> bbb > >>>>>>> 333 -> aaa > >>>>>>> > >>>>>>> Table2 > >>>>>>> aaa -> 999 > >>>>>>> bbb -> 888 > >>>>>>> ccc -> 777 > >>>>>>> > >>>>>>> My result table should be > >>>>>>> 111 -> 999 > >>>>>>> 222 -> 888 > >>>>>>> 333 -> 999 > >>>>>>> > >>>>>>> Its not a case for join() as the keys don't match. Its more a > lookup > >>>>>> table. > >>>>>>> > >>>>>>> Option1 is to use a Table1.toStream().process(ProcessSupplier(), > >>>>>>> "storeName") > >>>>>>> punctuate() will use regular kafka consumer that reads updates from > >>>>>> Table2 > >>>>>>> and updates a private map. > >>>>>>> Process() will do a key-value lookup. > >>>>>>> This has an advantage when Table1 is much larger than Table2. > >>>>>>> Each instance of the processor will have to hold entire Table2. > >>>>>>> > >>>>>>> Option2 is to re-partition Table1 using through(StreamPartitioner) > >> and > >>>>>>> partition using value. > >>>>>>> This will ensure co-location. Then join with Table2. This part > might > >> be > >>>>>>> tricky?? > >>>>>>> > >>>>>>> Your comments and suggestions are welcome! > >>>>>>> > >>>>>>> Srikanth > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > > >