Hello,

I ran into an issue where if `table.optimizer.join-reorder-enabled` is set
to true and I execute a lookup join query the following error is shown:

```
Exception in thread "main" org.apache.flink.table.api.TableException:
Cannot generate a valid execution plan for the given query:

FlinkLogicalSink(table=[default_catalog.default_database.print_sink],
fields=[id, name, info, col_1, col_2, col_3, col_4, col_5, col_6, col_7,
col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16,
col_17, col_18, col_19])
+- FlinkLogicalCalc(select=[id0 AS id, name, info, col_1, col_2, col_3,
col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13,
col_14, col_15, col_16, col_17, col_18, col_19])
   +- FlinkLogicalJoin(condition=[=($21, $0)], joinType=[right])
      :- FlinkLogicalSnapshot(period=[$cor0.proc_time])
      :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
default_database, kudu_join]], fields=[id, info, col_1, col_2, col_3,
col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13,
col_14, col_15, col_16, col_17, col_18, col_19])
      +- FlinkLogicalTableSourceScan(table=[[default_catalog,
default_database, datagen]], fields=[id, name])
```

Relevant stacktrace:

```
Caused by: org.apache.flink.table.api.TableException: Temporal table join
only support apply FOR SYSTEM_TIME AS OF on the right table.
at
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalJoinRule.matches(StreamPhysicalJoinRule.scala:52)
```

The optimizer flips the ordering of the join and although the new type is
`right`, the validation above always checks if the left side is not of
`FlinkLogicalSnapshot`.

I'm wondering if this is a bug or intended behavior and if the latter,
could there be a way to improve the planner to be smarter about it?

Reply via email to