If I understand it right, I need to pass the key and ensure that I handle it correctly in the pojo constructor. Let me give it a shot.
- Shekar On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang <wangg...@gmail.com> wrote: > I think your issue is in two folds: > > 1) if the coming record's key is null, then when it flows into the join > processor inside the topology this record will be dropped as it cannot be > joined with any records from the other stream. > > 2) the NPE you are getting when giving it the non-null keyed record seems > because, you are using "SnowServerDeserialzer" (is it set as the default > key deserializer) which expects a SnowServerPOJOClass while the key "joe" > is typed String. You need to override the key deserialize when constructing > the "cache" KTable as well: > > ---------------- > KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(), > rawSerde, "cache", "local-cache"); > > > Guozhang > > > On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ctip...@gmail.com> wrote: > > > Guozhang > > > > I am using 0.10.2.1 version > > > > - Shekar > > > > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > Hi Shekar, > > > > > > Could you demonstrate your input data. More specifically, what are the > > key > > > types of your input streams, and are they not-null values? It seems the > > > root cause is similar to the other thread you asked on the mailing > list. > > > > > > Also, could you provide your used Kafka Streams version? > > > > > > > > > Guozhang > > > > > > > > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ctip...@gmail.com> > > wrote: > > > > > > > Hello, > > > > > > > > I am having trouble implementing streams to table join. > > > > > > > > I have 2 POJO's each representing streams and table data structures. > > raw > > > > topic contains streams and cache topic contains table structure. The > > join > > > > is not happening since the print statement is not being called. > > > > > > > > Appreciate any pointers. > > > > > > > > - Shekar > > > > > > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass, > > > > CachePOJOClass,RawPOJOClass>() { > > > > > > > > @Override > > > > public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) { > > > > > > > > String src=r.getSource(); > > > > String cSrc=c.getSnowHost(); > > > > Custom custom=new Custom(); > > > > > > > > if (src.matches(snowSrc)){ > > > > System.out.println("In apply code"); > > > > custom.setAdditionalProperty("custom",cSrc.getAll()); > > > > r.setCustom(custom); > > > > } > > > > return r; > > > > } > > > > }).to("parser"); > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >