Hi All,
I have the below code val sev = StreamExecutionEnvironment.getExecutionEnvironment val socTextStream = sev.socketTextStream("localhost",4444) val counts = socTextStream.flatMap{_.split("\\s")} .map { (_, 1) } .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10))) .trigger(CountTrigger.of(5)) .sum(1) counts.print() sev.execute() I am sending messages to the port 4444 using nc -lk 4444 This is my sample input a a a a a b b b b b c c c c c d d d d d e e e e e I am sending 5 of each letter since I have a Count Trigger of 5. I was expecting that for each 5 character, the code will print 5, i.e., (a,5) (b,5) etc. But the output I am getting is little confusing. Output: 1> (a,5) 1> (a,5) 1> (b,5) 2> (c,5) 2> (c,5) 1> (d,5) 1> (e,5) 1> (e,5) As you can see, for some character the count is printed twice(a,c,e) and for some characters it is printed only once (b,d). I am not able to figure out what is going on. I think it may have something to do with the SlidingProcessingTimeWindow but I am not sure. Can someone explain me what is going on? Thanks and Regards, Vishnu Viswanath www.vishnuviswanath.com