Hi everyone, I'm seeing a lack of determinism in unit tests when using an interval join. I am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits of my pipeline look a bit like this:
keySelector1 = ... keySelector2 = ... rightStream = stream1 .flatMap(...) .keyBy(keySelector1) .assignTimestampsAndWatermarks(strategy1) leftStream = stream2 .keyBy(keySelector2) .assignTimestampsAndWatermarks(strategy2) joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream, keySelector2) .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream, keySelector1)) .between(Time.minutes(-10L), Time.milliseconds(0L)) .lowerBoundExclusive() .process(new IntervalJoinFunction(...)) --- In my tests, I have a bounded source that loads demo data from a file and simulates the stream with a sink that collects results in memory. In the specific case of my IntervalJoinFunction, I'm seeing that it's called a different amount of times in a non-deterministic way, sometimes I see 14 calls to its processElement() method, others 8, others none at all and my output is empty; I count this by checking my logs with some tracing. Does anyone know why this is? Maybe I'm doing something wrong, particularly with reinterpretAsKeyedStream. Regards, Alexis.