Hi,

Before giving the method u described above a try, i tried adding the
timestamp with my data directly at the stream source.

Following is my stream source:

http://pastebin.com/AsXiStMC

and I am using the stream source as follows:

DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath,
streamSpeed));
                ConnectedIterativeStreams<Point, MicroCluster[]> 
inputsAndMicroCluster =
tuples.iterate()
                                                                                
                                .withFeedbackType(MicroCluster[].class);
                //mcStream.broadcast().global();
                DataStream<MicroCluster[]> updatedMicroCluster = 
inputsAndMicroCluster
                                                                                
                                .flatMap(new MyCoFlatmap(k,tw))
                                                                                
                                .keyBy(1)
                                                                                
                                .reduce(new ReduceMC(k))
                                                                                
                                .map(new ReturnMC());
                
                
inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());

The problem is, when i execute this, all the 4 different partition gets the
same data, I don't really understand how is the same data sent to all the 4
partitions when it should 4 different data tuple to 4 different partitions.

Can you maybe explain this behaviour? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to