Hi! Thanks for the information. If possible could you share your test case (including SQL and configurations) so that we can look deeper into this problem?
Curt Buechter <tricksho...@gmail.com> 于2021年11月15日周一 上午9:45写道: > Thanks for the response. But, no, that's not the scenario. I'm familiar > with later arriving data. In my case, though, the record on table 2 never > arrives. > I'm not positive, but it may only be happening when there are three or > more tables being joined in one statement. > > On Sun, Nov 14, 2021, 7:32 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> sometimes it produces a result, and sometimes it doesn't >> >> >> Do you mean this scenario: Consider a record from table1 with a.bId = 1. >> You know that in table2 there is a record with b.Id = 1, but the join >> result is null at the beginning, and then it is updated with the correct >> result? >> >> This is the property of streaming joins. If the records with a.bId = 1 >> arrives earlier than the records with b.Id = 1, the join operator will >> first output a null result (because currently there is nothing to join) and >> remember that record in state. When later the record b.Id = 1 arrives the >> operator will look into the state and update the results. So the results >> are eventually correct. >> >> I thought there might be a way to specify the partitioning. >> >> >> There is no need to specify partitioning for joins. Flink's SQL planner >> does this for you. If you look into the web UI you'll see an arrow marked >> with HASH pointing from sources to join operators. It means that records >> flowing through this arrow will be distributed to the corresponding >> parallelism according to the hash values of their join keys. >> >> Curt Buechter <tricksho...@gmail.com> 于2021年11月14日周日 下午11:52写道: >> >>> Hi, >>> I'd like to understand a little more how joins work. I have a fairly >>> simple LEFT JOIN query, and I'm seeing spotty results on the joins. I know >>> there is a record on the right side, but sometimes it produces a result, >>> and sometimes it doesn't. >>> >>> Sample query: >>> SELECT a.id, b.val1, c.val2 >>> FROM table1 a >>> LEFT JOIN table2 b ON a.bId = b.Id >>> LEFT JOIN table3 c ON a.cId = c.Id >>> >>> I'm using Flink 1.13.2. All tables are loaded from kafka. Using the >>> Table/SQL API. >>> >>> My suspicion is that the distributed nature of the join causes the >>> problem. If I reduce the parallelism to match the number of slots, >>> resulting in a single task manager, the joins always (I think) seem to >>> work. So, my assumption is that records in table3 that should join to >>> table1 are not present on the same task manager, so the join produces null >>> values for table3. >>> >>> I thought there might be a way to specify the partitioning. If the >>> tables are from a multi-tenant database, I could specify the tenant-id as >>> the partition key, but the Flink SQL "PARTITION BY" statement doesn't seem >>> to work that way. >>> >>> Is there a way to confirm this? Does anyone know if this is a known >>> problem, and is there a solution? >>> >>> Thanks, >>> Curt >>> >>