Hello Srikanth, When involved in joins, KTable need to pass both the old value as well as the new value as a pair to the join operator since it is "an ever updating table with the underlying changelog", for example, your topic1 stream have key "128073" with updated values from 542361 to 560608. The pair<old, new> has will be printed as "(new <- old)" when printing directly; but if you call "table.toStream.print", the library will then ignore the old value in "toStream" but only pass the new value to the "print" operator.
Now as for the duplicate: they are actually from the pair <old, new>, i.e. if you remove "toStream" for your metadataKTable, I think you will see sth. like: 128073 , null <- null 128073 , (542361,100710) <- (null, 100710) ... 128073 , (560608,100710) <- (542361, 100710) Guozhang On Thu, Jun 2, 2016 at 9:58 AM, Srikanth <srikanth...@gmail.com> wrote: > I did try approach 3 yesterday with the following sample data. > > topic1: > 127339 538433 > 131933 626026 > 128072 536012 > 128074 546262 > *123507 517631* > 128073 542361 > 128073 560608 > > topic2: > 128074 100282 > 131933 100394 > 127339 100445 > 128073 100710 > *123507 100226* > > I joined these and printed the result > val KTable1 = kStreamBuilder.table(intSerde, intSerde, "topic1") > val KTable2 = kStreamBuilder.table(intSerde, intSerde, "topic2") > val metadataKTable = KTable1.join(KTable2, (user, loc) => (user, loc) ) > metadataKTable.toStream().print() > > In the output I see each key being output twice. Didn't understand why? > > Started Streams Example. > *123507 , (517631,100226)* > 127339 , (538433,100445) > 128073 , (542361,100710) > 131933 , (626026,100394) > 128073 , (560608,100710) > 128072 , null > 128074 , (546262,100282) > 128074 , (546262,100282) > 128073 , (560608,100710) > *123507 , (517631,100226)* > 131933 , (626026,100394) > 127339 , (538433,100445) > Finished Streams Example. > > If I store the joinedTable to an intermediate topic and read it back, I see > duplicate records too. > val metadataKTable = kStreamBuilder.table(intSerde, intSerde, > metadataTopicName) > metadataKTable.print() > > Started Streams Example. > 538433 , (100445<-null) > 546262 , (100282<-null) > 546262 , (100282<-null) > 538433 , (100445<-null) > *517631 , (100226<-null)* > *517631 , (100226<-null)* > 542361 , (100710<-null) > 560608 , (100710<-null) > 560608 , (100710<-null) > 626026 , (100394<-null) > 626026 , (100394<-null) > Finished Streams Example. > > BTW, what is the strange "*<-null"* in KTable.print mean? > > Srikanth > > On Thu, Jun 2, 2016 at 9:21 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Hi Srikanth, > > > > your third approach seems to be the best fit. It uses only one shuffle > > of the data (which you cannot prevent in any case). > > > > If you want to put everything into a single application, you could use a > > "dummy" custom aggregation to convert the KStream into a KTable instead > > of writing into a topic and reading it from a second application. > > > > val kTable = metadataKTable > > .toStream() > > .map((k,v) => new KeyValue(v._1, v._2)) > > .through("intermediate topic") > > .aggregateByKey(...); > > > > The aggregate function just replaces the old value with the new value > > (ie, not really performing an aggregation). > > > > -Matthias > > > > > > On 06/01/2016 08:03 PM, Srikanth wrote: > > > Hello, > > > > > > How do I build a KTable from two topics such that key is in one topic > and > > > value in other? > > > > > > Ex, > > > topic1 has a key called basekey and userId as value. > > > topic2 has same basekey and locationId as value > > > > > > topic1 = {"basekey":1,"userId":111} > > > topic1 = {"basekey":2,"userId":222} > > > > > > topic2 = {"basekey":1,"locId":888} > > > topic2 = {"basekey":2,"locId":999} > > > > > > I want to build a KTable with userId as key and locationId as value. > > > This KTable will be used to enrich a KStream that only has userId and > > needs > > > to be updated with locationId. > > > > > > val KTable1: KTable[Integer, Integer] = > > kStreamBuilder.table(intSerde, > > > intSerde, "topic1") --> basekey is used as key > > > val KTable2: KTable[Integer, Integer] = > > kStreamBuilder.table(intSerde, > > > intSerde, "topic2") --> basekey is used as key > > > > > > val metadataKTable: KTable[Integer, Integer] = > > > KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc) > ) > > > //.map((k,v) => (v._1, v._2) --> .map() is not supported > > on > > > KTable > > > > > > Problem is KTable doesn't have an API to update its key. It only has a > > > mapValue(). > > > I guess since the key is used in underlying rocksDB, it isn't easy to > > > change the key. > > > I was exploring if I can pass it through() another topic using > > > custom StreamPartitioner. > > > That will let me partition using a field in value but still can't > replace > > > the key. > > > > > > > > > Alternate one, is to join the KStream with topic1 to get "basekey". > Then > > > join it again with topic2 to get locationId. > > > This will cause KStream to be shuffled twice. > > > > > > > > > Alternate two, is to have this logic as a separate topology. That will > > > write metadata to a topic. > > > val metadataKStream = metadataKTable.toStream() > > > .map((k,v) => new > > > KeyValue(v._1, v._2)) > > > .to("intermediate topic") > > > > > > Another topology will read the stream topic and perform a join. > > > val kTable = kStreamBuilder.table(intSerde, intSerde, > "intermediate > > > topic") > > > val joinedKStream = someKStream.join(kTable, ...) > > > > > > Any thoughts on what could be a good approach? > > > > > > Srikanth > > > > > > > > -- -- Guozhang