Hi,

Regarding 1) Thanks a lot for the ParallelSourceFunction, i completely
missed that I was using a SourceFunction instead.

Regarding 2) the example works and i can see what is happening there, now
when i increase the parallelism i understand the corresponding change as to
how the data is fed back to the iterator.

What I want to ask next is, is there a way to send back a group of
data-points at once? something like an array of some object? if yes what
would be the type given to 'withFeedbackType' parameter? 

Again thank you so much for such a detailed explanation and example

Regards
Biplob Biswas


Aljoscha Krettek wrote
> Hi,
> regarding 1)  the source needs to implement the ParallelSourceFunction or
> RichParallelSourceFunction interface to allow it to have a higher
> parallelism than 1.
> 
> regarding 2) I wrote a small example that showcases how to do it:
> 
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> 
> DataStream
> <String>
>  mainInput = env.fromElements("Hello", "Zwei", "drei");
> DataStream
> <String>
>  initialIterateInput = env.fromElements("bcast 1", "bcast
> 2", "bcast 3");
> 
> 
> IterativeStream.ConnectedIterativeStreams&lt;String, String&gt; iteration
> =
> 
> mainInput.iterate().withFeedbackType(BasicTypeInfo.STRING_TYPE_INFO);
> 
> SingleOutputStreamOperator
> <String>
>  iterateHead = iteration
>         .flatMap(new CoFlatMapFunction&lt;String, String, String&gt;() {
>             @Override
>             public void flatMap1(String value, Collector
> <String>
>  out)
> throws Exception {
>                 Thread.sleep(1000);
>                 System.out.println("SEEING FROM INPUT 1: " + value);
> 
>                 out.collect(value);
>             }
> 
>             @Override
>             public void flatMap2(String value, Collector
> <String>
>  out)
> throws Exception {
>                 Thread.sleep(1000);
>                 System.out.println("SEEING FROM INPUT 2: " + value);
> 
>                 out.collect(value);
> 
>             }
>         });
> 
> iteration.closeWith(initialIterateInput.broadcast().union(iterateHead.broadcast()));
> 
> iterateHead.map(new MapFunction&lt;String, String&gt;() {
>     @Override
>     public String map(String value) throws Exception {
>         System.out.println("SEEING OUTPUT FROM ITERATION: " + value);
>         return value;
>     }
> });
> 
> I inserted Thread.sleep(1000) so that you can observe what is happening.
> If
> you remove them it iterates too fast.
> 
> Cheers,
> Aljoscha
> 
> 
> On Thu, 5 May 2016 at 20:43 Biplob Biswas &lt;

> revolutionisme@

> &gt; wrote:
> 
>> Hi,
>>
>> I have 2 different questions, both influencing each other in a way.
>>
>> *1)* I am getting a stream of tuples from a data generator using the
>> following statements,
>> "env.addSource(new DataStreamGenerator(filePath));"
>>
>> This generator reads a line from the file and splits it into different
>> attributes and returns the entire thing as an object.
>> My problem here is that the parallelism of this data source is by default
>> 1
>> and if i force it to change by using setParallelism, i get the error
>> message
>> "Source: 1 is not a parallel source" so when I search for I get this from
>> the flink website
>> "collection data sources can not be executed in parallel ( parallelism =
>> 1)."
>>
>> So my question is, can I read my data source(which is currently a file)
>> in
>> any other such that the parallelism is not restricted to 1?
>>
>> *2)* I need to connect 2 datasources over an iteration, for example :
>> "points.iterate().withFeedbackType(Centroid.class);"
>> and run coflatmap transformation, my question is, can I already broadcast
>> some content of centroid type before the
>> ptct.closewith(Centroid.broadcast()) send the data back to the iterator?
>>
>> For example, I tried this but i cant see anything in the map functions
>> /centroidStream.broadcast();
>> ConnectedIterativeStreams&lt;Point, Centroid&gt; ptct=
>> tuples.iterate().withFeedbackType(MicroCluster.class);
>> DataStream
> <Centroid>
>  updatedcentroids = ptct.flatMap(new MyCoFlatmap())
>> inputsAndMicroCluster.closeWith(updatedcentroids .broadcast());/
>>
>>
>> but i can't see the centroids already broadcasted by
>> centroidStream.broadcast() in the map functions.
>>
>> Any kind of help is hugely appreciated.
>>
>> Thanks and Regards
>> Biplob Biswas
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-coflatmap-transformation-tp6721.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-broadcast-over-iteration-in-Flink-Streaming-tp6721p6785.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to