Hello Aljoscha, For *DataSet: * IterativeDataSet<Centroid> *loop* = centroids.iterate(numIterations); DataSet<Centroid> *newCentroids* = points.map(new SelectNearestCenter()). *withBroadcastSet*(*loop*, "*centroids*") .map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator()) .map(new CentroidAverager()); // feed new centroids back into next iteration DataSet<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*);
It's working fine, now if I do the same operation in *DataStream *as below *: * IterativeDataStream<Centroid>*loop* = centroids.iterate(numIterations); DataStream<Centroid> *newCentroids* = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0) .reduce(new CentroidAccumulator()).map(new CentroidAverager()); DataSet<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*); I get the following exception as already mentioned in earlier thread: Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot close an iteration with a feedback DataStream that does not originate from said iteration.* at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75) at wikiedits.StockAnalysisKMeansOutlierDetection.main(StockAnalysisKMeansOutlierDetection.java:98) Could you please suggest me where I am wrong here? Best Regards, Subash Basnet On Mon, May 16, 2016 at 6:11 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible > for Automatic Cleanup! (aljos...@apache.org) Add cleanup rule > <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DyDJIsi0%252BlOrq0WaCT1DQ5avMUJUkSXJki5zTdOMFzGLCT9QEN6CODvxxs2LdU6pmNOjPbJji3cC2zJc3b%252Bfu3rfrxOGeNPyMFpYFPOxTmS1v%252BlZg%252FqDiVL9xQPMAVc9q4JdxgkBuvk8%253D%26key%3DScLPbPXy3TSkZu%252Bu2GXdktVXu45EsoEFtCeUNP6bChg%253D&tc_serial=25425051479&tc_rand=1401352945&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> > | More info > <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25425051479&tc_rand=1401352945&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> > > There you have your explanation. A loop actually has to be a loop for it > to work in Flink. > > On Sat, 14 May 2016 at 16:35 subash basnet <yasub...@gmail.com> wrote: > >> Hello, >> >> I had to use, >> private static IterativeStream<Centroid> *loop*; >> loop as global variable because I cannot broadcast it like that of >> DataSet API in DataStream API. >> >> I tried to use *closewith * like that of DataSet as below in DataStream >> but it gives me exception: >> DataStream<Centroid> finalCentroids = *loop*.closeWith(newCentroids); >> >> >> Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot >> close an iteration with a feedback DataStream that does not originate from >> said iteration.* >> at >> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75) >> at wikiedits.StockAnalysis.main(StockAnalysis.java:64) >> >> >> Best Regards, >> Subash Basnet >> >> >> On Sat, May 14, 2016 at 4:26 PM, subash basnet <yasub...@gmail.com> >> wrote: >> >>> Hello Aljoscha, >>> >>> Below is the shorted version of StockAnalysis class which is a >>> datastream adapation of the *KMeans.java* dataset code. >>> >>> public class StockAnalysis{ >>> public static void main(String args[]){ >>> DataStream<Centroid> centroids = newCentroidDataStream.map(new >>> TupleCentroidConverter()); >>> *loop* = centroids.iterate(10); >>> DataStream<Centroid> newCentroids = points.map(new >>> SelectNearestCenter()).map(new CountAppender()).keyBy(0) >>> .reduce(new CentroidAccumulator()).map(new CentroidAverager()); >>> public static final class SelectNearestCenter extends >>> RichMapFunction<Point, Tuple2<String, Point>> { >>> private Collection<Centroid> centroids; >>> @Override >>> public void open(Configuration parameters) throws Exception { >>> Iterator<Centroid> iter = DataStreamUtils.collect(*loop*); >>> this.*centroids* = Lists.newArrayList(iter); >>> } >>> @Override >>> public Tuple2<String, Point> map(Point p) throws Exception { >>> for (Centroid centroid : *centroids*) { >>> }................... >>> } >>> } >>> } >>> >>> } >>> >>> >>> On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is >>>> eligible for Automatic Cleanup! (aljos...@apache.org) Add cleanup rule >>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DgC%252BkGO7SbhS%252FoALSDkW8dBXumMPYysp%252B5FWf%252FX8whAVqYWiJqTOpC2fjBOdm%252BrZr6ZTM6BmqH1lYr8kUEWi3BxO7RFl%252FqJC2kUoaP4Q2L98wc9thjH6dY6QYn7ZQ6hN0GCi5xDFMhOo%253D%26key%3DNwIHY0Ppe%252BKHFaaQd88hYlg52OwTtztNKydGoopQE7I%253D&tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>>> | More info >>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>>> >>>> Could you please post your code. >>>> >>>> On Sat, 7 May 2016 at 19:16 subash basnet <yasub...@gmail.com> wrote: >>>> >>>>> Hello all, >>>>> >>>>> I am getting the below error on execute of StreamExecutionEnvironment. >>>>> >>>>> >>>>> *Caused by: java.lang.IllegalStateException: Iteration >>>>> FeedbackTransformation{id=15, name='Feedback', >>>>> outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: >>>>> String, >>>>> pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any >>>>> feedback >>>>> edges.* >>>>> The run method inside the thread class of DataStreamUtils handles this >>>>> exception: >>>>> @Override >>>>> public void run(){ >>>>> try { >>>>> stream.getExecutionEnvironment().execute(); >>>>> } catch (Exception e) { >>>>> throw new RuntimeException("Exception in execute()", e); >>>>> } >>>>> } >>>>> >>>>> I am not able to understand what to infer from this error message so >>>>> that I could solve it. >>>>> >>>>> >>>>> Best Regards, >>>>> Subash Basnet >>>>> >>>> >>>> >>> >> >