Hi, Regarding 1) Thanks a lot for the ParallelSourceFunction, i completely missed that I was using a SourceFunction instead.
Regarding 2) the example works and i can see what is happening there, now when i increase the parallelism i understand the corresponding change as to how the data is fed back to the iterator. What I want to ask next is, is there a way to send back a group of data-points at once? something like an array of some object? if yes what would be the type given to 'withFeedbackType' parameter? Again thank you so much for such a detailed explanation and example Regards Biplob Biswas Aljoscha Krettek wrote > Hi, > regarding 1) the source needs to implement the ParallelSourceFunction or > RichParallelSourceFunction interface to allow it to have a higher > parallelism than 1. > > regarding 2) I wrote a small example that showcases how to do it: > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > DataStream > <String> > mainInput = env.fromElements("Hello", "Zwei", "drei"); > DataStream > <String> > initialIterateInput = env.fromElements("bcast 1", "bcast > 2", "bcast 3"); > > > IterativeStream.ConnectedIterativeStreams<String, String> iteration > = > > mainInput.iterate().withFeedbackType(BasicTypeInfo.STRING_TYPE_INFO); > > SingleOutputStreamOperator > <String> > iterateHead = iteration > .flatMap(new CoFlatMapFunction<String, String, String>() { > @Override > public void flatMap1(String value, Collector > <String> > out) > throws Exception { > Thread.sleep(1000); > System.out.println("SEEING FROM INPUT 1: " + value); > > out.collect(value); > } > > @Override > public void flatMap2(String value, Collector > <String> > out) > throws Exception { > Thread.sleep(1000); > System.out.println("SEEING FROM INPUT 2: " + value); > > out.collect(value); > > } > }); > > iteration.closeWith(initialIterateInput.broadcast().union(iterateHead.broadcast())); > > iterateHead.map(new MapFunction<String, String>() { > @Override > public String map(String value) throws Exception { > System.out.println("SEEING OUTPUT FROM ITERATION: " + value); > return value; > } > }); > > I inserted Thread.sleep(1000) so that you can observe what is happening. > If > you remove them it iterates too fast. > > Cheers, > Aljoscha > > > On Thu, 5 May 2016 at 20:43 Biplob Biswas < > revolutionisme@ > > wrote: > >> Hi, >> >> I have 2 different questions, both influencing each other in a way. >> >> *1)* I am getting a stream of tuples from a data generator using the >> following statements, >> "env.addSource(new DataStreamGenerator(filePath));" >> >> This generator reads a line from the file and splits it into different >> attributes and returns the entire thing as an object. >> My problem here is that the parallelism of this data source is by default >> 1 >> and if i force it to change by using setParallelism, i get the error >> message >> "Source: 1 is not a parallel source" so when I search for I get this from >> the flink website >> "collection data sources can not be executed in parallel ( parallelism = >> 1)." >> >> So my question is, can I read my data source(which is currently a file) >> in >> any other such that the parallelism is not restricted to 1? >> >> *2)* I need to connect 2 datasources over an iteration, for example : >> "points.iterate().withFeedbackType(Centroid.class);" >> and run coflatmap transformation, my question is, can I already broadcast >> some content of centroid type before the >> ptct.closewith(Centroid.broadcast()) send the data back to the iterator? >> >> For example, I tried this but i cant see anything in the map functions >> /centroidStream.broadcast(); >> ConnectedIterativeStreams<Point, Centroid> ptct= >> tuples.iterate().withFeedbackType(MicroCluster.class); >> DataStream > <Centroid> > updatedcentroids = ptct.flatMap(new MyCoFlatmap()) >> inputsAndMicroCluster.closeWith(updatedcentroids .broadcast());/ >> >> >> but i can't see the centroids already broadcasted by >> centroidStream.broadcast() in the map functions. >> >> Any kind of help is hugely appreciated. >> >> Thanks and Regards >> Biplob Biswas >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-coflatmap-transformation-tp6721.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive >> at Nabble.com. >> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-broadcast-over-iteration-in-Flink-Streaming-tp6721p6785.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.