Hi everyone,

I'm seeing a lack of determinism in unit tests when using an interval join. I 
am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits of my 
pipeline look a bit like this:

keySelector1 = ...
keySelector2 = ...

rightStream = stream1
  .flatMap(...)
  .keyBy(keySelector1)
  .assignTimestampsAndWatermarks(strategy1)

leftStream = stream2
  .keyBy(keySelector2)
  .assignTimestampsAndWatermarks(strategy2)

joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream, 
keySelector2)
  .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream, 
keySelector1))
  .between(Time.minutes(-10L), Time.milliseconds(0L))
  .lowerBoundExclusive()
  .process(new IntervalJoinFunction(...))

---

In my tests, I have a bounded source that loads demo data from a file and 
simulates the stream with a sink that collects results in memory. In the 
specific case of my IntervalJoinFunction, I'm seeing that it's called a 
different amount of times in a non-deterministic way, sometimes I see 14 calls 
to its processElement() method, others 8, others none at all and my output is 
empty; I count this by checking my logs with some tracing.

Does anyone know why this is? Maybe I'm doing something wrong, particularly 
with reinterpretAsKeyedStream.

Regards,
Alexis.

Reply via email to