Never mind Till figured out a way, instead of doing the aggregation in reduce, I moved that logic to apply of the window function.
On Thu, Mar 24, 2016 at 11:33 PM, Balaji Rajagopalan < balaji.rajagopa...@olacabs.com> wrote: > Till, > > Thanks for your reply, may be I should have given more details. val > stream = DataStream(String,Int) is already windowed. Ideally I have all > the data that I need in my data stream, all my trying to do is like > HashMap[String,Int] from tuples(String,Int) , if reduce is not the best > solution, can you please suggest another way to do the same. > > val source: DataStream[String] = someSource > val stream = > source.keyBy(_._1).window(TumblingEventWindows.of(Time.minutes(xmin))).apply > { x:String,y:TimeWindow,z:Iterable[(String),w:Collector[(String,Int)]=> > mywindowfunc(x,y,z,w)} > val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction) > keyedStream.print() > > Balaji > > > On Thu, Mar 24, 2016 at 11:21 PM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Balaji, >> >> the output you see is the correct output since you're computing a >> continuous reduce of the incoming data. Since you haven't defined a time >> frame for your reduce computation you either would have to wait for all >> eternity to output the final result or you output every time you've >> generated a new reduce result this result (which is of course partial). >> Since the first option is not very practical, Flink emits the partial >> reduce results. >> >> Cheers, >> Till >> >> On Thu, Mar 24, 2016 at 6:21 PM, Balaji Rajagopalan < >> balaji.rajagopa...@olacabs.com> wrote: >> >>> I have keyed input stream on DateStream(String,Int) and wrote a reduce >>> on the keyedStream. The reduce is simple one summing up the integer values >>> of the same key. >>> >>> val stream = DataStream(String,Int) >>> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction) >>> keyedStream.print() >>> >>> class MyReduceFunction extends ReduceFunction(String,Int) { >>> override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int) >>> = { >>> (in._1, in._2+in1._2) >>> } >>> } >>> >>> Here is my sample input stream. >>> ( "k1",1) >>> ("k1",1) >>> ("k2",1) >>> >>> I was expecting the output of the above program to return >>> ("k1",2) >>> ("k2",1) >>> >>> where as I got this, >>> ("k1",1) >>> ("k1",2) >>> ("k2",1) >>> >>> Isn't this a incorrect output. >>> >>> Balaji >>> >> >> >