Hi, Could you share the original SQL? If not, could you share the plan after executing 'EXPLAIN ...'. There should be one node 'Exchange' as the both inputs of the 'Join' in both "== Optimized Physical Plan ==" and "== Optimized Execution Plan ==".
-- Best! Xuyang 在 2023-10-20 15:57:28,"Yaroslav Tkachenko" <yaros...@goldsky.com> 写道: Hi Xuyang, A shuffle by join key is what I'd expect, but I don't see it. The issue only happens with parallelism > 1. > do you mean the one +I record and two +U records arrive the sink with random > order? Yes. On Fri, Oct 20, 2023 at 4:48 AM Xuyang <xyzhong...@163.com> wrote: Hi. Actually the results that arrive join are shuffled by join keys by design. In your test, do you means the one +I record and two +U records arrive the sink with random order? What is the parallelism of these operators ? It would be better if you could post an example that can be reproduced. -- Best! Xuyang At 2023-10-20 04:31:09, "Yaroslav Tkachenko" <yaros...@goldsky.com> wrote: Hi everyone, I noticed that a simple INNER JOIN in Flink SQL behaves non-deterministicly. I'd like to understand if it's expected and whether an issue is created to address it. In my example, I have the following query: SELECT a.funder, a.amounts_added, r.amounts_removed FROM table_a AS a JOIN table_b AS r ON a.funder = r.funder Let's say I have three records with funder 12345 in the table_a and a single record with funder 12345 in the table_b. When I run this Flink job, I can see an INSERT with two UPDATEs as my results (corresponding to the records from table_a), but their order is not deterministic. If I re-run the application several times, I can see different results. It looks like Flink uses a GlobalPartitioner in this case, which tells me that it doesn't perform a shuffle on the column used in the join condition. I use Flink 1.17.1. Appreciate any insights here!