In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2, Collector<Integer> out)` , when both iter1 and iter2 are not empty, it means they are matched elements from both stream. When one of iter1 and iter2 is empty , it means that they are unmatched.
- Jark Wu (wuchong) > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道: > > Hi Matthias , > > I did not get you, even if we use Co-Group we have to apply it on a key > > sourceStream.coGroup(destStream) > .where(new ElementSelector()) > .equalTo(new ElementSelector()) > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > .apply(new CoGroupFunction<Integer, Integer, Integer>() { > private static final long serialVersionUID = 6408179761497497475L; > > @Override > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer> > paramIterable1, > Collector<Integer> paramCollector) throws Exception { > Iterator<Integer> iterator = paramIterable.iterator(); > while(iterator.hasNext()) { > } > } > }); > > when I debug this ,only the matched element from both stream will come in > the coGroup function. > > What I want is how do I check for unmatched elements from both streams and > write it to sink. > > Regards, > Vinay Patil > > *+91-800-728-4749* > > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org> wrote: > >> You need to do an outer-join. However, there is no build-in support for >> outer-joins yet. >> >> You can use Window-CoGroup to implement the outer-join as an own operator. >> >> >> -Matthias >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote: >>> Hi, >>> >>> I have a question regarding the join operation, consider the following >>> dummy example: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >>> DataStreamSource<Integer> sourceStream = >>> env.fromElements(10,20,23,25,30,33,102,18); >>> DataStreamSource<Integer> destStream = >> env.fromElements(20,30,40,50,60,10); >>> >>> sourceStream.join(destStream) >>> .where(new ElementSelector()) >>> .equalTo(new ElementSelector()) >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) >>> .apply(new JoinFunction<Integer, Integer, Integer>() { >>> >>> private static final long serialVersionUID = 1L; >>> >>> @Override >>> public Integer join(Integer paramIN1, Integer paramIN2) throws Exception >> { >>> return paramIN1; >>> } >>> }).print(); >>> >>> I perfectly get the elements that are matching in both the streams, >> however >>> my requirement is to write these matched elements and also the unmatched >>> elements to sink(S3) >>> >>> How do I get the unmatched elements from each stream ? >>> >>> Regards, >>> Vinay Patil >>> >> >>