No its not, but at the moment there is afaik no other way around it. There
is an issue for proper outer join support [1]


On Fri, Apr 17, 2015 at 10:01 AM, Flavio Pompermaier <>

> Could resolve the problem but the fact to accumulate stuff in a local
> variable is it safe if datasets are huge..?
> On Fri, Apr 17, 2015 at 9:54 AM, Till Rohrmann <>
> wrote:
>> If it's fine when you have null string values in the cases where
>> D1.f1!="a1" or D1.f2!="a2" then a possible solution could look like (with
>> Scala API):
>> val ds1: DataSet[(String, String, String)] = getDS1
>> val ds2: DataSet[(String, String, String)] = getDS2
>> ds1.coGroup(ds2).where(2).equalTo(0) {
>>   (left, right, collector: Collector[(String, String, String, String)])
>> => {
>>     if(right.isEmpty) {
>>       left foreach {
>>       element => {
>>       val value1 = if(element._2 == "a1") element._3 else null
>>       val value2 = if(element._2 == "a2") element._3 else null
>>       collector.collect((element._1, null, value1, value2))
>>         }
>>       }
>>     } else {
>>       val array = right.toArray
>>       for(leftElement <- left) {
>>       val value1 = if(leftElement._2 == "a1") leftElement._3 else null
>>     val value2 = if(leftElement._2 == "a2") leftElement._3 else null
>>     for(rightElement <- array) {
>>       collector.collect(leftElement._1, rightElement._1, value1, value2))
>>     }
>>       }
>>     }
>>   }
>> }
>> Does this solve your problem?
>> On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier <
>> > wrote:
>>> Hi Till,
>>> thanks for the reply.
>>> What I'd like to do is to merge D1 and D2 if there's a ref from D1 to D2
>>> (D1.f2==D2.f0).
>>> If this condition is true, I would like to produce a set of tuples with
>>> the matching elements
>>> at the first to places (D1.*f2*, D2.*f0*) and the other two values (if
>>> present) of the matching tuple
>>> in D1 when D1.f1==*"a1"* and D1.f2=*"a2"* (string values) respectively.
>>> (PS: For each value of D1.f0 you can have at most one value of a1 and a2)
>>> Is it more clear?
>>> On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann <>
>>> wrote:
>>>> Hi Flavio,
>>>> I don't really understand what you try to do. What does
>>>> D1.f2(D1.f1==p1) mean? What does happen if the condition in D1.f2(if
>>>> D1.f1==p2) is false?
>>>> Where does the values a1 and a2 in (A, X, a1, a2) come from when you
>>>> join [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe you can
>>>> elaborate a bit more on your example.
>>>> Cheers,
>>>> Till
>>>> On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier <
>>>>> wrote:
>>>>> I cannot find a solution to my use case :(
>>>>> I have 2 datasets D1 and D2 like:
>>>>> D1:
>>>>> A,p1,a1
>>>>> A,p2,a2
>>>>> A,p3,X
>>>>> B,p3,Y
>>>>> B,p1,b1
>>>>> D2:
>>>>> X,s,V
>>>>> X,r,2
>>>>> Y,j,k
>>>>> I'd like to have a unique dataset D3(Tuple4) like
>>>>> A,X,a1,a2
>>>>> B,Y,b1,null
>>>>> Basically filling with <D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if
>>>>> D1.f1==p2)> when D1.f2==D2.f0.
>>>>> Is that possible and how?
>>>>> Could you show me a simple snippet?
>>>>> Thanks in advance,
>>>>> Flavio
>>>>> On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann <>
>>>>> wrote:
>>>>>> You can materialize the input of the right input by creating an array
>>>>>> out of it, for example. Then you can reiterate over it.
>>>>>> Cheers,
>>>>>> Till
>>>>>> On Apr 16, 2015 7:37 PM, "Flavio Pompermaier" <>
>>>>>> wrote:
>>>>>>> Hi Maximilian,
>>>>>>> I tried your solution but it doesn't work because the rightElements
>>>>>>> iterator cannot be used more than once:
>>>>>>> Caused by: org.apache.flink.util.TraversableOnceException: The
>>>>>>> Iterable can be iterated over only once. Only the first call to
>>>>>>> 'iterator()' will succeed.
>>>>>>> On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels <
>>>>>>> > wrote:
>>>>>>>> Hi Flavio,
>>>>>>>> Here's an simple example of a Left Outer Join:
>>>>>>>> As Stephan pointed out, this can be very easily modified to
>>>>>>>> construct a Right Outer Join (just exchange leftElements and 
>>>>>>>> rightElements
>>>>>>>> in the two loops).
>>>>>>>> Here's an excerpt with the most important part, the coGroup
>>>>>>>> function:
>>>>>>>> public static class LeftOuterJoin implements 
>>>>>>>> CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, 
>>>>>>>> Tuple2<Integer, Integer>> {
>>>>>>>>    @Override
>>>>>>>>    public void coGroup(Iterable<Tuple2<Integer, String>> leftElements,
>>>>>>>>                        Iterable<Tuple2<Integer, String>> rightElements,
>>>>>>>>                        Collector<Tuple2<Integer, Integer>> out) throws 
>>>>>>>> Exception {
>>>>>>>>       final int NULL_ELEMENT = -1;
>>>>>>>>       for (Tuple2<Integer, String> leftElem : leftElements) {
>>>>>>>>          boolean hadElements = false;
>>>>>>>>          for (Tuple2<Integer, String> rightElem : rightElements) {
>>>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0, 
>>>>>>>> rightElem.f0));
>>>>>>>>             hadElements = true;
>>>>>>>>          }
>>>>>>>>          if (!hadElements) {
>>>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0, 
>>>>>>>> NULL_ELEMENT));
>>>>>>>>          }
>>>>>>>>       }
>>>>>>>>    }
>>>>>>>> }
>>>>>>>> On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen <>
>>>>>>>> wrote:
>>>>>>>>> I think this may be a great example to add as a utility function.
>>>>>>>>> Or actually add as an function to the DataSet, internally realized
>>>>>>>>> as a special case of coGroup.
>>>>>>>>> We do not have a ready example of that, but it should be
>>>>>>>>> straightforward to realize. Similar as for the join, coGroup on the 
>>>>>>>>> join
>>>>>>>>> keys. Inside the coGroup function, emit the combination of all values 
>>>>>>>>> from
>>>>>>>>> the two iterators. If one of them is empty (the one that is not 
>>>>>>>>> outer) then
>>>>>>>>> emit all values from the outer side.
>>>>>>>>> Greetings,
>>>>>>>>> Stephan
>>>>>>>>> On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier <
>>>>>>>>>> wrote:
>>>>>>>>>> Do you have an already working example of it? :)
>>>>>>>>>> On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi <>
>>>>>>>>>> wrote:
>>>>>>>>>>> On 15 Apr 2015, at 10:30, Flavio Pompermaier <
>>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Hi to all,
>>>>>>>>>>> > I have to join two datasets but I'd like to keep all data in
>>>>>>>>>>> the left also if there' no right dataset.
>>>>>>>>>>> > How can you achieve that in Flink? maybe I should use coGroup?
>>>>>>>>>>> Yes, currently you have to implement this manually with a coGroup

Reply via email to