Thanks for clarifying that  !

On Mon, Oct 23, 2017 at 3:31 AM, Michael Noll <mich...@confluent.io> wrote:

> > *What key should the join on ? *
>
> The message key, on both cases, should contain the user ID in String
> format.
>
> > *There seems to be no common key (eg. user) between the 2 classes -
> PageView
> and UserProfile*
>
> The user ID is the common key, but the user ID is stored in the respective
> message *keys*, whereas PageView and UserProfile values are stored in the
> message *values* in Kafka.
>
> Btw, you might also want to take a look at
> https://github.com/confluentinc/kafka-streams-examples, which has many
> more
> examples, including a PageView demo called `PageViewRegionLambdaExample`
> that is similar to the one you shared above.  `PageViewRegionLambdaExample`
> ships with a data generator for the example, and instructions for how to
> run it.
>
> It would be great if we had the same setup for the Apache Kafka page view
> example, of course.  Pull requests are very welcome. :-)
>
> Hope this helps,
> Michael
>
>
>
>
>
>
> On Sun, Oct 22, 2017 at 9:30 PM, karan alang <karan.al...@gmail.com>
> wrote:
>
> > Hello all -
> > I'm trying to run the sample PageView Kafka streams example,
> > (ref -
> > https://github.com/apache/kafka/tree/0.11.0/streams/
> > examples/src/main/java/org/apache/kafka/streams/examples/pageview
> > )
> > and have a question ..
> >
> > There is a leftJoin between PageView (Kstream) and UserProfile(Ktable) as
> > shown below... The join should give - PageViewByRegions
> >
> > *What key should the join on ? *
> >
> > *There seems to be no common key (eg. user) between the 2 classes -
> > PageView and UserProfile*
> >
> > *Also, what should a sample input data in the PageView & UserProfile
> > specific topics be ?*
> > Do we need to add surrogate (id) key to the input for these, to enable
> the
> > left join ?
> >
> > Code :
> >
> > *------------- static Classes ------------ *
> >
> > >
> > > static public class PageView {
> > >         public String user;
> > >         public String page;
> > >         public Long timestamp;
> > >     }
> > >
> >
> >
> > >     static public class UserProfile {
> > >         public String region;
> > >         public Long timestamp;
> > >     }
> >
> >
> >
> > *----------Join between the views( i.e. KStream - PageView) & users
> (KTable
> > - UserProfile) *
> >
> >
> > KStream<String, PageViewByRegion> kstream1 =
> > > *views        .leftJoin(users, *
> > >         //PageView - first value type
> > >         //UserProfile - 2nd value type
> > >         //PageViewByRegion - Joined Value
> > >         new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
> > >             @Override
> > >             public PageViewByRegion apply(PageView view, UserProfile
> > > profile) {
> > >                 PageViewByRegion viewByRegion = new PageViewByRegion();
> > >                 viewByRegion.user = view.user;
> > >                 viewByRegion.page = view.page;
> > >                 System.out.println(" viewByRegion.user " +
> > > viewByRegion.user);
> > >                 System.out.println(" viewByRegion.page " +
> > > viewByRegion.page);
> > >
> > >                 if (profile != null) {
> > >                     viewByRegion.region = profile.region;
> > >                 } else {
> > >                     viewByRegion.region = "UNKNOWN";
> > >                 }
> > >
> > >                 System.out.println(" viewByRegion.page " +
> > > viewByRegion.region);
> > >
> > >                 return viewByRegion;
> > >             }
> > >         }
> > >         )
> >
>

Reply via email to