Hi, I have an issue with a for-loop. If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out why. With 1, 2 or 3 it runs smoothly. I attached the code below and marked the loop with //PROBLEM.
Thanks in advance! Lydia package org.apache.flink.contrib.lifescience.examples; import edu.princeton.cs.algs4.Graph; import edu.princeton.cs.algs4.SymbolDigraph; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils; import org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network; import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge; import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.util.Collector; import java.util.*; import static edu.princeton.cs.algs4.GraphGenerator.simple; public class PowerIteration { //path to input static String input = null; //path to output static String output = null; //number of iterations (default = 7) static int iterations = 7; //threshold static double delta = 0.01; public void run() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //read input file DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input); DataSet<Tuple3<Integer, Integer, Double>> eigenVector; DataSet<Tuple3<Integer, Integer, Double>> eigenValue; //initial: //Approximate EigenVector by PowerIteration eigenVector = PowerIteration_getEigenVector(matrixA); //Approximate EigenValue by PowerIteration eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector); //Deflate original matrix matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue); MyResult initial = new MyResult(eigenVector,eigenValue,matrixA); MyResult next = null; //PROBLEM!!! get i eigenvalue gaps for(int i=0;i<2;i++){ next = PowerIteration_routine(initial); initial = next; next.gap.print(); } env.execute("Power Iteration"); } public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env, String filePath) { CsvReader csvReader = env.readCsvFile(filePath); csvReader.fieldDelimiter(","); csvReader.includeFields("ttt"); return csvReader.types(Integer.class, Integer.class, Double.class); } public static final class ProjectJoinResultMapper implements MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, Tuple3<Integer, Integer, Double>> { @Override public Tuple3<Integer, Integer, Double> map( Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) throws Exception { Integer row = value.f0.f0; Integer column = value.f1.f1; Double product = value.f0.f2 * value.f1.f2; return new Tuple3<Integer, Integer, Double>(row, column, product); } } public static final class RQ implements MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, Tuple3<Integer, Integer, Double>> { @Override public Tuple3<Integer, Integer, Double> map( Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) throws Exception { return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2); } } public static void main(String[] args) throws Exception { if(args.length<2 || args.length > 4){ System.err.println("Usage: PowerIteration <input path> <result path> optional: <iterations> <threshold diff>"); System.exit(0); } input = args[0]; output = args[1]; if(args.length==3) { iterations = Integer.parseInt(args[2]); } if(args.length==4){ delta = Double.parseDouble(args[3]); } new PowerIteration2().run(); } public static final class deltaFilter implements FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> { public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) { if(!(candidate.f2 == old.f2)){ out.collect(candidate); } //if(Math.abs(candidate.f2-old.f2) > delta){ // out.collect(candidate); //} } } public static final class normalizeByMax implements MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, Tuple3<Integer, Integer, Double>> { public Tuple3<Integer, Integer, Double> map( Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) throws Exception { return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2)); } } public static final class firstX implements MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, Tuple3<Integer, Integer, Double>> { public Tuple3<Integer, Integer, Double> map( Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) throws Exception { return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2)); } } public static final class resetIndex implements MapFunction<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> { public Tuple3<Integer, Integer, Double> map( Tuple3<Integer, Integer, Double>value) throws Exception { return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2); } } public static final class decBy1 implements MapFunction<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> { public Tuple3<Integer, Integer, Double> map( Tuple3<Integer, Integer, Double>value) throws Exception { return new Tuple3<Integer, Integer, Double>(value.f0-1,value.f1-1,value.f2); } } public static final class resetIndex2 implements MapFunction<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> { public Tuple3<Integer, Integer, Double> map( Tuple3<Integer, Integer, Double>value) throws Exception { return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2); } } public static final class MatrixTimesValue implements MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, Tuple3<Integer, Integer, Double>> { @Override public Tuple3<Integer, Integer, Double> map( Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) throws Exception { return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2)); } } public static final class MatrixMinusMatrix implements MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, Tuple3<Integer, Integer, Double>> { @Override public Tuple3<Integer, Integer, Double> map( Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) throws Exception { Integer row = value.f0.f0; Integer column = value.f0.f1; Double result = value.f0.f2 - value.f1.f2; return new Tuple3<Integer, Integer, Double>(row, column, result); } } public static final class getGapCenter implements MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>, Tuple3<Integer, Integer, Double>> { @Override public Tuple3<Integer, Integer, Double> map( Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value) throws Exception { Integer row = value.f0.f0; Integer column = value.f0.f1; Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2))); return new Tuple3<Integer, Integer, Double>(row, column, result); } } public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenVector(DataSet<Tuple3<Integer, Integer, Double>> matrixA) throws Exception { //get initial vector - which equals matrixA * [1, ... , 1] DataSet<Tuple3<Integer, Integer, Double>> initial0 = matrixA.groupBy(0).aggregate(Aggregations.SUM,2); //normalize by maximum value DataSet<Tuple3<Integer, Integer, Double>> initial= initial0.cross(initial0.maxBy(2)).map(new normalizeByMax()); //BulkIteration to find dominant eigenvector IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = initial.iterate(iterations); DataSet<Tuple3<Integer, Integer, Double>> intermediate = (matrixA.join(iteration).where(1).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2). cross((matrixA.join(iteration).where(1).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).maxBy(2)) .map(new normalizeByMax()); DataSet<Tuple3<Integer, Integer, Double>> diffs = iteration.join(intermediate).where(0).equalTo(0).with(new deltaFilter()); DataSet<Tuple3<Integer, Integer, Double>> eigenVector = iteration.closeWith(intermediate,diffs); return eigenVector; } public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenValue(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector) { //determine now EigenValue by approximating the Rayleigh Quotient: //get Ax DataSet<Tuple3<Integer, Integer, Double>> Ax = matrixA.join(eigenVector).where(1).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).groupBy(0).aggregate(Aggregations.SUM, 2); //get Ax * x DataSet<Tuple3<Integer, Integer, Double>> Axx = eigenVector.join(Ax).where(0).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2); //now x * x DataSet<Tuple3<Integer, Integer, Double>> xx = eigenVector.join(eigenVector).where(0).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2); return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2); } public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue) { DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = eigenValue.map(new resetIndex()); DataSet<Tuple3<Integer, Integer, Double>> firstVal = eigenVector.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() { public boolean filter(Tuple3<Integer, Integer, Double> value) { return value.f0 == 0; } }); firstVal = eigenValueReset.cross(firstVal).map(new firstX()); DataSet<Tuple3<Integer, Integer, Double>> firstRow = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() { public boolean filter(Tuple3<Integer, Integer, Double> value) { return value.f0 == 0; } }); DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new MatrixTimesValue())).map(new DataSetUtils.transpose()); DataSet<Tuple3<Integer, Integer, Double>> C = eigenVector.cross(eigenValueReset).map(new MatrixTimesValue()).map(new resetIndex2()).join(x).where(1).equalTo(0). map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2); matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new MatrixMinusMatrix()); matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() { public boolean filter(Tuple3<Integer, Integer, Double> value) { return (value.f0 != 0) && (value.f1 != 0); } }); return matrixA.map(new decBy1()); } public MyResult PowerIteration_routine(MyResult initial) throws Exception { //Approximate EigenVector by PowerIteration DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector(initial.matrixA); //Approximate EigenValue by PowerIteration DataSet<Tuple3<Integer, Integer, Double>> eigenValue = PowerIteration_getEigenValue(initial.matrixA, eigenVector); //get gap DataSet<Tuple3<Integer, Integer, Double>> gap = initial.eigenValue.cross(eigenValue).map(new MatrixMinusMatrix()); //Deflate original matrix DataSet<Tuple3<Integer, Integer, Double>> matrixA = PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue); return new MyResult(eigenVector,eigenValue,matrixA,gap); } public class MyResult { DataSet<Tuple3<Integer, Integer, Double>> eigenVector; DataSet<Tuple3<Integer, Integer, Double>> eigenValue; DataSet<Tuple3<Integer, Integer, Double>> gap; DataSet<Tuple3<Integer, Integer, Double>> matrixA; public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA){ this.eigenVector = eigenVector; this.eigenValue =eigenValue; this.matrixA = matrixA; } public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> gap){ this.eigenVector = eigenVector; this.eigenValue =eigenValue; this.matrixA = matrixA; this.gap = gap; } } }