Hello all, loop.closewith(...) in below code works fine for the dataset, but the adaptation of the same code for datastream throws exception.
For *DataSet: * IterativeDataSet<Centroid> *loop* = centroids.iterate(numIterations); DataSet<Centroid> *newCentroids* = points.map(new SelectNearestCenter()). *withBroadcastSet*(*loop*, "*centroids*") .map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator()) .map(new CentroidAverager()); // feed new centroids back into next iteration DataSet<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*); It's working fine, now if I do the same operation in *DataStream *as below *: * IterativeStream<Centroid>*loop* = centroids.iterate(numIterations); DataStream<Centroid> *newCentroids* = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0) .reduce(new CentroidAccumulator()).map(new CentroidAverager()); DataStream<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*); I get the following exception as already mentioned in earlier emails: Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot close an iteration with a feedback DataStream that does not originate from said iteration.* at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75) at wikiedits.StockAnalysisKMeansOutlierDetection.main( StockAnalysisKMeansOutlierDetection.java:98) Could you please suggest me where my understanding is wrong here? I couldn't infer much from the given explanation about *closewith(..)* with in DataSet and DataStream as: Closes the iteration. This method defines the end of the iterative program part that will be fed back to the start of the iteration.A common usage pattern for streaming iterations is to use output splitting to send a part of the closing data stream to the head. ----------------------------------------> For DataSet Closes the iteration. This method defines the end of the iterative program part. Parameters: iterationResult The data set that will be fed back to the next iteration. Returns: The DataSet that represents the result of the iteration, after the computation has terminated.------------------------------------>For DataStream Best Regards, Subash Basnet