Hi All,

I have the below code


val sev = StreamExecutionEnvironment.getExecutionEnvironment
val socTextStream = sev.socketTextStream("localhost",4444)

val counts = socTextStream.flatMap{_.split("\\s")}
  .map { (_, 1) }
  .keyBy(0)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
  .trigger(CountTrigger.of(5))
  .sum(1)

counts.print()
sev.execute()

I am sending messages to the port 4444 using nc -lk 4444
This is my sample input

a
a
a
a
a
b
b
b
b
b
c
c
c
c
c
d
d
d
d
d
e
e
e
e
e

I am sending 5 of each letter since I have a Count Trigger of 5. I was
expecting that for each 5 character, the code will print 5, i.e., (a,5)
(b,5) etc. But the output I am getting is little confusing.
Output:

1> (a,5)
1> (a,5)
1> (b,5)
2> (c,5)
2> (c,5)
1> (d,5)
1> (e,5)
1> (e,5)

As you can see, for some character the count is printed twice(a,c,e) and
for some characters it is printed only once (b,d). I am not able to figure
out what is going on. I think it may have something to do with the
SlidingProcessingTimeWindow but I am not sure.
Can someone explain me what is going on?


Thanks and Regards,
Vishnu Viswanath
www.vishnuviswanath.com
​

Reply via email to