Ah, I was hoping for a magic solution not a jira!

Another thought was to embed table2 as KTable in custom processor and use
that for lookup.

class CustomProcesser(kStreamBuilder: KStreamBuilder) extends
Processor[Int, Int] {
  private var kvStore: KeyValueStore[Int, Int] = _
  private val table2  = kStreamBuilder.table(IntSerde(), IntSerde(),
table2TopicName)

  override def init(context: ProcessorContext) {
      this.kvStore =
context.getStateStore("lookupMap").asInstanceOf[KeyValueStore[Int, Int]]
      table2.foreach((k,v) => kvStore.put(k, v))
  }
  override def process(key1:Int, val1:Int) {
    context.forward(key1, kvStore.get(val1))
  }

Then do
  table1.toStream().through(ValueBasedPartitioner).process(customProcessor)

I'm not sure if creating a KTable inside a Processor like this is
semantically correct.
But at least both tables are co-partitioned and we don't have to replicate.

Srikanth

On Thu, Jul 14, 2016 at 11:36 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> My bad... I should have considered this in the first place. You are
> absolutely right.
>
> Supporting this kind of a join is work in progress.
> https://issues.apache.org/jira/browse/KAFKA-3705
>
> Your custom solution (Option 1) might work... But as you mentioned, the
> problem will be that the first table gets replicated (and not
> partitioned) over all Processors, which might become a problem.
>
> -Matthias
>
>
> On 07/14/2016 05:22 PM, Srikanth wrote:
> > I should have mentioned that I tried this. It worked in other case but
> will
> > not work for this one.
> > I'm pasting a sample from table1 that I gave in my first email.
> >
> > Table1
> >   111 -> aaa
> >   222 -> bbb
> >   333 -> aaa
> >
> > Here value is not unique(aaa). So, I can't just make it a key. 333 will
> > then override 111.
> >
> > Srikanth
> >
> > On Thu, Jul 14, 2016 at 11:07 AM, Matthias J. Sax <matth...@confluent.io
> >
> > wrote:
> >
> >> You will need to set a new key before you do the re-partitioning. In
> >> your case, it seems you want to switch key and value. This can be done
> >> with a simple map
> >>
> >>> table1.toStream()
> >>>       .map(new KeyValueMapper<K, V, KeyValue<V, K>() {
> >>>              public KeyValue<V, K> apply(K key, V value) {
> >>>                return new KeyValue<V, K>(value, key);
> >>>              }
> >>>            })
> >>>       .to("my-repartioning-topic");
> >>> newTable1 = builder.table("my-repartioning-topic");
> >>
> >> With K and V being the actual types of key and value in table1.
> >>
> >> Of course, you can modify the table entries in map() in any other way
> >> that suits your use case. You only need to make sure, to set the (join)
> >> key before you do the re-partitioning.
> >>
> >> -Matthias
> >>
> >>
> >> On 07/14/2016 04:47 PM, Srikanth wrote:
> >>> Matthias,
> >>>
> >>> With option 2, how would we perform join after re-partition. Although
> we
> >>> re-partitioned with value, the key doesn't change.
> >>> KTable joins always use keys and ValueJoiner get values from both table
> >>> when keys match.
> >>>
> >>> Having data co-located will not be sufficient rt??
> >>>
> >>> Srikanth
> >>>
> >>> On Thu, Jul 14, 2016 at 4:12 AM, Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> I would recommend re-partitioning as described in Option 2.
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 07/13/2016 10:53 PM, Srikanth wrote:
> >>>>> Hello,
> >>>>>
> >>>>> I'm trying the following join using KTable. There are two change log
> >>>> tables.
> >>>>> Table1
> >>>>>   111 -> aaa
> >>>>>   222 -> bbb
> >>>>>   333 -> aaa
> >>>>>
> >>>>> Table2
> >>>>>   aaa -> 999
> >>>>>   bbb -> 888
> >>>>>   ccc -> 777
> >>>>>
> >>>>> My result table should be
> >>>>>   111 -> 999
> >>>>>   222 -> 888
> >>>>>   333 -> 999
> >>>>>
> >>>>> Its not a case for join() as the keys don't match. Its more a lookup
> >>>> table.
> >>>>>
> >>>>> Option1 is to use a Table1.toStream().process(ProcessSupplier(),
> >>>>> "storeName")
> >>>>> punctuate() will use regular kafka consumer that reads updates from
> >>>> Table2
> >>>>> and updates a private map.
> >>>>> Process() will do a key-value lookup.
> >>>>> This has an advantage when Table1 is much larger than Table2.
> >>>>> Each instance of the processor will have to hold entire Table2.
> >>>>>
> >>>>> Option2 is to re-partition Table1 using through(StreamPartitioner)
> and
> >>>>> partition using value.
> >>>>> This will ensure co-location. Then join with Table2. This part might
> be
> >>>>> tricky??
> >>>>>
> >>>>> Your comments and suggestions are welcome!
> >>>>>
> >>>>> Srikanth
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to