Hello,

I want to join 2 Topics (KStreams)


Stream 1
Topic :  userIdClicks
Key : userId
Value : JSON String with event details

Stream 2
Topic :  userIdChannel
Key : userId
Value : JSON String  with event details and has channel value

I could not find any examples with KStream-to-KStream Join.

Here is my code

//build stream userIdClicks
> KStream<String, Long> userClickStream = builder.stream(stringSerde, 
> stringSerde,
> "userClicks");
>


> //create stream -> < userId, 1 (count) >
> KStream<String, Long> *userClickCountStream* = userClickStream.filter((
> userId,record)-> userId != null) .map((userId,record) -> new KeyValue<>(
> userId,1l));
>


> //build stream userChannelStream
> KStream<String, String> userEventStream = builder.stream(stringSerde,
> stringSerde, "userEvents");
>


> //create stream <userId, channel> : extract channel value from json string
> KStream<String, String> *userChannelStream* =  userEventStream
>                 .filter((userId,record)-> userId != null)
>                 .map((userId,record) -> new KeyValue<>(userId
> ,JsonPath.read(record, "$.event.page.channel").toString()));
>


> //join *userClickCountStream* with
> *userChannelStream*KTable<String, Long> clicksPerChannel =
> userClickCountStream
>         .join(userChannelStream, new ValueJoiner<Long, String,
> ChannelWithClicks>() {
>              @Override
>              public ChannelWithClicks apply(Long clicks, String channel) {
>                 return new ChannelWithClicks(channel == null ? "UNKNOWN"
> : channel, clicks);
>              }
>          }, 
> JoinWindows.of("ClicksPerChannelwindowed").after(30000).before(30000))
> //30 secs before and after
>         .map((user, channelWithClicks) -> new KeyValue<>(channelWithClicks
> .getChannel(), channelWithClicks.getClicks()))
>         .reduceByKey(
>                     (firstClicks, secondClicks) -> firstClicks +
> secondClicks,
>                      stringSerde, longSerde, "ClicksPerChannelUnwindowed"
> );

When I run this topology, I get an exception

Invalid topology building: KSTREAM-MAP-0000000003 and
KSTREAM-MAP-0000000006 are not joinable

I looking for a way to join 2 KStreams.

Thanks,

Vivek

Reply via email to