Hi to all,

I have a case where I don't understand why flink is not able to optimize
the join between 2 datasets.

My initial code was basically this:

DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207
elements
DataSet<Tuple2<String,List<MyObject>>> attrToExpand = ...;//65.000 elements

DataSet<Tuple2<String, IndexAttributeToExpand>> tmp =
attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);

This job wasn't able to complete on my local machine (from Eclipse) because
Flink was giving me the following error:

Hash join exceeded maximum number of recursions, without reducing
partitions enough to be memory resident. Probably cause: Too many duplicate
keys.

This was because in attrToExpand the List<MyObject> could be quite big.
Indeed, changing that code to the following make everything work like a
charm:

DataSet<Tuple2<String, List<ThriftObj>>> subset =

attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2();

DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset =

attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1);


Isn't something impossible for Flink to optimize my initial code into the
second? I was convinced that Flink was performing a join only on the keys
before grabbing also the other elements of the Tuples into memory..am I
wrong?

Best,
Flavio

Reply via email to