I think Fabian has a good direction. @Andra, are you using a mapPartition() operation? If yes, it is really non-deterministic, unless you explicitly call a partitioning before (rebalancing being a special case thereof).
If you are not using that, can you point us to the code and give us a bit more input, like what operations are you applying, and what is the data set you are using. Thanks, Stephan On Sat, Feb 14, 2015 at 9:17 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Andra, > > I haven't had a detailed look at Gelly and its functions, but Flink has > only few operators which can cause undeterministic behavior. > In general, user code should be implemented without side effects, i.e., the > result of each function call may only depend on its arguments. This > principle gives Flink the freedom to perform function calls in any order on > any machine. > The only build-in exception is the mapPartition operator, which receives a > whole partition as input where the partitions are not deterministically > computed and depend on input split assignment and optimizer strategy > choices. > Another source of undeterministic results can be incorrect semantic > properties, which can make the optimizer believe that data is already > sorted or partitioned while it is not. In case of your example, an explicit > rebalance operation could reset these believed properties and force the > optimizer to deterministically reorganize the data. > > I would have a look at the execution plans for both variants (the > undeterministic and the deterministic). > You can get them as JSON String by calling > ExecutionEnvirionment.getExecutionPlan(). > > Best, Fabian > > > > For example, the result of a mapPartition() operator can depen > > > 2015-02-14 18:55 GMT+01:00 Andra Lungu <lungu.an...@gmail.com>: > > > Hey guys, > > > > As I previously said, I have had some problems getting this DMST > algorithm > > to be fully functional, either with Flink 0.8 or with Flink 0.9. > > > > My latest problem(and I have been debugging this for quite some days) was > > that for the test I wrote that extended MultipleProgramsTestBase, the > > Execution mode = CLUSTER test had a very non deterministic behaviour(i.e. > > each time it produced a different number of Actual lines, that were, > > obviously not equal to the number of Expected lines). > > > > > > The function that caused problems has the following header: > > > > public DataSet<Tuple5<Long, Long, Double, Long, Long>> > > updateRootIdsForRealEdges( > > DataSet<Tuple5<Long, Long, Double, Long, Long>> edges, > > DataSet<Vertex<Long, Long>> verticesWithRootIDs) { > > > > After some classical "printf debugging", I saw that the: > > > > DataSet<Tuple5<Long, Long, Double, Long, Long>> edges > > > > when performing a join on it, did not always act as if it had all the > > values you would expect it to have. > > > > The code snippet that solved the problem raises some questions for me: > > > > DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges = > > edges.map(new MapFunction<Tuple5<Long, Long, Double, Long, Long>, > > Tuple5<Long, Long, Double, Long, Long>>() { > > @Override > > public Tuple5<Long, Long, Double, Long, Long> map(Tuple5<Long, > > Long, Double, Long, Long> longLongDoubleLongLongTuple5) throws > > Exception { > > return longLongDoubleLongLongTuple5; > > } > > }); > > > > or > > > > DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges = > > edges.rebalance(); > > > > and then the join would be performed on those rebalancedEdges. > > And now the test passes(with either of the two solutions). > > > > So the question is: > > *Why *is this happening? *Is this normal? * > > Maybe it has something to do with the context, then how come a simple map > > fixes everything? > > > > I am sorry if this may seem like one of those "Why is the sky blue?" > > questions, but I am here to learn :D > > > > Thank you! > > Andra > > >