If it's fine when you have null string values in the cases where
D1.f1!="a1" or D1.f2!="a2" then a possible solution could look like (with
Scala API):

val ds1: DataSet[(String, String, String)] = getDS1
val ds2: DataSet[(String, String, String)] = getDS2

ds1.coGroup(ds2).where(2).equalTo(0) {
  (left, right, collector: Collector[(String, String, String, String)]) => {
    if(right.isEmpty) {
      left foreach {
      element => {
      val value1 = if(element._2 == "a1") element._3 else null
      val value2 = if(element._2 == "a2") element._3 else null
      collector.collect((element._1, null, value1, value2))
        }
      }
    } else {
      val array = right.toArray
      for(leftElement <- left) {
      val value1 = if(leftElement._2 == "a1") leftElement._3 else null
    val value2 = if(leftElement._2 == "a2") leftElement._3 else null

    for(rightElement <- array) {
      collector.collect(leftElement._1, rightElement._1, value1, value2))
    }
      }
    }
  }
}

Does this solve your problem?

On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Hi Till,
> thanks for the reply.
> What I'd like to do is to merge D1 and D2 if there's a ref from D1 to D2
> (D1.f2==D2.f0).
> If this condition is true, I would like to produce a set of tuples with
> the matching elements
> at the first to places (D1.*f2*, D2.*f0*) and the other two values (if
> present) of the matching tuple
> in D1 when D1.f1==*"a1"* and D1.f2=*"a2"* (string values) respectively.
> (PS: For each value of D1.f0 you can have at most one value of a1 and a2)
>
> Is it more clear?
>
> On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann <till.rohrm...@gmail.com>
> wrote:
>
>> Hi Flavio,
>>
>> I don't really understand what you try to do. What does D1.f2(D1.f1==p1)
>> mean? What does happen if the condition in D1.f2(if D1.f1==p2) is false?
>>
>> Where does the values a1 and a2 in (A, X, a1, a2) come from when you join
>> [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe you can
>> elaborate a bit more on your example.
>>
>> Cheers,
>>
>> Till
>>
>> On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> I cannot find a solution to my use case :(
>>> I have 2 datasets D1 and D2 like:
>>>
>>> D1:
>>> A,p1,a1
>>> A,p2,a2
>>> A,p3,X
>>> B,p3,Y
>>> B,p1,b1
>>>
>>> D2:
>>> X,s,V
>>> X,r,2
>>> Y,j,k
>>>
>>> I'd like to have a unique dataset D3(Tuple4) like
>>>
>>> A,X,a1,a2
>>> B,Y,b1,null
>>>
>>> Basically filling with <D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if
>>> D1.f1==p2)> when D1.f2==D2.f0.
>>> Is that possible and how?
>>> Could you show me a simple snippet?
>>>
>>> Thanks in advance,
>>> Flavio
>>>
>>>
>>>
>>>
>>> On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> You can materialize the input of the right input by creating an array
>>>> out of it, for example. Then you can reiterate over it.
>>>>
>>>> Cheers,
>>>> Till
>>>> On Apr 16, 2015 7:37 PM, "Flavio Pompermaier" <pomperma...@okkam.it>
>>>> wrote:
>>>>
>>>>> Hi Maximilian,
>>>>> I tried your solution but it doesn't work because the rightElements
>>>>> iterator cannot be used more than once:
>>>>>
>>>>> Caused by: org.apache.flink.util.TraversableOnceException: The
>>>>> Iterable can be iterated over only once. Only the first call to
>>>>> 'iterator()' will succeed.
>>>>>
>>>>> On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels <m...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> Here's an simple example of a Left Outer Join:
>>>>>> https://gist.github.com/mxm/c2e9c459a9d82c18d789
>>>>>>
>>>>>> As Stephan pointed out, this can be very easily modified to construct
>>>>>> a Right Outer Join (just exchange leftElements and rightElements in the 
>>>>>> two
>>>>>> loops).
>>>>>>
>>>>>> Here's an excerpt with the most important part, the coGroup function:
>>>>>>
>>>>>> public static class LeftOuterJoin implements 
>>>>>> CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, 
>>>>>> Tuple2<Integer, Integer>> {
>>>>>>
>>>>>>    @Override
>>>>>>    public void coGroup(Iterable<Tuple2<Integer, String>> leftElements,
>>>>>>                        Iterable<Tuple2<Integer, String>> rightElements,
>>>>>>                        Collector<Tuple2<Integer, Integer>> out) throws 
>>>>>> Exception {
>>>>>>
>>>>>>       final int NULL_ELEMENT = -1;
>>>>>>
>>>>>>       for (Tuple2<Integer, String> leftElem : leftElements) {
>>>>>>          boolean hadElements = false;
>>>>>>          for (Tuple2<Integer, String> rightElem : rightElements) {
>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0, 
>>>>>> rightElem.f0));
>>>>>>             hadElements = true;
>>>>>>          }
>>>>>>          if (!hadElements) {
>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0, 
>>>>>> NULL_ELEMENT));
>>>>>>          }
>>>>>>       }
>>>>>>
>>>>>>    }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I think this may be a great example to add as a utility function.
>>>>>>>
>>>>>>> Or actually add as an function to the DataSet, internally realized
>>>>>>> as a special case of coGroup.
>>>>>>>
>>>>>>> We do not have a ready example of that, but it should be
>>>>>>> straightforward to realize. Similar as for the join, coGroup on the join
>>>>>>> keys. Inside the coGroup function, emit the combination of all values 
>>>>>>> from
>>>>>>> the two iterators. If one of them is empty (the one that is not outer) 
>>>>>>> then
>>>>>>> emit all values from the outer side.
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> Do you have an already working example of it? :)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi <u...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 15 Apr 2015, at 10:30, Flavio Pompermaier <pomperma...@okkam.it>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> >
>>>>>>>>> > Hi to all,
>>>>>>>>> > I have to join two datasets but I'd like to keep all data in the
>>>>>>>>> left also if there' no right dataset.
>>>>>>>>> > How can you achieve that in Flink? maybe I should use coGroup?
>>>>>>>>>
>>>>>>>>> Yes, currently you have to implement this manually with a coGroup
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>
>
>

Reply via email to