Hello all,

When I execute the below streaming code:
DataStream<Centroid> *centroids* = newCentroidDataStream.map(new
TupleCentroidConverter());
ConnectedIterativeStreams<Point, Centroid> loop =
 *points*.iterate().withFeedbackType(Centroid.class);
DataStream<Centroid> *newCentroids* = loop.flatMap(new
SelectNearestCenter(10)).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataStream<Centroid> finalCentroids = loop.closeWith(*centroids*
.broadcast());

the following exception arises:
Exception in thread "main" java.lang.UnsupportedOperationException:* Cannot
close an iteration with a feedback DataStream that does not originate from
said iteration.*
at
org.apache.flink.streaming.api.datastream.IterativeStream$ConnectedIterativeStreams.closeWith(IterativeStream.java:181)

If I use loop.closeWith(newCentroids.broadcast()) it works fine. I am not
able to fully understand the error message. Could you explain it more in
depth the error message in relation to above code.

Best Regards,
Subash Basnet

Reply via email to