Hi, you have to ensure to filter the data that you send back on the feedback edge, i.e. the loop.closeWith(newCentroids.broadcast()); statement needs to take a stream that only has the centroids that you want to send back. And you need to make sure to emit centroids with a good timestamp if you want to preserve timestamps.
What you can also do is to union the stream of initial centroids with the new centroids on the feedback edge, i.e: loop.closeWith(newCentroids.union(initialCentroids).broadcast()) Cheers, Aljoscha On Mon, 18 Jul 2016 at 12:59 subash basnet <yasub...@gmail.com> wrote: > Hello all, > > I am trying to cluster datastream points around a centroid. My input is > stock data where the centroid id I have taken as the timestamp of the > stock. The error I am facing is in getting *id *of the *centroid* within > *flatMap2*. Below is my code if you could look: > > ConnectedIterativeStreams<Point, Centroid> loop = > points.iterate().withFeedbackType(Centroid.class); > DataStream<Centroid> newCentroids = loop.flatMap(new > SelectNearestCenter(10)).map(new CountAppender()).keyBy(0) > .reduce(new CentroidAccumulator()).map(new CentroidAverager()); > DataStream<Centroid> finalCentroids = > loop.closeWith(newCentroids.broadcast()); > > public static final class SelectNearestCenter implements > CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> { > private Centroid[] centroids; > private int size = 0; > private int count = 0; > private boolean flag = true; > > public SelectNearestCenter(int size) { > this.size = size; > } > > @Override > public void flatMap1(Point p, Collector<Tuple2<String, Point>> out) throws > Exception { > double minDistance = Double.MAX_VALUE; > *String closestCentroidId = "-1";* > if (centroids != null) { > // let's assume minimum size 20 for now > for (Centroid centroid : centroids) { > // compute distance > double distance = p.euclideanDistance(centroid); > // update nearest cluster if necessary > if (distance < minDistance) { > minDistance = distance; > closestCentroidId = centroid.id; > } > } > } > // emit a new record with the center id and the data point. > out.collect(new Tuple2<String, Point>(closestCentroidId, p)); > } > > @Override > public void flatMap2(Centroid value, Collector<Tuple2<String, Point>> out) > throws Exception { > if (flag) { > centroids = new Centroid[size]; > flag = false; > } > if (count < size) { > *System.out.println(value);* > centroids[count] = value; > count++; > } > } > } > > > The centroid datastreams looks as below with string timestamp as id. > Fri Jul 15 15:30:55 CEST 2016 117.8818 117.9 117.8 117.835 1383700.0 > Fri Jul 15 15:31:58 CEST 2016 117.835 117.99 117.82 117.885 118900.0 > > But now if I print the *centroid value *in *flatMap2* it shows with the > id as '-1': > -1 117.8818 117.9 117.8 117.835 1383700.0 > -1 117.5309 117.575 117.48245 117.52 707100.0 > > This '-1' is from *flatMap1 *which get's assigned initially. To get rid > of this if I put the out.collect statement within the if centroids is not > null condition, it never goes inside the if condition as intially the > centroids is null, hence the execution never comes out of *flatMap1*. > It would be great if you could suggest what could be the probable problem > or solution to the case. > > > Best Regards, > Subash Basnet >