Thanks for clarifying that !
On Mon, Oct 23, 2017 at 3:31 AM, Michael Noll <mich...@confluent.io> wrote: > > *What key should the join on ? * > > The message key, on both cases, should contain the user ID in String > format. > > > *There seems to be no common key (eg. user) between the 2 classes - > PageView > and UserProfile* > > The user ID is the common key, but the user ID is stored in the respective > message *keys*, whereas PageView and UserProfile values are stored in the > message *values* in Kafka. > > Btw, you might also want to take a look at > https://github.com/confluentinc/kafka-streams-examples, which has many > more > examples, including a PageView demo called `PageViewRegionLambdaExample` > that is similar to the one you shared above. `PageViewRegionLambdaExample` > ships with a data generator for the example, and instructions for how to > run it. > > It would be great if we had the same setup for the Apache Kafka page view > example, of course. Pull requests are very welcome. :-) > > Hope this helps, > Michael > > > > > > > On Sun, Oct 22, 2017 at 9:30 PM, karan alang <karan.al...@gmail.com> > wrote: > > > Hello all - > > I'm trying to run the sample PageView Kafka streams example, > > (ref - > > https://github.com/apache/kafka/tree/0.11.0/streams/ > > examples/src/main/java/org/apache/kafka/streams/examples/pageview > > ) > > and have a question .. > > > > There is a leftJoin between PageView (Kstream) and UserProfile(Ktable) as > > shown below... The join should give - PageViewByRegions > > > > *What key should the join on ? * > > > > *There seems to be no common key (eg. user) between the 2 classes - > > PageView and UserProfile* > > > > *Also, what should a sample input data in the PageView & UserProfile > > specific topics be ?* > > Do we need to add surrogate (id) key to the input for these, to enable > the > > left join ? > > > > Code : > > > > *------------- static Classes ------------ * > > > > > > > > static public class PageView { > > > public String user; > > > public String page; > > > public Long timestamp; > > > } > > > > > > > > > > static public class UserProfile { > > > public String region; > > > public Long timestamp; > > > } > > > > > > > > *----------Join between the views( i.e. KStream - PageView) & users > (KTable > > - UserProfile) * > > > > > > KStream<String, PageViewByRegion> kstream1 = > > > *views .leftJoin(users, * > > > //PageView - first value type > > > //UserProfile - 2nd value type > > > //PageViewByRegion - Joined Value > > > new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { > > > @Override > > > public PageViewByRegion apply(PageView view, UserProfile > > > profile) { > > > PageViewByRegion viewByRegion = new PageViewByRegion(); > > > viewByRegion.user = view.user; > > > viewByRegion.page = view.page; > > > System.out.println(" viewByRegion.user " + > > > viewByRegion.user); > > > System.out.println(" viewByRegion.page " + > > > viewByRegion.page); > > > > > > if (profile != null) { > > > viewByRegion.region = profile.region; > > > } else { > > > viewByRegion.region = "UNKNOWN"; > > > } > > > > > > System.out.println(" viewByRegion.page " + > > > viewByRegion.region); > > > > > > return viewByRegion; > > > } > > > } > > > ) > > >