Hi, I have a question regarding the join operation, consider the following dummy example:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStreamSource<Integer> sourceStream = env.fromElements(10,20,23,25,30,33,102,18); DataStreamSource<Integer> destStream = env.fromElements(20,30,40,50,60,10); sourceStream.join(destStream) .where(new ElementSelector()) .equalTo(new ElementSelector()) .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) .apply(new JoinFunction<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer join(Integer paramIN1, Integer paramIN2) throws Exception { return paramIN1; } }).print(); I perfectly get the elements that are matching in both the streams, however my requirement is to write these matched elements and also the unmatched elements to sink(S3) How do I get the unmatched elements from each stream ? Regards, Vinay Patil