Hi!

Thanks for the information. If possible could you share your test case
(including SQL and configurations) so that we can look deeper into this
problem?

Curt Buechter <tricksho...@gmail.com> 于2021年11月15日周一 上午9:45写道:

> Thanks for the response. But, no, that's not the scenario. I'm familiar
> with later arriving data. In my case, though, the record on table 2 never
> arrives.
> I'm not positive, but it may only be happening when there are three or
> more tables being joined in one statement.
>
> On Sun, Nov 14, 2021, 7:32 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> sometimes it produces a result, and sometimes it doesn't
>>
>>
>> Do you mean this scenario: Consider a record from table1 with a.bId = 1.
>> You know that in table2 there is a record with b.Id = 1, but the join
>> result is null at the beginning, and then it is updated with the correct
>> result?
>>
>> This is the property of streaming joins. If the records with a.bId = 1
>> arrives earlier than the records with b.Id = 1, the join operator will
>> first output a null result (because currently there is nothing to join) and
>> remember that record in state. When later the record b.Id = 1 arrives the
>> operator will look into the state and update the results. So the results
>> are eventually correct.
>>
>> I thought there might be a way to specify the partitioning.
>>
>>
>> There is no need to specify partitioning for joins. Flink's SQL planner
>> does this for you. If you look into the web UI you'll see an arrow marked
>> with HASH pointing from sources to join operators. It means that records
>> flowing through this arrow will be distributed to the corresponding
>> parallelism according to the hash values of their join keys.
>>
>> Curt Buechter <tricksho...@gmail.com> 于2021年11月14日周日 下午11:52写道:
>>
>>> Hi,
>>> I'd like to understand a little more how joins work. I have a fairly
>>> simple LEFT JOIN query, and I'm seeing spotty results on the joins. I know
>>> there is a record on the right side, but sometimes it produces a result,
>>> and sometimes it doesn't.
>>>
>>> Sample query:
>>> SELECT a.id, b.val1, c.val2
>>> FROM table1 a
>>> LEFT JOIN table2 b ON a.bId = b.Id
>>> LEFT JOIN table3 c ON a.cId = c.Id
>>>
>>> I'm using Flink 1.13.2. All tables are loaded from kafka. Using the
>>> Table/SQL API.
>>>
>>> My suspicion is that the distributed nature of the join causes the
>>> problem. If I reduce the parallelism to match the number of slots,
>>> resulting in a single task manager, the joins always (I think) seem to
>>> work. So, my assumption is that records in table3 that should join to
>>> table1 are not present on the same task manager, so the join produces null
>>> values for table3.
>>>
>>> I thought there might be a way to specify the partitioning. If the
>>> tables are from a multi-tenant database, I could specify the tenant-id as
>>> the partition key, but the Flink SQL "PARTITION BY" statement doesn't seem
>>> to work that way.
>>>
>>> Is there a way to confirm this? Does anyone know if this is a known
>>> problem, and is there a solution?
>>>
>>> Thanks,
>>> Curt
>>>
>>

Reply via email to