Hi, Could you share the original SQL? If not, could you share the plan after
executing 'EXPLAIN ...'. There should be one node 'Exchange' as the both inputs
of the 'Join' in both "== Optimized Physical Plan ==" and "== Optimized
Execution Plan ==".
--
Best!
Xuyang
在 2023-10-20 15:57:28,"Yaroslav Tkachenko" <[email protected]> 写道:
Hi Xuyang,
A shuffle by join key is what I'd expect, but I don't see it. The issue only
happens with parallelism > 1.
> do you mean the one +I record and two +U records arrive the sink with random
> order?
Yes.
On Fri, Oct 20, 2023 at 4:48 AM Xuyang <[email protected]> wrote:
Hi. Actually the results that arrive join are shuffled by join keys by design.
In your test, do you means the one +I record and two +U records arrive the sink
with random order? What is the parallelism of these operators ? It would be
better if you could post an example that can be reproduced.
--
Best!
Xuyang
At 2023-10-20 04:31:09, "Yaroslav Tkachenko" <[email protected]> wrote:
Hi everyone,
I noticed that a simple INNER JOIN in Flink SQL behaves non-deterministicly.
I'd like to understand if it's expected and whether an issue is created to
address it.
In my example, I have the following query:
SELECT a.funder, a.amounts_added, r.amounts_removed FROM table_a AS a JOIN
table_b AS r ON a.funder = r.funder
Let's say I have three records with funder 12345 in the table_a and a single
record with funder 12345 in the table_b. When I run this Flink job, I can see
an INSERT with two UPDATEs as my results (corresponding to the records from
table_a), but their order is not deterministic. If I re-run the application
several times, I can see different results.
It looks like Flink uses a GlobalPartitioner in this case, which tells me that
it doesn't perform a shuffle on the column used in the join condition.
I use Flink 1.17.1. Appreciate any insights here!