I should have mentioned that I tried this. It worked in other case but will
not work for this one.
I'm pasting a sample from table1 that I gave in my first email.

Table1
  111 -> aaa
  222 -> bbb
  333 -> aaa

Here value is not unique(aaa). So, I can't just make it a key. 333 will
then override 111.

Srikanth

On Thu, Jul 14, 2016 at 11:07 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> You will need to set a new key before you do the re-partitioning. In
> your case, it seems you want to switch key and value. This can be done
> with a simple map
>
> > table1.toStream()
> >       .map(new KeyValueMapper<K, V, KeyValue<V, K>() {
> >              public KeyValue<V, K> apply(K key, V value) {
> >                return new KeyValue<V, K>(value, key);
> >              }
> >            })
> >       .to("my-repartioning-topic");
> > newTable1 = builder.table("my-repartioning-topic");
>
> With K and V being the actual types of key and value in table1.
>
> Of course, you can modify the table entries in map() in any other way
> that suits your use case. You only need to make sure, to set the (join)
> key before you do the re-partitioning.
>
> -Matthias
>
>
> On 07/14/2016 04:47 PM, Srikanth wrote:
> > Matthias,
> >
> > With option 2, how would we perform join after re-partition. Although we
> > re-partitioned with value, the key doesn't change.
> > KTable joins always use keys and ValueJoiner get values from both table
> > when keys match.
> >
> > Having data co-located will not be sufficient rt??
> >
> > Srikanth
> >
> > On Thu, Jul 14, 2016 at 4:12 AM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> I would recommend re-partitioning as described in Option 2.
> >>
> >> -Matthias
> >>
> >> On 07/13/2016 10:53 PM, Srikanth wrote:
> >>> Hello,
> >>>
> >>> I'm trying the following join using KTable. There are two change log
> >> tables.
> >>> Table1
> >>>   111 -> aaa
> >>>   222 -> bbb
> >>>   333 -> aaa
> >>>
> >>> Table2
> >>>   aaa -> 999
> >>>   bbb -> 888
> >>>   ccc -> 777
> >>>
> >>> My result table should be
> >>>   111 -> 999
> >>>   222 -> 888
> >>>   333 -> 999
> >>>
> >>> Its not a case for join() as the keys don't match. Its more a lookup
> >> table.
> >>>
> >>> Option1 is to use a Table1.toStream().process(ProcessSupplier(),
> >>> "storeName")
> >>> punctuate() will use regular kafka consumer that reads updates from
> >> Table2
> >>> and updates a private map.
> >>> Process() will do a key-value lookup.
> >>> This has an advantage when Table1 is much larger than Table2.
> >>> Each instance of the processor will have to hold entire Table2.
> >>>
> >>> Option2 is to re-partition Table1 using through(StreamPartitioner) and
> >>> partition using value.
> >>> This will ensure co-location. Then join with Table2. This part might be
> >>> tricky??
> >>>
> >>> Your comments and suggestions are welcome!
> >>>
> >>> Srikanth
> >>>
> >>
> >>
> >
>
>

Reply via email to