Sachin is right that the filter has to be inverted. Furthermore, the join operation is not right here. You have to do a kind of a left outer join where you only keep the elements which join with NULL. Here is an example of how one could do it [1].
Cheers, Till [1] http://stackoverflow.com/questions/31558326/apache-flink-filter-as-termination-condition/31559947#31559947 On Wed, Jul 22, 2015 at 2:23 PM, Sachin Goel <sachingoel0...@gmail.com> wrote: > It appears that you're returning true when the previous and current > solution are the same. You should instead return false in that case, > because this is when the iteration should terminate. > Further, instead of joining, it would be a good idea to broadcast the new > solution to the old solution [or the other way around] and have some > tolerance value instead of an exact equality check. > > Cheers! > Sachin > > -- Sachin Goel > Computer Science, IIT Delhi > m. +91-9871457685 > On Jul 22, 2015 5:46 PM, "Stephan Ewen" <se...@apache.org> wrote: > >> Termination happens if the "termination criterion" data set is empty. >> >> Maybe your filter is too aggressive and filters out everything, or the >> join is wrong and nothing joins... >> >> On Tue, Jul 21, 2015 at 5:05 PM, Pa Rö <paul.roewer1...@googlemail.com> >> wrote: >> >>> hello, >>> >>> i have define a filter for the termination condition by k-means. >>> if i run my app it always compute only one iteration. >>> >>> i think the problem is here: >>> DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, >>> newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter())); >>> or maybe the filter function: >>> public static final class MyFilter implements >>> FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> { >>> >>> private static final long serialVersionUID = >>> 5868635346889117617L; >>> >>> public boolean filter(Tuple2<GeoTimeDataCenter, >>> GeoTimeDataCenter> tuple) throws Exception { >>> if(tuple.f0.equals(tuple.f1)) { >>> return true; >>> } >>> else { >>> return false; >>> } >>> } >>> } >>> >>> best regards, >>> paul >>> >>> my full code here: >>> >>> public void run() { >>> //load properties >>> Properties pro = new Properties(); >>> FileSystem fs = null; >>> try { >>> >>> pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); >>> fs = FileSystem.get(new >>> URI(pro.getProperty("hdfs.namenode")),new >>> org.apache.hadoop.conf.Configuration()); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> >>> int maxIteration = >>> Integer.parseInt(pro.getProperty("maxiterations")); >>> String outputPath = >>> fs.getHomeDirectory()+pro.getProperty("flink.output"); >>> // set up execution environment >>> ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> // get input points >>> DataSet<GeoTimeDataTupel> points = getPointDataSet(env); >>> DataSet<GeoTimeDataCenter> centroids = null; >>> try { >>> centroids = getCentroidDataSet(env); >>> } catch (Exception e1) { >>> e1.printStackTrace(); >>> } >>> // set number of bulk iterations for KMeans algorithm >>> IterativeDataSet<GeoTimeDataCenter> loop = >>> centroids.iterate(maxIteration); >>> DataSet<GeoTimeDataCenter> newCentroids = points >>> // compute closest centroid for each point >>> .map(new >>> SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, >>> "centroids") >>> // count and sum point coordinates for each centroid >>> .groupBy(0).reduceGroup(new CentroidAccumulator()) >>> // compute new centroids from point counts and coordinate >>> sums >>> .map(new CentroidAverager(this.getBenchmarkCounter())); >>> // feed new centroids back into next iteration with termination >>> condition >>> DataSet<GeoTimeDataCenter> finalCentroids = >>> loop.closeWith(newCentroids, >>> newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter())); >>> DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = >>> points >>> // assign points to final clusters >>> .map(new >>> SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids"); >>> // emit result >>> clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); >>> finalCentroids.writeAsText(outputPath+"/centers");//print(); >>> // execute program >>> try { >>> env.execute("KMeans Flink"); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> } >>> >>> public static final class MyFilter implements >>> FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> { >>> >>> private static final long serialVersionUID = >>> 5868635346889117617L; >>> >>> public boolean filter(Tuple2<GeoTimeDataCenter, >>> GeoTimeDataCenter> tuple) throws Exception { >>> if(tuple.f0.equals(tuple.f1)) { >>> return true; >>> } >>> else { >>> return false; >>> } >>> } >>> } >>> >> >>