Hello all,

loop.closewith(...) in below code works fine for the dataset, but the
adaptation of the same code for datastream throws exception.

For *DataSet: *
IterativeDataSet<Centroid> *loop* = centroids.iterate(numIterations);
DataSet<Centroid> *newCentroids* = points.map(new SelectNearestCenter()).
*withBroadcastSet*(*loop*, "*centroids*")
.map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator())
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*);


It's working fine, now if I do the same operation in *DataStream *as below
*: *
IterativeStream<Centroid>*loop* = centroids.iterate(numIterations);
DataStream<Centroid> *newCentroids* = points.map(new
SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataStream<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*);

I get the following exception as already mentioned in earlier emails:
Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot
close an iteration with a feedback DataStream that does not originate from
said iteration.*
at
org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at wikiedits.StockAnalysisKMeansOutlierDetection.main(
StockAnalysisKMeansOutlierDetection.java:98)

Could you please suggest me where my understanding is wrong here?

I couldn't infer much from the given explanation about *closewith(..)* with
in DataSet and DataStream as:
Closes the iteration. This method defines the end of the iterative program
part that will be fed back to the start of the iteration.A common usage
pattern for streaming iterations is to use output splitting to send a part
of the closing data stream to the
head. ----------------------------------------> For DataSet

Closes the iteration. This method defines the end of the iterative program
part.
Parameters:
iterationResult The data set that will be fed back to the next iteration.
Returns:
The DataSet that represents the result of the iteration, after the
computation has terminated.------------------------------------>For
DataStream

Best Regards,
Subash Basnet

Reply via email to