I would not expect a performance difference.

-Matthias

On 06/02/2016 06:15 PM, Srikanth wrote:
> In terms of performance there is not going to be much difference to+table
> vs through+aggregateByKey rt?
> 
> Srikanth
> 
> 
> On Thu, Jun 2, 2016 at 9:21 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Hi Srikanth,
>>
>> your third approach seems to be the best fit. It uses only one shuffle
>> of the data (which you cannot prevent in any case).
>>
>> If you want to put everything into a single application, you could use a
>> "dummy" custom aggregation to convert the KStream into a KTable instead
>> of writing into a topic and reading it from a second application.
>>
>> val kTable = metadataKTable
>>              .toStream()
>>              .map((k,v) => new KeyValue(v._1, v._2))
>>              .through("intermediate topic")
>>              .aggregateByKey(...);
>>
>> The aggregate function just replaces the old value with the new value
>> (ie, not really performing an aggregation).
>>
>> -Matthias
>>
>>
>> On 06/01/2016 08:03 PM, Srikanth wrote:
>>> Hello,
>>>
>>> How do I build a KTable from two topics such that key is in one topic and
>>> value in other?
>>>
>>> Ex,
>>> topic1 has a key called basekey and userId as value.
>>> topic2 has same basekey and locationId as value
>>>
>>> topic1 = {"basekey":1,"userId":111}
>>> topic1 = {"basekey":2,"userId":222}
>>>
>>> topic2 = {"basekey":1,"locId":888}
>>> topic2 = {"basekey":2,"locId":999}
>>>
>>> I want to build a KTable with userId as key and locationId as value.
>>> This KTable will be used to enrich a KStream that only has userId and
>> needs
>>> to be updated with locationId.
>>>
>>>     val KTable1: KTable[Integer, Integer] =
>> kStreamBuilder.table(intSerde,
>>> intSerde, "topic1")  --> basekey is used as key
>>>     val KTable2: KTable[Integer, Integer] =
>> kStreamBuilder.table(intSerde,
>>> intSerde, "topic2")  --> basekey is used as key
>>>
>>>     val metadataKTable: KTable[Integer, Integer] =
>>>       KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc) )
>>>                //.map((k,v) => (v._1, v._2) --> .map() is not supported
>> on
>>> KTable
>>>
>>> Problem is KTable doesn't have an API to update its key. It only has a
>>> mapValue().
>>> I guess since the key is used in underlying rocksDB, it isn't easy to
>>> change the key.
>>> I was exploring if I can pass it through() another topic using
>>> custom StreamPartitioner.
>>> That will let me partition using a field in value but still can't replace
>>> the key.
>>>
>>>
>>> Alternate one, is to join the KStream with topic1 to get "basekey". Then
>>> join it again with topic2 to get locationId.
>>> This will cause KStream to be shuffled twice.
>>>
>>>
>>> Alternate two, is to have this logic as a separate topology. That will
>>> write metadata to a topic.
>>>     val metadataKStream = metadataKTable.toStream()
>>>                                             .map((k,v) => new
>>> KeyValue(v._1, v._2))
>>>                                             .to("intermediate topic")
>>>
>>> Another topology will read the stream topic and perform a join.
>>>     val kTable =  kStreamBuilder.table(intSerde, intSerde, "intermediate
>>> topic")
>>>     val joinedKStream =  someKStream.join(kTable, ...)
>>>
>>> Any thoughts on what could be a good approach?
>>>
>>> Srikanth
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to