hello community, i have write a k-means app in flink, now i want change my terminate condition from max iteration to checking the changing of the cluster centers, but i don't know how i can break the flink loop. here my execution code of flink:
public void run() { //load properties Properties pro = new Properties(); FileSystem fs = null; try { pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration()); } catch (Exception e) { e.printStackTrace(); } int maxIteration = Integer.parseInt(pro.getProperty("maxiterations")); String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output"); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); DataSet<GeoTimeDataCenter> centroids = null; try { centroids = getCentroidDataSet(env); } catch (Exception e1) { e1.printStackTrace(); } // set number of bulk iterations for KMeans algorithm IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration); DataSet<GeoTimeDataCenter> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .groupBy(0).reduceGroup(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager(this.getBenchmarkCounter())); // feed new centroids back into next iteration DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids); DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids"); // emit result clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } } is it possible to use a contruct like: if(centroids equals points){break the loop}??? best regards, paul