I think Fabian has a good direction.

@Andra, are you using a mapPartition() operation? If yes, it is really
non-deterministic, unless you explicitly call a partitioning before
(rebalancing being a special case thereof).

If you are not using that, can you point us to the code and give us a bit
more input, like what operations are you applying, and what is the data set
you are using.

Thanks,
Stephan


On Sat, Feb 14, 2015 at 9:17 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Andra,
>
> I haven't had a detailed look at Gelly and its functions, but Flink has
> only few operators which can cause undeterministic behavior.
> In general, user code should be implemented without side effects, i.e., the
> result of each function call may only depend on its arguments. This
> principle gives Flink the freedom to perform function calls in any order on
> any machine.
> The only build-in exception is the mapPartition operator, which receives a
> whole partition as input where the partitions are not deterministically
> computed and depend on input split assignment and optimizer strategy
> choices.
> Another source of undeterministic results can be incorrect semantic
> properties, which can make the optimizer believe that data is already
> sorted or partitioned while it is not. In case of your example, an explicit
> rebalance operation could reset these believed properties and force the
> optimizer to deterministically reorganize the data.
>
> I would have a look at the execution plans for both variants (the
> undeterministic and the deterministic).
> You can get them as JSON String by calling
> ExecutionEnvirionment.getExecutionPlan().
>
> Best, Fabian
>
>
>
> For example, the result of a mapPartition() operator can depen
>
>
> 2015-02-14 18:55 GMT+01:00 Andra Lungu <lungu.an...@gmail.com>:
>
> > Hey guys,
> >
> > As I previously said, I have had some problems getting this DMST
> algorithm
> > to be fully functional, either with Flink 0.8 or with Flink 0.9.
> >
> > My latest problem(and I have been debugging this for quite some days) was
> > that for the test I wrote that extended MultipleProgramsTestBase, the
> > Execution mode = CLUSTER test had a very non deterministic behaviour(i.e.
> > each time it produced a different number of Actual lines, that were,
> > obviously not equal to the number of Expected lines).
> >
> >
> > The function that caused problems has the following header:
> >
> > public DataSet<Tuple5<Long, Long, Double, Long, Long>>
> > updateRootIdsForRealEdges(
> >         DataSet<Tuple5<Long, Long, Double, Long, Long>> edges,
> >         DataSet<Vertex<Long, Long>> verticesWithRootIDs) {
> >
> > After some classical "printf debugging",  I saw that the:
> >
> > DataSet<Tuple5<Long, Long, Double, Long, Long>> edges
> >
> > when performing a join on it, did not always act as if it had all the
> > values you would expect it to have.
> >
> > The code snippet that solved the problem raises some questions for me:
> >
> > DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
> > edges.map(new MapFunction<Tuple5<Long, Long, Double, Long, Long>,
> > Tuple5<Long, Long, Double, Long, Long>>() {
> >     @Override
> >     public Tuple5<Long, Long, Double, Long, Long> map(Tuple5<Long,
> > Long, Double, Long, Long> longLongDoubleLongLongTuple5) throws
> > Exception {
> >         return longLongDoubleLongLongTuple5;
> >     }
> > });
> >
> >  or
> >
> > DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
> > edges.rebalance();
> >
> > and then the join would be performed on those rebalancedEdges.
> > And now the test passes(with either of the two solutions).
> >
> > So the question is:
> > *Why  *is this happening? *Is this normal? *
> > Maybe it has something to do with the context, then how come a simple map
> > fixes everything?
> >
> > I am sorry if this may seem like one of those "Why is the sky blue?"
> > questions, but I am here to learn :D
> >
> > Thank you!
> > Andra
> >
>

Reply via email to