Hello, I ran into an issue where if `table.optimizer.join-reorder-enabled` is set to true and I execute a lookup join query the following error is shown:
``` Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: FlinkLogicalSink(table=[default_catalog.default_database.print_sink], fields=[id, name, info, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19]) +- FlinkLogicalCalc(select=[id0 AS id, name, info, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19]) +- FlinkLogicalJoin(condition=[=($21, $0)], joinType=[right]) :- FlinkLogicalSnapshot(period=[$cor0.proc_time]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, kudu_join]], fields=[id, info, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[id, name]) ``` Relevant stacktrace: ``` Caused by: org.apache.flink.table.api.TableException: Temporal table join only support apply FOR SYSTEM_TIME AS OF on the right table. at org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalJoinRule.matches(StreamPhysicalJoinRule.scala:52) ``` The optimizer flips the ordering of the join and although the new type is `right`, the validation above always checks if the left side is not of `FlinkLogicalSnapshot`. I'm wondering if this is a bug or intended behavior and if the latter, could there be a way to improve the planner to be smarter about it?