Hi, I have a question regarding the Delta Iteration. I basically want to iterate as long as the former and the new calculated set are different. Stop if they are the same.
Right now I get a result set that has entries with duplicate „row“ indices which should not be the case. I guess I am doing something wrong in the iteration.closeWith(intermediate, diffs); Maybe I am sending only parts of the set but for the Multiplication (ProjectJoinResultMapper()) I need the whole DataSet. Could somebody please hint me in the right direction? Thanks in advance! This is what I have right now: DataSet<Tuple3<Integer, Integer, Double>> initial = matrixA.groupBy(0).sum(); //normalize by maximum value initial = initial.cross(initial.sum(2)).map(new normalizeByMax()); DeltaIteration<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> iteration = initial.iterateDelta(initial, 1, 0,1); DataSet<Tuple3<Integer, Integer, Double>> intermediate = matrixA.join(iteration.getWorkset()).where(1).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).groupBy(0).sum(2).cross(matrixA.join(iteration.getWorkset()).where(1).equalTo(0) .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).groupBy(0).sum(2).max(2)).map(new normalizeByMax()); DataSet<Tuple3<Integer, Integer, Double>> diffs = intermediate.join(iteration.getSolutionSet()).where(0,1).equalTo(0,1).with(new ComponentIdFilter()); DataSet<Tuple3<Integer, Integer, Double>> result = iteration.closeWith(intermediate, diffs); public static final class ComponentIdFilter implements FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> { public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) { if(!candidate.f2.equals(old.f2)){ out.collect(candidate); } } }