i have fix a bug at the input reading, but the results are still different.
i think i have local the problem, in the other implementation i sum all geo points/time points and share thougt the counter. but in flink i sum two points and share thougt two, and sum the next... the method is the following: // sums and counts point coordinates private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> { private static final long serialVersionUID = -4868797820391121771L; public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) { return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f0,val1.f1,val2.f1)); } } private static GeoTimeDataTupel addAndDiv(int clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){ long time = (input1.getTime()+input2.getTime())/2; List<LatLongSeriable> list = new ArrayList<LatLongSeriable>(); list.add(input1.getGeo()); list.add(input2.getGeo()); LatLongSeriable geo = Geometry.getGeoCenterOf(list); return new GeoTimeDataTupel(geo,time,"POINT"); } how i can sum all points and share thoug the counter? 2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > hi, > if i print the centroids all are show in the output. i have implement k > means with map reduce und spark. by same input, i get the same output. but > in flink i get a one cluster output with this input set. (i use csv files > from the GDELT projekt) > > here my class: > > public class FlinkMain { > > > public static void main(String[] args) { > //load properties > Properties pro = new Properties(); > try { > pro.load(new FileInputStream("./resources/config.properties")); > } catch (Exception e) { > e.printStackTrace(); > } > int maxIteration = > 1;//Integer.parseInt(pro.getProperty("maxiterations")); > String outputPath = pro.getProperty("flink.output"); > // set up execution environment > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > // get input points > DataSet<GeoTimeDataTupel> points = getPointDataSet(env); > DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env); > // set number of bulk iterations for KMeans algorithm > IterativeDataSet<GeoTimeDataCenter> loop = > centroids.iterate(maxIteration); > DataSet<GeoTimeDataCenter> newCentroids = points > // compute closest centroid for each point > .map(new SelectNearestCenter()).withBroadcastSet(loop, > "centroids") > // count and sum point coordinates for each centroid > .groupBy(0).reduce(new CentroidAccumulator()) > // compute new centroids from point counts and coordinate sums > .map(new CentroidAverager()); > // feed new centroids back into next iteration > DataSet<GeoTimeDataCenter> finalCentroids = > loop.closeWith(newCentroids); > DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points > // assign points to final clusters > .map(new > SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); > // emit result > clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); > finalCentroids.writeAsText(outputPath+"/centers");//print(); > // execute program > try { > env.execute("KMeans Flink"); > } catch (Exception e) { > e.printStackTrace(); > } > } > > private static final class SelectNearestCenter extends > RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> { > > private static final long serialVersionUID = -2729445046389350264L; > private Collection<GeoTimeDataCenter> centroids; > > @Override > public void open(Configuration parameters) throws Exception { > this.centroids = > getRuntimeContext().getBroadcastVariable("centroids"); > } > > @Override > public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel > point) throws Exception { > double minDistance = Double.MAX_VALUE; > int closestCentroidId= -1; > > // check all cluster centers > for(GeoTimeDataCenter centroid : centroids) { > // compute distance > double distance = Distance.ComputeDist(point, centroid); > // update nearest cluster if necessary > if(distance < minDistance) { > minDistance = distance; > closestCentroidId = centroid.getId(); > } > } > // emit a new record with the center id and the data point > return new Tuple2<Integer, > GeoTimeDataTupel>(closestCentroidId, point); > } > } > > // sums and counts point coordinates > private static final class CentroidAccumulator implements > ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> { > > private static final long serialVersionUID = -4868797820391121771L; > > public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, > GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) { > return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, > addAndDiv(val1.f1,val2.f1)); > } > } > > private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, > GeoTimeDataTupel input2){ > long time = (input1.getTime()+input2.getTime())/2; > List<LatLongSeriable> list = new ArrayList<LatLongSeriable>(); > list.add(input1.getGeo()); > list.add(input2.getGeo()); > LatLongSeriable geo = Geometry.getGeoCenterOf(list); > > return new GeoTimeDataTupel(geo,time,"POINT"); > } > > // computes new centroid from coordinate sum and count of points > private static final class CentroidAverager implements > MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> { > > private static final long serialVersionUID = -2687234478847261803L; > > public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> > value) { > return new GeoTimeDataCenter(value.f0, > value.f1.getGeo(),value.f1.getTime()); > } > } > > private static DataSet<GeoTimeDataTupel> > getPointDataSet(ExecutionEnvironment env) { > // load properties > Properties pro = new Properties(); > try { > pro.load(new FileInputStream("./resources/config.properties")); > } catch (Exception e) { > e.printStackTrace(); > } > String inputFile = pro.getProperty("input"); > // map csv file > return env.readCsvFile(inputFile) > .ignoreInvalidLines() > .fieldDelimiter('\u0009') > //.fieldDelimiter("\t") > //.lineDelimiter("\n") > .includeFields(true, true, false, false, false, false, false, > false, false, false, false > , false, false, false, false, false, false, false, > false, false, false > , false, false, false, false, false, false, false, > false, false, false > , false, false, false, false, false, false, false, > false, true, true > , false, false, false, false, false, false, false, > false, false, false > , false, false, false, false, false, false, false, > false) > //.includeFields(true,true,true,true) > .types(String.class, Long.class, Double.class, Double.class) > .map(new TuplePointConverter()); > } > > private static final class TuplePointConverter implements > MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{ > > private static final long serialVersionUID = 3485560278562719538L; > > public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> > t) throws Exception { > return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), > t.f1, t.f0); > } > } > > private static DataSet<GeoTimeDataCenter> > getCentroidDataSet(ExecutionEnvironment env) { > // load properties > Properties pro = new Properties(); > try { > pro.load(new FileInputStream("./resources/config.properties")); > } catch (Exception e) { > e.printStackTrace(); > } > String seedFile = pro.getProperty("seed.file"); > boolean seedFlag = > Boolean.parseBoolean(pro.getProperty("seed.flag")); > // get points from file or random > if(seedFlag || !(new File(seedFile+"-1").exists())) { > Seeding.randomSeeding(); > } > // map csv file > return env.readCsvFile(seedFile+"-1") > .lineDelimiter("\n") > .fieldDelimiter('\u0009') > //.fieldDelimiter("\t") > .includeFields(true, true, true, true) > .types(Integer.class, Double.class, Double.class, Long.class) > .map(new TupleCentroidConverter()); > } > > private static final class TupleCentroidConverter implements > MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{ > > private static final long serialVersionUID = -1046538744363026794L; > > public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long> > t) throws Exception { > return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, > t.f2), t.f3); > } > } > } > > 2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > >> Concerning your first problem that you only see one resulting centroid, >> your code looks good modulo the parts you haven't posted. >> >> However, your problem could simply be caused by a bad selection of >> initial centroids. If, for example, all centroids except for one don't get >> any points assigned, then only one centroid will survive the iteration >> step. How do you do it? >> >> To check that all centroids are read you can print the contents of the >> centroids DataSet. Furthermore, you can simply println the new centroids >> after each iteration step. In local mode you can then observe the >> computation. >> >> Cheers, >> Till >> >> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi! >>> >>> This problem should not depend on any user code. There are no user-code >>> dependent actors in Flink. >>> >>> Is there more stack trace that you can send us? It looks like it misses >>> the core exception that is causing the issue is not part of the stack trace. >>> >>> Greetings, >>> Stephan >>> >>> >>> >>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <paul.roewer1...@googlemail.com> >>> wrote: >>> >>>> hi flink community, >>>> >>>> i have implement k-means for clustering temporal geo data. i use the >>>> following github project and my own data structure: >>>> >>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java >>>> >>>> not i have the problem, that flink read the centroids from file and >>>> work parallel futher. if i look at the results, i have the feeling, that >>>> the prgramm load only one centroid point. >>>> >>>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the >>>> following exception: >>>> ERROR actor.OneForOneStrategy: exception during creation >>>> akka.actor.ActorInitializationException: exception during creation >>>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218) >>>> at akka.actor.ActorCell.create(ActorCell.scala:578) >>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) >>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) >>>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:218) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: java.lang.reflect.InvocationTargetException >>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>> Method) >>>> at >>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) >>>> at >>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>> at akka.util.Reflect$.instantiate(Reflect.scala:65) >>>> at akka.actor.Props.newActor(Props.scala:337) >>>> at akka.actor.ActorCell.newActor(ActorCell.scala:534) >>>> at akka.actor.ActorCell.create(ActorCell.scala:560) >>>> ... 9 more >>>> >>>> how can i say flink, that it should be wait for loading dataset, and >>>> what say this exception? >>>> >>>> best regards, >>>> paul >>>> >>> >>> >> >