Hi, Could you share the original SQL? If not, could you share the plan after 
executing 'EXPLAIN ...'. There should be one node 'Exchange' as the both inputs 
of the 'Join'  in both "== Optimized Physical Plan ==" and "== Optimized 
Execution Plan ==".




--

    Best!
    Xuyang




在 2023-10-20 15:57:28,"Yaroslav Tkachenko" <yaros...@goldsky.com> 写道:

Hi Xuyang,


A shuffle by join key is what I'd expect, but I don't see it. The issue only 
happens with parallelism > 1. 


> do you mean the one +I record and two +U records arrive the sink with random 
> order?


Yes. 


On Fri, Oct 20, 2023 at 4:48 AM Xuyang <xyzhong...@163.com> wrote:

Hi. Actually the results that arrive join are shuffled by join keys by design. 


In your test, do you means the one +I record and two +U records arrive the sink 
with random order? What is the parallelism of these operators ? It would be 
better if you could post an example that can be reproduced.










--

    Best!
    Xuyang




At 2023-10-20 04:31:09, "Yaroslav Tkachenko" <yaros...@goldsky.com> wrote:

Hi everyone,


I noticed that a simple INNER JOIN in Flink SQL behaves non-deterministicly. 
I'd like to understand if it's expected and whether an issue is created to 
address it.




In my example, I have the following query: 


SELECT a.funder, a.amounts_added, r.amounts_removed FROM table_a AS a JOIN 
table_b AS r ON a.funder = r.funder



Let's say I have three records with funder 12345 in the table_a and a single 
record with funder 12345 in the table_b. When I run this Flink job, I can see 
an INSERT with two UPDATEs as my results (corresponding to the records from 
table_a), but their order is not deterministic. If I re-run the application 
several times, I can see different results. 


It looks like Flink uses a GlobalPartitioner in this case, which tells me that 
it doesn't perform a shuffle on the column used in the join condition.




I use Flink 1.17.1. Appreciate any insights here! 

Reply via email to