Hello all, My task to cluster the stream of points around the centroids, I am using DataStreamUtils to collect the stream and pass it on to the map function to perform the necessary action. Below is the code:
DataStream<Point> points = newDataStream.map(new getPoints()); DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter()); Iterator<Centroid> iter = *DataStreamUtils*.collect(centroids); Collection<Centroid> collectionCentroids = Lists.newArrayList(iter); DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter(collectionCentroids)) .map(new CountAppender()).keyBy(0).reduce(new CentroidAccumulator()).map(new CentroidAverager()); Iterator<Centroid> iter1 = *DataStreamUtils*.collect(newCentroids); Collection<Centroid> finalCentroidsCollection = Lists.newArrayList(iter1); DataStream<Tuple2<String, Point>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter(finalCentroidsCollection)); *clusteredPoints*.print(); public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> { private Collection<Centroid> centroids; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } public SelectNearestCenter(Collection<Centroid> centroids) { this.centroids = centroids; } @Override public Tuple2<String, Point> map(Point p) throws Exception { double minDistance = Double.MAX_VALUE; String closestCentroidId = "-1"; .................. return new Tuple2<String, Point>(closestCentroidId, p); } } Cases: 1. Waited for around 10mins, and the *clusteredPoints *got printed but with centroid id as '-1' for all the points. And the execution ends after a certain time, due to multiple execution since there is one already inside the datastreamutil. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.. How to get rid of this exception. 1> (-1, 121.86 121.87 121.8149 121.8149 60600.0) 4> (-1, 121.52 121.52 121.45 121.485 28800.0) ......... 2. Waited for around 10mins, the *clusteredPoints *got printed with the centroid id as desired shown below. The *clusteredPoint* also gets printed in the console in streaming manner. And throws no exception at all. The streaming continues. 1> (Wed Jul 20 16:45:01 CEST 2016, 121.555 121.56 121.53 121.5385 69300.0) 1> (Wed Jul 20 18:19:00 CEST 2016, 121.8699 121.89 121.86 121.86 25700.0) 3> (Wed Jul 20 16:41:59 CEST 2016, 121.415 121.47 121.41 121.4658 38400.0) 1> (Wed Jul 20 18:13:59 CEST 2016, 121.86 121.87 121.8149 121.8149 60600.0) 4> (Wed Jul 20 16:43:59 CEST 2016, 121.52 121.52 121.45 121.485 28800.0) 3> (Wed Jul 20 18:16:59 CEST 2016, 121.8716 121.92 121.85 121.9141 64500.0) 4> (Wed Jul 20 18:15:00 CEST 2016, 121.92 121.92 121.88 121.88 53500.0) 4> (Wed Jul 20 18:12:04 CEST 2016, 121.82 121.82 121.74 121.74 43600.0) .............. 3. The *clusteredPoints* is printed with the centroid id as desired in streaming manner. But after certain duration the exception same as in case 1 is thrown and the program ends abruptly. Why so much variation in result on executing the same code. Now, in case of centroid id as '-1' in case 1, I would not be able to perform operations later on as all the *clusteredPoints* have the same centroid id '-1' which should have been rather timestamp as shown in case 2. How could be the solution to this issue. Best Regards, Subash Basnet