Can you add a flag to each element emitted by the CoGroupFunction that indicates whether it was joined or not? Then you can use split to distinguish between both cases and handle both streams differently.
Best, Fabian 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.pa...@gmail.com>: > Hi Jark, > > I am able to get the non-matching elements in a stream :, > > Of-course we can collect the matching elements in the same stream as well, > however I want to perform additional operations on the joined stream before > writing it to S3, so I would have to include a separate join operator for > the same two streams, right ? > Correct me if I am wrong. > > I have pasted the dummy code which collects the non-matching records (i > have to perform this on the actual data, correct me if I am dong wrong). > > sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new > ElementSelector()) > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > .apply(new CoGroupFunction<Integer, Integer, Integer>() { > > private static final long serialVersionUID = 6408179761497497475L; > > @Override > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer> > paramIterable1, > Collector<Integer> paramCollector) throws Exception { > long exactSizeIfKnown = paramIterable.spliterator().getExactSizeIfKnown(); > long exactSizeIfKnown2 = > paramIterable1.spliterator().getExactSizeIfKnown(); > if(exactSizeIfKnown == 0 ) { > paramCollector.collect(paramIterable1.iterator().next()); > } else if (exactSizeIfKnown2 == 0) { > paramCollector.collect(paramIterable.iterator().next()); > } > } > }).print(); > > Regards, > Vinay Patil > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <vinay18.pa...@gmail.com> > wrote: > > > You are right, debugged it for all elements , I can do that now. > > Thanks a lot. > > > > Regards, > > Vinay Patil > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <wuchong...@alibaba-inc.com> > > wrote: > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2, > >> Collector<Integer> out)` , when both iter1 and iter2 are not empty, it > >> means they are matched elements from both stream. > >> When one of iter1 and iter2 is empty , it means that they are unmatched. > >> > >> > >> - Jark Wu (wuchong) > >> > >> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道: > >> > > >> > Hi Matthias , > >> > > >> > I did not get you, even if we use Co-Group we have to apply it on a > key > >> > > >> > sourceStream.coGroup(destStream) > >> > .where(new ElementSelector()) > >> > .equalTo(new ElementSelector()) > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() { > >> > private static final long serialVersionUID = 6408179761497497475L; > >> > > >> > @Override > >> > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer> > >> > paramIterable1, > >> > Collector<Integer> paramCollector) throws Exception { > >> > Iterator<Integer> iterator = paramIterable.iterator(); > >> > while(iterator.hasNext()) { > >> > } > >> > } > >> > }); > >> > > >> > when I debug this ,only the matched element from both stream will come > >> in > >> > the coGroup function. > >> > > >> > What I want is how do I check for unmatched elements from both streams > >> and > >> > write it to sink. > >> > > >> > Regards, > >> > Vinay Patil > >> > > >> > *+91-800-728-4749* > >> > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org> > >> wrote: > >> > > >> >> You need to do an outer-join. However, there is no build-in support > for > >> >> outer-joins yet. > >> >> > >> >> You can use Window-CoGroup to implement the outer-join as an own > >> operator. > >> >> > >> >> > >> >> -Matthias > >> >> > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote: > >> >>> Hi, > >> >>> > >> >>> I have a question regarding the join operation, consider the > following > >> >>> dummy example: > >> >>> > >> >>> StreamExecutionEnvironment env = > >> >>> StreamExecutionEnvironment.getExecutionEnvironment(); > >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > >> >>> DataStreamSource<Integer> sourceStream = > >> >>> env.fromElements(10,20,23,25,30,33,102,18); > >> >>> DataStreamSource<Integer> destStream = > >> >> env.fromElements(20,30,40,50,60,10); > >> >>> > >> >>> sourceStream.join(destStream) > >> >>> .where(new ElementSelector()) > >> >>> .equalTo(new ElementSelector()) > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() { > >> >>> > >> >>> private static final long serialVersionUID = 1L; > >> >>> > >> >>> @Override > >> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws > >> Exception > >> >> { > >> >>> return paramIN1; > >> >>> } > >> >>> }).print(); > >> >>> > >> >>> I perfectly get the elements that are matching in both the streams, > >> >> however > >> >>> my requirement is to write these matched elements and also the > >> unmatched > >> >>> elements to sink(S3) > >> >>> > >> >>> How do I get the unmatched elements from each stream ? > >> >>> > >> >>> Regards, > >> >>> Vinay Patil > >> >>> > >> >> > >> >> > >> > >> > > >