Hello all, How could I broadcast the variable in Datastream or perform similar operation so that I could read the value as in DataSet: IterativeDataSet<Centroid> *loop* = centroids.iterate(numIterations); DataSet<Centroid> *newCentroids* = points.map(new SelectNearestCenter()). *withBroadcastSet*(*loop*, "*centroids*") ... INSIDE map function: @Override public void open(...){ . this.*centroids* = getRuntimeContext().getBroadcastVariable("*centroids*"); }
Is defining '*loop*' as a global variable is only the option to use it in the map functions. Any other possible methods. When I use *loop *as global variable and read it inside map function as below via *DataStreamUtils*: private static IterativeStream<Centroid> *loop*; ... *loop* = centroids.iterate(numIterations); ... INSIDE map function @Override public void open(...){ Iterator<Centroid> iter = DataStreamUtils.collect(*loop*); this.*centroids* = Lists.newArrayList(iter); } It throws below exception upon execution: Exception in thread "Thread-13" java.lang.RuntimeException: Exception in execute() at org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:82) *Caused by: java.lang.IllegalStateException: Iteration FeedbackTransformation{id=15, name='Feedback', outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback edges.* at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformFeedback(StreamGraphGenerator.java:295) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSink(StreamGraphGenerator.java:441) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:158) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:127) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:119) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1197) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170) at org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:80) Could you please suggest me possible cause and solution to this exception, as I am not able to see any other option beside to use global variable in absence of broadcast of variable in datastream. Best Regards, Subash Basnet