Hi, regarding 1) the source needs to implement the ParallelSourceFunction or RichParallelSourceFunction interface to allow it to have a higher parallelism than 1.
regarding 2) I wrote a small example that showcases how to do it: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> mainInput = env.fromElements("Hello", "Zwei", "drei"); DataStream<String> initialIterateInput = env.fromElements("bcast 1", "bcast 2", "bcast 3"); IterativeStream.ConnectedIterativeStreams<String, String> iteration = mainInput.iterate().withFeedbackType(BasicTypeInfo.STRING_TYPE_INFO); SingleOutputStreamOperator<String> iterateHead = iteration .flatMap(new CoFlatMapFunction<String, String, String>() { @Override public void flatMap1(String value, Collector<String> out) throws Exception { Thread.sleep(1000); System.out.println("SEEING FROM INPUT 1: " + value); out.collect(value); } @Override public void flatMap2(String value, Collector<String> out) throws Exception { Thread.sleep(1000); System.out.println("SEEING FROM INPUT 2: " + value); out.collect(value); } }); iteration.closeWith(initialIterateInput.broadcast().union(iterateHead.broadcast())); iterateHead.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { System.out.println("SEEING OUTPUT FROM ITERATION: " + value); return value; } }); I inserted Thread.sleep(1000) so that you can observe what is happening. If you remove them it iterates too fast. Cheers, Aljoscha On Thu, 5 May 2016 at 20:43 Biplob Biswas <revolutioni...@gmail.com> wrote: > Hi, > > I have 2 different questions, both influencing each other in a way. > > *1)* I am getting a stream of tuples from a data generator using the > following statements, > "env.addSource(new DataStreamGenerator(filePath));" > > This generator reads a line from the file and splits it into different > attributes and returns the entire thing as an object. > My problem here is that the parallelism of this data source is by default 1 > and if i force it to change by using setParallelism, i get the error > message > "Source: 1 is not a parallel source" so when I search for I get this from > the flink website > "collection data sources can not be executed in parallel ( parallelism = > 1)." > > So my question is, can I read my data source(which is currently a file) in > any other such that the parallelism is not restricted to 1? > > *2)* I need to connect 2 datasources over an iteration, for example : > "points.iterate().withFeedbackType(Centroid.class);" > and run coflatmap transformation, my question is, can I already broadcast > some content of centroid type before the > ptct.closewith(Centroid.broadcast()) send the data back to the iterator? > > For example, I tried this but i cant see anything in the map functions > /centroidStream.broadcast(); > ConnectedIterativeStreams<Point, Centroid> ptct= > tuples.iterate().withFeedbackType(MicroCluster.class); > DataStream<Centroid> updatedcentroids = ptct.flatMap(new MyCoFlatmap()) > inputsAndMicroCluster.closeWith(updatedcentroids .broadcast());/ > > > but i can't see the centroids already broadcasted by > centroidStream.broadcast() in the map functions. > > Any kind of help is hugely appreciated. > > Thanks and Regards > Biplob Biswas > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-coflatmap-transformation-tp6721.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >