Hi!

sometimes it produces a result, and sometimes it doesn't


Do you mean this scenario: Consider a record from table1 with a.bId = 1.
You know that in table2 there is a record with b.Id = 1, but the join
result is null at the beginning, and then it is updated with the correct
result?

This is the property of streaming joins. If the records with a.bId = 1
arrives earlier than the records with b.Id = 1, the join operator will
first output a null result (because currently there is nothing to join) and
remember that record in state. When later the record b.Id = 1 arrives the
operator will look into the state and update the results. So the results
are eventually correct.

I thought there might be a way to specify the partitioning.


There is no need to specify partitioning for joins. Flink's SQL planner
does this for you. If you look into the web UI you'll see an arrow marked
with HASH pointing from sources to join operators. It means that records
flowing through this arrow will be distributed to the corresponding
parallelism according to the hash values of their join keys.

Curt Buechter <tricksho...@gmail.com> 于2021年11月14日周日 下午11:52写道:

> Hi,
> I'd like to understand a little more how joins work. I have a fairly
> simple LEFT JOIN query, and I'm seeing spotty results on the joins. I know
> there is a record on the right side, but sometimes it produces a result,
> and sometimes it doesn't.
>
> Sample query:
> SELECT a.id, b.val1, c.val2
> FROM table1 a
> LEFT JOIN table2 b ON a.bId = b.Id
> LEFT JOIN table3 c ON a.cId = c.Id
>
> I'm using Flink 1.13.2. All tables are loaded from kafka. Using the
> Table/SQL API.
>
> My suspicion is that the distributed nature of the join causes the
> problem. If I reduce the parallelism to match the number of slots,
> resulting in a single task manager, the joins always (I think) seem to
> work. So, my assumption is that records in table3 that should join to
> table1 are not present on the same task manager, so the join produces null
> values for table3.
>
> I thought there might be a way to specify the partitioning. If the tables
> are from a multi-tenant database, I could specify the tenant-id as the
> partition key, but the Flink SQL "PARTITION BY" statement doesn't seem to
> work that way.
>
> Is there a way to confirm this? Does anyone know if this is a known
> problem, and is there a solution?
>
> Thanks,
> Curt
>

Reply via email to