Hello,

How do I build a KTable from two topics such that key is in one topic and
value in other?

Ex,
topic1 has a key called basekey and userId as value.
topic2 has same basekey and locationId as value

topic1 = {"basekey":1,"userId":111}
topic1 = {"basekey":2,"userId":222}

topic2 = {"basekey":1,"locId":888}
topic2 = {"basekey":2,"locId":999}

I want to build a KTable with userId as key and locationId as value.
This KTable will be used to enrich a KStream that only has userId and needs
to be updated with locationId.

    val KTable1: KTable[Integer, Integer] = kStreamBuilder.table(intSerde,
intSerde, "topic1")  --> basekey is used as key
    val KTable2: KTable[Integer, Integer] = kStreamBuilder.table(intSerde,
intSerde, "topic2")  --> basekey is used as key

    val metadataKTable: KTable[Integer, Integer] =
      KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc) )
               //.map((k,v) => (v._1, v._2) --> .map() is not supported  on
KTable

Problem is KTable doesn't have an API to update its key. It only has a
mapValue().
I guess since the key is used in underlying rocksDB, it isn't easy to
change the key.
I was exploring if I can pass it through() another topic using
custom StreamPartitioner.
That will let me partition using a field in value but still can't replace
the key.


Alternate one, is to join the KStream with topic1 to get "basekey". Then
join it again with topic2 to get locationId.
This will cause KStream to be shuffled twice.


Alternate two, is to have this logic as a separate topology. That will
write metadata to a topic.
    val metadataKStream = metadataKTable.toStream()
                                            .map((k,v) => new
KeyValue(v._1, v._2))
                                            .to("intermediate topic")

Another topology will read the stream topic and perform a join.
    val kTable =  kStreamBuilder.table(intSerde, intSerde, "intermediate
topic")
    val joinedKStream =  someKStream.join(kTable, ...)

Any thoughts on what could be a good approach?

Srikanth

Reply via email to