Yes, there are same number of partitions to both the topic, also same partition key i.e userId If I just join the streams without applying the map functions (in this case userClickStream and userEventSrtream) , it works.
Thanks, Vivek On Wed, Jul 13, 2016 at 4:53 PM, Philippe Derome <phder...@gmail.com> wrote: > Did you specify same number of partitions for the two input topics you are > joining? I think that this is usually the first thing people ask to verify > with errors similar to yours. > > If you are experimenting with learning some concepts, it is simpler to > always use one partition for your topics. > On 13 Jul 2016 7:40 p.m., "vivek thakre" <vivek.tha...@gmail.com> wrote: > > > Hello, > > > > I want to join 2 Topics (KStreams) > > > > > > Stream 1 > > Topic : userIdClicks > > Key : userId > > Value : JSON String with event details > > > > Stream 2 > > Topic : userIdChannel > > Key : userId > > Value : JSON String with event details and has channel value > > > > I could not find any examples with KStream-to-KStream Join. > > > > Here is my code > > > > //build stream userIdClicks > > > KStream<String, Long> userClickStream = builder.stream(stringSerde, > > stringSerde, > > > "userClicks"); > > > > > > > > > > //create stream -> < userId, 1 (count) > > > > KStream<String, Long> *userClickCountStream* = userClickStream.filter(( > > > userId,record)-> userId != null) .map((userId,record) -> new > KeyValue<>( > > > userId,1l)); > > > > > > > > > > //build stream userChannelStream > > > KStream<String, String> userEventStream = builder.stream(stringSerde, > > > stringSerde, "userEvents"); > > > > > > > > > > //create stream <userId, channel> : extract channel value from json > > string > > > KStream<String, String> *userChannelStream* = userEventStream > > > .filter((userId,record)-> userId != null) > > > .map((userId,record) -> new KeyValue<>(userId > > > ,JsonPath.read(record, "$.event.page.channel").toString())); > > > > > > > > > > //join *userClickCountStream* with > > > *userChannelStream*KTable<String, Long> clicksPerChannel = > > > userClickCountStream > > > .join(userChannelStream, new ValueJoiner<Long, String, > > > ChannelWithClicks>() { > > > @Override > > > public ChannelWithClicks apply(Long clicks, String > channel) > > { > > > return new ChannelWithClicks(channel == null ? > "UNKNOWN" > > > : channel, clicks); > > > } > > > }, > > JoinWindows.of("ClicksPerChannelwindowed").after(30000).before(30000)) > > > //30 secs before and after > > > .map((user, channelWithClicks) -> new > > KeyValue<>(channelWithClicks > > > .getChannel(), channelWithClicks.getClicks())) > > > .reduceByKey( > > > (firstClicks, secondClicks) -> firstClicks + > > > secondClicks, > > > stringSerde, longSerde, > "ClicksPerChannelUnwindowed" > > > ); > > > > When I run this topology, I get an exception > > > > Invalid topology building: KSTREAM-MAP-0000000003 and > > KSTREAM-MAP-0000000006 are not joinable > > > > I looking for a way to join 2 KStreams. > > > > Thanks, > > > > Vivek > > >