Hi,
could you try pulling the problem apart, i.e. determine at which point in
the pipeline you have duplicate data. Is it after the sources or in the
CoFlatMap or the Map after the reduce, for example?

Cheers,
Aljoscha

On Wed, 1 Jun 2016 at 17:11 Biplob Biswas <revolutioni...@gmail.com> wrote:

> Hi,
>
> Before giving the method u described above a try, i tried adding the
> timestamp with my data directly at the stream source.
>
> Following is my stream source:
>
> http://pastebin.com/AsXiStMC
>
> and I am using the stream source as follows:
>
> DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath,
> streamSpeed));
>                 ConnectedIterativeStreams<Point, MicroCluster[]>
> inputsAndMicroCluster =
> tuples.iterate()
>
>
> .withFeedbackType(MicroCluster[].class);
>                 //mcStream.broadcast().global();
>                 DataStream<MicroCluster[]> updatedMicroCluster =
> inputsAndMicroCluster
>
>                                       .flatMap(new MyCoFlatmap(k,tw))
>
>                                       .keyBy(1)
>
>                                       .reduce(new ReduceMC(k))
>
>                                       .map(new ReturnMC());
>
>
> inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());
>
> The problem is, when i execute this, all the 4 different partition gets the
> same data, I don't really understand how is the same data sent to all the 4
> partitions when it should 4 different data tuple to 4 different partitions.
>
> Can you maybe explain this behaviour?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to