Till, Thanks for your reply, may be I should have given more details. val stream = DataStream(String,Int) is already windowed. Ideally I have all the data that I need in my data stream, all my trying to do is like HashMap[String,Int] from tuples(String,Int) , if reduce is not the best solution, can you please suggest another way to do the same.
val source: DataStream[String] = someSource val stream = source.keyBy(_._1).window(TumblingEventWindows.of(Time.minutes(xmin))).apply { x:String,y:TimeWindow,z:Iterable[(String),w:Collector[(String,Int)]=> mywindowfunc(x,y,z,w)} val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction) keyedStream.print() Balaji On Thu, Mar 24, 2016 at 11:21 PM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Balaji, > > the output you see is the correct output since you're computing a > continuous reduce of the incoming data. Since you haven't defined a time > frame for your reduce computation you either would have to wait for all > eternity to output the final result or you output every time you've > generated a new reduce result this result (which is of course partial). > Since the first option is not very practical, Flink emits the partial > reduce results. > > Cheers, > Till > > On Thu, Mar 24, 2016 at 6:21 PM, Balaji Rajagopalan < > balaji.rajagopa...@olacabs.com> wrote: > >> I have keyed input stream on DateStream(String,Int) and wrote a reduce on >> the keyedStream. The reduce is simple one summing up the integer values of >> the same key. >> >> val stream = DataStream(String,Int) >> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction) >> keyedStream.print() >> >> class MyReduceFunction extends ReduceFunction(String,Int) { >> override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) >> = { >> (in._1, in._2+in1._2) >> } >> } >> >> Here is my sample input stream. >> ( "k1",1) >> ("k1",1) >> ("k2",1) >> >> I was expecting the output of the above program to return >> ("k1",2) >> ("k2",1) >> >> where as I got this, >> ("k1",1) >> ("k1",2) >> ("k2",1) >> >> Isn't this a incorrect output. >> >> Balaji >> > >