Hello,
I am having trouble implementing streams to table join.
I have 2 POJO's each representing streams and table data structures. raw
topic contains streams and cache topic contains table structure. The join
is not happening since the print statement is not being called.
Appreciate any pointers.
- Shekar
raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
CachePOJOClass,RawPOJOClass>() {
@Override
public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
String src=r.getSource();
String cSrc=c.getSnowHost();
Custom custom=new Custom();
if (src.matches(snowSrc)){
System.out.println("In apply code");
custom.setAdditionalProperty("custom",cSrc.getAll());
r.setCustom(custom);
}
return r;
}
}).to("parser");