I have keyed input stream on DateStream(String,Int) and wrote a reduce on
the keyedStream. The reduce is simple one summing up the integer values of
the same key.

val stream = DataStream(String,Int)
val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
keyedStream.print()

class MyReduceFunction extends ReduceFunction(String,Int) {
   override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) = {
       (in._1, in._2+in1._2)
   }
}

Here is my sample input stream.
( "k1",1)
("k1",1)
("k2",1)

I was expecting the output of the above program to return
("k1",2)
("k2",1)

where as I got this,
("k1",1)
("k1",2)
("k2",1)

Isn't this a incorrect output.

Balaji

Reply via email to