hi, the exception came with version 0.9. with version 0.8.1 came no exception, but the results are foobar.
here my main: public static void main(String[] args) { //load properties Properties pro = new Properties(); try { pro.load(new FileInputStream("./resources/config.properties")); } catch (Exception e) { e.printStackTrace(); } int maxIteration = 2;//Integer.parseInt(pro.getProperty("maxiterations")); String outputPath = pro.getProperty("flink.output"); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env); // set number of bulk iterations for KMeans algorithm IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration); DataSet<GeoTimeDataCenter> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .groupBy(0).reduce(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager()); // feed new centroids back into next iteration DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids); DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); // emit result clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } } 2015-05-21 11:28 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > Hi Paul, > > could you share your code with us so that we see whether there is any > error. > > Does this error also occurs with 0.9-SNAPSHOT? > > Cheers, > Till > > Che > > On Thu, May 21, 2015 at 11:11 AM, Pa Rö <paul.roewer1...@googlemail.com> > wrote: > >> hi flink community, >> >> i have implement k-means for clustering temporal geo data. i use the >> following github project and my own data structure: >> >> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java >> >> not i have the problem, that flink read the centroids from file and work >> parallel futher. if i look at the results, i have the feeling, that the >> prgramm load only one centroid point. >> >> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the >> following exception: >> ERROR actor.OneForOneStrategy: exception during creation >> akka.actor.ActorInitializationException: exception during creation >> at akka.actor.ActorInitializationException$.apply(Actor.scala:218) >> at akka.actor.ActorCell.create(ActorCell.scala:578) >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) >> at akka.dispatch.Mailbox.run(Mailbox.scala:218) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.reflect.InvocationTargetException >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >> at akka.util.Reflect$.instantiate(Reflect.scala:65) >> at akka.actor.Props.newActor(Props.scala:337) >> at akka.actor.ActorCell.newActor(ActorCell.scala:534) >> at akka.actor.ActorCell.create(ActorCell.scala:560) >> ... 9 more >> >> how can i say flink, that it should be wait for loading dataset, and what >> say this exception? >> >> best regards, >> paul >> > >