Hi Matthias , I did not get you, even if we use Co-Group we have to apply it on a key
sourceStream.coGroup(destStream) .where(new ElementSelector()) .equalTo(new ElementSelector()) .window(TumblingEventTimeWindows.of(Time.seconds(30))) .apply(new CoGroupFunction<Integer, Integer, Integer>() { private static final long serialVersionUID = 6408179761497497475L; @Override public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer> paramIterable1, Collector<Integer> paramCollector) throws Exception { Iterator<Integer> iterator = paramIterable.iterator(); while(iterator.hasNext()) { } } }); when I debug this ,only the matched element from both stream will come in the coGroup function. What I want is how do I check for unmatched elements from both streams and write it to sink. Regards, Vinay Patil *+91-800-728-4749* On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org> wrote: > You need to do an outer-join. However, there is no build-in support for > outer-joins yet. > > You can use Window-CoGroup to implement the outer-join as an own operator. > > > -Matthias > > On 06/13/2016 06:53 PM, Vinay Patil wrote: > > Hi, > > > > I have a question regarding the join operation, consider the following > > dummy example: > > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > > DataStreamSource<Integer> sourceStream = > > env.fromElements(10,20,23,25,30,33,102,18); > > DataStreamSource<Integer> destStream = > env.fromElements(20,30,40,50,60,10); > > > > sourceStream.join(destStream) > > .where(new ElementSelector()) > > .equalTo(new ElementSelector()) > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > > .apply(new JoinFunction<Integer, Integer, Integer>() { > > > > private static final long serialVersionUID = 1L; > > > > @Override > > public Integer join(Integer paramIN1, Integer paramIN2) throws Exception > { > > return paramIN1; > > } > > }).print(); > > > > I perfectly get the elements that are matching in both the streams, > however > > my requirement is to write these matched elements and also the unmatched > > elements to sink(S3) > > > > How do I get the unmatched elements from each stream ? > > > > Regards, > > Vinay Patil > > > >