Sorry that I can only point to a Jira right now :/ About your CustomProcessor. I doubt that this will work:
CustomProcessor#init() will be executed after you call KafkaStream.start(); Thus, your table2.foreach will be executed after the topology was built and I guess, will just not have any effect. Hence, no data will be put into the KV-Store. However, your basic idea might work. The question is, what kind of semantics you want to get. In a KTable-KTable join the lookup is done both ways, ie, each time there is an update to either of the both tables, you will get a join result. In your example, the lookup will be only done in way direction, ie, for each update of table1, you would do a lookup into table2. Thus, if you have two consecutive updates to table2, you would miss one join result. Did you think about the exact semantics you want to get? As an alternative, you could convert your first table into a stream (thus, consecutive keys do not overwrite each other) and perform a KStream-KTable join. This has the same semantics as your custom solution from above. Updates to the KTable do not trigger a join result (same as an update to the KV store you want to maintain), and each tuple in KStream does a look-up in KTable. -Matthias On 07/14/2016 05:49 PM, Srikanth wrote: > Ah, I was hoping for a magic solution not a jira! > > Another thought was to embed table2 as KTable in custom processor and use > that for lookup. > > class CustomProcesser(kStreamBuilder: KStreamBuilder) extends > Processor[Int, Int] { > private var kvStore: KeyValueStore[Int, Int] = _ > private val table2 = kStreamBuilder.table(IntSerde(), IntSerde(), > table2TopicName) > > override def init(context: ProcessorContext) { > this.kvStore = > context.getStateStore("lookupMap").asInstanceOf[KeyValueStore[Int, Int]] > table2.foreach((k,v) => kvStore.put(k, v)) > } > override def process(key1:Int, val1:Int) { > context.forward(key1, kvStore.get(val1)) > } > > Then do > table1.toStream().through(ValueBasedPartitioner).process(customProcessor) > > I'm not sure if creating a KTable inside a Processor like this is > semantically correct. > But at least both tables are co-partitioned and we don't have to replicate. > > Srikanth > > On Thu, Jul 14, 2016 at 11:36 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> My bad... I should have considered this in the first place. You are >> absolutely right. >> >> Supporting this kind of a join is work in progress. >> https://issues.apache.org/jira/browse/KAFKA-3705 >> >> Your custom solution (Option 1) might work... But as you mentioned, the >> problem will be that the first table gets replicated (and not >> partitioned) over all Processors, which might become a problem. >> >> -Matthias >> >> >> On 07/14/2016 05:22 PM, Srikanth wrote: >>> I should have mentioned that I tried this. It worked in other case but >> will >>> not work for this one. >>> I'm pasting a sample from table1 that I gave in my first email. >>> >>> Table1 >>> 111 -> aaa >>> 222 -> bbb >>> 333 -> aaa >>> >>> Here value is not unique(aaa). So, I can't just make it a key. 333 will >>> then override 111. >>> >>> Srikanth >>> >>> On Thu, Jul 14, 2016 at 11:07 AM, Matthias J. Sax <matth...@confluent.io >>> >>> wrote: >>> >>>> You will need to set a new key before you do the re-partitioning. In >>>> your case, it seems you want to switch key and value. This can be done >>>> with a simple map >>>> >>>>> table1.toStream() >>>>> .map(new KeyValueMapper<K, V, KeyValue<V, K>() { >>>>> public KeyValue<V, K> apply(K key, V value) { >>>>> return new KeyValue<V, K>(value, key); >>>>> } >>>>> }) >>>>> .to("my-repartioning-topic"); >>>>> newTable1 = builder.table("my-repartioning-topic"); >>>> >>>> With K and V being the actual types of key and value in table1. >>>> >>>> Of course, you can modify the table entries in map() in any other way >>>> that suits your use case. You only need to make sure, to set the (join) >>>> key before you do the re-partitioning. >>>> >>>> -Matthias >>>> >>>> >>>> On 07/14/2016 04:47 PM, Srikanth wrote: >>>>> Matthias, >>>>> >>>>> With option 2, how would we perform join after re-partition. Although >> we >>>>> re-partitioned with value, the key doesn't change. >>>>> KTable joins always use keys and ValueJoiner get values from both table >>>>> when keys match. >>>>> >>>>> Having data co-located will not be sufficient rt?? >>>>> >>>>> Srikanth >>>>> >>>>> On Thu, Jul 14, 2016 at 4:12 AM, Matthias J. Sax < >> matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> I would recommend re-partitioning as described in Option 2. >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 07/13/2016 10:53 PM, Srikanth wrote: >>>>>>> Hello, >>>>>>> >>>>>>> I'm trying the following join using KTable. There are two change log >>>>>> tables. >>>>>>> Table1 >>>>>>> 111 -> aaa >>>>>>> 222 -> bbb >>>>>>> 333 -> aaa >>>>>>> >>>>>>> Table2 >>>>>>> aaa -> 999 >>>>>>> bbb -> 888 >>>>>>> ccc -> 777 >>>>>>> >>>>>>> My result table should be >>>>>>> 111 -> 999 >>>>>>> 222 -> 888 >>>>>>> 333 -> 999 >>>>>>> >>>>>>> Its not a case for join() as the keys don't match. Its more a lookup >>>>>> table. >>>>>>> >>>>>>> Option1 is to use a Table1.toStream().process(ProcessSupplier(), >>>>>>> "storeName") >>>>>>> punctuate() will use regular kafka consumer that reads updates from >>>>>> Table2 >>>>>>> and updates a private map. >>>>>>> Process() will do a key-value lookup. >>>>>>> This has an advantage when Table1 is much larger than Table2. >>>>>>> Each instance of the processor will have to hold entire Table2. >>>>>>> >>>>>>> Option2 is to re-partition Table1 using through(StreamPartitioner) >> and >>>>>>> partition using value. >>>>>>> This will ensure co-location. Then join with Table2. This part might >> be >>>>>>> tricky?? >>>>>>> >>>>>>> Your comments and suggestions are welcome! >>>>>>> >>>>>>> Srikanth >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature