There is no caching mechanism.
To do the left outer join as in Tills implementation, you need to collect
all elements of one! iterator in memory. If you know, that one of the two
iterators contains at most 1 element, you should collect that in memory and
stream the elements of the other iterator.

2015-04-17 6:18 GMT-05:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Could you explain a little more in detail this caching mechanism with a
> simple code snippet...?
>
> Thanks,
> Flavio
> On Apr 17, 2015 1:12 PM, "Fabian Hueske" <fhue...@gmail.com> wrote:
>
>> If you know that the group cardinality of one input is always 1 (or 0)
>> you can make that input the one to cache in memory and stream the other
>> input with potentially more group elements.
>>
>> 2015-04-17 4:09 GMT-05:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> That would be very helpful...
>>>
>>> Thanks for the support,
>>> Flavio
>>>
>>> On Fri, Apr 17, 2015 at 10:04 AM, Till Rohrmann <till.rohrm...@gmail.com
>>> > wrote:
>>>
>>>> No its not, but at the moment there is afaik no other way around it.
>>>> There is an issue for proper outer join support [1]
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-687
>>>>
>>>> On Fri, Apr 17, 2015 at 10:01 AM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Could resolve the problem but the fact to accumulate stuff in a local
>>>>> variable is it safe if datasets are huge..?
>>>>>
>>>>> On Fri, Apr 17, 2015 at 9:54 AM, Till Rohrmann <
>>>>> till.rohrm...@gmail.com> wrote:
>>>>>
>>>>>> If it's fine when you have null string values in the cases where
>>>>>> D1.f1!="a1" or D1.f2!="a2" then a possible solution could look like (with
>>>>>> Scala API):
>>>>>>
>>>>>> val ds1: DataSet[(String, String, String)] = getDS1
>>>>>> val ds2: DataSet[(String, String, String)] = getDS2
>>>>>>
>>>>>> ds1.coGroup(ds2).where(2).equalTo(0) {
>>>>>>   (left, right, collector: Collector[(String, String, String,
>>>>>> String)]) => {
>>>>>>     if(right.isEmpty) {
>>>>>>       left foreach {
>>>>>>       element => {
>>>>>>       val value1 = if(element._2 == "a1") element._3 else null
>>>>>>       val value2 = if(element._2 == "a2") element._3 else null
>>>>>>       collector.collect((element._1, null, value1, value2))
>>>>>>         }
>>>>>>       }
>>>>>>     } else {
>>>>>>       val array = right.toArray
>>>>>>       for(leftElement <- left) {
>>>>>>       val value1 = if(leftElement._2 == "a1") leftElement._3 else
>>>>>> null
>>>>>>     val value2 = if(leftElement._2 == "a2") leftElement._3 else null
>>>>>>
>>>>>>     for(rightElement <- array) {
>>>>>>       collector.collect(leftElement._1, rightElement._1, value1,
>>>>>> value2))
>>>>>>     }
>>>>>>       }
>>>>>>     }
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> Does this solve your problem?
>>>>>>
>>>>>> On Fri, Apr 17, 2015 at 9:30 AM, Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> Hi Till,
>>>>>>> thanks for the reply.
>>>>>>> What I'd like to do is to merge D1 and D2 if there's a ref from D1
>>>>>>> to D2 (D1.f2==D2.f0).
>>>>>>> If this condition is true, I would like to produce a set of tuples
>>>>>>> with the matching elements
>>>>>>> at the first to places (D1.*f2*, D2.*f0*) and the other two values
>>>>>>> (if present) of the matching tuple
>>>>>>> in D1 when D1.f1==*"a1"* and D1.f2=*"a2"* (string values)
>>>>>>> respectively.
>>>>>>> (PS: For each value of D1.f0 you can have at most one value of a1
>>>>>>> and a2)
>>>>>>>
>>>>>>> Is it more clear?
>>>>>>>
>>>>>>> On Fri, Apr 17, 2015 at 9:03 AM, Till Rohrmann <
>>>>>>> till.rohrm...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Flavio,
>>>>>>>>
>>>>>>>> I don't really understand what you try to do. What does
>>>>>>>> D1.f2(D1.f1==p1) mean? What does happen if the condition in D1.f2(if
>>>>>>>> D1.f1==p2) is false?
>>>>>>>>
>>>>>>>> Where does the values a1 and a2 in (A, X, a1, a2) come from when
>>>>>>>> you join [(A, p3, X), (X, s, V)] and [(A, p3, X), (X, r, 2)]? Maybe 
>>>>>>>> you can
>>>>>>>> elaborate a bit more on your example.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Thu, Apr 16, 2015 at 10:09 PM, Flavio Pompermaier <
>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> I cannot find a solution to my use case :(
>>>>>>>>> I have 2 datasets D1 and D2 like:
>>>>>>>>>
>>>>>>>>> D1:
>>>>>>>>> A,p1,a1
>>>>>>>>> A,p2,a2
>>>>>>>>> A,p3,X
>>>>>>>>> B,p3,Y
>>>>>>>>> B,p1,b1
>>>>>>>>>
>>>>>>>>> D2:
>>>>>>>>> X,s,V
>>>>>>>>> X,r,2
>>>>>>>>> Y,j,k
>>>>>>>>>
>>>>>>>>> I'd like to have a unique dataset D3(Tuple4) like
>>>>>>>>>
>>>>>>>>> A,X,a1,a2
>>>>>>>>> B,Y,b1,null
>>>>>>>>>
>>>>>>>>> Basically filling with <D1.f0,D2.f0,D1.f2(D1.f1==p1),D1.f2(if
>>>>>>>>> D1.f1==p2)> when D1.f2==D2.f0.
>>>>>>>>> Is that possible and how?
>>>>>>>>> Could you show me a simple snippet?
>>>>>>>>>
>>>>>>>>> Thanks in advance,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Apr 16, 2015 at 9:48 PM, Till Rohrmann <
>>>>>>>>> trohrm...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> You can materialize the input of the right input by creating an
>>>>>>>>>> array out of it, for example. Then you can reiterate over it.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>> On Apr 16, 2015 7:37 PM, "Flavio Pompermaier" <
>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Maximilian,
>>>>>>>>>>> I tried your solution but it doesn't work because the
>>>>>>>>>>> rightElements iterator cannot be used more than once:
>>>>>>>>>>>
>>>>>>>>>>> Caused by: org.apache.flink.util.TraversableOnceException: The
>>>>>>>>>>> Iterable can be iterated over only once. Only the first call to
>>>>>>>>>>> 'iterator()' will succeed.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 15, 2015 at 12:59 PM, Maximilian Michels <
>>>>>>>>>>> m...@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Flavio,
>>>>>>>>>>>>
>>>>>>>>>>>> Here's an simple example of a Left Outer Join:
>>>>>>>>>>>> https://gist.github.com/mxm/c2e9c459a9d82c18d789
>>>>>>>>>>>>
>>>>>>>>>>>> As Stephan pointed out, this can be very easily modified to
>>>>>>>>>>>> construct a Right Outer Join (just exchange leftElements and 
>>>>>>>>>>>> rightElements
>>>>>>>>>>>> in the two loops).
>>>>>>>>>>>>
>>>>>>>>>>>> Here's an excerpt with the most important part, the coGroup
>>>>>>>>>>>> function:
>>>>>>>>>>>>
>>>>>>>>>>>> public static class LeftOuterJoin implements 
>>>>>>>>>>>> CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, 
>>>>>>>>>>>> Tuple2<Integer, Integer>> {
>>>>>>>>>>>>
>>>>>>>>>>>>    @Override
>>>>>>>>>>>>    public void coGroup(Iterable<Tuple2<Integer, String>> 
>>>>>>>>>>>> leftElements,
>>>>>>>>>>>>                        Iterable<Tuple2<Integer, String>> 
>>>>>>>>>>>> rightElements,
>>>>>>>>>>>>                        Collector<Tuple2<Integer, Integer>> out) 
>>>>>>>>>>>> throws Exception {
>>>>>>>>>>>>
>>>>>>>>>>>>       final int NULL_ELEMENT = -1;
>>>>>>>>>>>>
>>>>>>>>>>>>       for (Tuple2<Integer, String> leftElem : leftElements) {
>>>>>>>>>>>>          boolean hadElements = false;
>>>>>>>>>>>>          for (Tuple2<Integer, String> rightElem : rightElements) {
>>>>>>>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0, 
>>>>>>>>>>>> rightElem.f0));
>>>>>>>>>>>>             hadElements = true;
>>>>>>>>>>>>          }
>>>>>>>>>>>>          if (!hadElements) {
>>>>>>>>>>>>             out.collect(new Tuple2<Integer, Integer>(leftElem.f0, 
>>>>>>>>>>>> NULL_ELEMENT));
>>>>>>>>>>>>          }
>>>>>>>>>>>>       }
>>>>>>>>>>>>
>>>>>>>>>>>>    }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Apr 15, 2015 at 11:01 AM, Stephan Ewen <
>>>>>>>>>>>> se...@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I think this may be a great example to add as a utility
>>>>>>>>>>>>> function.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Or actually add as an function to the DataSet, internally
>>>>>>>>>>>>> realized as a special case of coGroup.
>>>>>>>>>>>>>
>>>>>>>>>>>>> We do not have a ready example of that, but it should be
>>>>>>>>>>>>> straightforward to realize. Similar as for the join, coGroup on 
>>>>>>>>>>>>> the join
>>>>>>>>>>>>> keys. Inside the coGroup function, emit the combination of all 
>>>>>>>>>>>>> values from
>>>>>>>>>>>>> the two iterators. If one of them is empty (the one that is not 
>>>>>>>>>>>>> outer) then
>>>>>>>>>>>>> emit all values from the outer side.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Greetings,
>>>>>>>>>>>>> Stephan
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Apr 15, 2015 at 10:36 AM, Flavio Pompermaier <
>>>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Do you have an already working example of it? :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Apr 15, 2015 at 10:32 AM, Ufuk Celebi <u...@apache.org
>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 15 Apr 2015, at 10:30, Flavio Pompermaier <
>>>>>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Hi to all,
>>>>>>>>>>>>>>> > I have to join two datasets but I'd like to keep all data
>>>>>>>>>>>>>>> in the left also if there' no right dataset.
>>>>>>>>>>>>>>> > How can you achieve that in Flink? maybe I should use
>>>>>>>>>>>>>>> coGroup?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, currently you have to implement this manually with a
>>>>>>>>>>>>>>> coGroup
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to