Hi,
Regarding 1) Thanks a lot for the ParallelSourceFunction, i completely
missed that I was using a SourceFunction instead.
Regarding 2) the example works and i can see what is happening there, now
when i increase the parallelism i understand the corresponding change as to
how the data is fed back to the iterator.
What I want to ask next is, is there a way to send back a group of
data-points at once? something like an array of some object? if yes what
would be the type given to 'withFeedbackType' parameter?
Again thank you so much for such a detailed explanation and example
Regards
Biplob Biswas
Aljoscha Krettek wrote
> Hi,
> regarding 1) the source needs to implement the ParallelSourceFunction or
> RichParallelSourceFunction interface to allow it to have a higher
> parallelism than 1.
>
> regarding 2) I wrote a small example that showcases how to do it:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> DataStream
> <String>
> mainInput = env.fromElements("Hello", "Zwei", "drei");
> DataStream
> <String>
> initialIterateInput = env.fromElements("bcast 1", "bcast
> 2", "bcast 3");
>
>
> IterativeStream.ConnectedIterativeStreams<String, String> iteration
> =
>
> mainInput.iterate().withFeedbackType(BasicTypeInfo.STRING_TYPE_INFO);
>
> SingleOutputStreamOperator
> <String>
> iterateHead = iteration
> .flatMap(new CoFlatMapFunction<String, String, String>() {
> @Override
> public void flatMap1(String value, Collector
> <String>
> out)
> throws Exception {
> Thread.sleep(1000);
> System.out.println("SEEING FROM INPUT 1: " + value);
>
> out.collect(value);
> }
>
> @Override
> public void flatMap2(String value, Collector
> <String>
> out)
> throws Exception {
> Thread.sleep(1000);
> System.out.println("SEEING FROM INPUT 2: " + value);
>
> out.collect(value);
>
> }
> });
>
> iteration.closeWith(initialIterateInput.broadcast().union(iterateHead.broadcast()));
>
> iterateHead.map(new MapFunction<String, String>() {
> @Override
> public String map(String value) throws Exception {
> System.out.println("SEEING OUTPUT FROM ITERATION: " + value);
> return value;
> }
> });
>
> I inserted Thread.sleep(1000) so that you can observe what is happening.
> If
> you remove them it iterates too fast.
>
> Cheers,
> Aljoscha
>
>
> On Thu, 5 May 2016 at 20:43 Biplob Biswas <
> revolutionisme@
> > wrote:
>
>> Hi,
>>
>> I have 2 different questions, both influencing each other in a way.
>>
>> *1)* I am getting a stream of tuples from a data generator using the
>> following statements,
>> "env.addSource(new DataStreamGenerator(filePath));"
>>
>> This generator reads a line from the file and splits it into different
>> attributes and returns the entire thing as an object.
>> My problem here is that the parallelism of this data source is by default
>> 1
>> and if i force it to change by using setParallelism, i get the error
>> message
>> "Source: 1 is not a parallel source" so when I search for I get this from
>> the flink website
>> "collection data sources can not be executed in parallel ( parallelism =
>> 1)."
>>
>> So my question is, can I read my data source(which is currently a file)
>> in
>> any other such that the parallelism is not restricted to 1?
>>
>> *2)* I need to connect 2 datasources over an iteration, for example :
>> "points.iterate().withFeedbackType(Centroid.class);"
>> and run coflatmap transformation, my question is, can I already broadcast
>> some content of centroid type before the
>> ptct.closewith(Centroid.broadcast()) send the data back to the iterator?
>>
>> For example, I tried this but i cant see anything in the map functions
>> /centroidStream.broadcast();
>> ConnectedIterativeStreams<Point, Centroid> ptct=
>> tuples.iterate().withFeedbackType(MicroCluster.class);
>> DataStream
> <Centroid>
> updatedcentroids = ptct.flatMap(new MyCoFlatmap())
>> inputsAndMicroCluster.closeWith(updatedcentroids .broadcast());/
>>
>>
>> but i can't see the centroids already broadcasted by
>> centroidStream.broadcast() in the map functions.
>>
>> Any kind of help is hugely appreciated.
>>
>> Thanks and Regards
>> Biplob Biswas
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-coflatmap-transformation-tp6721.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-broadcast-over-iteration-in-Flink-Streaming-tp6721p6785.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.