Thanks for the response. But, no, that's not the scenario. I'm familiar with later arriving data. In my case, though, the record on table 2 never arrives. I'm not positive, but it may only be happening when there are three or more tables being joined in one statement.
On Sun, Nov 14, 2021, 7:32 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > sometimes it produces a result, and sometimes it doesn't > > > Do you mean this scenario: Consider a record from table1 with a.bId = 1. > You know that in table2 there is a record with b.Id = 1, but the join > result is null at the beginning, and then it is updated with the correct > result? > > This is the property of streaming joins. If the records with a.bId = 1 > arrives earlier than the records with b.Id = 1, the join operator will > first output a null result (because currently there is nothing to join) and > remember that record in state. When later the record b.Id = 1 arrives the > operator will look into the state and update the results. So the results > are eventually correct. > > I thought there might be a way to specify the partitioning. > > > There is no need to specify partitioning for joins. Flink's SQL planner > does this for you. If you look into the web UI you'll see an arrow marked > with HASH pointing from sources to join operators. It means that records > flowing through this arrow will be distributed to the corresponding > parallelism according to the hash values of their join keys. > > Curt Buechter <tricksho...@gmail.com> 于2021年11月14日周日 下午11:52写道: > >> Hi, >> I'd like to understand a little more how joins work. I have a fairly >> simple LEFT JOIN query, and I'm seeing spotty results on the joins. I know >> there is a record on the right side, but sometimes it produces a result, >> and sometimes it doesn't. >> >> Sample query: >> SELECT a.id, b.val1, c.val2 >> FROM table1 a >> LEFT JOIN table2 b ON a.bId = b.Id >> LEFT JOIN table3 c ON a.cId = c.Id >> >> I'm using Flink 1.13.2. All tables are loaded from kafka. Using the >> Table/SQL API. >> >> My suspicion is that the distributed nature of the join causes the >> problem. If I reduce the parallelism to match the number of slots, >> resulting in a single task manager, the joins always (I think) seem to >> work. So, my assumption is that records in table3 that should join to >> table1 are not present on the same task manager, so the join produces null >> values for table3. >> >> I thought there might be a way to specify the partitioning. If the tables >> are from a multi-tenant database, I could specify the tenant-id as the >> partition key, but the Flink SQL "PARTITION BY" statement doesn't seem to >> work that way. >> >> Is there a way to confirm this? Does anyone know if this is a known >> problem, and is there a solution? >> >> Thanks, >> Curt >> >