Hi devs During the work on https://issues.apache.org/jira/browse/FLINK-10050 I've found unstable behaviour of unit tests for unioned streams (which are used in CoGroupedStream/JoinedStream under the hood). Let's assume we have late elements in one of the stream. The thing is we have no guarantees which source will be read first, and in which order watermark alignment will be applied. So, the following example produce different results for different invocation:
val s1 = env.addSource(new SourceFunction[(String, String)] { override def run(ctx: SourceFunction.SourceContext[(String, String)]): Unit = { ctx.collectWithTimestamp(("a", "a1"), 1) //wmAllignmentLock.synchronized { //wmAllignmentLock.wait() //} ctx.emitWatermark(new Watermark(4)) ctx.collectWithTimestamp(("a", "a2"), 2) } override def cancel(): Unit = {} }) val s2 = env.addSource(new SourceFunction[(String, String)] { override def run(ctx: SourceFunction.SourceContext[(String, String)]): Unit = { ctx.collectWithTimestamp(("a", "b1"), 1) ctx.emitWatermark(new Watermark(4)) //wmAllignmentLock.synchronized { //wmAllignmentLock.notifyAll() //} } override def cancel(): Unit = {} }) val joined = s1.join(s2).where(_._1).equalTo(_._1) .window(TumblingEventTimeWindows.of(Time.milliseconds(3))) .apply((x, y) => s"$x:$y") For some invocations (when Flink decide to process 2nd source before 1st), ("a", "a2") is considered to be late and dropped; and vice versa.Here is the rate for 1000 invocations: Run JOIN periodic iteration [50] contains late total = 22, this iter = 22 iteration [100] contains late total = 51, this iter = 29 iteration [150] contains late total = 78, this iter = 27 iteration [200] contains late total = 101, this iter = 23 iteration [250] contains late total = 124, this iter = 23 iteration [300] contains late total = 155, this iter = 31 iteration [350] contains late total = 184, this iter = 29 iteration [400] contains late total = 210, this iter = 26 iteration [450] contains late total = 233, this iter = 23 iteration [500] contains late total = 256, this iter = 23 iteration [550] contains late total = 274, this iter = 18 iteration [600] contains late total = 303, this iter = 29 iteration [650] contains late total = 338, this iter = 35 iteration [700] contains late total = 367, this iter = 29 iteration [750] contains late total = 393, this iter = 26 iteration [800] contains late total = 415, this iter = 22 iteration [850] contains late total = 439, this iter = 24 iteration [900] contains late total = 459, this iter = 20 iteration [950] contains late total = 484, this iter = 25 iteration [1000] contains late total = 502, this iter = 18 contains late = 502 It doesn't matter Periodic or Punctuated watermark assigner is used. As well as syncronization mechanism (commented in the code snippet above) doesn't help to align records in particular order. While this behaviour is totally fine for Production case, I just wonder how to write stable unit test scenario to cover late elements processing. I didn't find any suitable test harness from utils. Any feedback is appreciated! Regards, Eugen