Hi! sometimes it produces a result, and sometimes it doesn't
Do you mean this scenario: Consider a record from table1 with a.bId = 1. You know that in table2 there is a record with b.Id = 1, but the join result is null at the beginning, and then it is updated with the correct result? This is the property of streaming joins. If the records with a.bId = 1 arrives earlier than the records with b.Id = 1, the join operator will first output a null result (because currently there is nothing to join) and remember that record in state. When later the record b.Id = 1 arrives the operator will look into the state and update the results. So the results are eventually correct. I thought there might be a way to specify the partitioning. There is no need to specify partitioning for joins. Flink's SQL planner does this for you. If you look into the web UI you'll see an arrow marked with HASH pointing from sources to join operators. It means that records flowing through this arrow will be distributed to the corresponding parallelism according to the hash values of their join keys. Curt Buechter <tricksho...@gmail.com> 于2021年11月14日周日 下午11:52写道: > Hi, > I'd like to understand a little more how joins work. I have a fairly > simple LEFT JOIN query, and I'm seeing spotty results on the joins. I know > there is a record on the right side, but sometimes it produces a result, > and sometimes it doesn't. > > Sample query: > SELECT a.id, b.val1, c.val2 > FROM table1 a > LEFT JOIN table2 b ON a.bId = b.Id > LEFT JOIN table3 c ON a.cId = c.Id > > I'm using Flink 1.13.2. All tables are loaded from kafka. Using the > Table/SQL API. > > My suspicion is that the distributed nature of the join causes the > problem. If I reduce the parallelism to match the number of slots, > resulting in a single task manager, the joins always (I think) seem to > work. So, my assumption is that records in table3 that should join to > table1 are not present on the same task manager, so the join produces null > values for table3. > > I thought there might be a way to specify the partitioning. If the tables > are from a multi-tenant database, I could specify the tenant-id as the > partition key, but the Flink SQL "PARTITION BY" statement doesn't seem to > work that way. > > Is there a way to confirm this? Does anyone know if this is a known > problem, and is there a solution? > > Thanks, > Curt >