Hi,
regarding 1)  the source needs to implement the ParallelSourceFunction or
RichParallelSourceFunction interface to allow it to have a higher
parallelism than 1.

regarding 2) I wrote a small example that showcases how to do it:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStream<String> mainInput = env.fromElements("Hello", "Zwei", "drei");
DataStream<String> initialIterateInput = env.fromElements("bcast 1", "bcast
2", "bcast 3");


IterativeStream.ConnectedIterativeStreams<String, String> iteration =

mainInput.iterate().withFeedbackType(BasicTypeInfo.STRING_TYPE_INFO);

SingleOutputStreamOperator<String> iterateHead = iteration
        .flatMap(new CoFlatMapFunction<String, String, String>() {
            @Override
            public void flatMap1(String value, Collector<String> out)
throws Exception {
                Thread.sleep(1000);
                System.out.println("SEEING FROM INPUT 1: " + value);

                out.collect(value);
            }

            @Override
            public void flatMap2(String value, Collector<String> out)
throws Exception {
                Thread.sleep(1000);
                System.out.println("SEEING FROM INPUT 2: " + value);

                out.collect(value);

            }
        });

iteration.closeWith(initialIterateInput.broadcast().union(iterateHead.broadcast()));

iterateHead.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
        System.out.println("SEEING OUTPUT FROM ITERATION: " + value);
        return value;
    }
});

I inserted Thread.sleep(1000) so that you can observe what is happening. If
you remove them it iterates too fast.

Cheers,
Aljoscha


On Thu, 5 May 2016 at 20:43 Biplob Biswas <revolutioni...@gmail.com> wrote:

> Hi,
>
> I have 2 different questions, both influencing each other in a way.
>
> *1)* I am getting a stream of tuples from a data generator using the
> following statements,
> "env.addSource(new DataStreamGenerator(filePath));"
>
> This generator reads a line from the file and splits it into different
> attributes and returns the entire thing as an object.
> My problem here is that the parallelism of this data source is by default 1
> and if i force it to change by using setParallelism, i get the error
> message
> "Source: 1 is not a parallel source" so when I search for I get this from
> the flink website
> "collection data sources can not be executed in parallel ( parallelism =
> 1)."
>
> So my question is, can I read my data source(which is currently a file) in
> any other such that the parallelism is not restricted to 1?
>
> *2)* I need to connect 2 datasources over an iteration, for example :
> "points.iterate().withFeedbackType(Centroid.class);"
> and run coflatmap transformation, my question is, can I already broadcast
> some content of centroid type before the
> ptct.closewith(Centroid.broadcast()) send the data back to the iterator?
>
> For example, I tried this but i cant see anything in the map functions
> /centroidStream.broadcast();
> ConnectedIterativeStreams<Point, Centroid> ptct=
> tuples.iterate().withFeedbackType(MicroCluster.class);
> DataStream<Centroid> updatedcentroids = ptct.flatMap(new MyCoFlatmap())
> inputsAndMicroCluster.closeWith(updatedcentroids .broadcast());/
>
>
> but i can't see the centroids already broadcasted by
> centroidStream.broadcast() in the map functions.
>
> Any kind of help is hugely appreciated.
>
> Thanks and Regards
> Biplob Biswas
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-source-Parallelism-and-usage-of-coflatmap-transformation-tp6721.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to