Hi,
you have to ensure to filter the data that you send back on the feedback
edge, i.e. the loop.closeWith(newCentroids.broadcast()); statement needs to
take a stream that only has the centroids that you want to send back. And
you need to make sure to emit centroids with a good timestamp if you want
to preserve timestamps.

What you can also do is to union the stream of initial centroids with the
new centroids on the feedback edge, i.e:
loop.closeWith(newCentroids.union(initialCentroids).broadcast())

Cheers,
Aljoscha


On Mon, 18 Jul 2016 at 12:59 subash basnet <yasub...@gmail.com> wrote:

> Hello all,
>
> I am trying to cluster datastream points around a centroid. My input is
> stock data where the centroid id I have taken as the timestamp of the
> stock. The error I am facing is in getting *id *of the *centroid* within
> *flatMap2*. Below is my code if you could look:
>
> ConnectedIterativeStreams<Point, Centroid> loop =
> points.iterate().withFeedbackType(Centroid.class);
> DataStream<Centroid> newCentroids = loop.flatMap(new
> SelectNearestCenter(10)).map(new CountAppender()).keyBy(0)
> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
> DataStream<Centroid> finalCentroids =
> loop.closeWith(newCentroids.broadcast());
>
> public static final class SelectNearestCenter implements
> CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> {
> private Centroid[] centroids;
> private int size = 0;
> private int count = 0;
> private boolean flag = true;
>
> public SelectNearestCenter(int size) {
> this.size = size;
> }
>
> @Override
> public void flatMap1(Point p, Collector<Tuple2<String, Point>> out) throws
> Exception {
> double minDistance = Double.MAX_VALUE;
> *String closestCentroidId = "-1";*
> if (centroids != null) {
> // let's assume minimum size 20 for now
> for (Centroid centroid : centroids) {
> // compute distance
> double distance = p.euclideanDistance(centroid);
> // update nearest cluster if necessary
> if (distance < minDistance) {
> minDistance = distance;
> closestCentroidId = centroid.id;
> }
> }
> }
> // emit a new record with the center id and the data point.
> out.collect(new Tuple2<String, Point>(closestCentroidId, p));
> }
>
> @Override
> public void flatMap2(Centroid value, Collector<Tuple2<String, Point>> out)
> throws Exception {
> if (flag) {
> centroids = new Centroid[size];
> flag = false;
> }
> if (count < size) {
> *System.out.println(value);*
> centroids[count] = value;
> count++;
> }
> }
> }
>
>
> The centroid datastreams looks as below with string timestamp as id.
> Fri Jul 15 15:30:55 CEST 2016  117.8818 117.9 117.8 117.835 1383700.0
> Fri Jul 15 15:31:58 CEST 2016  117.835 117.99 117.82 117.885 118900.0
>
> But now if I print the *centroid value *in *flatMap2* it shows with the
> id as '-1':
> -1  117.8818 117.9 117.8 117.835 1383700.0
> -1  117.5309 117.575 117.48245 117.52 707100.0
>
> This '-1' is from *flatMap1 *which get's assigned initially. To get rid
> of this if I put the out.collect statement within the if centroids is not
> null condition, it never goes inside the if condition as intially the
> centroids is null, hence the execution never comes out of *flatMap1*.
> It would be great if you could suggest what could be the probable problem
> or solution to the case.
>
>
> Best Regards,
> Subash Basnet
>

Reply via email to