Hi, Before giving the method u described above a try, i tried adding the timestamp with my data directly at the stream source.
Following is my stream source: http://pastebin.com/AsXiStMC and I am using the stream source as follows: DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed)); ConnectedIterativeStreams<Point, MicroCluster[]> inputsAndMicroCluster = tuples.iterate() .withFeedbackType(MicroCluster[].class); //mcStream.broadcast().global(); DataStream<MicroCluster[]> updatedMicroCluster = inputsAndMicroCluster .flatMap(new MyCoFlatmap(k,tw)) .keyBy(1) .reduce(new ReduceMC(k)) .map(new ReturnMC()); inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast()); The problem is, when i execute this, all the 4 different partition gets the same data, I don't really understand how is the same data sent to all the 4 partitions when it should 4 different data tuple to 4 different partitions. Can you maybe explain this behaviour? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.