Hi Francis, I'm afraid this is a very strange bug that results from the interplay between pre-aggregating (an optimization that pre-aggregates the elements of a window as they arrive) and the window size/slide size you use. When using some other time values it works, but with other it doesn't, again.
I'm sorry you ran into this problem. We are currently reworking the windowing logic. So in the next release (0.10) this should be rock solid. Cheers, Aljoscha On Mon, 14 Sep 2015 at 09:17 Francis Aranda <frascuc...@gmail.com> wrote: > Testing the apache flink stream API, I found something weird with a > simple example. > > This code counts the words every 5 seconds under a window of 10 seconds. > Until the 10 first seconds, counts sound good, after that, every print > shows a wrong count - one per word. There is something wrong in my code? > > > Thanks in advance! > > def generateWords(ctx: SourceContext[String]) = { > val words = List("amigo", "brazo", "pelo") > while (true) { > Thread.sleep(300) > ctx.collect(words(Random.nextInt(words.length))) > }} > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.setParallelism(1) > val stream = env.addSource(generateWords _)val windowedStream = > stream.map((_, 1)) > .window(Time of(10, SECONDS)).every(Time of(5, SECONDS)) > > val wordCount = windowedStream > .groupBy("_1") > .sum("_2") > > wordCount > .getDiscretizedStream() > .print() > > env.execute("sum randoms") > > The output is: > > [(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds[(pelo,9), (brazo,5), > (amigo,9)] // first 10 seconds[(brazo,1)] [(amigo,1)] > > >