hello community,

i have write a k-means app in flink, now i want change my terminate
condition from max iteration to checking the changing of the cluster
centers, but i don't know how i can break the flink loop. here my execution
code of flink:

public void run() {
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {

pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new
URI(pro.getProperty("hdfs.namenode")),new
org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }

        int maxIteration =
Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath =
fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop =
centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new
SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop,
"centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids =
loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new
SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

is it possible to use a contruct like: if(centroids equals points){break
the loop}???

best regards,
paul

Reply via email to