Hi, How can I co-relate two streams of different types in Flink? Scenario: In stream1, I have data in pojo with a field user. In stream2, I have data in a different pojo which also contains the field user. (However, other than the user field, they have no common field).
Now what I want to do is relate the two streams such that for every event in stream1, I want to collect events from stream2 where the user is the same. Both stream1 and stream2 are unbounded. I tried using stream1.connect(stream2).process(new CoProcessFunction<Type1, Type2, Type2>) { private String user; public void processElement1(Type1 inp, CoProcessFunction<Type1, Type2, Type2>.Context ctx, Collector<Type2> out) { user = inp.getUser(); } public void processElement2(Type2 inp, CoProcessFunction<Type1, Type2, Type2>.Context ctx, Collector<Type2> out) { if (user.equals(inp.getUser())) { out.collect(inp); } } }); But this works only and only if both elements occur simultaneously. How can I collect the cases with history? Is using ListState required? Is there some better way to this in Flink? Requesting help, Abhinav