Guozhang,

Thanks for responding.
The raw and cache keys are null. Both KStream and KTable entries are json's.

Here is the input to cache (KTable)

{"user_name": "Joe", "location": "US", "gender": "male"}
{"user_name": "Julie", "location": "US", "gender": "female"}}

{"user_name": "Kawasaki", "location": "Japan", "gender": "male"}

The kstream gets a event (KStreams)

{"user": "Joe", "custom": {"choice":"vegan"}}

I want a output as a join

{"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location": "US",
"gender": "male"}*} }

I want to take whats in ktable and add to enriched section of the output
stream.


//

KStream<String, RawPOJOClass > raw = builder.stream(Serdes.String(),
rawSerde, "raw");
KTable <String, CachePOJOClass > cache = builder.table("cache", "local-cache");


raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
CachePOJOClass,RawPOJOClass>() {

    @Override
    public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {

        String src=r.getSource();
        String cSrc=c.getSnowHost();
        Custom custom=new Custom();

        if (src.matches(snowSrc)){
            System.out.println("In apply code");
            custom.setAdditionalProperty("custom",cSrc.getAll());
            r.setCustom(custom);
        }
        return r;
    }
}).to("parser");

-------

I tried adding a key but I get a serializer error. For example, cache entry:
"joe", {"user_name": "Joe", "location": "US", "gender": "male"}

Raw entry:

"joe", {"user": "Joe", "custom": {"choice":"vegan"}}


Here is the error:

com.fasterxml.jackson.databind.JsonMappingException: Can not construct
instance of com.intuit.argos_streams.system.SnowServerPOJOClass: no
String-argument constructor/factory method to deserialize from String
value ('joe')

 at [Source: [B@5dfc53f4; line: 1, column: 1]

at 
com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)

at 
com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1456)

at 
com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1012)

at 
com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:370)

at 
com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:315)

at 
com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1282)

at 
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:159)

at 
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:150)

at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)

at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)

at 
com.intuit.argos_streams.system.SnowServerDeserialzer.deserialize(SnowServerDeserialzer.java:40)

at 
org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46)

at 
org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)

at 
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)

at 
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)

at 
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)

at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)

at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
- Shekar


On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Shekar,
>
> Could you demonstrate your input data. More specifically, what are the key
> types of your input streams, and are they not-null values? It seems the
> root cause is similar to the other thread you asked on the mailing list.
>
> Also, could you provide your used Kafka Streams version?
>
>
> Guozhang
>
>
> On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ctip...@gmail.com> wrote:
>
> > Hello,
> >
> > I am having trouble implementing streams to table join.
> >
> > I have 2 POJO's each representing streams and table data structures. raw
> > topic contains streams and cache topic contains table structure. The join
> > is not happening since the print statement is not being called.
> >
> > Appreciate any pointers.
> >
> > - Shekar
> >
> > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > CachePOJOClass,RawPOJOClass>() {
> >
> >     @Override
> >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> >
> >         String src=r.getSource();
> >         String cSrc=c.getSnowHost();
> >         Custom custom=new Custom();
> >
> >         if (src.matches(snowSrc)){
> >             System.out.println("In apply code");
> >             custom.setAdditionalProperty("custom",cSrc.getAll());
> >             r.setCustom(custom);
> >         }
> >         return r;
> >     }
> > }).to("parser");
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to