Hi, sure, sharing it again: SELECT a.funder, a.amounts_added, r.amounts_removed FROM table_a AS a JOIN table_b AS r ON a.funder = r.funder
and the Optimized Execution Plan: Calc(select=[funder, vid AS a_vid, vid0 AS r_vid, amounts_added, amounts_removed]) +- Join(joinType=[InnerJoin], where=[(funder = funder0)], select=[vid, funder, amounts_added, vid0, funder0, amounts_removed], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[funder]]) : +- Calc(select=[vid, funder, amounts_added]) : +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[vid, block_range, id, timestamp, fpmm, funder, amounts_added, amounts_refunded, shares_minted, _gs_chain, _gs_gid]) +- Exchange(distribution=[hash[funder]]) +- Calc(select=[vid, funder, amounts_removed]) +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], fields=[vid, block_range, id, timestamp, fpmm, funder, amounts_removed, collateral_removed, shares_burnt, _gs_chain, _gs_gid]) So I see an Exchange, which makes sense. I'm still confused about how it can be non-deterministic... On Mon, Oct 23, 2023 at 5:51 AM Xuyang <xyzhong...@163.com> wrote: > Hi, Could you share the original SQL? If not, could you share the plan > after executing 'EXPLAIN ...'. There should be one node 'Exchange' as the > both inputs of the 'Join' in both "== Optimized Physical Plan ==" and "== > Optimized Execution Plan ==". > > > -- > Best! > Xuyang > > > 在 2023-10-20 15:57:28,"Yaroslav Tkachenko" <yaros...@goldsky.com> 写道: > > Hi Xuyang, > > A shuffle by join key is what I'd expect, but I don't see it. The issue > only happens with parallelism > 1. > > > do you mean the one +I record and two +U records arrive the sink with > random order? > > Yes. > > On Fri, Oct 20, 2023 at 4:48 AM Xuyang <xyzhong...@163.com> wrote: > >> Hi. Actually the results that arrive join are shuffled by join keys by >> design. >> >> In your test, do you means the one +I record and two +U records arrive >> the sink with random order? What is the parallelism of these operators ? It >> would be better if you could post an example that can be reproduced. >> >> >> >> >> -- >> Best! >> Xuyang >> >> >> At 2023-10-20 04:31:09, "Yaroslav Tkachenko" <yaros...@goldsky.com> >> wrote: >> >> Hi everyone, >> >> I noticed that a simple INNER JOIN in Flink SQL behaves >> non-deterministicly. I'd like to understand if it's expected and whether an >> issue is created to address it. >> >> >> In my example, I have the following query: >> >> SELECT a.funder, a.amounts_added, r.amounts_removed FROM table_a AS a >> JOIN table_b AS r ON a.funder = r.funder >> >> Let's say I have three records with funder 12345 in the table_a and a >> single record with funder 12345 in the table_b. When I run this Flink job, >> I can see an INSERT with two UPDATEs as my results (corresponding to the >> records from table_a), but their order is not deterministic. If I re-run >> the application several times, I can see different results. >> >> It looks like Flink uses a GlobalPartitioner in this case, which tells me >> that it doesn't perform a shuffle on the column used in the join condition. >> >> >> I use Flink 1.17.1. Appreciate any insights here! >> >>