Never mind Till figured out a way, instead of doing the aggregation in
reduce, I moved that logic to apply of the window function.

>   Thanks for your reply, may be I should have given more details. val
> stream = DataStream(String,Int) is already windowed.  Ideally I have all
> the data that I need in my data stream, all my trying to do is like
> HashMap[String,Int] from tuples(String,Int) , if reduce is not the best
> solution, can you please suggest another way to do the same.
> val source: DataStream[String] = someSource
> val stream =
> source.keyBy(_._1).window(TumblingEventWindows.of(Time.minutes(xmin))).apply
> { x:String,y:TimeWindow,z:Iterable[(String),w:Collector[(String,Int)]=>
> mywindowfunc(x,y,z,w)}
> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
> keyedStream.print()
> Balaji
>> Hi Balaji,
>> the output you see is the correct output since you're computing a
>> continuous reduce of the incoming data. Since you haven't defined a time
>> frame for your reduce computation you either would have to wait for all
>> eternity to output the final result or you output every time you've
>> generated a new reduce result  this result (which is of course partial).
>> Since the first option is not very practical, Flink emits the partial
>> reduce results.
>> Cheers,
>> Till
>>> I have keyed input stream on DateStream(String,Int) and wrote a reduce
>>> on the keyedStream. The reduce is simple one summing up the integer values
>>> of the same key.
>>> val stream = DataStream(String,Int)
>>> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
>>> keyedStream.print()
>>> class MyReduceFunction extends ReduceFunction(String,Int) {
>>>    override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int)
>>> = {
>>>        (in._1, in._2+in1._2)
>>>    }
>>> }
>>> Here is my sample input stream.
>>> ( "k1",1)
>>> ("k1",1)
>>> ("k2",1)
>>> I was expecting the output of the above program to return
>>> ("k1",2)
>>> ("k2",1)
>>> where as I got this,
>>> ("k1",1)
>>> ("k1",2)
>>> ("k2",1)
>>> Isn't this a incorrect output.
>>> Balaji

