If you know that the group cardinality of one input is always 1 (or 0) you can make that input the one to cache in memory and stream the other input with potentially more group elements.
2015-04-17 4:09 GMT-05:00 Flavio Pompermaier <pomperma...@okkam.it>: > That would be very helpful... > > Thanks for the support, > Flavio > > On Fri, Apr 17, 2015 at 10:04 AM, Till Rohrmann <till.rohrm...@gmail.com> > wrote: > >> No its not, but at the moment there is afaik no other way around it. >> There is an issue for proper outer join support [1] >> >> [1] https://issues.apache.org/jira/browse/FLINK-687 >> >> On Fri, Apr 17, 2015 at 10:01 AM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> Could resolve the problem but the fact to accumulate stuff in a local >>> variable is it safe if datasets are huge..? >>> >>> On Fri, Apr 17, 2015 at 9:54 AM, Till Rohrmann <till.rohrm...@gmail.com> >>> wrote: >>> >>>> If it's fine when you have null string values in the cases where >>>> D1.f1!="a1" or D1.f2!="a2" then a possible solution could look like (with >>>> Scala API): >>>> >>>> val ds1: DataSet[(String, String, String)] = getDS1 >>>> val ds2: DataSet[(String, String, String)] = getDS2 >>>> >>>> ds1.coGroup(ds2).where(2).equalTo(0) { >>>> (left, right, collector: Collector[(String, String, String, String)]) >>>> => { >>>> if(right.isEmpty) { >>>> left foreach { >>>> element => { >>>> val value1 = if(element._2 == "a1") element._3 else null >>>> val value2 = if(element._2 == "a2") element._3 else null >>>> collector.collect((element._1, null, value1, value2)) >>>> } >>>> } >>>> } else { >>>> val array = right.toArray >>>> for(leftElement <- left) { >>>> val value1 = if(leftElement._2 == "a1") leftElement._3 else null >>>> val value2 = if(leftElement._2 == "a2") leftElement._3 else null >>>> >>>> for(rightElement <- array) { >>>> collector.collect(leftElement._1, rightElement._1, value1, >>>> value2)) >>>> } >>>> } >>>> } >>>> } >>>> } >>>> >>>> Does this solve your problem? >>>> >>>> On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi Till, >>>>> thanks for the reply. >>>>> What I'd like to do is to merge D1 and D2 if there's a ref from D1 to >>>>> D2 (D1.f2==D2.f0). >>>>> If this condition is true, I would like to produce a set of tuples >>>>> with the matching elements >>>>> at the first to places (D1.*f2*, D2.*f0*) and the other two values >>>>> (if present) of the matching tuple >>>>> in D1 when D1.f1==*"a1"* and D1.f2=*"a2"* (string values) >>>>> respectively. >>>>> (PS: For each value of D1.f0 you can have at most one value of a1 and >>>>> a2) >>>>> >>>>> Is it more clear? >>>>> >>>>> On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann < >>>>> till.rohrm...@gmail.com> wrote: >>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> I don't really understand what you try to do. What does >>>>>> D1.f2(D1.f1==p1) mean? What does happen if the condition in D1.f2(if >>>>>> D1.f1==p2) is false? >>>>>> >>>>>> Where does the values a1 and a2 in (A, X, a1, a2) come from when you >>>>>> join [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe you can >>>>>> elaborate a bit more on your example. >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Till >>>>>> >>>>>> On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> I cannot find a solution to my use case :( >>>>>>> I have 2 datasets D1 and D2 like: >>>>>>> >>>>>>> D1: >>>>>>> A,p1,a1 >>>>>>> A,p2,a2 >>>>>>> A,p3,X >>>>>>> B,p3,Y >>>>>>> B,p1,b1 >>>>>>> >>>>>>> D2: >>>>>>> X,s,V >>>>>>> X,r,2 >>>>>>> Y,j,k >>>>>>> >>>>>>> I'd like to have a unique dataset D3(Tuple4) like >>>>>>> >>>>>>> A,X,a1,a2 >>>>>>> B,Y,b1,null >>>>>>> >>>>>>> Basically filling with <D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if >>>>>>> D1.f1==p2)> when D1.f2==D2.f0. >>>>>>> Is that possible and how? >>>>>>> Could you show me a simple snippet? >>>>>>> >>>>>>> Thanks in advance, >>>>>>> Flavio >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann <trohrm...@apache.org >>>>>>> > wrote: >>>>>>> >>>>>>>> You can materialize the input of the right input by creating an >>>>>>>> array out of it, for example. Then you can reiterate over it. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> On Apr 16, 2015 7:37 PM, "Flavio Pompermaier" <pomperma...@okkam.it> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Maximilian, >>>>>>>>> I tried your solution but it doesn't work because the >>>>>>>>> rightElements iterator cannot be used more than once: >>>>>>>>> >>>>>>>>> Caused by: org.apache.flink.util.TraversableOnceException: The >>>>>>>>> Iterable can be iterated over only once. Only the first call to >>>>>>>>> 'iterator()' will succeed. >>>>>>>>> >>>>>>>>> On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels < >>>>>>>>> m...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> Hi Flavio, >>>>>>>>>> >>>>>>>>>> Here's an simple example of a Left Outer Join: >>>>>>>>>> https://gist.github.com/mxm/c2e9c459a9d82c18d789 >>>>>>>>>> >>>>>>>>>> As Stephan pointed out, this can be very easily modified to >>>>>>>>>> construct a Right Outer Join (just exchange leftElements and >>>>>>>>>> rightElements >>>>>>>>>> in the two loops). >>>>>>>>>> >>>>>>>>>> Here's an excerpt with the most important part, the coGroup >>>>>>>>>> function: >>>>>>>>>> >>>>>>>>>> public static class LeftOuterJoin implements >>>>>>>>>> CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, >>>>>>>>>> Tuple2<Integer, Integer>> { >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void coGroup(Iterable<Tuple2<Integer, String>> >>>>>>>>>> leftElements, >>>>>>>>>> Iterable<Tuple2<Integer, String>> >>>>>>>>>> rightElements, >>>>>>>>>> Collector<Tuple2<Integer, Integer>> out) >>>>>>>>>> throws Exception { >>>>>>>>>> >>>>>>>>>> final int NULL_ELEMENT = -1; >>>>>>>>>> >>>>>>>>>> for (Tuple2<Integer, String> leftElem : leftElements) { >>>>>>>>>> boolean hadElements = false; >>>>>>>>>> for (Tuple2<Integer, String> rightElem : rightElements) { >>>>>>>>>> out.collect(new Tuple2<Integer, Integer>(leftElem.f0, >>>>>>>>>> rightElem.f0)); >>>>>>>>>> hadElements = true; >>>>>>>>>> } >>>>>>>>>> if (!hadElements) { >>>>>>>>>> out.collect(new Tuple2<Integer, Integer>(leftElem.f0, >>>>>>>>>> NULL_ELEMENT)); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen <se...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I think this may be a great example to add as a utility function. >>>>>>>>>>> >>>>>>>>>>> Or actually add as an function to the DataSet, internally >>>>>>>>>>> realized as a special case of coGroup. >>>>>>>>>>> >>>>>>>>>>> We do not have a ready example of that, but it should be >>>>>>>>>>> straightforward to realize. Similar as for the join, coGroup on the >>>>>>>>>>> join >>>>>>>>>>> keys. Inside the coGroup function, emit the combination of all >>>>>>>>>>> values from >>>>>>>>>>> the two iterators. If one of them is empty (the one that is not >>>>>>>>>>> outer) then >>>>>>>>>>> emit all values from the outer side. >>>>>>>>>>> >>>>>>>>>>> Greetings, >>>>>>>>>>> Stephan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>> >>>>>>>>>>>> Do you have an already working example of it? :) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi <u...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 15 Apr 2015, at 10:30, Flavio Pompermaier < >>>>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> > >>>>>>>>>>>>> > Hi to all, >>>>>>>>>>>>> > I have to join two datasets but I'd like to keep all data in >>>>>>>>>>>>> the left also if there' no right dataset. >>>>>>>>>>>>> > How can you achieve that in Flink? maybe I should use >>>>>>>>>>>>> coGroup? >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, currently you have to implement this manually with a >>>>>>>>>>>>> coGroup >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> >