I'm not sure if the issue in [1] is relevant since it mentions the Table API, but it could be. Since stream1 and stream2 in my example have a long chain of operators behind, I presume they might "run" at very different paces.
Oh and, in the context of my unit tests, watermarks should be deterministic, the input file is sorted, and the watermark strategies should essentially behave like the monotonous generator. [1] https://issues.apache.org/jira/browse/FLINK-24466 Regards, Alexis. ________________________________ From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> Sent: Thursday, January 27, 2022 1:30 PM To: user@flink.apache.org <user@flink.apache.org> Subject: Determinism of interval joins Hi everyone, I’m seeing a lack of determinism in unit tests when using an interval join. I am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits of my pipeline look a bit like this: keySelector1 = … keySelector2 = … rightStream = stream1 .flatMap(…) .keyBy(keySelector1) .assignTimestampsAndWatermarks(strategy1) leftStream = stream2 .keyBy(keySelector2) .assignTimestampsAndWatermarks(strategy2) joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream, keySelector2) .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream, keySelector1)) .between(Time.minutes(-10L), Time.milliseconds(0L)) .lowerBoundExclusive() .process(new IntervalJoinFunction(…)) --- In my tests, I have a bounded source that loads demo data from a file and simulates the stream with a sink that collects results in memory. In the specific case of my IntervalJoinFunction, I’m seeing that it’s called a different amount of times in a non-deterministic way, sometimes I see 14 calls to its processElement() method, others 8, others none at all and my output is empty; I count this by checking my logs with some tracing. Does anyone know why this is? Maybe I’m doing something wrong, particularly with reinterpretAsKeyedStream. Regards, Alexis.