i try the following, but it take always maxIterations, maybe someone can give me a review?
private int benchmarkCounter; private static int iterationCounter = 1; private static DataSet<GeoTimeDataCenter> oldCentroids; FlinkMain(int benchmarkCounter) { this.benchmarkCounter = benchmarkCounter; } public int getBenchmarkCounter() { return benchmarkCounter; } public void run() { //load properties Properties pro = new Properties(); FileSystem fs = null; try { pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration()); } catch (Exception e) { e.printStackTrace(); } int maxIteration = Integer.parseInt(pro.getProperty("maxiterations")); String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output"); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); DataSet<GeoTimeDataCenter> centroids = null; try { centroids = getCentroidDataSet(env); } catch (Exception e1) { e1.printStackTrace(); } // set number of bulk iterations for KMeans algorithm IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration); DataSet<GeoTimeDataCenter> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .groupBy(0).reduceGroup(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager(this.getBenchmarkCounter())); // check if centers been updated DataSet<GeoTimeDataCenter> checkCentroids = null; if(this.getBenchmarkCounter()==0) { oldCentroids = newCentroids; } else { try { HashMap<Integer, GeoTimeDataTupel> oldMap = new HashMap<Integer, GeoTimeDataTupel>(); for (GeoTimeDataCenter i : oldCentroids.collect()) oldMap.put(i.getId(),new GeoTimeDataTupel(i.getGeo(),i.getTime(),i.getidGDELT())); HashMap<Integer, GeoTimeDataTupel> newMap = new HashMap<Integer, GeoTimeDataTupel>(); for (GeoTimeDataCenter i : newCentroids.collect()) oldMap.put(i.getId(),new GeoTimeDataTupel(i.getGeo(),i.getTime(),i.getidGDELT())); if(!GeoTimeDataHelper.compareCentersMaps(oldMap, newMap)) { checkCentroids = newCentroids; } else { checkCentroids = null; } oldCentroids = newCentroids; } catch(Exception e) { e.printStackTrace(); } } // feed new centroids back into next iteration DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, checkCentroids); DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids"); // emit result clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } } 2015-07-20 12:58 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Use a broadcastset to distribute the old centers to a map which has the > new centers as regular input. Put the old centers in a hashmap in open() > and check the distance to the new centers in map(). > On Jul 20, 2015 12:55 PM, "Pa Rö" <paul.roewer1...@googlemail.com> wrote: > >> okay, i have found it. how to compare my old and new centers? >> >> 2015-07-20 12:16 GMT+02:00 Sachin Goel <sachingoel0...@gmail.com>: >> >>> Gah. Sorry. >>> In the closeWith call, give a second argument which determines if the >>> iteration should be stopped. >>> >>> -- Sachin Goel >>> Computer Science, IIT Delhi >>> m. +91-9871457685 >>> On Jul 20, 2015 3:21 PM, "Pa Rö" <paul.roewer1...@googlemail.com> wrote: >>> >>>> i not found the "iterateWithTermination" function, only "iterate" and >>>> "iterateDelta". i use flink 0.9.0 with java. >>>> >>>> 2015-07-20 11:30 GMT+02:00 Sachin Goel <sachingoel0...@gmail.com>: >>>> >>>>> Hi >>>>> You can use iterateWithTermination to terminate before max iterations. >>>>> The feedback for iteration then would be (next solution, isConverged) >>>>> where >>>>> isConverged is an empty data set if you wish to terminate. >>>>> However, this is something I have a pull request for: >>>>> https://github.com/apache/flink/pull/918. Take a look. >>>>> >>>>> -- Sachin Goel >>>>> Computer Science, IIT Delhi >>>>> m. +91-9871457685 >>>>> >>>>> On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <paul.roewer1...@googlemail.com >>>>> > wrote: >>>>> >>>>>> hello community, >>>>>> >>>>>> i have write a k-means app in flink, now i want change my terminate >>>>>> condition from max iteration to checking the changing of the cluster >>>>>> centers, but i don't know how i can break the flink loop. here my >>>>>> execution >>>>>> code of flink: >>>>>> >>>>>> public void run() { >>>>>> //load properties >>>>>> Properties pro = new Properties(); >>>>>> FileSystem fs = null; >>>>>> try { >>>>>> >>>>>> pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); >>>>>> fs = FileSystem.get(new >>>>>> URI(pro.getProperty("hdfs.namenode")),new >>>>>> org.apache.hadoop.conf.Configuration()); >>>>>> } catch (Exception e) { >>>>>> e.printStackTrace(); >>>>>> } >>>>>> >>>>>> int maxIteration = >>>>>> Integer.parseInt(pro.getProperty("maxiterations")); >>>>>> String outputPath = >>>>>> fs.getHomeDirectory()+pro.getProperty("flink.output"); >>>>>> // set up execution environment >>>>>> ExecutionEnvironment env = >>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>> // get input points >>>>>> DataSet<GeoTimeDataTupel> points = getPointDataSet(env); >>>>>> DataSet<GeoTimeDataCenter> centroids = null; >>>>>> try { >>>>>> centroids = getCentroidDataSet(env); >>>>>> } catch (Exception e1) { >>>>>> e1.printStackTrace(); >>>>>> } >>>>>> // set number of bulk iterations for KMeans algorithm >>>>>> IterativeDataSet<GeoTimeDataCenter> loop = >>>>>> centroids.iterate(maxIteration); >>>>>> DataSet<GeoTimeDataCenter> newCentroids = points >>>>>> // compute closest centroid for each point >>>>>> .map(new >>>>>> SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, >>>>>> "centroids") >>>>>> // count and sum point coordinates for each centroid >>>>>> .groupBy(0).reduceGroup(new CentroidAccumulator()) >>>>>> // compute new centroids from point counts and coordinate >>>>>> sums >>>>>> .map(new CentroidAverager(this.getBenchmarkCounter())); >>>>>> // feed new centroids back into next iteration >>>>>> DataSet<GeoTimeDataCenter> finalCentroids = >>>>>> loop.closeWith(newCentroids); >>>>>> DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = >>>>>> points >>>>>> // assign points to final clusters >>>>>> .map(new >>>>>> SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids"); >>>>>> // emit result >>>>>> clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); >>>>>> finalCentroids.writeAsText(outputPath+"/centers");//print(); >>>>>> // execute program >>>>>> try { >>>>>> env.execute("KMeans Flink"); >>>>>> } catch (Exception e) { >>>>>> e.printStackTrace(); >>>>>> } >>>>>> } >>>>>> >>>>>> is it possible to use a contruct like: if(centroids equals >>>>>> points){break the loop}??? >>>>>> >>>>>> best regards, >>>>>> paul >>>>>> >>>>> >>>>> >>>> >>