You need to do an outer-join. However, there is no build-in support for outer-joins yet.
You can use Window-CoGroup to implement the outer-join as an own operator. -Matthias On 06/13/2016 06:53 PM, Vinay Patil wrote: > Hi, > > I have a question regarding the join operation, consider the following > dummy example: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > DataStreamSource<Integer> sourceStream = > env.fromElements(10,20,23,25,30,33,102,18); > DataStreamSource<Integer> destStream = env.fromElements(20,30,40,50,60,10); > > sourceStream.join(destStream) > .where(new ElementSelector()) > .equalTo(new ElementSelector()) > .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > .apply(new JoinFunction<Integer, Integer, Integer>() { > > private static final long serialVersionUID = 1L; > > @Override > public Integer join(Integer paramIN1, Integer paramIN2) throws Exception { > return paramIN1; > } > }).print(); > > I perfectly get the elements that are matching in both the streams, however > my requirement is to write these matched elements and also the unmatched > elements to sink(S3) > > How do I get the unmatched elements from each stream ? > > Regards, > Vinay Patil >
signature.asc
Description: OpenPGP digital signature