I would not expect a performance difference. -Matthias
On 06/02/2016 06:15 PM, Srikanth wrote: > In terms of performance there is not going to be much difference to+table > vs through+aggregateByKey rt? > > Srikanth > > > On Thu, Jun 2, 2016 at 9:21 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Hi Srikanth, >> >> your third approach seems to be the best fit. It uses only one shuffle >> of the data (which you cannot prevent in any case). >> >> If you want to put everything into a single application, you could use a >> "dummy" custom aggregation to convert the KStream into a KTable instead >> of writing into a topic and reading it from a second application. >> >> val kTable = metadataKTable >> .toStream() >> .map((k,v) => new KeyValue(v._1, v._2)) >> .through("intermediate topic") >> .aggregateByKey(...); >> >> The aggregate function just replaces the old value with the new value >> (ie, not really performing an aggregation). >> >> -Matthias >> >> >> On 06/01/2016 08:03 PM, Srikanth wrote: >>> Hello, >>> >>> How do I build a KTable from two topics such that key is in one topic and >>> value in other? >>> >>> Ex, >>> topic1 has a key called basekey and userId as value. >>> topic2 has same basekey and locationId as value >>> >>> topic1 = {"basekey":1,"userId":111} >>> topic1 = {"basekey":2,"userId":222} >>> >>> topic2 = {"basekey":1,"locId":888} >>> topic2 = {"basekey":2,"locId":999} >>> >>> I want to build a KTable with userId as key and locationId as value. >>> This KTable will be used to enrich a KStream that only has userId and >> needs >>> to be updated with locationId. >>> >>> val KTable1: KTable[Integer, Integer] = >> kStreamBuilder.table(intSerde, >>> intSerde, "topic1") --> basekey is used as key >>> val KTable2: KTable[Integer, Integer] = >> kStreamBuilder.table(intSerde, >>> intSerde, "topic2") --> basekey is used as key >>> >>> val metadataKTable: KTable[Integer, Integer] = >>> KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc) ) >>> //.map((k,v) => (v._1, v._2) --> .map() is not supported >> on >>> KTable >>> >>> Problem is KTable doesn't have an API to update its key. It only has a >>> mapValue(). >>> I guess since the key is used in underlying rocksDB, it isn't easy to >>> change the key. >>> I was exploring if I can pass it through() another topic using >>> custom StreamPartitioner. >>> That will let me partition using a field in value but still can't replace >>> the key. >>> >>> >>> Alternate one, is to join the KStream with topic1 to get "basekey". Then >>> join it again with topic2 to get locationId. >>> This will cause KStream to be shuffled twice. >>> >>> >>> Alternate two, is to have this logic as a separate topology. That will >>> write metadata to a topic. >>> val metadataKStream = metadataKTable.toStream() >>> .map((k,v) => new >>> KeyValue(v._1, v._2)) >>> .to("intermediate topic") >>> >>> Another topology will read the stream topic and perform a join. >>> val kTable = kStreamBuilder.table(intSerde, intSerde, "intermediate >>> topic") >>> val joinedKStream = someKStream.join(kTable, ...) >>> >>> Any thoughts on what could be a good approach? >>> >>> Srikanth >>> >> >> >
signature.asc
Description: OpenPGP digital signature