In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2, 
Collector<Integer> out)` ,   when both iter1 and iter2 are not empty, it means 
they are matched elements from both stream.
When one of iter1 and iter2 is empty , it means that they are unmatched.


- Jark Wu (wuchong)

> 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道:
> 
> Hi Matthias ,
> 
> I did not get you, even if we use Co-Group we have to apply it on a key
> 
> sourceStream.coGroup(destStream)
> .where(new ElementSelector())
> .equalTo(new ElementSelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> private static final long serialVersionUID = 6408179761497497475L;
> 
> @Override
> public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> paramIterable1,
> Collector<Integer> paramCollector) throws Exception {
> Iterator<Integer> iterator = paramIterable.iterator();
> while(iterator.hasNext()) {
> }
> }
> });
> 
> when I debug this ,only the matched element from both stream will come in
> the coGroup function.
> 
> What I want is how do I check for unmatched elements from both streams and
> write it to sink.
> 
> Regards,
> Vinay Patil
> 
> *+91-800-728-4749*
> 
> On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org> wrote:
> 
>> You need to do an outer-join. However, there is no build-in support for
>> outer-joins yet.
>> 
>> You can use Window-CoGroup to implement the outer-join as an own operator.
>> 
>> 
>> -Matthias
>> 
>> On 06/13/2016 06:53 PM, Vinay Patil wrote:
>>> Hi,
>>> 
>>> I have a question regarding the join operation, consider the following
>>> dummy example:
>>> 
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>> DataStreamSource<Integer> sourceStream =
>>> env.fromElements(10,20,23,25,30,33,102,18);
>>> DataStreamSource<Integer> destStream =
>> env.fromElements(20,30,40,50,60,10);
>>> 
>>> sourceStream.join(destStream)
>>> .where(new ElementSelector())
>>> .equalTo(new ElementSelector())
>>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
>>> .apply(new JoinFunction<Integer, Integer, Integer>() {
>>> 
>>> private static final long serialVersionUID = 1L;
>>> 
>>> @Override
>>> public Integer join(Integer paramIN1, Integer paramIN2) throws Exception
>> {
>>> return paramIN1;
>>> }
>>> }).print();
>>> 
>>> I perfectly get the elements that are matching in both the streams,
>> however
>>> my requirement is to write these matched elements and also the unmatched
>>> elements to sink(S3)
>>> 
>>> How do I get the unmatched elements from each stream ?
>>> 
>>> Regards,
>>> Vinay Patil
>>> 
>> 
>> 

Reply via email to