Guozhang, Thanks for responding. The raw and cache keys are null. Both KStream and KTable entries are json's.
Here is the input to cache (KTable) {"user_name": "Joe", "location": "US", "gender": "male"} {"user_name": "Julie", "location": "US", "gender": "female"}} {"user_name": "Kawasaki", "location": "Japan", "gender": "male"} The kstream gets a event (KStreams) {"user": "Joe", "custom": {"choice":"vegan"}} I want a output as a join {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location": "US", "gender": "male"}*} } I want to take whats in ktable and add to enriched section of the output stream. // KStream<String, RawPOJOClass > raw = builder.stream(Serdes.String(), rawSerde, "raw"); KTable <String, CachePOJOClass > cache = builder.table("cache", "local-cache"); raw.leftJoin(cache, new ValueJoiner<RawPOJOClass, CachePOJOClass,RawPOJOClass>() { @Override public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) { String src=r.getSource(); String cSrc=c.getSnowHost(); Custom custom=new Custom(); if (src.matches(snowSrc)){ System.out.println("In apply code"); custom.setAdditionalProperty("custom",cSrc.getAll()); r.setCustom(custom); } return r; } }).to("parser"); ------- I tried adding a key but I get a serializer error. For example, cache entry: "joe", {"user_name": "Joe", "location": "US", "gender": "male"} Raw entry: "joe", {"user": "Joe", "custom": {"choice":"vegan"}} Here is the error: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of com.intuit.argos_streams.system.SnowServerPOJOClass: no String-argument constructor/factory method to deserialize from String value ('joe') at [Source: [B@5dfc53f4; line: 1, column: 1] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270) at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1456) at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1012) at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:370) at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:315) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1282) at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:159) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:150) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929) at com.intuit.argos_streams.system.SnowServerDeserialzer.deserialize(SnowServerDeserialzer.java:40) at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46) at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) - Shekar On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Shekar, > > Could you demonstrate your input data. More specifically, what are the key > types of your input streams, and are they not-null values? It seems the > root cause is similar to the other thread you asked on the mailing list. > > Also, could you provide your used Kafka Streams version? > > > Guozhang > > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ctip...@gmail.com> wrote: > > > Hello, > > > > I am having trouble implementing streams to table join. > > > > I have 2 POJO's each representing streams and table data structures. raw > > topic contains streams and cache topic contains table structure. The join > > is not happening since the print statement is not being called. > > > > Appreciate any pointers. > > > > - Shekar > > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass, > > CachePOJOClass,RawPOJOClass>() { > > > > @Override > > public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) { > > > > String src=r.getSource(); > > String cSrc=c.getSnowHost(); > > Custom custom=new Custom(); > > > > if (src.matches(snowSrc)){ > > System.out.println("In apply code"); > > custom.setAdditionalProperty("custom",cSrc.getAll()); > > r.setCustom(custom); > > } > > return r; > > } > > }).to("parser"); > > > > > > -- > -- Guozhang >