Hi, could you please provide a minimal example input and maybe also the output for parallelism=5 and parallelism=1 so that we can check.
-- aljoscha On Mon, 4 Apr 2016 at 09:52 Lydia Ickler <ickle...@googlemail.com> wrote: > Hi all, > > I have an issue regarding execution on 1 machine VS 5 machines. > If I execute the following code the results are not the same though I > would expect them to be since the input file is the same. > Do you have any suggestions? > > Thanks in advance! > Lydia > > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setGlobalJobParameters(parameters); > > //read input file > DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, > parameters.get("input")); > //Approximate EigenVector by PowerIteration > //get initial vector - which equals matrixA * [1, ... , 1] > DataSet<Tuple3<Integer, Integer, Double>> initial0 = > (matrixA.groupBy(0)).sum(2); > DataSet<Tuple3<Integer, Integer, Double>> maximum = initial0.maxBy(2); > //normalize by maximum value > DataSet<Tuple3<Integer, Integer, Double>> initial= > (initial0.cross(maximum)).map(new normalizeByMax()); > > //BulkIteration to find dominant eigenvector > IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = > initial.iterate(1); > > DataSet<Tuple3<Integer, Integer, Double>> intermediate = > ((((((matrixA.join(iteration).where(1).equalTo(0)) > .map(new ProjectJoinResultMapper())).groupBy(0, > 1)).sum(2)).groupBy(0)).sum(2)). > cross(((((((((matrixA.join(iteration).where(1).equalTo(0)) > .map(new ProjectJoinResultMapper())).groupBy(0, > 1)).sum(2))).groupBy(0)).sum(2)).sum(2))) > .map(new normalizeByMax()); > > DataSet<Tuple3<Integer, Integer, Double>> diffs = > (iteration.join(intermediate).where(0).equalTo(0)).with(new deltaFilter()); > DataSet<Tuple3<Integer, Integer, Double>> eigenVector = > iteration.closeWith(intermediate,diffs); > > eigenVector.writeAsCsv(parameters.get("output")); > env.execute("Power Iteration"); > > >