hi, if i print the centroids all are show in the output. i have implement k means with map reduce und spark. by same input, i get the same output. but in flink i get a one cluster output with this input set. (i use csv files from the GDELT projekt)
here my class: public class FlinkMain { public static void main(String[] args) { //load properties Properties pro = new Properties(); try { pro.load(new FileInputStream("./resources/config.properties")); } catch (Exception e) { e.printStackTrace(); } int maxIteration = 1;//Integer.parseInt(pro.getProperty("maxiterations")); String outputPath = pro.getProperty("flink.output"); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env); // set number of bulk iterations for KMeans algorithm IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration); DataSet<GeoTimeDataCenter> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .groupBy(0).reduce(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager()); // feed new centroids back into next iteration DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids); DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); // emit result clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } } private static final class SelectNearestCenter extends RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> { private static final long serialVersionUID = -2729445046389350264L; private Collection<GeoTimeDataCenter> centroids; @Override public void open(Configuration parameters) throws Exception { this.centroids = getRuntimeContext().getBroadcastVariable("centroids"); } @Override public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel point) throws Exception { double minDistance = Double.MAX_VALUE; int closestCentroidId= -1; // check all cluster centers for(GeoTimeDataCenter centroid : centroids) { // compute distance double distance = Distance.ComputeDist(point, centroid); // update nearest cluster if necessary if(distance < minDistance) { minDistance = distance; closestCentroidId = centroid.getId(); } } // emit a new record with the center id and the data point return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId, point); } } // sums and counts point coordinates private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> { private static final long serialVersionUID = -4868797820391121771L; public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) { return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1)); } } private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, GeoTimeDataTupel input2){ long time = (input1.getTime()+input2.getTime())/2; List<LatLongSeriable> list = new ArrayList<LatLongSeriable>(); list.add(input1.getGeo()); list.add(input2.getGeo()); LatLongSeriable geo = Geometry.getGeoCenterOf(list); return new GeoTimeDataTupel(geo,time,"POINT"); } // computes new centroid from coordinate sum and count of points private static final class CentroidAverager implements MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> { private static final long serialVersionUID = -2687234478847261803L; public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> value) { return new GeoTimeDataCenter(value.f0, value.f1.getGeo(),value.f1.getTime()); } } private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) { // load properties Properties pro = new Properties(); try { pro.load(new FileInputStream("./resources/config.properties")); } catch (Exception e) { e.printStackTrace(); } String inputFile = pro.getProperty("input"); // map csv file return env.readCsvFile(inputFile) .ignoreInvalidLines() .fieldDelimiter('\u0009') //.fieldDelimiter("\t") //.lineDelimiter("\n") .includeFields(true, true, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false, true, true , false, false, false, false, false, false, false, false, false, false , false, false, false, false, false, false, false, false) //.includeFields(true,true,true,true) .types(String.class, Long.class, Double.class, Double.class) .map(new TuplePointConverter()); } private static final class TuplePointConverter implements MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{ private static final long serialVersionUID = 3485560278562719538L; public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> t) throws Exception { return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), t.f1, t.f0); } } private static DataSet<GeoTimeDataCenter> getCentroidDataSet(ExecutionEnvironment env) { // load properties Properties pro = new Properties(); try { pro.load(new FileInputStream("./resources/config.properties")); } catch (Exception e) { e.printStackTrace(); } String seedFile = pro.getProperty("seed.file"); boolean seedFlag = Boolean.parseBoolean(pro.getProperty("seed.flag")); // get points from file or random if(seedFlag || !(new File(seedFile+"-1").exists())) { Seeding.randomSeeding(); } // map csv file return env.readCsvFile(seedFile+"-1") .lineDelimiter("\n") .fieldDelimiter('\u0009') //.fieldDelimiter("\t") .includeFields(true, true, true, true) .types(Integer.class, Double.class, Double.class, Long.class) .map(new TupleCentroidConverter()); } private static final class TupleCentroidConverter implements MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{ private static final long serialVersionUID = -1046538744363026794L; public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long> t) throws Exception { return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, t.f2), t.f3); } } } 2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > Concerning your first problem that you only see one resulting centroid, > your code looks good modulo the parts you haven't posted. > > However, your problem could simply be caused by a bad selection of initial > centroids. If, for example, all centroids except for one don't get any > points assigned, then only one centroid will survive the iteration step. > How do you do it? > > To check that all centroids are read you can print the contents of the > centroids DataSet. Furthermore, you can simply println the new centroids > after each iteration step. In local mode you can then observe the > computation. > > Cheers, > Till > > On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> wrote: > >> Hi! >> >> This problem should not depend on any user code. There are no user-code >> dependent actors in Flink. >> >> Is there more stack trace that you can send us? It looks like it misses >> the core exception that is causing the issue is not part of the stack trace. >> >> Greetings, >> Stephan >> >> >> >> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <paul.roewer1...@googlemail.com> >> wrote: >> >>> hi flink community, >>> >>> i have implement k-means for clustering temporal geo data. i use the >>> following github project and my own data structure: >>> >>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java >>> >>> not i have the problem, that flink read the centroids from file and work >>> parallel futher. if i look at the results, i have the feeling, that the >>> prgramm load only one centroid point. >>> >>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the >>> following exception: >>> ERROR actor.OneForOneStrategy: exception during creation >>> akka.actor.ActorInitializationException: exception during creation >>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218) >>> at akka.actor.ActorCell.create(ActorCell.scala:578) >>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) >>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) >>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:218) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: java.lang.reflect.InvocationTargetException >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>> Method) >>> at >>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>> at akka.util.Reflect$.instantiate(Reflect.scala:65) >>> at akka.actor.Props.newActor(Props.scala:337) >>> at akka.actor.ActorCell.newActor(ActorCell.scala:534) >>> at akka.actor.ActorCell.create(ActorCell.scala:560) >>> ... 9 more >>> >>> how can i say flink, that it should be wait for loading dataset, and >>> what say this exception? >>> >>> best regards, >>> paul >>> >> >> >