Re: Case of possible join optimization

2015-09-08 Thread Flavio Pompermaier
Ah..Fortunately it seems to do what I need :) It efficiently filters the bigDataset retaining only the needed elements making the join feasible with few memory.. :) So that's a bug? Which should be the right way to achieve that behaviour with Flink? On Tue, Sep 8, 2015 at 11:22 AM, Stephan Ewen w

Re: Case of possible join optimization

2015-09-08 Thread Stephan Ewen
The problem is the "getInput2()" call. It takes the input to the join, not the result of the join. That way, the first join never happens. On Tue, Sep 8, 2015 at 11:10 AM, Flavio Pompermaier wrote: > Obviously when trying to simplify my code I didn't substitute correctly > the variable of the jo

Re: Case of possible join optimization

2015-09-08 Thread Flavio Pompermaier
Obviously when trying to simplify my code I didn't substitute correctly the variable of the join..it should be: DataSet, List>> atomSubset = attrToExpand.join(*subset* ).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); Do you think that a JoinHint to create a sort-merge join is equiv

Re: Case of possible join optimization

2015-09-08 Thread Stephan Ewen
Hi Flavio! No, Flink does not join keys before full values. That is very often very inefficient, as it results effectively in two joins where one is typically about as expensive as the original join. One can do "semi-join-reduction", in case the join filters out many values (many elements from on

Case of possible join optimization

2015-09-08 Thread Flavio Pompermaier
Hi to all, I have a case where I don't understand why flink is not able to optimize the join between 2 datasets. My initial code was basically this: DataSet>> bigDataset = ...;//5.257.207 elements DataSet>> attrToExpand = ...;//65.000 elements DataSet> tmp = attrToExpand.joinWithHuge(subset).wh