> *What key should the join on ? *

The message key, on both cases, should contain the user ID in String format.

> *There seems to be no common key (eg. user) between the 2 classes - PageView
and UserProfile*

The user ID is the common key, but the user ID is stored in the respective
message *keys*, whereas PageView and UserProfile values are stored in the
message *values* in Kafka.

Btw, you might also want to take a look at
https://github.com/confluentinc/kafka-streams-examples, which has many more
examples, including a PageView demo called `PageViewRegionLambdaExample`
that is similar to the one you shared above.  `PageViewRegionLambdaExample`
ships with a data generator for the example, and instructions for how to
run it.

It would be great if we had the same setup for the Apache Kafka page view
example, of course.  Pull requests are very welcome. :-)

Hope this helps,
Michael






On Sun, Oct 22, 2017 at 9:30 PM, karan alang <karan.al...@gmail.com> wrote:

> Hello all -
> I'm trying to run the sample PageView Kafka streams example,
> (ref -
> https://github.com/apache/kafka/tree/0.11.0/streams/
> examples/src/main/java/org/apache/kafka/streams/examples/pageview
> )
> and have a question ..
>
> There is a leftJoin between PageView (Kstream) and UserProfile(Ktable) as
> shown below... The join should give - PageViewByRegions
>
> *What key should the join on ? *
>
> *There seems to be no common key (eg. user) between the 2 classes -
> PageView and UserProfile*
>
> *Also, what should a sample input data in the PageView & UserProfile
> specific topics be ?*
> Do we need to add surrogate (id) key to the input for these, to enable the
> left join ?
>
> Code :
>
> *------------- static Classes ------------ *
>
> >
> > static public class PageView {
> >         public String user;
> >         public String page;
> >         public Long timestamp;
> >     }
> >
>
>
> >     static public class UserProfile {
> >         public String region;
> >         public Long timestamp;
> >     }
>
>
>
> *----------Join between the views( i.e. KStream - PageView) & users (KTable
> - UserProfile) *
>
>
> KStream<String, PageViewByRegion> kstream1 =
> > *views        .leftJoin(users, *
> >         //PageView - first value type
> >         //UserProfile - 2nd value type
> >         //PageViewByRegion - Joined Value
> >         new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
> >             @Override
> >             public PageViewByRegion apply(PageView view, UserProfile
> > profile) {
> >                 PageViewByRegion viewByRegion = new PageViewByRegion();
> >                 viewByRegion.user = view.user;
> >                 viewByRegion.page = view.page;
> >                 System.out.println(" viewByRegion.user " +
> > viewByRegion.user);
> >                 System.out.println(" viewByRegion.page " +
> > viewByRegion.page);
> >
> >                 if (profile != null) {
> >                     viewByRegion.region = profile.region;
> >                 } else {
> >                     viewByRegion.region = "UNKNOWN";
> >                 }
> >
> >                 System.out.println(" viewByRegion.page " +
> > viewByRegion.region);
> >
> >                 return viewByRegion;
> >             }
> >         }
> >         )
>

Reply via email to