If I understand it right, I need to pass the key and ensure that I handle
it correctly in the pojo constructor.
Let me give it a shot.
- Shekar

On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> I think your issue is in two folds:
>
> 1) if the coming record's key is null, then when it flows into the join
> processor inside the topology this record will be dropped as it cannot be
> joined with any records from the other stream.
>
> 2) the NPE you are getting when giving it the non-null keyed record seems
> because, you are using "SnowServerDeserialzer" (is it set as the default
> key deserializer) which expects a SnowServerPOJOClass while the key "joe"
> is typed String. You need to override the key deserialize when constructing
> the "cache" KTable as well:
>
> ----------------
> KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
> rawSerde, "cache", "local-cache");
>
>
> Guozhang
>
>
> On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>
> > Guozhang
> >
> > I am using 0.10.2.1 version
> >
> > - Shekar
> >
> > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Hi Shekar,
> > >
> > > Could you demonstrate your input data. More specifically, what are the
> > key
> > > types of your input streams, and are they not-null values? It seems the
> > > root cause is similar to the other thread you asked on the mailing
> list.
> > >
> > > Also, could you provide your used Kafka Streams version?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ctip...@gmail.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > I am having trouble implementing streams to table join.
> > > >
> > > > I have 2 POJO's each representing streams and table data structures.
> > raw
> > > > topic contains streams and cache topic contains table structure. The
> > join
> > > > is not happening since the print statement is not being called.
> > > >
> > > > Appreciate any pointers.
> > > >
> > > > - Shekar
> > > >
> > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > > CachePOJOClass,RawPOJOClass>() {
> > > >
> > > >     @Override
> > > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > > >
> > > >         String src=r.getSource();
> > > >         String cSrc=c.getSnowHost();
> > > >         Custom custom=new Custom();
> > > >
> > > >         if (src.matches(snowSrc)){
> > > >             System.out.println("In apply code");
> > > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > > >             r.setCustom(custom);
> > > >         }
> > > >         return r;
> > > >     }
> > > > }).to("parser");
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to