Hey guys, As I previously said, I have had some problems getting this DMST algorithm to be fully functional, either with Flink 0.8 or with Flink 0.9.
My latest problem(and I have been debugging this for quite some days) was that for the test I wrote that extended MultipleProgramsTestBase, the Execution mode = CLUSTER test had a very non deterministic behaviour(i.e. each time it produced a different number of Actual lines, that were, obviously not equal to the number of Expected lines). The function that caused problems has the following header: public DataSet<Tuple5<Long, Long, Double, Long, Long>> updateRootIdsForRealEdges( DataSet<Tuple5<Long, Long, Double, Long, Long>> edges, DataSet<Vertex<Long, Long>> verticesWithRootIDs) { After some classical "printf debugging", I saw that the: DataSet<Tuple5<Long, Long, Double, Long, Long>> edges when performing a join on it, did not always act as if it had all the values you would expect it to have. The code snippet that solved the problem raises some questions for me: DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges = edges.map(new MapFunction<Tuple5<Long, Long, Double, Long, Long>, Tuple5<Long, Long, Double, Long, Long>>() { @Override public Tuple5<Long, Long, Double, Long, Long> map(Tuple5<Long, Long, Double, Long, Long> longLongDoubleLongLongTuple5) throws Exception { return longLongDoubleLongLongTuple5; } }); or DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges = edges.rebalance(); and then the join would be performed on those rebalancedEdges. And now the test passes(with either of the two solutions). So the question is: *Why *is this happening? *Is this normal? * Maybe it has something to do with the context, then how come a simple map fixes everything? I am sorry if this may seem like one of those "Why is the sky blue?" questions, but I am here to learn :D Thank you! Andra