Testing the apache flink stream API, I found something weird with a simple example.
This code counts the words every 5 seconds under a window of 10 seconds. Until the 10 first seconds, counts sound good, after that, every print shows a wrong count - one per word. There is something wrong in my code? Thanks in advance! def generateWords(ctx: SourceContext[String]) = { val words = List("amigo", "brazo", "pelo") while (true) { Thread.sleep(300) ctx.collect(words(Random.nextInt(words.length))) } } val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream = env.addSource(generateWords _) val windowedStream = stream.map((_, 1)) .window(Time of(10, SECONDS)).every(Time of(5, SECONDS)) val wordCount = windowedStream .groupBy("_1") .sum("_2") wordCount .getDiscretizedStream() .print() env.execute("sum randoms") The output is: [(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds [(pelo,9), (brazo,5), (amigo,9)] // first 10 seconds [(brazo,1)] [(amigo,1)]