Hello, How do I build a KTable from two topics such that key is in one topic and value in other?
Ex, topic1 has a key called basekey and userId as value. topic2 has same basekey and locationId as value topic1 = {"basekey":1,"userId":111} topic1 = {"basekey":2,"userId":222} topic2 = {"basekey":1,"locId":888} topic2 = {"basekey":2,"locId":999} I want to build a KTable with userId as key and locationId as value. This KTable will be used to enrich a KStream that only has userId and needs to be updated with locationId. val KTable1: KTable[Integer, Integer] = kStreamBuilder.table(intSerde, intSerde, "topic1") --> basekey is used as key val KTable2: KTable[Integer, Integer] = kStreamBuilder.table(intSerde, intSerde, "topic2") --> basekey is used as key val metadataKTable: KTable[Integer, Integer] = KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc) ) //.map((k,v) => (v._1, v._2) --> .map() is not supported on KTable Problem is KTable doesn't have an API to update its key. It only has a mapValue(). I guess since the key is used in underlying rocksDB, it isn't easy to change the key. I was exploring if I can pass it through() another topic using custom StreamPartitioner. That will let me partition using a field in value but still can't replace the key. Alternate one, is to join the KStream with topic1 to get "basekey". Then join it again with topic2 to get locationId. This will cause KStream to be shuffled twice. Alternate two, is to have this logic as a separate topology. That will write metadata to a topic. val metadataKStream = metadataKTable.toStream() .map((k,v) => new KeyValue(v._1, v._2)) .to("intermediate topic") Another topology will read the stream topic and perform a join. val kTable = kStreamBuilder.table(intSerde, intSerde, "intermediate topic") val joinedKStream = someKStream.join(kTable, ...) Any thoughts on what could be a good approach? Srikanth