Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if
the behavior gets deterministic?

On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> I'm not sure if the issue in [1] is relevant since it mentions the Table
> API, but it could be. Since stream1 and stream2 in my example have a long
> chain of operators behind, I presume they might "run" at very different
> paces.
>
> Oh and, in the context of my unit tests, watermarks should be
> deterministic, the input file is sorted, and the watermark strategies
> should essentially behave like the monotonous generator.
>
> [1] https://issues.apache.org/jira/browse/FLINK-24466
>
> Regards,
> Alexis.
>
> ------------------------------
> *From:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Sent:* Thursday, January 27, 2022 1:30 PM
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Determinism of interval joins
>
>
> Hi everyone,
>
>
>
> I’m seeing a lack of determinism in unit tests when using an interval
> join. I am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits
> of my pipeline look a bit like this:
>
>
>
> keySelector1 = …
>
> keySelector2 = …
>
>
>
> rightStream = stream1
>
>   .flatMap(…)
>
>   .keyBy(keySelector1)
>
>   .assignTimestampsAndWatermarks(strategy1)
>
>
>
> leftStream = stream2
>
>   .keyBy(keySelector2)
>
>   .assignTimestampsAndWatermarks(strategy2)
>
>
>
> joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream,
> keySelector2)
>
>   .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream,
> keySelector1))
>
>   .between(Time.minutes(-10L), Time.milliseconds(0L))
>
>   .lowerBoundExclusive()
>
>   .process(new IntervalJoinFunction(…))
>
>
>
> ---
>
>
>
> In my tests, I have a bounded source that loads demo data from a file and
> simulates the stream with a sink that collects results in memory. In the
> specific case of my IntervalJoinFunction, I’m seeing that it’s called a
> different amount of times in a non-deterministic way, sometimes I see 14
> calls to its processElement() method, others 8, others none at all and my
> output is empty; I count this by checking my logs with some tracing.
>
>
>
> Does anyone know why this is? Maybe I’m doing something wrong,
> particularly with reinterpretAsKeyedStream.
>
>
>
> Regards,
>
> Alexis.
>
>
>

Reply via email to