i try the following, but it take always maxIterations, maybe someone can
give me a review?

    private int benchmarkCounter;
    private static int iterationCounter = 1;
    private static DataSet<GeoTimeDataCenter> oldCentroids;


    FlinkMain(int benchmarkCounter) {
        this.benchmarkCounter = benchmarkCounter;
    }

    public int getBenchmarkCounter() {
        return benchmarkCounter;
    }

    public void run() {
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {

pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new
URI(pro.getProperty("hdfs.namenode")),new
org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }

        int maxIteration =
Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath =
fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop =
centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new
SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop,
"centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // check if centers been updated
        DataSet<GeoTimeDataCenter> checkCentroids = null;
        if(this.getBenchmarkCounter()==0) {
            oldCentroids = newCentroids;
        }
        else {
            try {
                HashMap<Integer, GeoTimeDataTupel> oldMap = new
HashMap<Integer, GeoTimeDataTupel>();
                for (GeoTimeDataCenter i : oldCentroids.collect())
oldMap.put(i.getId(),new
GeoTimeDataTupel(i.getGeo(),i.getTime(),i.getidGDELT()));
                HashMap<Integer, GeoTimeDataTupel> newMap = new
HashMap<Integer, GeoTimeDataTupel>();
                for (GeoTimeDataCenter i : newCentroids.collect())
oldMap.put(i.getId(),new
GeoTimeDataTupel(i.getGeo(),i.getTime(),i.getidGDELT()));
                if(!GeoTimeDataHelper.compareCentersMaps(oldMap, newMap)) {
                    checkCentroids = newCentroids;
                }
                else {
                    checkCentroids = null;
                }
                oldCentroids = newCentroids;
            }
            catch(Exception e) {
                e.printStackTrace();
            }

        }
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids =
loop.closeWith(newCentroids, checkCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new
SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2015-07-20 12:58 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Use a broadcastset to distribute the old centers to a map which has the
> new centers as regular input. Put the old centers in a hashmap in open()
> and check the distance to the new centers in map().
> On Jul 20, 2015 12:55 PM, "Pa Rö" <paul.roewer1...@googlemail.com> wrote:
>
>> okay, i have found it. how to compare my old and new centers?
>>
>> 2015-07-20 12:16 GMT+02:00 Sachin Goel <sachingoel0...@gmail.com>:
>>
>>> Gah. Sorry.
>>> In the closeWith call, give a second argument which determines if the
>>> iteration should be stopped.
>>>
>>> -- Sachin Goel
>>> Computer Science, IIT Delhi
>>> m. +91-9871457685
>>> On Jul 20, 2015 3:21 PM, "Pa Rö" <paul.roewer1...@googlemail.com> wrote:
>>>
>>>> i not found the "iterateWithTermination" function, only "iterate" and
>>>> "iterateDelta". i use flink 0.9.0 with java.
>>>>
>>>> 2015-07-20 11:30 GMT+02:00 Sachin Goel <sachingoel0...@gmail.com>:
>>>>
>>>>> Hi
>>>>> You can use iterateWithTermination to terminate before max iterations.
>>>>> The feedback for iteration then would be (next solution, isConverged) 
>>>>> where
>>>>> isConverged is an empty data set if you wish to terminate.
>>>>> However, this is something I have a pull request for:
>>>>> https://github.com/apache/flink/pull/918. Take a look.
>>>>>
>>>>> -- Sachin Goel
>>>>> Computer Science, IIT Delhi
>>>>> m. +91-9871457685
>>>>>
>>>>> On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <paul.roewer1...@googlemail.com
>>>>> > wrote:
>>>>>
>>>>>> hello community,
>>>>>>
>>>>>> i have write a k-means app in flink, now i want change my terminate
>>>>>> condition from max iteration to checking the changing of the cluster
>>>>>> centers, but i don't know how i can break the flink loop. here my 
>>>>>> execution
>>>>>> code of flink:
>>>>>>
>>>>>> public void run() {
>>>>>>         //load properties
>>>>>>         Properties pro = new Properties();
>>>>>>         FileSystem fs = null;
>>>>>>         try {
>>>>>>
>>>>>> pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
>>>>>>             fs = FileSystem.get(new
>>>>>> URI(pro.getProperty("hdfs.namenode")),new
>>>>>> org.apache.hadoop.conf.Configuration());
>>>>>>         } catch (Exception e) {
>>>>>>             e.printStackTrace();
>>>>>>         }
>>>>>>
>>>>>>         int maxIteration =
>>>>>> Integer.parseInt(pro.getProperty("maxiterations"));
>>>>>>         String outputPath =
>>>>>> fs.getHomeDirectory()+pro.getProperty("flink.output");
>>>>>>         // set up execution environment
>>>>>>         ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>         // get input points
>>>>>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>>>>>         DataSet<GeoTimeDataCenter> centroids = null;
>>>>>>         try {
>>>>>>             centroids = getCentroidDataSet(env);
>>>>>>         } catch (Exception e1) {
>>>>>>             e1.printStackTrace();
>>>>>>         }
>>>>>>         // set number of bulk iterations for KMeans algorithm
>>>>>>         IterativeDataSet<GeoTimeDataCenter> loop =
>>>>>> centroids.iterate(maxIteration);
>>>>>>         DataSet<GeoTimeDataCenter> newCentroids = points
>>>>>>             // compute closest centroid for each point
>>>>>>             .map(new
>>>>>> SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop,
>>>>>> "centroids")
>>>>>>             // count and sum point coordinates for each centroid
>>>>>>             .groupBy(0).reduceGroup(new CentroidAccumulator())
>>>>>>             // compute new centroids from point counts and coordinate
>>>>>> sums
>>>>>>             .map(new CentroidAverager(this.getBenchmarkCounter()));
>>>>>>         // feed new centroids back into next iteration
>>>>>>         DataSet<GeoTimeDataCenter> finalCentroids =
>>>>>> loop.closeWith(newCentroids);
>>>>>>         DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints =
>>>>>> points
>>>>>>             // assign points to final clusters
>>>>>>             .map(new
>>>>>> SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
>>>>>>         // emit result
>>>>>>         clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>>>>>>         finalCentroids.writeAsText(outputPath+"/centers");//print();
>>>>>>         // execute program
>>>>>>         try {
>>>>>>             env.execute("KMeans Flink");
>>>>>>         } catch (Exception e) {
>>>>>>             e.printStackTrace();
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>> is it possible to use a contruct like: if(centroids equals
>>>>>> points){break the loop}???
>>>>>>
>>>>>> best regards,
>>>>>> paul
>>>>>>
>>>>>
>>>>>
>>>>
>>

Reply via email to