Hello,

I had to use,
private static IterativeStream<Centroid> *loop*;
loop as global variable because I cannot broadcast it like that of DataSet
API in DataStream API.

I tried to use *closewith * like that of DataSet as below in DataStream but
it gives me exception:
DataStream<Centroid> finalCentroids = *loop*.closeWith(newCentroids);


Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot
close an iteration with a feedback DataStream that does not originate from
said iteration.*
at
org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at wikiedits.StockAnalysis.main(StockAnalysis.java:64)


Best Regards,
Subash Basnet


On Sat, May 14, 2016 at 4:26 PM, subash basnet <yasub...@gmail.com> wrote:

> Hello Aljoscha,
>
> Below is the shorted version of StockAnalysis class which is a datastream
> adapation of the *KMeans.java* dataset code.
>
> public class StockAnalysis{
>     public static void main(String args[]){
>        DataStream<Centroid> centroids = newCentroidDataStream.map(new
> TupleCentroidConverter());
>   *loop* = centroids.iterate(10);
>   DataStream<Centroid> newCentroids = points.map(new
> SelectNearestCenter()).map(new CountAppender()).keyBy(0)
> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
>       public static final class SelectNearestCenter extends
> RichMapFunction<Point, Tuple2<String, Point>> {
> private Collection<Centroid> centroids;
> @Override
> public void open(Configuration parameters) throws Exception {
> Iterator<Centroid> iter = DataStreamUtils.collect(*loop*);
> this.*centroids* = Lists.newArrayList(iter);
> }
>                @Override
> public Tuple2<String, Point> map(Point p) throws Exception {
>                      for (Centroid centroid : *centroids*) {
>                      }...................
>                 }
>      }
>    }
>
> }
>
>
> On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
>> for Automatic Cleanup! (aljos...@apache.org) Add cleanup rule
>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DgC%252BkGO7SbhS%252FoALSDkW8dBXumMPYysp%252B5FWf%252FX8whAVqYWiJqTOpC2fjBOdm%252BrZr6ZTM6BmqH1lYr8kUEWi3BxO7RFl%252FqJC2kUoaP4Q2L98wc9thjH6dY6QYn7ZQ6hN0GCi5xDFMhOo%253D%26key%3DNwIHY0Ppe%252BKHFaaQd88hYlg52OwTtztNKydGoopQE7I%253D&tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>> | More info
>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>
>> Could you please post your code.
>>
>> On Sat, 7 May 2016 at 19:16 subash basnet <yasub...@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> I am getting the below error on execute of StreamExecutionEnvironment.
>>>
>>>
>>> *Caused by: java.lang.IllegalStateException: Iteration
>>> FeedbackTransformation{id=15, name='Feedback',
>>> outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String,
>>> pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback
>>> edges.*
>>> The run method inside the thread class of DataStreamUtils handles this
>>> exception:
>>> @Override
>>> public void run(){
>>> try {
>>> stream.getExecutionEnvironment().execute();
>>> } catch (Exception e) {
>>> throw new RuntimeException("Exception in execute()", e);
>>> }
>>> }
>>>
>>> I am not able to understand what to infer from this error message so
>>> that I could solve it.
>>>
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>
>>
>

Reply via email to