Could resolve the problem but the fact to accumulate stuff in a local variable is it safe if datasets are huge..?
On Fri, Apr 17, 2015 at 9:54 AM, Till Rohrmann <till.rohrm...@gmail.com> wrote: > If it's fine when you have null string values in the cases where > D1.f1!="a1" or D1.f2!="a2" then a possible solution could look like (with > Scala API): > > val ds1: DataSet[(String, String, String)] = getDS1 > val ds2: DataSet[(String, String, String)] = getDS2 > > ds1.coGroup(ds2).where(2).equalTo(0) { > (left, right, collector: Collector[(String, String, String, String)]) => > { > if(right.isEmpty) { > left foreach { > element => { > val value1 = if(element._2 == "a1") element._3 else null > val value2 = if(element._2 == "a2") element._3 else null > collector.collect((element._1, null, value1, value2)) > } > } > } else { > val array = right.toArray > for(leftElement <- left) { > val value1 = if(leftElement._2 == "a1") leftElement._3 else null > val value2 = if(leftElement._2 == "a2") leftElement._3 else null > > for(rightElement <- array) { > collector.collect(leftElement._1, rightElement._1, value1, value2)) > } > } > } > } > } > > Does this solve your problem? > > On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Hi Till, >> thanks for the reply. >> What I'd like to do is to merge D1 and D2 if there's a ref from D1 to D2 >> (D1.f2==D2.f0). >> If this condition is true, I would like to produce a set of tuples with >> the matching elements >> at the first to places (D1.*f2*, D2.*f0*) and the other two values (if >> present) of the matching tuple >> in D1 when D1.f1==*"a1"* and D1.f2=*"a2"* (string values) respectively. >> (PS: For each value of D1.f0 you can have at most one value of a1 and a2) >> >> Is it more clear? >> >> On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann <till.rohrm...@gmail.com> >> wrote: >> >>> Hi Flavio, >>> >>> I don't really understand what you try to do. What does D1.f2(D1.f1==p1) >>> mean? What does happen if the condition in D1.f2(if D1.f1==p2) is false? >>> >>> Where does the values a1 and a2 in (A, X, a1, a2) come from when you >>> join [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe you can >>> elaborate a bit more on your example. >>> >>> Cheers, >>> >>> Till >>> >>> On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> I cannot find a solution to my use case :( >>>> I have 2 datasets D1 and D2 like: >>>> >>>> D1: >>>> A,p1,a1 >>>> A,p2,a2 >>>> A,p3,X >>>> B,p3,Y >>>> B,p1,b1 >>>> >>>> D2: >>>> X,s,V >>>> X,r,2 >>>> Y,j,k >>>> >>>> I'd like to have a unique dataset D3(Tuple4) like >>>> >>>> A,X,a1,a2 >>>> B,Y,b1,null >>>> >>>> Basically filling with <D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if >>>> D1.f1==p2)> when D1.f2==D2.f0. >>>> Is that possible and how? >>>> Could you show me a simple snippet? >>>> >>>> Thanks in advance, >>>> Flavio >>>> >>>> >>>> >>>> >>>> On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> You can materialize the input of the right input by creating an array >>>>> out of it, for example. Then you can reiterate over it. >>>>> >>>>> Cheers, >>>>> Till >>>>> On Apr 16, 2015 7:37 PM, "Flavio Pompermaier" <pomperma...@okkam.it> >>>>> wrote: >>>>> >>>>>> Hi Maximilian, >>>>>> I tried your solution but it doesn't work because the rightElements >>>>>> iterator cannot be used more than once: >>>>>> >>>>>> Caused by: org.apache.flink.util.TraversableOnceException: The >>>>>> Iterable can be iterated over only once. Only the first call to >>>>>> 'iterator()' will succeed. >>>>>> >>>>>> On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels <m...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Flavio, >>>>>>> >>>>>>> Here's an simple example of a Left Outer Join: >>>>>>> https://gist.github.com/mxm/c2e9c459a9d82c18d789 >>>>>>> >>>>>>> As Stephan pointed out, this can be very easily modified to >>>>>>> construct a Right Outer Join (just exchange leftElements and >>>>>>> rightElements >>>>>>> in the two loops). >>>>>>> >>>>>>> Here's an excerpt with the most important part, the coGroup function: >>>>>>> >>>>>>> public static class LeftOuterJoin implements >>>>>>> CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, >>>>>>> Tuple2<Integer, Integer>> { >>>>>>> >>>>>>> @Override >>>>>>> public void coGroup(Iterable<Tuple2<Integer, String>> leftElements, >>>>>>> Iterable<Tuple2<Integer, String>> rightElements, >>>>>>> Collector<Tuple2<Integer, Integer>> out) throws >>>>>>> Exception { >>>>>>> >>>>>>> final int NULL_ELEMENT = -1; >>>>>>> >>>>>>> for (Tuple2<Integer, String> leftElem : leftElements) { >>>>>>> boolean hadElements = false; >>>>>>> for (Tuple2<Integer, String> rightElem : rightElements) { >>>>>>> out.collect(new Tuple2<Integer, Integer>(leftElem.f0, >>>>>>> rightElem.f0)); >>>>>>> hadElements = true; >>>>>>> } >>>>>>> if (!hadElements) { >>>>>>> out.collect(new Tuple2<Integer, Integer>(leftElem.f0, >>>>>>> NULL_ELEMENT)); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen <se...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> I think this may be a great example to add as a utility function. >>>>>>>> >>>>>>>> Or actually add as an function to the DataSet, internally realized >>>>>>>> as a special case of coGroup. >>>>>>>> >>>>>>>> We do not have a ready example of that, but it should be >>>>>>>> straightforward to realize. Similar as for the join, coGroup on the >>>>>>>> join >>>>>>>> keys. Inside the coGroup function, emit the combination of all values >>>>>>>> from >>>>>>>> the two iterators. If one of them is empty (the one that is not outer) >>>>>>>> then >>>>>>>> emit all values from the outer side. >>>>>>>> >>>>>>>> Greetings, >>>>>>>> Stephan >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> Do you have an already working example of it? :) >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi <u...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 15 Apr 2015, at 10:30, Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>> > >>>>>>>>>> > Hi to all, >>>>>>>>>> > I have to join two datasets but I'd like to keep all data in >>>>>>>>>> the left also if there' no right dataset. >>>>>>>>>> > How can you achieve that in Flink? maybe I should use coGroup? >>>>>>>>>> >>>>>>>>>> Yes, currently you have to implement this manually with a coGroup >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>> >> >> >