Hi to all, I have a case where I don't understand why flink is not able to optimize the join between 2 datasets.
My initial code was basically this: DataSet<Tuple2<String, List<ThriftObj>>> bigDataset = ...;//5.257.207 elements DataSet<Tuple2<String,List<MyObject>>> attrToExpand = ...;//65.000 elements DataSet<Tuple2<String, IndexAttributeToExpand>> tmp = attrToExpand.joinWithHuge(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); This job wasn't able to complete on my local machine (from Eclipse) because Flink was giving me the following error: Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident. Probably cause: Too many duplicate keys. This was because in attrToExpand the List<MyObject> could be quite big. Indeed, changing that code to the following make everything work like a charm: DataSet<Tuple2<String, List<ThriftObj>>> subset = attrToExpand.project(0).joinWithHuge(bigDataset).where(0).equalTo(0).getInput2(); DataSet<Tuple3<String, List<MyObject>, List<ThriftObj>>> atomSubset = attrToExpand.join(subset).where(0).equalTo(0).projectFirst(0,1).projectSecond(1); Isn't something impossible for Flink to optimize my initial code into the second? I was convinced that Flink was performing a join only on the keys before grabbing also the other elements of the Tuples into memory..am I wrong? Best, Flavio