First a follow-up question, just in case (sorry if that was obvious to you already): Have you considered using Kafka Streams DSL, which has much more convenient join functionality built-in out of the box? The reason I am asking is that you didn't specifically mention that you did try using the DSL and/or you need to use the Processor API because of other reasons as well.
-Michael On Fri, Aug 26, 2016 at 1:51 AM, Caleb Welton <ca...@autonomic.ai> wrote: > Hello, > > I'm trying to understand best practices related to joining streams using > the Kafka Streams API. > > I can configure the topology such that two sources feed into a single > processor: > > topologyBuilder > .addSource("A", stringDeserializer, itemDeserializer, "a-topic") > .addSource("B", stringDeserializer, itemDeserializer, "b-topic) > .addProcessor("hello-join", HelloJoin::new, "A", "B")... > > And within my processor I can determine which topic a given message came > from: > > public void process(String Key, String value) { > if (context.topic.equals("a-topic") { > ... > } else { > ... > } > > This allows for a crude form of cross stream join with the following > issues/limitations: > > i. A string compare on topic name to decide which stream a message came > from. Having actual access to the TopicPartition could lead to more > efficient validation. Priority low, as this is just a small performance > hit, but it is a per message performance hit so would be nice to eliminate. > > ii. This requires "a-topic" and "b-topic" to have the same message > format, which for general join handling is a pretty big limitation. What > would be the recommended way to handle the case of different message > formats, e.g. needing different deserializers for different input topics? > > E.g. how would I define my Processor if the topology was: > > topologyBuilder > .addSource("A", stringDeserializer, itemADeserializer, "a-topic") > .addSource("B", stringDeserializer, itemBDeserializer, "b-topic) > .addProcessor("hello-join", HelloJoin::new, "A", "B")... > > where itemADeserializer and itemBDeserializer return different classes? > > Thanks, > Caleb >