Till,

  Thanks for your reply, may be I should have given more details. val
stream = DataStream(String,Int) is already windowed.  Ideally I have all
the data that I need in my data stream, all my trying to do is like
HashMap[String,Int] from tuples(String,Int) , if reduce is not the best
solution, can you please suggest another way to do the same.

val source: DataStream[String] = someSource
val stream =
source.keyBy(_._1).window(TumblingEventWindows.of(Time.minutes(xmin))).apply
{ x:String,y:TimeWindow,z:Iterable[(String),w:Collector[(String,Int)]=>
mywindowfunc(x,y,z,w)}
val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
keyedStream.print()

Balaji


On Thu, Mar 24, 2016 at 11:21 PM, Till Rohrmann <trohrm...@apache.org>
wrote:

> Hi Balaji,
>
> the output you see is the correct output since you're computing a
> continuous reduce of the incoming data. Since you haven't defined a time
> frame for your reduce computation you either would have to wait for all
> eternity to output the final result or you output every time you've
> generated a new reduce result  this result (which is of course partial).
> Since the first option is not very practical, Flink emits the partial
> reduce results.
>
> Cheers,
> Till
>
> On Thu, Mar 24, 2016 at 6:21 PM, Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> I have keyed input stream on DateStream(String,Int) and wrote a reduce on
>> the keyedStream. The reduce is simple one summing up the integer values of
>> the same key.
>>
>> val stream = DataStream(String,Int)
>> val keyedStream = stream.keyBy(_._1).reduce( new MyReduceFunction)
>> keyedStream.print()
>>
>> class MyReduceFunction extends ReduceFunction(String,Int) {
>>    override def reduce(in:(String,Int), in1:(String,Int) ) :(String,Int)
>> = {
>>        (in._1, in._2+in1._2)
>>    }
>> }
>>
>> Here is my sample input stream.
>> ( "k1",1)
>> ("k1",1)
>> ("k2",1)
>>
>> I was expecting the output of the above program to return
>> ("k1",2)
>> ("k2",1)
>>
>> where as I got this,
>> ("k1",1)
>> ("k1",2)
>> ("k2",1)
>>
>> Isn't this a incorrect output.
>>
>> Balaji
>>
>
>

Reply via email to