Hi Matthias ,

I did not get you, even if we use Co-Group we have to apply it on a key

sourceStream.coGroup(destStream)
.where(new ElementSelector())
.equalTo(new ElementSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new CoGroupFunction<Integer, Integer, Integer>() {
private static final long serialVersionUID = 6408179761497497475L;

@Override
public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
paramIterable1,
Collector<Integer> paramCollector) throws Exception {
Iterator<Integer> iterator = paramIterable.iterator();
while(iterator.hasNext()) {
}
}
});

when I debug this ,only the matched element from both stream will come in
the coGroup function.

What I want is how do I check for unmatched elements from both streams and
write it to sink.

Regards,
Vinay Patil

*+91-800-728-4749*

On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org> wrote:

> You need to do an outer-join. However, there is no build-in support for
> outer-joins yet.
>
> You can use Window-CoGroup to implement the outer-join as an own operator.
>
>
> -Matthias
>
> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > Hi,
> >
> > I have a question regarding the join operation, consider the following
> > dummy example:
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > DataStreamSource<Integer> sourceStream =
> > env.fromElements(10,20,23,25,30,33,102,18);
> > DataStreamSource<Integer> destStream =
> env.fromElements(20,30,40,50,60,10);
> >
> > sourceStream.join(destStream)
> > .where(new ElementSelector())
> > .equalTo(new ElementSelector())
> > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > .apply(new JoinFunction<Integer, Integer, Integer>() {
> >
> > private static final long serialVersionUID = 1L;
> >
> > @Override
> > public Integer join(Integer paramIN1, Integer paramIN2) throws Exception
> {
> > return paramIN1;
> > }
> > }).print();
> >
> > I perfectly get the elements that are matching in both the streams,
> however
> > my requirement is to write these matched elements and also the unmatched
> > elements to sink(S3)
> >
> > How do I get the unmatched elements from each stream ?
> >
> > Regards,
> > Vinay Patil
> >
>
>

Reply via email to