I am also a newbie but from what i experienced during my experiments is that
...The same implementation doesnt work for the streaming context because 
1) In streaming context the stream is assumed to be infinite so the process
of iteration is also infinite and the part with which you close your
iteration is sent to the iteration head.

Thus, your connected stream needs to be something like this

IterativeStream<Centroid,Centroid>*loop* =
centroids.iterate(numIterations).withFeedbackType(Centroid.class);

as the feedback is spread to the partitions again. 

also the newCentroids in "*loop*.closeWith(*newCentroids*);" can't be
replaced with any other datastream as this is the feedback stream which
originates from the said iteration.

Whether this solves your problem or not is something I can't say as I am
struggling with the concept of centroid in iteration with broadcast for
sometime myself without much help from the community.


subash basnet wrote
> Hello all,
> 
> loop.closewith(...) in below code works fine for the dataset, but the
> adaptation of the same code for datastream throws exception.
> 
> For *DataSet: *
> IterativeDataSet
> <Centroid>
>  *loop* = centroids.iterate(numIterations);
> DataSet
> <Centroid>
>  *newCentroids* = points.map(new SelectNearestCenter()).
> *withBroadcastSet*(*loop*, "*centroids*")
> .map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator())
> .map(new CentroidAverager());
> // feed new centroids back into next iteration
> DataSet
> <Centroid>
>  finalCentroids = *loop*.closeWith(*newCentroids*);
> 
> 
> It's working fine, now if I do the same operation in *DataStream *as below
> *: *
> IterativeStream
> <Centroid>
> *loop* = centroids.iterate(numIterations);
> DataStream
> <Centroid>
>  *newCentroids* = points.map(new
> SelectNearestCenter()).map(new CountAppender()).keyBy(0)
> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
> DataStream
> <Centroid>
>  finalCentroids = *loop*.closeWith(*newCentroids*);
> 
> I get the following exception as already mentioned in earlier emails:
> Exception in thread "main" java.lang.UnsupportedOperationException:
> *Cannot
> close an iteration with a feedback DataStream that does not originate from
> said iteration.*
> at
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
> at wikiedits.StockAnalysisKMeansOutlierDetection.main(
> StockAnalysisKMeansOutlierDetection.java:98)
> 
> Could you please suggest me where my understanding is wrong here?
> 
> I couldn't infer much from the given explanation about *closewith(..)*
> with
> in DataSet and DataStream as:
> Closes the iteration. This method defines the end of the iterative program
> part that will be fed back to the start of the iteration.A common usage
> pattern for streaming iterations is to use output splitting to send a part
> of the closing data stream to the
> head. ----------------------------------------> For DataSet
> 
> Closes the iteration. This method defines the end of the iterative program
> part.
> Parameters:
> iterationResult The data set that will be fed back to the next iteration.
> Returns:
> The DataSet that represents the result of the iteration, after the
> computation has terminated.------------------------------------>For
> DataStream
> 
> Best Regards,
> Subash Basnet





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/closewith-not-working-in-DataStream-error-but-works-in-DataSet-tp6953p6958.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to