Hello, I want to join 2 Topics (KStreams)
Stream 1 Topic : userIdClicks Key : userId Value : JSON String with event details Stream 2 Topic : userIdChannel Key : userId Value : JSON String with event details and has channel value I could not find any examples with KStream-to-KStream Join. Here is my code //build stream userIdClicks > KStream<String, Long> userClickStream = builder.stream(stringSerde, > stringSerde, > "userClicks"); > > //create stream -> < userId, 1 (count) > > KStream<String, Long> *userClickCountStream* = userClickStream.filter(( > userId,record)-> userId != null) .map((userId,record) -> new KeyValue<>( > userId,1l)); > > //build stream userChannelStream > KStream<String, String> userEventStream = builder.stream(stringSerde, > stringSerde, "userEvents"); > > //create stream <userId, channel> : extract channel value from json string > KStream<String, String> *userChannelStream* = userEventStream > .filter((userId,record)-> userId != null) > .map((userId,record) -> new KeyValue<>(userId > ,JsonPath.read(record, "$.event.page.channel").toString())); > > //join *userClickCountStream* with > *userChannelStream*KTable<String, Long> clicksPerChannel = > userClickCountStream > .join(userChannelStream, new ValueJoiner<Long, String, > ChannelWithClicks>() { > @Override > public ChannelWithClicks apply(Long clicks, String channel) { > return new ChannelWithClicks(channel == null ? "UNKNOWN" > : channel, clicks); > } > }, > JoinWindows.of("ClicksPerChannelwindowed").after(30000).before(30000)) > //30 secs before and after > .map((user, channelWithClicks) -> new KeyValue<>(channelWithClicks > .getChannel(), channelWithClicks.getClicks())) > .reduceByKey( > (firstClicks, secondClicks) -> firstClicks + > secondClicks, > stringSerde, longSerde, "ClicksPerChannelUnwindowed" > ); When I run this topology, I get an exception Invalid topology building: KSTREAM-MAP-0000000003 and KSTREAM-MAP-0000000006 are not joinable I looking for a way to join 2 KStreams. Thanks, Vivek