First a follow-up question, just in case (sorry if that was obvious to you
already):  Have you considered using Kafka Streams DSL, which has much more
convenient join functionality built-in out of the box?  The reason I am
asking is that you didn't specifically mention that you did try using the
DSL and/or you need to use the Processor API because of other reasons as
well.

-Michael


On Fri, Aug 26, 2016 at 1:51 AM, Caleb Welton <ca...@autonomic.ai> wrote:

> Hello,
>
> I'm trying to understand best practices related to joining streams using
> the Kafka Streams API.
>
> I can configure the topology such that two sources feed into a single
> processor:
>
> topologyBuilder
>     .addSource("A", stringDeserializer, itemDeserializer, "a-topic")
>     .addSource("B", stringDeserializer, itemDeserializer, "b-topic)
>     .addProcessor("hello-join", HelloJoin::new, "A", "B")...
>
> And within my processor I can determine which topic a given message came
> from:
>
> public void process(String Key, String value) {
>      if (context.topic.equals("a-topic") {
>          ...
>      } else {
>          ...
>      }
>
> This allows for a crude form of cross stream join with the following
> issues/limitations:
>
>   i.  A string compare on topic name to decide which stream a message came
> from.  Having actual access to the TopicPartition could lead to more
> efficient validation.  Priority low, as this is just a small performance
> hit, but it is a per message performance hit so would be nice to eliminate.
>
>   ii. This requires "a-topic" and "b-topic" to have the same message
> format, which for general join handling is a pretty big limitation.  What
> would be the recommended way to handle the case of different message
> formats, e.g. needing different deserializers for different input topics?
>
> E.g. how would I define my Processor if the topology was:
>
> topologyBuilder
>     .addSource("A", stringDeserializer, itemADeserializer, "a-topic")
>     .addSource("B", stringDeserializer, itemBDeserializer, "b-topic)
>     .addProcessor("hello-join", HelloJoin::new, "A", "B")...
>
> where itemADeserializer and itemBDeserializer return different classes?
>
> Thanks,
>   Caleb
>

Reply via email to