If it's fine when you have null string values in the cases where D1.f1!="a1" or D1.f2!="a2" then a possible solution could look like (with Scala API):
val ds1: DataSet[(String, String, String)] = getDS1 val ds2: DataSet[(String, String, String)] = getDS2 ds1.coGroup(ds2).where(2).equalTo(0) { (left, right, collector: Collector[(String, String, String, String)]) => { if(right.isEmpty) { left foreach { element => { val value1 = if(element._2 == "a1") element._3 else null val value2 = if(element._2 == "a2") element._3 else null collector.collect((element._1, null, value1, value2)) } } } else { val array = right.toArray for(leftElement <- left) { val value1 = if(leftElement._2 == "a1") leftElement._3 else null val value2 = if(leftElement._2 == "a2") leftElement._3 else null for(rightElement <- array) { collector.collect(leftElement._1, rightElement._1, value1, value2)) } } } } } Does this solve your problem? On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Hi Till, > thanks for the reply. > What I'd like to do is to merge D1 and D2 if there's a ref from D1 to D2 > (D1.f2==D2.f0). > If this condition is true, I would like to produce a set of tuples with > the matching elements > at the first to places (D1.*f2*, D2.*f0*) and the other two values (if > present) of the matching tuple > in D1 when D1.f1==*"a1"* and D1.f2=*"a2"* (string values) respectively. > (PS: For each value of D1.f0 you can have at most one value of a1 and a2) > > Is it more clear? > > On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann <till.rohrm...@gmail.com> > wrote: > >> Hi Flavio, >> >> I don't really understand what you try to do. What does D1.f2(D1.f1==p1) >> mean? What does happen if the condition in D1.f2(if D1.f1==p2) is false? >> >> Where does the values a1 and a2 in (A, X, a1, a2) come from when you join >> [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe you can >> elaborate a bit more on your example. >> >> Cheers, >> >> Till >> >> On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> I cannot find a solution to my use case :( >>> I have 2 datasets D1 and D2 like: >>> >>> D1: >>> A,p1,a1 >>> A,p2,a2 >>> A,p3,X >>> B,p3,Y >>> B,p1,b1 >>> >>> D2: >>> X,s,V >>> X,r,2 >>> Y,j,k >>> >>> I'd like to have a unique dataset D3(Tuple4) like >>> >>> A,X,a1,a2 >>> B,Y,b1,null >>> >>> Basically filling with <D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if >>> D1.f1==p2)> when D1.f2==D2.f0. >>> Is that possible and how? >>> Could you show me a simple snippet? >>> >>> Thanks in advance, >>> Flavio >>> >>> >>> >>> >>> On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> You can materialize the input of the right input by creating an array >>>> out of it, for example. Then you can reiterate over it. >>>> >>>> Cheers, >>>> Till >>>> On Apr 16, 2015 7:37 PM, "Flavio Pompermaier" <pomperma...@okkam.it> >>>> wrote: >>>> >>>>> Hi Maximilian, >>>>> I tried your solution but it doesn't work because the rightElements >>>>> iterator cannot be used more than once: >>>>> >>>>> Caused by: org.apache.flink.util.TraversableOnceException: The >>>>> Iterable can be iterated over only once. Only the first call to >>>>> 'iterator()' will succeed. >>>>> >>>>> On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels <m...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> Here's an simple example of a Left Outer Join: >>>>>> https://gist.github.com/mxm/c2e9c459a9d82c18d789 >>>>>> >>>>>> As Stephan pointed out, this can be very easily modified to construct >>>>>> a Right Outer Join (just exchange leftElements and rightElements in the >>>>>> two >>>>>> loops). >>>>>> >>>>>> Here's an excerpt with the most important part, the coGroup function: >>>>>> >>>>>> public static class LeftOuterJoin implements >>>>>> CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, >>>>>> Tuple2<Integer, Integer>> { >>>>>> >>>>>> @Override >>>>>> public void coGroup(Iterable<Tuple2<Integer, String>> leftElements, >>>>>> Iterable<Tuple2<Integer, String>> rightElements, >>>>>> Collector<Tuple2<Integer, Integer>> out) throws >>>>>> Exception { >>>>>> >>>>>> final int NULL_ELEMENT = -1; >>>>>> >>>>>> for (Tuple2<Integer, String> leftElem : leftElements) { >>>>>> boolean hadElements = false; >>>>>> for (Tuple2<Integer, String> rightElem : rightElements) { >>>>>> out.collect(new Tuple2<Integer, Integer>(leftElem.f0, >>>>>> rightElem.f0)); >>>>>> hadElements = true; >>>>>> } >>>>>> if (!hadElements) { >>>>>> out.collect(new Tuple2<Integer, Integer>(leftElem.f0, >>>>>> NULL_ELEMENT)); >>>>>> } >>>>>> } >>>>>> >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> I think this may be a great example to add as a utility function. >>>>>>> >>>>>>> Or actually add as an function to the DataSet, internally realized >>>>>>> as a special case of coGroup. >>>>>>> >>>>>>> We do not have a ready example of that, but it should be >>>>>>> straightforward to realize. Similar as for the join, coGroup on the join >>>>>>> keys. Inside the coGroup function, emit the combination of all values >>>>>>> from >>>>>>> the two iterators. If one of them is empty (the one that is not outer) >>>>>>> then >>>>>>> emit all values from the outer side. >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> Do you have an already working example of it? :) >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi <u...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> On 15 Apr 2015, at 10:30, Flavio Pompermaier <pomperma...@okkam.it> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> > >>>>>>>>> > Hi to all, >>>>>>>>> > I have to join two datasets but I'd like to keep all data in the >>>>>>>>> left also if there' no right dataset. >>>>>>>>> > How can you achieve that in Flink? maybe I should use coGroup? >>>>>>>>> >>>>>>>>> Yes, currently you have to implement this manually with a coGroup >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> >> > >