Hello Srikanth,

When involved in joins, KTable need to pass both the old value as well as
the new value as a pair to the join operator since it is "an ever updating
table with the underlying changelog", for example, your topic1 stream have
key "128073" with updated values from 542361 to 560608. The pair<old, new>
has will be printed as "(new <- old)" when printing directly; but if you
call "table.toStream.print", the library will then ignore the old value in
"toStream" but only pass the new value to the "print" operator.

Now as for the duplicate: they are actually from the pair <old, new>, i.e.
if you remove "toStream" for your metadataKTable, I think you will see sth.
like:

128073 , null <- null

128073 , (542361,100710) <- (null, 100710)
...
128073 , (560608,100710) <- (542361, 100710)


Guozhang

On Thu, Jun 2, 2016 at 9:58 AM, Srikanth <srikanth...@gmail.com> wrote:

> I did try approach 3 yesterday with the following sample data.
>
> topic1:
> 127339  538433
> 131933  626026
> 128072  536012
> 128074  546262
> *123507  517631*
> 128073  542361
> 128073  560608
>
> topic2:
> 128074  100282
> 131933  100394
> 127339  100445
> 128073  100710
> *123507  100226*
>
> I joined these and printed the result
>     val KTable1 = kStreamBuilder.table(intSerde, intSerde, "topic1")
>     val KTable2 = kStreamBuilder.table(intSerde, intSerde, "topic2")
>     val metadataKTable = KTable1.join(KTable2, (user, loc) => (user, loc) )
>     metadataKTable.toStream().print()
>
> In the output I see each key being output twice. Didn't understand why?
>
> Started Streams Example.
> *123507 , (517631,100226)*
> 127339 , (538433,100445)
> 128073 , (542361,100710)
> 131933 , (626026,100394)
> 128073 , (560608,100710)
> 128072 , null
> 128074 , (546262,100282)
> 128074 , (546262,100282)
> 128073 , (560608,100710)
> *123507 , (517631,100226)*
> 131933 , (626026,100394)
> 127339 , (538433,100445)
> Finished Streams Example.
>
> If I store the joinedTable to an intermediate topic and read it back, I see
> duplicate records too.
>     val metadataKTable = kStreamBuilder.table(intSerde, intSerde,
> metadataTopicName)
>     metadataKTable.print()
>
> Started Streams Example.
> 538433 , (100445<-null)
> 546262 , (100282<-null)
> 546262 , (100282<-null)
> 538433 , (100445<-null)
> *517631 , (100226<-null)*
> *517631 , (100226<-null)*
> 542361 , (100710<-null)
> 560608 , (100710<-null)
> 560608 , (100710<-null)
> 626026 , (100394<-null)
> 626026 , (100394<-null)
> Finished Streams Example.
>
> BTW, what is the strange "*<-null"* in KTable.print mean?
>
> Srikanth
>
> On Thu, Jun 2, 2016 at 9:21 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Hi Srikanth,
> >
> > your third approach seems to be the best fit. It uses only one shuffle
> > of the data (which you cannot prevent in any case).
> >
> > If you want to put everything into a single application, you could use a
> > "dummy" custom aggregation to convert the KStream into a KTable instead
> > of writing into a topic and reading it from a second application.
> >
> > val kTable = metadataKTable
> >              .toStream()
> >              .map((k,v) => new KeyValue(v._1, v._2))
> >              .through("intermediate topic")
> >              .aggregateByKey(...);
> >
> > The aggregate function just replaces the old value with the new value
> > (ie, not really performing an aggregation).
> >
> > -Matthias
> >
> >
> > On 06/01/2016 08:03 PM, Srikanth wrote:
> > > Hello,
> > >
> > > How do I build a KTable from two topics such that key is in one topic
> and
> > > value in other?
> > >
> > > Ex,
> > > topic1 has a key called basekey and userId as value.
> > > topic2 has same basekey and locationId as value
> > >
> > > topic1 = {"basekey":1,"userId":111}
> > > topic1 = {"basekey":2,"userId":222}
> > >
> > > topic2 = {"basekey":1,"locId":888}
> > > topic2 = {"basekey":2,"locId":999}
> > >
> > > I want to build a KTable with userId as key and locationId as value.
> > > This KTable will be used to enrich a KStream that only has userId and
> > needs
> > > to be updated with locationId.
> > >
> > >     val KTable1: KTable[Integer, Integer] =
> > kStreamBuilder.table(intSerde,
> > > intSerde, "topic1")  --> basekey is used as key
> > >     val KTable2: KTable[Integer, Integer] =
> > kStreamBuilder.table(intSerde,
> > > intSerde, "topic2")  --> basekey is used as key
> > >
> > >     val metadataKTable: KTable[Integer, Integer] =
> > >       KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc)
> )
> > >                //.map((k,v) => (v._1, v._2) --> .map() is not supported
> > on
> > > KTable
> > >
> > > Problem is KTable doesn't have an API to update its key. It only has a
> > > mapValue().
> > > I guess since the key is used in underlying rocksDB, it isn't easy to
> > > change the key.
> > > I was exploring if I can pass it through() another topic using
> > > custom StreamPartitioner.
> > > That will let me partition using a field in value but still can't
> replace
> > > the key.
> > >
> > >
> > > Alternate one, is to join the KStream with topic1 to get "basekey".
> Then
> > > join it again with topic2 to get locationId.
> > > This will cause KStream to be shuffled twice.
> > >
> > >
> > > Alternate two, is to have this logic as a separate topology. That will
> > > write metadata to a topic.
> > >     val metadataKStream = metadataKTable.toStream()
> > >                                             .map((k,v) => new
> > > KeyValue(v._1, v._2))
> > >                                             .to("intermediate topic")
> > >
> > > Another topology will read the stream topic and perform a join.
> > >     val kTable =  kStreamBuilder.table(intSerde, intSerde,
> "intermediate
> > > topic")
> > >     val joinedKStream =  someKStream.join(kTable, ...)
> > >
> > > Any thoughts on what could be a good approach?
> > >
> > > Srikanth
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to