Hi Andra, I haven't had a detailed look at Gelly and its functions, but Flink has only few operators which can cause undeterministic behavior. In general, user code should be implemented without side effects, i.e., the result of each function call may only depend on its arguments. This principle gives Flink the freedom to perform function calls in any order on any machine. The only build-in exception is the mapPartition operator, which receives a whole partition as input where the partitions are not deterministically computed and depend on input split assignment and optimizer strategy choices. Another source of undeterministic results can be incorrect semantic properties, which can make the optimizer believe that data is already sorted or partitioned while it is not. In case of your example, an explicit rebalance operation could reset these believed properties and force the optimizer to deterministically reorganize the data.
I would have a look at the execution plans for both variants (the undeterministic and the deterministic). You can get them as JSON String by calling ExecutionEnvirionment.getExecutionPlan(). Best, Fabian For example, the result of a mapPartition() operator can depen 2015-02-14 18:55 GMT+01:00 Andra Lungu <lungu.an...@gmail.com>: > Hey guys, > > As I previously said, I have had some problems getting this DMST algorithm > to be fully functional, either with Flink 0.8 or with Flink 0.9. > > My latest problem(and I have been debugging this for quite some days) was > that for the test I wrote that extended MultipleProgramsTestBase, the > Execution mode = CLUSTER test had a very non deterministic behaviour(i.e. > each time it produced a different number of Actual lines, that were, > obviously not equal to the number of Expected lines). > > > The function that caused problems has the following header: > > public DataSet<Tuple5<Long, Long, Double, Long, Long>> > updateRootIdsForRealEdges( > DataSet<Tuple5<Long, Long, Double, Long, Long>> edges, > DataSet<Vertex<Long, Long>> verticesWithRootIDs) { > > After some classical "printf debugging", I saw that the: > > DataSet<Tuple5<Long, Long, Double, Long, Long>> edges > > when performing a join on it, did not always act as if it had all the > values you would expect it to have. > > The code snippet that solved the problem raises some questions for me: > > DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges = > edges.map(new MapFunction<Tuple5<Long, Long, Double, Long, Long>, > Tuple5<Long, Long, Double, Long, Long>>() { > @Override > public Tuple5<Long, Long, Double, Long, Long> map(Tuple5<Long, > Long, Double, Long, Long> longLongDoubleLongLongTuple5) throws > Exception { > return longLongDoubleLongLongTuple5; > } > }); > > or > > DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges = > edges.rebalance(); > > and then the join would be performed on those rebalancedEdges. > And now the test passes(with either of the two solutions). > > So the question is: > *Why *is this happening? *Is this normal? * > Maybe it has something to do with the context, then how come a simple map > fixes everything? > > I am sorry if this may seem like one of those "Why is the sky blue?" > questions, but I am here to learn :D > > Thank you! > Andra >