I was looking for KTable-KTable semantics where both trigger updates.
The result will be used to enrich a few KStreams.

I'll keep an eye on this jira.
Meanwhile, I'll use custom processor or like you said convert it to
KStream-KTable join and continue with my test.

Srikanth



On Thu, Jul 14, 2016 at 12:08 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Sorry that I can only point to a Jira right now :/
>
> About your CustomProcessor. I doubt that this will work:
>
> CustomProcessor#init() will be executed after you call KafkaStream.start();
>
> Thus, your table2.foreach will be executed after the topology was built
> and I guess, will just not have any effect. Hence, no data will be put
> into the KV-Store.
>
> However, your basic idea might work. The question is, what kind of
> semantics you want to get.
>
> In a KTable-KTable join the lookup is done both ways, ie, each time
> there is an update to either of the both tables, you will get a join
> result. In your example, the lookup will be only done in way direction,
> ie, for each update of table1, you would do a lookup into table2. Thus,
> if you have two consecutive updates to table2, you would miss one join
> result.
>
> Did you think about the exact semantics you want to get? As an
> alternative, you could convert your first table into a stream (thus,
> consecutive keys do not overwrite each other) and perform a
> KStream-KTable join. This has the same semantics as your custom solution
> from above. Updates to the KTable do not trigger a join result (same as
> an update to the KV store you want to maintain), and each tuple in
> KStream does a look-up in KTable.
>
>
>
> -Matthias
>
>
>
> On 07/14/2016 05:49 PM, Srikanth wrote:
> > Ah, I was hoping for a magic solution not a jira!
> >
> > Another thought was to embed table2 as KTable in custom processor and use
> > that for lookup.
> >
> > class CustomProcesser(kStreamBuilder: KStreamBuilder) extends
> > Processor[Int, Int] {
> >   private var kvStore: KeyValueStore[Int, Int] = _
> >   private val table2  = kStreamBuilder.table(IntSerde(), IntSerde(),
> > table2TopicName)
> >
> >   override def init(context: ProcessorContext) {
> >       this.kvStore =
> > context.getStateStore("lookupMap").asInstanceOf[KeyValueStore[Int, Int]]
> >       table2.foreach((k,v) => kvStore.put(k, v))
> >   }
> >   override def process(key1:Int, val1:Int) {
> >     context.forward(key1, kvStore.get(val1))
> >   }
> >
> > Then do
> >
>  table1.toStream().through(ValueBasedPartitioner).process(customProcessor)
> >
> > I'm not sure if creating a KTable inside a Processor like this is
> > semantically correct.
> > But at least both tables are co-partitioned and we don't have to
> replicate.
> >
> > Srikanth
> >
> > On Thu, Jul 14, 2016 at 11:36 AM, Matthias J. Sax <matth...@confluent.io
> >
> > wrote:
> >
> >> My bad... I should have considered this in the first place. You are
> >> absolutely right.
> >>
> >> Supporting this kind of a join is work in progress.
> >> https://issues.apache.org/jira/browse/KAFKA-3705
> >>
> >> Your custom solution (Option 1) might work... But as you mentioned, the
> >> problem will be that the first table gets replicated (and not
> >> partitioned) over all Processors, which might become a problem.
> >>
> >> -Matthias
> >>
> >>
> >> On 07/14/2016 05:22 PM, Srikanth wrote:
> >>> I should have mentioned that I tried this. It worked in other case but
> >> will
> >>> not work for this one.
> >>> I'm pasting a sample from table1 that I gave in my first email.
> >>>
> >>> Table1
> >>>   111 -> aaa
> >>>   222 -> bbb
> >>>   333 -> aaa
> >>>
> >>> Here value is not unique(aaa). So, I can't just make it a key. 333 will
> >>> then override 111.
> >>>
> >>> Srikanth
> >>>
> >>> On Thu, Jul 14, 2016 at 11:07 AM, Matthias J. Sax <
> matth...@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> You will need to set a new key before you do the re-partitioning. In
> >>>> your case, it seems you want to switch key and value. This can be done
> >>>> with a simple map
> >>>>
> >>>>> table1.toStream()
> >>>>>       .map(new KeyValueMapper<K, V, KeyValue<V, K>() {
> >>>>>              public KeyValue<V, K> apply(K key, V value) {
> >>>>>                return new KeyValue<V, K>(value, key);
> >>>>>              }
> >>>>>            })
> >>>>>       .to("my-repartioning-topic");
> >>>>> newTable1 = builder.table("my-repartioning-topic");
> >>>>
> >>>> With K and V being the actual types of key and value in table1.
> >>>>
> >>>> Of course, you can modify the table entries in map() in any other way
> >>>> that suits your use case. You only need to make sure, to set the
> (join)
> >>>> key before you do the re-partitioning.
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 07/14/2016 04:47 PM, Srikanth wrote:
> >>>>> Matthias,
> >>>>>
> >>>>> With option 2, how would we perform join after re-partition. Although
> >> we
> >>>>> re-partitioned with value, the key doesn't change.
> >>>>> KTable joins always use keys and ValueJoiner get values from both
> table
> >>>>> when keys match.
> >>>>>
> >>>>> Having data co-located will not be sufficient rt??
> >>>>>
> >>>>> Srikanth
> >>>>>
> >>>>> On Thu, Jul 14, 2016 at 4:12 AM, Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> I would recommend re-partitioning as described in Option 2.
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 07/13/2016 10:53 PM, Srikanth wrote:
> >>>>>>> Hello,
> >>>>>>>
> >>>>>>> I'm trying the following join using KTable. There are two change
> log
> >>>>>> tables.
> >>>>>>> Table1
> >>>>>>>   111 -> aaa
> >>>>>>>   222 -> bbb
> >>>>>>>   333 -> aaa
> >>>>>>>
> >>>>>>> Table2
> >>>>>>>   aaa -> 999
> >>>>>>>   bbb -> 888
> >>>>>>>   ccc -> 777
> >>>>>>>
> >>>>>>> My result table should be
> >>>>>>>   111 -> 999
> >>>>>>>   222 -> 888
> >>>>>>>   333 -> 999
> >>>>>>>
> >>>>>>> Its not a case for join() as the keys don't match. Its more a
> lookup
> >>>>>> table.
> >>>>>>>
> >>>>>>> Option1 is to use a Table1.toStream().process(ProcessSupplier(),
> >>>>>>> "storeName")
> >>>>>>> punctuate() will use regular kafka consumer that reads updates from
> >>>>>> Table2
> >>>>>>> and updates a private map.
> >>>>>>> Process() will do a key-value lookup.
> >>>>>>> This has an advantage when Table1 is much larger than Table2.
> >>>>>>> Each instance of the processor will have to hold entire Table2.
> >>>>>>>
> >>>>>>> Option2 is to re-partition Table1 using through(StreamPartitioner)
> >> and
> >>>>>>> partition using value.
> >>>>>>> This will ensure co-location. Then join with Table2. This part
> might
> >> be
> >>>>>>> tricky??
> >>>>>>>
> >>>>>>> Your comments and suggestions are welcome!
> >>>>>>>
> >>>>>>> Srikanth
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to