You need to do an outer-join. However, there is no build-in support for
outer-joins yet.
You can use Window-CoGroup to implement the outer-join as an own operator.


-Matthias

On 06/13/2016 06:53 PM, Vinay Patil wrote:
> Hi,
> 
> I have a question regarding the join operation, consider the following
> dummy example:
> 
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> DataStreamSource<Integer> sourceStream =
> env.fromElements(10,20,23,25,30,33,102,18);
> DataStreamSource<Integer> destStream = env.fromElements(20,30,40,50,60,10);
> 
> sourceStream.join(destStream)
> .where(new ElementSelector())
> .equalTo(new ElementSelector())
> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> .apply(new JoinFunction<Integer, Integer, Integer>() {
> 
> private static final long serialVersionUID = 1L;
> 
> @Override
> public Integer join(Integer paramIN1, Integer paramIN2) throws Exception {
> return paramIN1;
> }
> }).print();
> 
> I perfectly get the elements that are matching in both the streams, however
> my requirement is to write these matched elements and also the unmatched
> elements to sink(S3)
> 
> How do I get the unmatched elements from each stream ?
> 
> Regards,
> Vinay Patil
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to