Hi Jark, I am able to get the non-matching elements in a stream :,
Of-course we can collect the matching elements in the same stream as well, however I want to perform additional operations on the joined stream before writing it to S3, so I would have to include a separate join operator for the same two streams, right ? Correct me if I am wrong. I have pasted the dummy code which collects the non-matching records (i have to perform this on the actual data, correct me if I am dong wrong). sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new ElementSelector()) .window(TumblingEventTimeWindows.of(Time.seconds(30))) .apply(new CoGroupFunction<Integer, Integer, Integer>() { private static final long serialVersionUID = 6408179761497497475L; @Override public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer> paramIterable1, Collector<Integer> paramCollector) throws Exception { long exactSizeIfKnown = paramIterable.spliterator().getExactSizeIfKnown(); long exactSizeIfKnown2 = paramIterable1.spliterator().getExactSizeIfKnown(); if(exactSizeIfKnown == 0 ) { paramCollector.collect(paramIterable1.iterator().next()); } else if (exactSizeIfKnown2 == 0) { paramCollector.collect(paramIterable.iterator().next()); } } }).print(); Regards, Vinay Patil On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <vinay18.pa...@gmail.com> wrote: > You are right, debugged it for all elements , I can do that now. > Thanks a lot. > > Regards, > Vinay Patil > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <wuchong...@alibaba-inc.com> > wrote: > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2, >> Collector<Integer> out)` , when both iter1 and iter2 are not empty, it >> means they are matched elements from both stream. >> When one of iter1 and iter2 is empty , it means that they are unmatched. >> >> >> - Jark Wu (wuchong) >> >> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道: >> > >> > Hi Matthias , >> > >> > I did not get you, even if we use Co-Group we have to apply it on a key >> > >> > sourceStream.coGroup(destStream) >> > .where(new ElementSelector()) >> > .equalTo(new ElementSelector()) >> > .window(TumblingEventTimeWindows.of(Time.seconds(30))) >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() { >> > private static final long serialVersionUID = 6408179761497497475L; >> > >> > @Override >> > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer> >> > paramIterable1, >> > Collector<Integer> paramCollector) throws Exception { >> > Iterator<Integer> iterator = paramIterable.iterator(); >> > while(iterator.hasNext()) { >> > } >> > } >> > }); >> > >> > when I debug this ,only the matched element from both stream will come >> in >> > the coGroup function. >> > >> > What I want is how do I check for unmatched elements from both streams >> and >> > write it to sink. >> > >> > Regards, >> > Vinay Patil >> > >> > *+91-800-728-4749* >> > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org> >> wrote: >> > >> >> You need to do an outer-join. However, there is no build-in support for >> >> outer-joins yet. >> >> >> >> You can use Window-CoGroup to implement the outer-join as an own >> operator. >> >> >> >> >> >> -Matthias >> >> >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote: >> >>> Hi, >> >>> >> >>> I have a question regarding the join operation, consider the following >> >>> dummy example: >> >>> >> >>> StreamExecutionEnvironment env = >> >>> StreamExecutionEnvironment.getExecutionEnvironment(); >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >> >>> DataStreamSource<Integer> sourceStream = >> >>> env.fromElements(10,20,23,25,30,33,102,18); >> >>> DataStreamSource<Integer> destStream = >> >> env.fromElements(20,30,40,50,60,10); >> >>> >> >>> sourceStream.join(destStream) >> >>> .where(new ElementSelector()) >> >>> .equalTo(new ElementSelector()) >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() { >> >>> >> >>> private static final long serialVersionUID = 1L; >> >>> >> >>> @Override >> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws >> Exception >> >> { >> >>> return paramIN1; >> >>> } >> >>> }).print(); >> >>> >> >>> I perfectly get the elements that are matching in both the streams, >> >> however >> >>> my requirement is to write these matched elements and also the >> unmatched >> >>> elements to sink(S3) >> >>> >> >>> How do I get the unmatched elements from each stream ? >> >>> >> >>> Regards, >> >>> Vinay Patil >> >>> >> >> >> >> >> >> >