Hi all, Can anyone advise on this?
I wrote a simple test of the countWindow method (in Kotlin) as below package aero.airlab.flinkjobs.headingreminder import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import kotlin.random.Random object CountWindowTest { @JvmStatic fun main(args: Array<String>) { val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRuntimeMode(RuntimeExecutionMode.BATCH) val rand = Random(0) val data = (0..1000).map { Pair(rand.nextInt(10), it) } env.fromCollection(data).assignTimestampsAndWatermarks( WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>() .withTimestampAssigner { e, _ -> e.second.toLong() }) .keyBy { it.first } .countWindow(3L, 1) .reduce { a, b -> b } .keyBy { it.first } .filter { it.first == 5 } .print() env.execute() } } The beginning of the output is as such 12> (5, 184) 12> (5, 18) 12> (5, 29) 12> (5, 37) 12> (5, 38) 12> (5, 112) 12> (5, 131) The first line (5, 184) is not in order from the rest. Is this a bug? The problem disappears if I remove the keyBy after the reduce. Thanks.