I think your issue is in two folds:

1) if the coming record's key is null, then when it flows into the join
processor inside the topology this record will be dropped as it cannot be
joined with any records from the other stream.

2) the NPE you are getting when giving it the non-null keyed record seems
because, you are using "SnowServerDeserialzer" (is it set as the default
key deserializer) which expects a SnowServerPOJOClass while the key "joe"
is typed String. You need to override the key deserialize when constructing
the "cache" KTable as well:

----------------
KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
rawSerde, "cache", "local-cache");


Guozhang


On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ctip...@gmail.com> wrote:

> Guozhang
>
> I am using 0.10.2.1 version
>
> - Shekar
>
> On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Hi Shekar,
> >
> > Could you demonstrate your input data. More specifically, what are the
> key
> > types of your input streams, and are they not-null values? It seems the
> > root cause is similar to the other thread you asked on the mailing list.
> >
> > Also, could you provide your used Kafka Streams version?
> >
> >
> > Guozhang
> >
> >
> > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ctip...@gmail.com>
> wrote:
> >
> > > Hello,
> > >
> > > I am having trouble implementing streams to table join.
> > >
> > > I have 2 POJO's each representing streams and table data structures.
> raw
> > > topic contains streams and cache topic contains table structure. The
> join
> > > is not happening since the print statement is not being called.
> > >
> > > Appreciate any pointers.
> > >
> > > - Shekar
> > >
> > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > CachePOJOClass,RawPOJOClass>() {
> > >
> > >     @Override
> > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > >
> > >         String src=r.getSource();
> > >         String cSrc=c.getSnowHost();
> > >         Custom custom=new Custom();
> > >
> > >         if (src.matches(snowSrc)){
> > >             System.out.println("In apply code");
> > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > >             r.setCustom(custom);
> > >         }
> > >         return r;
> > >     }
> > > }).to("parser");
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to