Testing the apache flink stream API, I found something weird with a simple 
example.

This code counts the words every 5 seconds under a window of 10 seconds. Until 
the 10 first seconds, counts sound good, after that, every print shows a wrong 
count - one per word. There is something wrong in my code? 



Thanks in advance!

def generateWords(ctx: SourceContext[String]) = {
  val words = List("amigo", "brazo", "pelo")
  while (true) {
    Thread.sleep(300)
    ctx.collect(words(Random.nextInt(words.length)))
  }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

val stream = env.addSource(generateWords _)
val windowedStream = stream.map((_, 1))
  .window(Time of(10, SECONDS)).every(Time of(5, SECONDS))


val wordCount = windowedStream
  .groupBy("_1")
  .sum("_2")

wordCount
 .getDiscretizedStream()
  .print()

env.execute("sum randoms")
The output is:

[(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds
[(pelo,9), (brazo,5), (amigo,9)] // first 10 seconds
[(brazo,1)] 
[(amigo,1)]

Reply via email to