Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if the behavior gets deterministic?
On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa < alexis.sarda-espin...@microfocus.com> wrote: > I'm not sure if the issue in [1] is relevant since it mentions the Table > API, but it could be. Since stream1 and stream2 in my example have a long > chain of operators behind, I presume they might "run" at very different > paces. > > Oh and, in the context of my unit tests, watermarks should be > deterministic, the input file is sorted, and the watermark strategies > should essentially behave like the monotonous generator. > > [1] https://issues.apache.org/jira/browse/FLINK-24466 > > Regards, > Alexis. > > ------------------------------ > *From:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > *Sent:* Thursday, January 27, 2022 1:30 PM > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Determinism of interval joins > > > Hi everyone, > > > > I’m seeing a lack of determinism in unit tests when using an interval > join. I am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits > of my pipeline look a bit like this: > > > > keySelector1 = … > > keySelector2 = … > > > > rightStream = stream1 > > .flatMap(…) > > .keyBy(keySelector1) > > .assignTimestampsAndWatermarks(strategy1) > > > > leftStream = stream2 > > .keyBy(keySelector2) > > .assignTimestampsAndWatermarks(strategy2) > > > > joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream, > keySelector2) > > .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream, > keySelector1)) > > .between(Time.minutes(-10L), Time.milliseconds(0L)) > > .lowerBoundExclusive() > > .process(new IntervalJoinFunction(…)) > > > > --- > > > > In my tests, I have a bounded source that loads demo data from a file and > simulates the stream with a sink that collects results in memory. In the > specific case of my IntervalJoinFunction, I’m seeing that it’s called a > different amount of times in a non-deterministic way, sometimes I see 14 > calls to its processElement() method, others 8, others none at all and my > output is empty; I count this by checking my logs with some tracing. > > > > Does anyone know why this is? Maybe I’m doing something wrong, > particularly with reinterpretAsKeyedStream. > > > > Regards, > > Alexis. > > >