Hi, It's stating that you can't use a DataStream which was not part of the iteration. It works with `newCentroids` because it is part of the loop.
The only way to get the centroids DataStream in, is to union/join it with the `newCentroids` stream. Cheers, Max On Wed, Jul 20, 2016 at 11:33 AM, subash basnet <yasub...@gmail.com> wrote: > Hello all, > > When I execute the below streaming code: > DataStream<Centroid> centroids = newCentroidDataStream.map(new > TupleCentroidConverter()); > ConnectedIterativeStreams<Point, Centroid> loop = > points.iterate().withFeedbackType(Centroid.class); > DataStream<Centroid> newCentroids = loop.flatMap(new > SelectNearestCenter(10)).map(new CountAppender()).keyBy(0) > .reduce(new CentroidAccumulator()).map(new CentroidAverager()); > DataStream<Centroid> finalCentroids = loop.closeWith(centroids.broadcast()); > > the following exception arises: > Exception in thread "main" java.lang.UnsupportedOperationException: Cannot > close an iteration with a feedback DataStream that does not originate from > said iteration. > at > org.apache.flink.streaming.api.datastream.IterativeStream$ConnectedIterativeStreams.closeWith(IterativeStream.java:181) > > If I use loop.closeWith(newCentroids.broadcast()) it works fine. I am not > able to fully understand the error message. Could you explain it more in > depth the error message in relation to above code. > > Best Regards, > Subash Basnet