Lilo created FLINK-36930: ---------------------------- Summary: Lookup join with JDBC connector fails when join condition does not include fields from the probe/left table Key: FLINK-36930 URL: https://issues.apache.org/jira/browse/FLINK-36930 Project: Flink Issue Type: Bug Affects Versions: 1.19.1, 1.18.1, 1.17.0 Environment: * Flink versions affected: 1.17.0 and later versions (Tested on 1.19.1) * Flink version where this worked: 1.16.2 * Connector: JDBC (tested with Apache Derby and MySQL ; likely reproducible with other JDBC drivers). Notably, this issue does not affect the Hive connector.
Reporter: Lilo I've encountered a regression in Flink 1.17.0 (and later versions) related to lookup joins when using the JDBC connector as the lookup source. The issue arises when the join condition does not include fields from the probe/left table. This scenario worked correctly in Flink 1.16.2 but now throws a `TableException`. The following SQL code demonstrates the problem. {code:sql} SET 'execution.target' = 'local'; -- Create the lookup table (using Apache Derby as an example) DROP TEMPORARY TABLE IF EXISTS lookup_table; CREATE TEMPORARY TABLE lookup_table ( id INT, sub_id INT, v STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:derby:memory:myInMemoryDB;create=true', 'table-name' = 'lookup_table' ); -- Create the main table (using datagen connector) DROP TEMPORARY TABLE IF EXISTS main_table; CREATE TEMPORARY TABLE main_table ( v STRING, some_id INT, some_sub_id INT, proctime AS PROCTIME() ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '2', 'fields.some_id.kind' = 'random', 'fields.some_id.min' = '1', 'fields.some_id.max' = '5', 'fields.some_sub_id.kind' = 'random', 'fields.some_sub_id.min' = '1', 'fields.some_sub_id.max' = '3', 'fields.v.length' = '10' ); -- This lookup join works correctly (join condition includes a field from main_table) EXPLAIN PLAN FOR SELECT t1.*, t2.v AS lookup_value FROM main_table t1 INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t2.id = 1 AND t2.sub_id = t1.some_sub_id; -- This lookup join FAILS in Flink 1.17.0+ (join condition only uses constants) EXPLAIN PLAN FOR SELECT t1.*, t2.v AS lookup_value FROM main_table t1 INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t2.id = 1 AND t2.sub_id = 1; -- Attempting to bypass the issue with a view also fails DROP VIEW IF EXISTS main_table_view; CREATE TEMPORARY VIEW main_table_view AS SELECT *, 1 AS fake_id FROM main_table; EXPLAIN PLAN FOR SELECT t1.*, t2.v AS lookup_value FROM main_table_view t1 INNER JOIN lookup_table FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t2.id = 1 AND t2.sub_id = t1.fake_id; {code} *Expected Behavior:* These queries should execute successfully, even when the join condition does not involve fields from the main table. This was the behavior in Flink 1.16.2. *Actual Behavior:* In Flink 1.17.0 and later, the second query (where the join condition only uses constants) throws the following exception (from Flink 1.17.2): {code:java} Caused by: org.apache.flink.table.api.TableException: Temporal table join requires an equality condition on fields of table [default_catalog.default_database.lookup_table]. at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validate(CommonExecLookupJoin.java:687) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createJoinTransformation(CommonExecLookupJoin.java:249) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.java:157) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)