hello, i have define a filter for the termination condition by k-means. if i run my app it always compute only one iteration.
i think the problem is here: DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter())); or maybe the filter function: public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> { private static final long serialVersionUID = 5868635346889117617L; public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception { if(tuple.f0.equals(tuple.f1)) { return true; } else { return false; } } } best regards, paul my full code here: public void run() { //load properties Properties pro = new Properties(); FileSystem fs = null; try { pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration()); } catch (Exception e) { e.printStackTrace(); } int maxIteration = Integer.parseInt(pro.getProperty("maxiterations")); String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output"); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); DataSet<GeoTimeDataCenter> centroids = null; try { centroids = getCentroidDataSet(env); } catch (Exception e1) { e1.printStackTrace(); } // set number of bulk iterations for KMeans algorithm IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration); DataSet<GeoTimeDataCenter> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .groupBy(0).reduceGroup(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager(this.getBenchmarkCounter())); // feed new centroids back into next iteration with termination condition DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter())); DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids"); // emit result clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } } public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> { private static final long serialVersionUID = 5868635346889117617L; public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception { if(tuple.f0.equals(tuple.f1)) { return true; } else { return false; } } }