Hi, sure, sharing it again:

SELECT a.funder, a.amounts_added, r.amounts_removed FROM table_a AS a JOIN
table_b AS r ON a.funder = r.funder

and the Optimized Execution Plan:

Calc(select=[funder, vid AS a_vid, vid0 AS r_vid, amounts_added,
amounts_removed])
+- Join(joinType=[InnerJoin], where=[(funder = funder0)], select=[vid,
funder, amounts_added, vid0, funder0, amounts_removed],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[funder]])
   :  +- Calc(select=[vid, funder, amounts_added])
   :     +- TableSourceScan(table=[[*anonymous_datastream_source$1*]],
fields=[vid, block_range, id, timestamp, fpmm, funder, amounts_added,
amounts_refunded, shares_minted, _gs_chain, _gs_gid])
   +- Exchange(distribution=[hash[funder]])
      +- Calc(select=[vid, funder, amounts_removed])
         +- TableSourceScan(table=[[*anonymous_datastream_source$2*]],
fields=[vid, block_range, id, timestamp, fpmm, funder, amounts_removed,
collateral_removed, shares_burnt, _gs_chain, _gs_gid])

So I see an Exchange, which makes sense. I'm still confused about how it
can be non-deterministic...

On Mon, Oct 23, 2023 at 5:51 AM Xuyang <xyzhong...@163.com> wrote:

> Hi, Could you share the original SQL? If not, could you share the plan
> after executing 'EXPLAIN ...'. There should be one node 'Exchange' as the
> both inputs of the 'Join'  in both "== Optimized Physical Plan ==" and "==
> Optimized Execution Plan ==".
>
>
> --
>     Best!
>     Xuyang
>
>
> 在 2023-10-20 15:57:28,"Yaroslav Tkachenko" <yaros...@goldsky.com> 写道:
>
> Hi Xuyang,
>
> A shuffle by join key is what I'd expect, but I don't see it. The issue
> only happens with parallelism > 1.
>
> > do you mean the one +I record and two +U records arrive the sink with
> random order?
>
> Yes.
>
> On Fri, Oct 20, 2023 at 4:48 AM Xuyang <xyzhong...@163.com> wrote:
>
>> Hi. Actually the results that arrive join are shuffled by join keys by
>> design.
>>
>> In your test, do you means the one +I record and two +U records arrive
>> the sink with random order? What is the parallelism of these operators ? It
>> would be better if you could post an example that can be reproduced.
>>
>>
>>
>>
>> --
>>     Best!
>>     Xuyang
>>
>>
>> At 2023-10-20 04:31:09, "Yaroslav Tkachenko" <yaros...@goldsky.com>
>> wrote:
>>
>> Hi everyone,
>>
>> I noticed that a simple INNER JOIN in Flink SQL behaves
>> non-deterministicly. I'd like to understand if it's expected and whether an
>> issue is created to address it.
>>
>>
>> In my example, I have the following query:
>>
>> SELECT a.funder, a.amounts_added, r.amounts_removed FROM table_a AS a
>> JOIN table_b AS r ON a.funder = r.funder
>>
>> Let's say I have three records with funder 12345 in the table_a and a
>> single record with funder 12345 in the table_b. When I run this Flink job,
>> I can see an INSERT with two UPDATEs as my results (corresponding to the
>> records from table_a), but their order is not deterministic. If I re-run
>> the application several times, I can see different results.
>>
>> It looks like Flink uses a GlobalPartitioner in this case, which tells me
>> that it doesn't perform a shuffle on the column used in the join condition.
>>
>>
>> I use Flink 1.17.1. Appreciate any insights here!
>>
>>

Reply via email to