Hi Uirco,

The method that doesn’t take Consumed will fall back to the configured “default 
serdes”. If you don’t have that confit set, it will just keep them as byte 
arrays, which will probably give you an exception at runtime. You’ll probably 
want to use the Consumed argument to set your serdes. 

I’m assuming from your question that the serialized data in your two topics 
have different formats, but that your serdes for them produce the same result 
type. That is, the resulting stream would have just one type in it.

I think you have two options, you can either create a “wrapper deserializer” 
that checks the input topic for each record and then delegates to the 
appropriate deserializer, or you can create two different streams for the two 
topics and then use the ‘merge’ operator to combine them into one stream.

Offhand, I don’t think there would be much of a performance difference among 
any of the options, so I’d suggest going for the approach that looks the 
cleanest to you.

Regarding the last question, I’m not aware of any plans, but if you think that 
would be a better API, you are welcome to propose it in a KIP!

I hope this helps,
John

On Thu, Nov 12, 2020, at 22:00, Uirco Aoi wrote:
> have anyone used public synchronized <K, V> KStream<K, V> stream(final 
> Collection<String> topics)?
> my use case is I have one stream but with two source input using 
> different serde.
> looks like the best way is to use this stream API, but I have some 
> concerns
> 1. currently the only API I can use is 
> public synchronized <K, V> KStream<K, V> stream(final 
> Collection<String> topics)
> does it means after I got it, I have to deserialize the data myself?
> and will it slow down the performance, compare to using it with 
> consumed.with ?
> 2. will it in the future support public synchronized <K, V> KStream<K, 
> V> stream(final Collection<List<TopicsWithConsumed>> )
> class TopicsWithConsumed pair{
> final String topic
> final Consumed<K, V> consumed
> }
>

Reply via email to