Use a broadcastset to distribute the old centers to a map which has the new centers as regular input. Put the old centers in a hashmap in open() and check the distance to the new centers in map(). On Jul 20, 2015 12:55 PM, "Pa Rö" <paul.roewer1...@googlemail.com> wrote:
> okay, i have found it. how to compare my old and new centers? > > 2015-07-20 12:16 GMT+02:00 Sachin Goel <sachingoel0...@gmail.com>: > >> Gah. Sorry. >> In the closeWith call, give a second argument which determines if the >> iteration should be stopped. >> >> -- Sachin Goel >> Computer Science, IIT Delhi >> m. +91-9871457685 >> On Jul 20, 2015 3:21 PM, "Pa Rö" <paul.roewer1...@googlemail.com> wrote: >> >>> i not found the "iterateWithTermination" function, only "iterate" and >>> "iterateDelta". i use flink 0.9.0 with java. >>> >>> 2015-07-20 11:30 GMT+02:00 Sachin Goel <sachingoel0...@gmail.com>: >>> >>>> Hi >>>> You can use iterateWithTermination to terminate before max iterations. >>>> The feedback for iteration then would be (next solution, isConverged) where >>>> isConverged is an empty data set if you wish to terminate. >>>> However, this is something I have a pull request for: >>>> https://github.com/apache/flink/pull/918. Take a look. >>>> >>>> -- Sachin Goel >>>> Computer Science, IIT Delhi >>>> m. +91-9871457685 >>>> >>>> On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <paul.roewer1...@googlemail.com> >>>> wrote: >>>> >>>>> hello community, >>>>> >>>>> i have write a k-means app in flink, now i want change my terminate >>>>> condition from max iteration to checking the changing of the cluster >>>>> centers, but i don't know how i can break the flink loop. here my >>>>> execution >>>>> code of flink: >>>>> >>>>> public void run() { >>>>> //load properties >>>>> Properties pro = new Properties(); >>>>> FileSystem fs = null; >>>>> try { >>>>> >>>>> pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); >>>>> fs = FileSystem.get(new >>>>> URI(pro.getProperty("hdfs.namenode")),new >>>>> org.apache.hadoop.conf.Configuration()); >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> >>>>> int maxIteration = >>>>> Integer.parseInt(pro.getProperty("maxiterations")); >>>>> String outputPath = >>>>> fs.getHomeDirectory()+pro.getProperty("flink.output"); >>>>> // set up execution environment >>>>> ExecutionEnvironment env = >>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>> // get input points >>>>> DataSet<GeoTimeDataTupel> points = getPointDataSet(env); >>>>> DataSet<GeoTimeDataCenter> centroids = null; >>>>> try { >>>>> centroids = getCentroidDataSet(env); >>>>> } catch (Exception e1) { >>>>> e1.printStackTrace(); >>>>> } >>>>> // set number of bulk iterations for KMeans algorithm >>>>> IterativeDataSet<GeoTimeDataCenter> loop = >>>>> centroids.iterate(maxIteration); >>>>> DataSet<GeoTimeDataCenter> newCentroids = points >>>>> // compute closest centroid for each point >>>>> .map(new >>>>> SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, >>>>> "centroids") >>>>> // count and sum point coordinates for each centroid >>>>> .groupBy(0).reduceGroup(new CentroidAccumulator()) >>>>> // compute new centroids from point counts and coordinate >>>>> sums >>>>> .map(new CentroidAverager(this.getBenchmarkCounter())); >>>>> // feed new centroids back into next iteration >>>>> DataSet<GeoTimeDataCenter> finalCentroids = >>>>> loop.closeWith(newCentroids); >>>>> DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = >>>>> points >>>>> // assign points to final clusters >>>>> .map(new >>>>> SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids"); >>>>> // emit result >>>>> clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); >>>>> finalCentroids.writeAsText(outputPath+"/centers");//print(); >>>>> // execute program >>>>> try { >>>>> env.execute("KMeans Flink"); >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> is it possible to use a contruct like: if(centroids equals >>>>> points){break the loop}??? >>>>> >>>>> best regards, >>>>> paul >>>>> >>>> >>>> >>> >