I have keyed input stream on DateStream(String,Int) and wrote a reduce on the keyedStream. The reduce is simple one summing up the integer values of the same key.
val stream = DataStream(String,Int) val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction) keyedStream.print() class MyReduceFunction extends ReduceFunction(String,Int) { override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) = { (in._1, in._2+in1._2) } } Here is my sample input stream. ( "k1",1) ("k1",1) ("k2",1) I was expecting the output of the above program to return ("k1",2) ("k2",1) where as I got this, ("k1",1) ("k1",2) ("k2",1) Isn't this a incorrect output. Balaji